package com.ververica.cdc.connectors.sqlserver.source.read.fetch;

import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.base.dialect.JdbcDataSourceDialect;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceTestBase;
import com.ververica.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig;
import com.ververica.cdc.connectors.sqlserver.source.config.SqlServerSourceConfigFactory;
import com.ververica.cdc.connectors.sqlserver.source.dialect.SqlServerDialect;
import com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
import com.ververica.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils;
import com.ververica.cdc.connectors.sqlserver.testutils.RecordsFormatter;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerPartition;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionSchema;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;
import org.testcontainers.containers.MSSQLServerContainer;

/* loaded from: input_file:com/ververica/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.class */
public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase {

    /* loaded from: input_file:com/ververica/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest$MakeChangeEventTaskContext.class */
    static class MakeChangeEventTaskContext extends SqlServerSourceFetchTaskContext {
        private final Supplier<Boolean> makeChangeEventFunction;

        public MakeChangeEventTaskContext(JdbcSourceConfig jdbcSourceConfig, SqlServerDialect sqlServerDialect, SqlServerConnection sqlServerConnection, SqlServerConnection sqlServerConnection2, Supplier<Boolean> supplier) {
            super(jdbcSourceConfig, sqlServerDialect, sqlServerConnection, sqlServerConnection2);
            this.makeChangeEventFunction = supplier;
        }

        public EventDispatcher.SnapshotReceiver<SqlServerPartition> getSnapshotReceiver() {
            final EventDispatcher.SnapshotReceiver snapshotReceiver = super.getSnapshotReceiver();
            return new EventDispatcher.SnapshotReceiver<SqlServerPartition>() { // from class: com.ververica.cdc.connectors.sqlserver.source.read.fetch.SqlServerScanFetchTaskTest.MakeChangeEventTaskContext.1
                public void changeRecord(SqlServerPartition sqlServerPartition, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext, ConnectHeaders connectHeaders) throws InterruptedException {
                    snapshotReceiver.changeRecord(sqlServerPartition, dataCollectionSchema, operation, obj, struct, offsetContext, connectHeaders);
                }

                public void completeSnapshot() throws InterruptedException {
                    snapshotReceiver.completeSnapshot();
                    MakeChangeEventTaskContext.this.makeChangeEventFunction.get();
                    Thread.sleep(10000L);
                }
            };
        }
    }

    @Test
    public void testChangingDataInSnapshotScan() throws Exception {
        initializeSqlServerTable("customer");
        SqlServerSourceConfigFactory configFactory = getConfigFactory("customer", new String[]{"dbo.customers"}, 10);
        SqlServerSourceConfig create = configFactory.create(0);
        SqlServerDialect sqlServerDialect = new SqlServerDialect(configFactory);
        String str = "customer.dbo.customers";
        String[] strArr = {"UPDATE " + str + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + str + " where id = 102", "INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + str + " SET address = 'Shanghai' where id = 103", "UPDATE " + str + " SET address = 'Hangzhou' where id = 110", "UPDATE " + str + " SET address = 'Hangzhou' where id = 111"};
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Hangzhou, 123567891234]", "+I[111, user_6, Hangzhou, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]"), readTableSnapshotSplits(getSnapshotSplits(create, sqlServerDialect), new MakeChangeEventTaskContext(create, sqlServerDialect, SqlServerConnectionUtils.createSqlServerConnection(create.getDbzConnectorConfig()), SqlServerConnectionUtils.createSqlServerConnection(create.getDbzConnectorConfig()), () -> {
            return Boolean.valueOf(executeSql(create, strArr));
        }), 1, DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())})));
    }

    @Test
    public void testInsertDataInSnapshotScan() throws Exception {
        initializeSqlServerTable("customer");
        SqlServerSourceConfigFactory configFactory = getConfigFactory("customer", new String[]{"dbo.customers"}, 10);
        SqlServerSourceConfig create = configFactory.create(0);
        SqlServerDialect sqlServerDialect = new SqlServerDialect(configFactory);
        String str = "customer.dbo.customers";
        String[] strArr = {"INSERT INTO " + str + " VALUES(112, 'user_12','Shanghai','123567891234')", "INSERT INTO " + str + " VALUES(113, 'user_13','Shanghai','123567891234')"};
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[112, user_12, Shanghai, 123567891234]", "+I[113, user_13, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]"), readTableSnapshotSplits(getSnapshotSplits(create, sqlServerDialect), new MakeChangeEventTaskContext(create, sqlServerDialect, SqlServerConnectionUtils.createSqlServerConnection(create.getDbzConnectorConfig()), SqlServerConnectionUtils.createSqlServerConnection(create.getDbzConnectorConfig()), () -> {
            return Boolean.valueOf(executeSql(create, strArr));
        }), 1, DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())})));
    }

    @Test
    public void testDeleteDataInSnapshotScan() throws Exception {
        initializeSqlServerTable("customer");
        SqlServerSourceConfigFactory configFactory = getConfigFactory("customer", new String[]{"dbo.customers"}, 10);
        SqlServerSourceConfig create = configFactory.create(0);
        SqlServerDialect sqlServerDialect = new SqlServerDialect(configFactory);
        String str = "customer.dbo.customers";
        String[] strArr = {"DELETE FROM " + str + " where id = 101", "DELETE FROM " + str + " where id = 102"};
        assertEqualsInAnyOrder(Arrays.asList("+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]"), readTableSnapshotSplits(getSnapshotSplits(create, sqlServerDialect), new MakeChangeEventTaskContext(create, sqlServerDialect, SqlServerConnectionUtils.createSqlServerConnection(create.getDbzConnectorConfig()), SqlServerConnectionUtils.createSqlServerConnection(create.getDbzConnectorConfig()), () -> {
            return Boolean.valueOf(executeSql(create, strArr));
        }), 1, DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())})));
    }

    private List<String> readTableSnapshotSplits(List<SnapshotSplit> list, SqlServerSourceFetchTaskContext sqlServerSourceFetchTaskContext, int i, DataType dataType) throws Exception {
        IncrementalSourceScanFetcher incrementalSourceScanFetcher = new IncrementalSourceScanFetcher(sqlServerSourceFetchTaskContext, 0);
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            SnapshotSplit snapshotSplit = list.get(i2);
            if (incrementalSourceScanFetcher.isFinished()) {
                incrementalSourceScanFetcher.submitTask(sqlServerSourceFetchTaskContext.getDataSourceDialect().createFetchTask(snapshotSplit));
            }
            while (true) {
                Iterator pollSplitRecords = incrementalSourceScanFetcher.pollSplitRecords();
                if (pollSplitRecords != null) {
                    while (pollSplitRecords.hasNext()) {
                        arrayList.addAll(((SourceRecords) pollSplitRecords.next()).getSourceRecordList());
                    }
                }
            }
        }
        incrementalSourceScanFetcher.close();
        Assert.assertNotNull(incrementalSourceScanFetcher.getExecutorService());
        Assert.assertTrue(incrementalSourceScanFetcher.getExecutorService().isTerminated());
        return formatResult(arrayList, dataType);
    }

    private List<String> formatResult(List<SourceRecord> list, DataType dataType) {
        return new RecordsFormatter(dataType).format(list);
    }

    private List<SnapshotSplit> getSnapshotSplits(SqlServerSourceConfig sqlServerSourceConfig, JdbcDataSourceDialect jdbcDataSourceDialect) {
        String str = (String) sqlServerSourceConfig.getDatabaseList().get(0);
        List list = (List) sqlServerSourceConfig.getTableList().stream().map(str2 -> {
            return TableId.parse(str + "." + str2);
        }).collect(Collectors.toList());
        ChunkSplitter createChunkSplitter = jdbcDataSourceDialect.createChunkSplitter(sqlServerSourceConfig);
        ArrayList arrayList = new ArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            arrayList.addAll(createChunkSplitter.generateSplits((TableId) it.next()));
        }
        return arrayList;
    }

    public static SqlServerSourceConfigFactory getConfigFactory(String str, String[] strArr, int i) {
        return new SqlServerSourceConfigFactory().hostname(MSSQL_SERVER_CONTAINER.getHost()).port(MSSQL_SERVER_CONTAINER.getMappedPort(MSSQLServerContainer.MS_SQL_SERVER_PORT.intValue()).intValue()).username(MSSQL_SERVER_CONTAINER.getUsername()).password(MSSQL_SERVER_CONTAINER.getPassword()).databaseList(new String[]{str}).tableList(strArr).splitSize(i);
    }

    private boolean executeSql(SqlServerSourceConfig sqlServerSourceConfig, String[] strArr) {
        try {
            SqlServerConnection createSqlServerConnection = SqlServerConnectionUtils.createSqlServerConnection(sqlServerSourceConfig.getDbzConnectorConfig());
            try {
                createSqlServerConnection.setAutoCommit(false);
                createSqlServerConnection.execute(strArr);
                createSqlServerConnection.commit();
                if (createSqlServerConnection != null) {
                    createSqlServerConnection.close();
                }
                return true;
            } finally {
            }
        } catch (SQLException e) {
            LOG.error("Failed to execute sql statements.", e);
            return false;
        }
    }
}
