/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.processors;

import ch.qos.logback.classic.Level;
import io.debezium.config.Configuration;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractReselectProcessorTest<T extends SourceConnector>
extends AbstractConnectorTest {
    protected abstract Class<T> getConnectorClass();

    protected abstract JdbcConnection databaseConnection();

    protected abstract Configuration.Builder getConfigurationBuilder();

    protected abstract String topicName();

    protected abstract String tableName();

    protected abstract String reselectColumnsList();

    protected abstract void createTable() throws Exception;

    protected abstract void dropTable() throws Exception;

    protected abstract String getInsertWithValue();

    protected abstract String getInsertWithNullValue();

    protected abstract void waitForStreamingStarted() throws InterruptedException;

    @Before
    public void beforeEach() throws Exception {
        this.createTable();
        this.databaseConnection().setAutoCommit(false);
    }

    @After
    public void afterEach() throws Exception {
        this.stopConnector();
        this.assertNoRecordsToConsume();
        this.dropTable();
    }

    @Test
    @FixFor(value={"DBZ-4321"})
    public void testNoColumnsReselectedWhenNullAndUnavailableColumnsAreDisabled() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
        interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
        this.databaseConnection().execute(new String[]{this.getInsertWithNullValue()});
        Configuration config = this.getConfigurationBuilder().with("snapshot.mode", "initial").with("reselector.reselect.null.values", "false").with("reselector.reselect.unavailable.values", "false").with("reselector.reselect.columns.include.list", this.reselectColumnsList()).build();
        this.start(this.getConnectorClass(), config);
        this.assertConnectorIsRunning();
        this.waitForStreamingStarted();
        Assertions.assertThat((boolean)interceptor.containsMessage("disables both null and unavailable columns, no-reselection will occur")).isTrue();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopicReselectWhenNotNullSnapshot();
        List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(this.topicName());
        SourceRecord record = tableRecords.get(0);
        Struct after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidRead((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after.get(this.fieldName("id"))).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get(this.fieldName("data"))).isNull();
        Assertions.assertThat((Object)after.get(this.fieldName("data2"))).isEqualTo((Object)1);
    }

    @Test
    @FixFor(value={"DBZ-4321"})
    public void testNoColumnsReselectedWhenNotNullSnapshot() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
        interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
        this.databaseConnection().execute(new String[]{this.getInsertWithValue()});
        Configuration config = this.getConfigurationBuilder().with("snapshot.mode", "initial").with("reselector.reselect.columns.include.list", this.reselectColumnsList()).build();
        this.start(this.getConnectorClass(), config);
        this.assertConnectorIsRunning();
        this.waitForStreamingStarted();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopicReselectWhenNotNullSnapshot();
        List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(this.topicName());
        SourceRecord record = tableRecords.get(0);
        Struct after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidRead((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after.get(this.fieldName("id"))).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get(this.fieldName("data"))).isEqualTo((Object)"one");
        Assertions.assertThat((Object)after.get(this.fieldName("data2"))).isEqualTo((Object)1);
        Assertions.assertThat((boolean)interceptor.containsMessage("No columns require re-selection.")).isTrue();
    }

    @Test
    @FixFor(value={"DBZ-4321"})
    public void testNoColumnsReselectedWhenNotNullStreaming() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(ReselectColumnsPostProcessor.class);
        interceptor.setLoggerLevel(ReselectColumnsPostProcessor.class, Level.DEBUG);
        Configuration config = this.getConfigurationBuilder().with("reselector.reselect.columns.include.list", this.reselectColumnsList()).build();
        this.start(this.getConnectorClass(), config);
        this.assertConnectorIsRunning();
        this.waitForStreamingStarted();
        this.databaseConnection().execute(new String[]{this.getInsertWithValue()});
        this.databaseConnection().execute(new String[]{String.format("UPDATE %s SET data = 'two' where id = 1", this.tableName())});
        this.databaseConnection().execute(new String[]{String.format("DELETE FROM %s WHERE id = 1", this.tableName())});
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopicReselectWhenNotNullStreaming();
        List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(this.topicName());
        SourceRecord record = tableRecords.get(0);
        Struct after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidInsert((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after.get(this.fieldName("id"))).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get(this.fieldName("data"))).isEqualTo((Object)"one");
        Assertions.assertThat((Object)after.get(this.fieldName("data2"))).isEqualTo((Object)1);
        record = tableRecords.get(1);
        after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidUpdate((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after.get(this.fieldName("id"))).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get(this.fieldName("data"))).isEqualTo((Object)"two");
        Assertions.assertThat((Object)after.get(this.fieldName("data2"))).isEqualTo((Object)1);
        record = tableRecords.get(2);
        after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidDelete((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after).isNull();
        record = tableRecords.get(3);
        VerifyRecord.isValidTombstone((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)record.value()).isNull();
        Assertions.assertThat((boolean)interceptor.containsMessage("No columns require re-selection.")).isTrue();
    }

    @Test
    @FixFor(value={"DBZ-4321"})
    public void testColumnsReselectedWhenValueIsNullSnapshot() throws Exception {
        this.databaseConnection().execute(new String[]{this.getInsertWithNullValue()});
        this.databaseConnection().execute(new String[]{String.format("UPDATE %s SET data = 'two' where id = 1", this.tableName())});
        Configuration config = this.getConfigurationBuilder().with("snapshot.mode", "initial").with("reselector.reselect.columns.include.list", this.reselectColumnsList()).build();
        this.start(this.getConnectorClass(), config);
        this.assertConnectorIsRunning();
        this.waitForStreamingStarted();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopicReselectWhenNullSnapshot();
        List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(this.topicName());
        SourceRecord record = tableRecords.get(0);
        Struct after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidRead((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after.get(this.fieldName("id"))).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get(this.fieldName("data"))).isEqualTo((Object)"two");
        Assertions.assertThat((Object)after.get(this.fieldName("data2"))).isEqualTo((Object)1);
    }

    @Test
    @FixFor(value={"DBZ-4321"})
    public void testColumnsReselectedWhenValueIsNullStreaming() throws Exception {
        Configuration config = this.getConfigurationBuilder().with("reselector.reselect.columns.include.list", this.reselectColumnsList()).build();
        this.start(this.getConnectorClass(), config);
        this.assertConnectorIsRunning();
        this.waitForStreamingStarted();
        this.databaseConnection().executeWithoutCommitting(new String[]{this.getInsertWithNullValue()});
        this.databaseConnection().executeWithoutCommitting(new String[]{String.format("UPDATE %s SET data = 'two' where id = 1", this.tableName())});
        this.databaseConnection().commit();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopicReselectWhenNullStreaming();
        List<SourceRecord> tableRecords = sourceRecords.recordsForTopic(this.topicName());
        SourceRecord record = tableRecords.get(0);
        Struct after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidInsert((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after.get(this.fieldName("id"))).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get(this.fieldName("data"))).isEqualTo((Object)"two");
        Assertions.assertThat((Object)after.get(this.fieldName("data2"))).isEqualTo((Object)1);
        record = tableRecords.get(1);
        after = ((Struct)record.value()).getStruct("after");
        VerifyRecord.isValidUpdate((SourceRecord)record, (String)this.fieldName("id"), (int)1);
        Assertions.assertThat((Object)after.get(this.fieldName("id"))).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get(this.fieldName("data"))).isEqualTo((Object)"two");
        Assertions.assertThat((Object)after.get(this.fieldName("data2"))).isEqualTo((Object)1);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNotNullSnapshot() throws InterruptedException {
        return this.consumeRecordsByTopic(1);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNotNullStreaming() throws InterruptedException {
        return this.consumeRecordsByTopic(4);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNullSnapshot() throws InterruptedException {
        return this.consumeRecordsByTopic(1);
    }

    protected AbstractConnectorTest.SourceRecords consumeRecordsByTopicReselectWhenNullStreaming() throws InterruptedException {
        return this.consumeRecordsByTopic(2);
    }

    protected String fieldName(String fieldName) {
        return fieldName;
    }
}

