/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.simple.SimpleSourceConnector;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.InterruptedConnector;
import io.debezium.embedded.InterruptingOffsetStore;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.Json;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import io.debezium.util.Throwables;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.Test;

public class EmbeddedEngineTest
extends AbstractConnectorTest {
    private static final int NUMBER_OF_LINES = 10;
    private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath((String)"file-connector-input.txt").toAbsolutePath();
    private static final Charset UTF8 = StandardCharsets.UTF_8;
    private File inputFile;
    private int nextConsumedLineNumber;
    private int linesAdded;
    private Configuration connectorConfig;

    @Before
    public void beforeEach() throws Exception {
        this.nextConsumedLineNumber = 1;
        this.linesAdded = 0;
        Testing.Files.delete((Path)TEST_FILE_PATH);
        this.inputFile = Testing.Files.createTestingFile((Path)TEST_FILE_PATH);
        this.connectorConfig = ((Configuration.Builder)Configuration.create().with("file", (Object)TEST_FILE_PATH)).with("topic", "topicX").build();
    }

    @Test
    public void verifyNonAsciiContentHandledCorrectly() throws Exception {
        this.appendLinesToSource("\u00d1 \u00f1", 10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        CountDownLatch firstLatch = new CountDownLatch(1);
        DebeziumEngine engine = DebeziumEngine.create(Json.class, Json.class).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            for (ChangeEvent record : records) {
                Assertions.assertThat((String)((String)record.value())).contains("\u00d1");
            }
            for (ChangeEvent r : records) {
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    public void interruptedTaskShutsDown() throws Exception {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(EmbeddedEngine.ENGINE_NAME, "testing-connector")).with(EmbeddedEngine.CONNECTOR_CLASS, InterruptedConnector.class)).with("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)).with(EmbeddedEngine.OFFSET_STORAGE, InterruptingOffsetStore.class)).build();
        CountDownLatch firstLatch = new CountDownLatch(1);
        this.engine = EmbeddedEngine.create().using(config).notifying((records, committer) -> {}).using(this.getClass().getClassLoader()).using((success, message, error) -> {
            if (error != null) {
                this.logger.error("Error while shutting down", error);
            }
            firstLatch.countDown();
        }).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
    }

    @Test
    public void interruptedOffsetCommitShutsDown() throws Exception {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with("batch.count", 1)).with(EmbeddedEngine.ENGINE_NAME, "testing-connector")).with(EmbeddedEngine.CONNECTOR_CLASS, SimpleSourceConnector.class)).with("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).with(EmbeddedEngine.OFFSET_STORAGE, InterruptingOffsetStore.class)).build();
        CountDownLatch firstLatch = new CountDownLatch(1);
        this.engine = EmbeddedEngine.create().using(config).using(OffsetCommitPolicy.always()).notifying((records, committer) -> {
            for (SourceRecord record : records) {
                committer.markProcessed((Object)record);
            }
            committer.markBatchFinished();
        }).using(this.getClass().getClassLoader()).using((success, message, error) -> {
            if (error != null) {
                this.logger.error("Error while shutting down", error);
            }
            firstLatch.countDown();
        }).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
    }

    @Test
    public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exception {
        this.appendLinesToSource(10);
        this.start(FileStreamSourceConnector.class, this.connectorConfig);
        this.consumeLines(10);
        this.assertNoRecordsToConsume();
        for (int i = 1; i != 5; ++i) {
            this.appendLinesToSource(10);
            this.consumeLines(10);
            this.assertNoRecordsToConsume();
        }
        this.stopConnector();
        this.appendLinesToSource(10);
        this.assertNoRecordsToConsume();
        this.start(FileStreamSourceConnector.class, this.connectorConfig);
        this.consumeLines(10);
        this.assertNoRecordsToConsume();
    }

    @Test
    @FixFor(value={"DBZ-1080"})
    public void shouldWorkToUseCustomChangeConsumer() throws Exception {
        this.appendLinesToSource(10);
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.copy((Configuration)this.connectorConfig).with(EmbeddedEngine.ENGINE_NAME, "testing-connector")).with(EmbeddedEngine.CONNECTOR_CLASS, FileStreamSourceConnector.class)).with("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)).build();
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        this.engine = EmbeddedEngine.create().using(config).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            Integer groupCount = records.size() / 10;
            for (SourceRecord r : records) {
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    public void shouldRunDebeziumEngine() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            Integer groupCount = records.size() / 10;
            for (RecordChangeEvent r : records) {
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-2897"})
    public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
        this.appendLinesToSource(10);
        String TEST_TOPIC = "topicX";
        String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1";
        Long EXPECTED_CUSTOM_OFFSET = 1L;
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", TEST_TOPIC);
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            Integer groupCount = records.size() / 10;
            for (RecordChangeEvent r : records) {
                DebeziumEngine.Offsets offsets = committer.buildOffsets();
                offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, (Object)EXPECTED_CUSTOM_OFFSET);
                this.logger.info(((SourceRecord)r.record()).sourceOffset().toString());
                committer.markProcessed((Object)r, offsets);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        SafeObjectInputStream inputStream = new SafeObjectInputStream(Files.newInputStream(OFFSET_STORE_PATH.toAbsolutePath(), new OpenOption[0]));
        Object obj = inputStream.readObject();
        Map raw = (Map)obj;
        Set fileOffsetStoreEntrySingleton = raw.entrySet();
        Assertions.assertThat((int)fileOffsetStoreEntrySingleton.size()).isEqualTo(1);
        Map.Entry fileOffsetEntry = fileOffsetStoreEntrySingleton.iterator().next();
        ByteBuffer offsetJsonString = fileOffsetEntry.getValue() != null ? ByteBuffer.wrap((byte[])fileOffsetEntry.getValue()) : null;
        JsonDeserializer jsonDeserializer = new JsonDeserializer();
        JsonNode partitionToOffsetMap = jsonDeserializer.deserialize(TEST_TOPIC, offsetJsonString.array());
        Long actualOffset = partitionToOffsetMap.get(CUSTOM_SOURCE_OFFSET_PARTITION).asLong();
        Assertions.assertThat((Long)actualOffset).isEqualTo((Object)EXPECTED_CUSTOM_OFFSET);
        this.stopConnector();
    }

    @Test
    public void shouldExecuteSmt() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("transforms", "filter, router");
        props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
        props.setProperty("transforms.router.regex", "(.*)");
        props.setProperty("transforms.router.replacement", "trf$1");
        props.setProperty("transforms.filter.type", "io.debezium.embedded.EmbeddedEngineTest$FilterTransform");
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(5);
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(9);
            records.forEach(r -> Assertions.assertThat((String)((SourceRecord)r.record()).topic()).isEqualTo((Object)"trftopicX"));
            Integer groupCount = records.size() / 10;
            for (RecordChangeEvent r2 : records) {
                Assertions.assertThat((String)((String)((SourceRecord)r2.record()).value())).isNotEqualTo((Object)"Generated line number 1");
                committer.markProcessed((Object)r2);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test(expected=DebeziumException.class)
    public void invalidSmt() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("transforms", "router");
        props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.Regex");
        props.setProperty("transforms.router.regex", "(.*)");
        props.setProperty("transforms.router.replacement", "trf$1");
        DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {}).using(this.getClass().getClassLoader()).build();
    }

    @Test
    @FixFor(value={"DBZ-1807"})
    public void shouldRunDebeziumEngineWithJson() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("converter.schemas.enable", "false");
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        DebeziumEngine engine = DebeziumEngine.create(Json.class).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            Integer groupCount = records.size() / 10;
            for (ChangeEvent r : records) {
                Assertions.assertThat((String)((String)r.key())).isNull();
                Assertions.assertThat((String)((String)r.value())).startsWith("\"Generated line number ");
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    protected void appendLinesToSource(int numberOfLines) throws IOException {
        Object[] lines = new CharSequence[numberOfLines];
        for (int i = 0; i != numberOfLines; ++i) {
            lines[i] = this.generateLine(this.linesAdded + i + 1);
        }
        Files.write(this.inputFile.toPath(), (Iterable<? extends CharSequence>)Collect.arrayListOf((Object[])lines), UTF8, StandardOpenOption.APPEND);
        this.linesAdded += numberOfLines;
    }

    protected void appendLinesToSource(String linePrefix, int numberOfLines) throws IOException {
        Object[] lines = new CharSequence[numberOfLines];
        for (int i = 0; i != numberOfLines; ++i) {
            lines[i] = this.generateLine(linePrefix, this.linesAdded + i + 1);
        }
        Files.write(this.inputFile.toPath(), (Iterable<? extends CharSequence>)Collect.arrayListOf((Object[])lines), UTF8, StandardOpenOption.APPEND);
        this.linesAdded += numberOfLines;
    }

    protected String generateLine(int lineNumber) {
        return this.generateLine("Generated line number ", lineNumber);
    }

    protected String generateLine(String linePrefix, int lineNumber) {
        return linePrefix + lineNumber;
    }

    protected void consumeLines(int numberOfLines) throws InterruptedException {
        this.consumeRecords(numberOfLines, 3, record -> {
            String line = record.value().toString();
            Assertions.assertThat((String)line).isEqualTo((Object)this.generateLine(this.nextConsumedLineNumber));
            ++this.nextConsumedLineNumber;
        }, false);
    }

    @Test
    @FixFor(value={"DBZ-5583"})
    public void verifyBadCommitPolicyClassName() {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(EmbeddedEngine.ENGINE_NAME, "testing-connector")).with(EmbeddedEngine.CONNECTOR_CLASS, SimpleSourceConnector.class)).with("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).with(EmbeddedEngine.OFFSET_COMMIT_POLICY, "badclassname")).build();
        AtomicBoolean exceptionCaught = new AtomicBoolean(false);
        this.engine = EmbeddedEngine.create().using(config).notifying((records, committer) -> {}).using(this.getClass().getClassLoader()).using((success, message, error) -> {
            Throwable rootCause = Throwables.getRootCause((Throwable)error);
            Assertions.assertThat((Throwable)rootCause).isInstanceOf(ClassNotFoundException.class);
            Assertions.assertThat((String)rootCause.getMessage()).contains("badclassname");
            exceptionCaught.set(true);
        }).build();
        this.engine.run();
        Assertions.assertThat((boolean)exceptionCaught.get()).isTrue();
    }

    public static class FilterTransform
    implements Transformation<SourceRecord> {
        public void configure(Map<String, ?> configs) {
        }

        public SourceRecord apply(SourceRecord record) {
            return ((String)record.value()).equals("Generated line number 1") ? null : record;
        }

        public ConfigDef config() {
            return null;
        }

        public void close() {
        }
    }
}

