package com.ververica.cdc.connectors.sqlserver.source;

import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceTestBase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MSSQLServerContainer;

/* loaded from: input_file:com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceITCase.class */
public class SqlServerSourceITCase extends SqlServerSourceTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(SqlServerSourceITCase.class);

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        testSqlServerParallelSource(1, SqlServerSourceTestBase.FailoverType.NONE, SqlServerSourceTestBase.FailoverPhase.NEVER, new String[]{"dbo.customers"});
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        testSqlServerParallelSource(4, SqlServerSourceTestBase.FailoverType.NONE, SqlServerSourceTestBase.FailoverPhase.NEVER, new String[]{"dbo.customers"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        testSqlServerParallelSource(SqlServerSourceTestBase.FailoverType.TM, SqlServerSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{"dbo.customers"});
    }

    @Test
    public void testTaskManagerFailoverInBinlogPhase() throws Exception {
        testSqlServerParallelSource(SqlServerSourceTestBase.FailoverType.TM, SqlServerSourceTestBase.FailoverPhase.STREAM, new String[]{"dbo.customers"});
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        testSqlServerParallelSource(SqlServerSourceTestBase.FailoverType.JM, SqlServerSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{"dbo.customers"});
    }

    @Test
    public void testJobManagerFailoverInBinlogPhase() throws Exception {
        testSqlServerParallelSource(SqlServerSourceTestBase.FailoverType.JM, SqlServerSourceTestBase.FailoverPhase.STREAM, new String[]{"dbo.customers"});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        testSqlServerParallelSource(1, SqlServerSourceTestBase.FailoverType.JM, SqlServerSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{"dbo.customers"});
    }

    private void testSqlServerParallelSource(SqlServerSourceTestBase.FailoverType failoverType, SqlServerSourceTestBase.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testSqlServerParallelSource(4, failoverType, failoverPhase, strArr);
    }

    private void testSqlServerParallelSource(int i, SqlServerSourceTestBase.FailoverType failoverType, SqlServerSourceTestBase.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        initializeSqlServerTable("customer");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        String[] strArr2 = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        create.executeSql(String.format("CREATE TABLE customers ( id INT NOT NULL, name STRING, address STRING, phone_number STRING, primary key (id) not enforced) WITH ( 'connector' = 'sqlserver-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = 'true', 'scan.incremental.snapshot.chunk.size' = '4')", MSSQL_SERVER_CONTAINER.getHost(), MSSQL_SERVER_CONTAINER.getMappedPort(MSSQLServerContainer.MS_SQL_SERVER_PORT.intValue()), MSSQL_SERVER_CONTAINER.getUsername(), MSSQL_SERVER_CONTAINER.getPassword(), "customer", getTableNameRegex(strArr)));
        TableResult executeSql = create.executeSql("select * from customers");
        CloseableIterator collect = executeSql.collect();
        JobID jobID = ((JobClient) executeSql.getJobClient().get()).getJobID();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            arrayList.addAll(Arrays.asList(strArr2));
        }
        if (failoverPhase == SqlServerSourceTestBase.FailoverPhase.SNAPSHOT && collect.hasNext()) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        LOG.info("snapshot data start");
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
        for (String str : strArr) {
            makeFirstPartChangeStreamEvents("customer." + str);
        }
        if (failoverPhase == SqlServerSourceTestBase.FailoverPhase.STREAM) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(200L);
            });
        }
        for (String str2 : strArr) {
            makeSecondPartBinlogEvents("customer." + str2);
        }
        String[] strArr3 = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < strArr.length; i3++) {
            arrayList2.addAll(Arrays.asList(strArr3));
        }
        assertEqualsInAnyOrder(arrayList2, fetchRows(collect, arrayList2.size()));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private void makeFirstPartChangeStreamEvents(String str) {
        executeSql("UPDATE " + str + " SET address = 'Hangzhou' where id = 103");
        executeSql("DELETE FROM " + str + " where id = 102");
        executeSql("INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')");
        executeSql("UPDATE " + str + " SET address = 'Shanghai' where id = 103");
    }

    private void makeSecondPartBinlogEvents(String str) {
        executeSql("UPDATE " + str + " SET address = 'Hangzhou' where id = 1010");
        executeSql("INSERT INTO " + str + " VALUES(2001, 'user_22','Shanghai','123567891234')");
        executeSql("INSERT INTO " + str + " VALUES(2002, 'user_23','Shanghai','123567891234')");
        executeSql("INSERT INTO " + str + " VALUES(2003, 'user_24','Shanghai','123567891234')");
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    private String getTableNameRegex(String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == 1 ? strArr[0] : String.format("(%s)", StringUtils.join(strArr, ","));
    }
}
