/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.simple.SimpleSourceConnector;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.Connect;
import io.debezium.embedded.DebeziumEngineTestUtils;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.EmbeddedEngineHeader;
import io.debezium.embedded.InterruptedConnector;
import io.debezium.embedded.InterruptingOffsetStore;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import io.debezium.util.Throwables;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

public class EmbeddedEngineTest
extends AbstractConnectorTest {
    private static final int NUMBER_OF_LINES = 10;
    private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath((String)"file-connector-input.txt").toAbsolutePath();
    private static final Charset UTF8 = StandardCharsets.UTF_8;
    private File inputFile;
    private int nextConsumedLineNumber;
    private int linesAdded;
    private Configuration connectorConfig;

    @Before
    public void beforeEach() throws Exception {
        this.nextConsumedLineNumber = 1;
        this.linesAdded = 0;
        Testing.Files.delete((Path)TEST_FILE_PATH);
        this.inputFile = Testing.Files.createTestingFile((Path)TEST_FILE_PATH);
        this.connectorConfig = ((Configuration.Builder)Configuration.create().with("file", (Object)TEST_FILE_PATH)).with("topic", "topicX").build();
    }

    @Test
    public void verifyNonAsciiContentHandledCorrectly() throws Exception {
        this.linesAdded += DebeziumEngineTestUtils.appendLinesToSource(this.inputFile, "\u00d1 \u00f1", 10, this.linesAdded);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        CountDownLatch firstLatch = new CountDownLatch(1);
        DebeziumEngine engine = DebeziumEngine.create(Json.class, Json.class, Json.class).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            for (ChangeEvent record : records) {
                Assertions.assertThat((String)((String)record.value())).contains(new CharSequence[]{"\u00d1"});
            }
            for (ChangeEvent r : records) {
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    public void interruptedTaskShutsDown() throws Exception {
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), InterruptedConnector.class.getName());
        props.put(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS.name(), (Object)0);
        props.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), InterruptingOffsetStore.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("offset.flush.interval.ms", "0");
        CountDownLatch firstLatch = new CountDownLatch(1);
        DebeziumEngine engine = DebeziumEngine.create(Json.class).using(props).notifying((records, committer) -> {}).using(this.getClass().getClassLoader()).using((success, message, error) -> {
            if (error != null) {
                this.logger.error("Error while shutting down", error);
            }
            firstLatch.countDown();
        }).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
    }

    @Test
    public void interruptedOffsetCommitShutsDown() throws Exception {
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName());
        props.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), InterruptingOffsetStore.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("batch.count", (Object)1);
        props.put("offset.flush.interval.ms", "0");
        CountDownLatch firstLatch = new CountDownLatch(1);
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).using(OffsetCommitPolicy.always()).notifying((records, committer) -> {
            for (RecordChangeEvent record : records) {
                committer.markProcessed((Object)record);
            }
            committer.markBatchFinished();
        }).using(this.getClass().getClassLoader()).using((success, message, error) -> {
            if (error != null) {
                this.logger.error("Error while shutting down", error);
            }
            firstLatch.countDown();
        }).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
    }

    @Test
    public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exception {
        this.appendLinesToSource(10);
        this.start(FileStreamSourceConnector.class, this.connectorConfig);
        this.consumeLines(10);
        this.assertNoRecordsToConsume();
        for (int i = 1; i != 5; ++i) {
            this.appendLinesToSource(10);
            this.consumeLines(10);
            this.assertNoRecordsToConsume();
        }
        this.stopConnector();
        this.appendLinesToSource(10);
        this.assertNoRecordsToConsume();
        this.start(FileStreamSourceConnector.class, this.connectorConfig);
        this.consumeLines(10);
        this.assertNoRecordsToConsume();
    }

    @Test
    @FixFor(value={"DBZ-1080"})
    public void shouldWorkToUseCustomChangeConsumer() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), FileStreamSourceConnector.class.getName());
        props.put(EmbeddedEngineConfig.OFFSET_FLUSH_INTERVAL_MS.name(), (Object)0);
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.put("topic", "topicX");
        props.put("offset.flush.interval.ms", "0");
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            Integer groupCount = records.size() / 10;
            for (RecordChangeEvent r : records) {
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    public void shouldRunDebeziumEngine() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("transforms", "header");
        props.setProperty("transforms.header.type", AddHeaderTransform.class.getName());
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        ConnectHeaders expectedHeaders = new ConnectHeaders();
        expectedHeaders.addString("headerKey", "headerValue");
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((arg_0, arg_1) -> EmbeddedEngineTest.lambda$shouldRunDebeziumEngine$10((Headers)expectedHeaders, firstLatch, allLatch, arg_0, arg_1)).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-2897"})
    public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
        this.appendLinesToSource(10);
        String TEST_TOPIC = "topicX";
        String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1";
        Long EXPECTED_CUSTOM_OFFSET = 1L;
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", TEST_TOPIC);
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            Integer groupCount = records.size() / 10;
            for (RecordChangeEvent r : records) {
                DebeziumEngine.Offsets offsets = committer.buildOffsets();
                offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, (Object)EXPECTED_CUSTOM_OFFSET);
                this.logger.info(((SourceRecord)r.record()).sourceOffset().toString());
                committer.markProcessed((Object)r, offsets);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        SafeObjectInputStream inputStream = new SafeObjectInputStream(Files.newInputStream(OFFSET_STORE_PATH.toAbsolutePath(), new OpenOption[0]));
        Object obj = inputStream.readObject();
        Map raw = (Map)obj;
        Set fileOffsetStoreEntrySingleton = raw.entrySet();
        Assertions.assertThat((int)fileOffsetStoreEntrySingleton.size()).isEqualTo(1);
        Map.Entry fileOffsetEntry = fileOffsetStoreEntrySingleton.iterator().next();
        ByteBuffer offsetJsonString = fileOffsetEntry.getValue() != null ? ByteBuffer.wrap((byte[])fileOffsetEntry.getValue()) : null;
        JsonDeserializer jsonDeserializer = new JsonDeserializer();
        JsonNode partitionToOffsetMap = jsonDeserializer.deserialize(TEST_TOPIC, offsetJsonString.array());
        Long actualOffset = partitionToOffsetMap.get(CUSTOM_SOURCE_OFFSET_PARTITION).asLong();
        Assertions.assertThat((Long)actualOffset).isEqualTo((Object)EXPECTED_CUSTOM_OFFSET);
        this.stopConnector();
    }

    @Test
    public void shouldExecuteSmt() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("predicates", "filter");
        props.setProperty("predicates.filter.type", FilterPredicate.class.getName());
        props.setProperty("transforms", "filter, router");
        props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
        props.setProperty("transforms.router.regex", "(.*)");
        props.setProperty("transforms.router.replacement", "trf$1");
        props.setProperty("transforms.filter.type", "io.debezium.embedded.EmbeddedEngineTest$FilterTransform");
        props.setProperty("transforms.filter.predicate", "filter");
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(5);
        DebeziumEngine engine = DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(9);
            records.forEach(r -> Assertions.assertThat((String)((SourceRecord)r.record()).topic()).isEqualTo((Object)"trftopicX"));
            Integer groupCount = records.size() / 10;
            for (RecordChangeEvent r2 : records) {
                Assertions.assertThat((String)((String)((SourceRecord)r2.record()).value())).isNotEqualTo((Object)"Generated line number 1");
                committer.markProcessed((Object)r2);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test(expected=DebeziumException.class)
    public void invalidSmt() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("transforms", "router");
        props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.Regex");
        props.setProperty("transforms.router.regex", "(.*)");
        props.setProperty("transforms.router.replacement", "trf$1");
        DebeziumEngine.create((ChangeEventFormat)ChangeEventFormat.of(Connect.class)).using(props).notifying((records, committer) -> {}).using(this.getClass().getClassLoader()).build();
    }

    @Test
    @FixFor(value={"DBZ-1807"})
    public void shouldRunDebeziumEngineWithJson() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("converter.schemas.enable", "false");
        props.setProperty("transforms", "header");
        props.setProperty("transforms.header.type", AddHeaderTransform.class.getName());
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        EmbeddedEngineHeader expectedHeader = new EmbeddedEngineHeader("headerKey", (Object)"\"headerValue\"");
        DebeziumEngine engine = DebeziumEngine.create(Json.class, Json.class, Json.class).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            int groupCount = records.size() / 10;
            for (ChangeEvent r : records) {
                Assertions.assertThat((String)((String)r.key())).isNull();
                Assertions.assertThat((String)((String)r.value())).startsWith((CharSequence)"\"Generated line number ");
                List headers = r.headers();
                Assertions.assertThat((List)headers).allMatch(h -> h.getKey().equals(expectedHeader.getKey()) && ((String)h.getValue()).equals(expectedHeader.getValue()));
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-5926"})
    public void shouldRunDebeziumEngineWithMismatchedTypes() throws Exception {
        this.appendLinesToSource(10);
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "topicX");
        props.setProperty("converter.schemas.enable", "false");
        props.setProperty("transforms", "header");
        props.setProperty("transforms.header.type", AddHeaderTransform.class.getName());
        CountDownLatch firstLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        EmbeddedEngineHeader expectedHeader = new EmbeddedEngineHeader("headerKey", (Object)"\"headerValue\"".getBytes(StandardCharsets.UTF_8));
        DebeziumEngine engine = DebeziumEngine.create(Json.class, JsonByteArray.class, JsonByteArray.class).using(props).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            int groupCount = records.size() / 10;
            for (ChangeEvent r : records) {
                Assertions.assertThat((String)((String)r.key())).isNull();
                Assertions.assertThat((String)new String((byte[])r.value(), Charsets.UTF_8)).startsWith((CharSequence)"\"Generated line number ");
                List headers = r.headers();
                Assertions.assertThat((List)headers).hasSize(1);
                Assertions.assertThat((List)headers).allMatch(h -> h.getKey().equals(expectedHeader.getKey()) && Arrays.equals((byte[])h.getValue(), (byte[])expectedHeader.getValue()));
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            firstLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        firstLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)firstLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(5000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-5583"})
    public void verifyBadCommitPolicyClassName() {
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName());
        props.put(EmbeddedEngineConfig.OFFSET_COMMIT_POLICY.name(), "badclassname");
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        AtomicBoolean exceptionCaught = new AtomicBoolean(false);
        DebeziumEngine engine = DebeziumEngine.create(Json.class).using(props).notifying((records, committer) -> {}).using(this.getClass().getClassLoader()).using((success, message, error) -> {
            Throwable rootCause = Throwables.getRootCause((Throwable)error);
            Assertions.assertThat((Throwable)rootCause).isInstanceOf(ClassNotFoundException.class);
            Assertions.assertThat((String)rootCause.getMessage()).contains(new CharSequence[]{"badclassname"});
            exceptionCaught.set(true);
        }).build();
        engine.run();
        Assertions.assertThat((boolean)exceptionCaught.get()).isTrue();
    }

    @Test
    @FixFor(value={"DBZ-4720"})
    public void validationThrowsException() throws Exception {
        this.appendLinesToSource(10);
        AtomicReference errorReference = new AtomicReference();
        this.start(FileStreamSourceConnector.class, Configuration.from((Properties)new Properties()), (success, message, error) -> {
            if (message != null) {
                errorReference.set(message);
            }
        });
        this.assertNoRecordsToConsume();
        Assertions.assertThat((String)((String)errorReference.get())).isNotNull();
        Assertions.assertThat((String)((String)errorReference.get())).contains(new CharSequence[]{"Connector configuration is not valid. "});
        Assertions.assertThat((boolean)this.isEngineRunning.get()).isFalse();
    }

    @Test
    @FixFor(value={"DBZ-7099"})
    public void shouldHandleNoDefaultOffsetFlushInterval() throws IOException, InterruptedException {
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), SimpleSourceConnector.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS.name(), "10");
        CountDownLatch engineRunning = new CountDownLatch(1);
        CountDownLatch engineStopped = new CountDownLatch(1);
        AtomicBoolean engineSucceeded = new AtomicBoolean(false);
        DebeziumEngine engine = DebeziumEngine.create(Json.class).using(props).notifying((records, committer) -> engineRunning.countDown()).using(this.getClass().getClassLoader()).using(new DebeziumEngine.ConnectorCallback(){

            public void connectorStarted() {
                EmbeddedEngineTest.this.isEngineRunning.compareAndExchange(false, true);
            }
        }).using((success, message, error) -> {
            engineSucceeded.set(success);
            engineStopped.countDown();
        }).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            engine.run();
        });
        engineRunning.await(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((boolean)this.isEngineRunning.get()).isTrue();
        engine.close();
        engineStopped.await(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((boolean)engineSucceeded.get()).isTrue();
    }

    protected void appendLinesToSource(int numberOfLines) throws IOException {
        this.linesAdded += DebeziumEngineTestUtils.appendLinesToSource(this.inputFile, numberOfLines, this.linesAdded);
    }

    protected void consumeLines(int numberOfLines) throws InterruptedException {
        this.consumeRecords(numberOfLines, 3, record -> {
            String line = record.value().toString();
            Assertions.assertThat((String)line).isEqualTo((Object)DebeziumEngineTestUtils.generateLine(this.nextConsumedLineNumber));
            ++this.nextConsumedLineNumber;
        }, false);
    }

    private static /* synthetic */ void lambda$shouldRunDebeziumEngine$10(Headers expectedHeaders, CountDownLatch firstLatch, CountDownLatch allLatch, List records, DebeziumEngine.RecordCommitter committer) throws InterruptedException {
        Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
        Integer groupCount = records.size() / 10;
        for (RecordChangeEvent r : records) {
            Assertions.assertThat((Iterable)((SourceRecord)r.record()).headers()).isEqualTo((Object)expectedHeaders);
            committer.markProcessed((Object)r);
        }
        committer.markBatchFinished();
        firstLatch.countDown();
        for (int i = 0; i < groupCount; ++i) {
            allLatch.countDown();
        }
    }

    public static class AddHeaderTransform
    implements Transformation<SourceRecord> {
        public void configure(Map<String, ?> configs) {
        }

        public SourceRecord apply(SourceRecord record) {
            ConnectHeaders headers = new ConnectHeaders();
            headers.addString("headerKey", "headerValue");
            record = record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), (Iterable)headers);
            return record;
        }

        public ConfigDef config() {
            return new ConfigDef();
        }

        public void close() {
        }
    }

    public static class FilterPredicate
    implements Predicate<SourceRecord> {
        public ConfigDef config() {
            return new ConfigDef();
        }

        public boolean test(SourceRecord sourceRecord) {
            return sourceRecord.value().equals("Generated line number 1");
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
        }
    }

    public static class FilterTransform
    implements Transformation<SourceRecord> {
        public void configure(Map<String, ?> configs) {
        }

        public SourceRecord apply(SourceRecord record) {
            String payload = (String)record.value();
            return payload.equals("Generated line number 1") || payload.equals("Generated line number 2") ? null : record;
        }

        public ConfigDef config() {
            return new ConfigDef();
        }

        public void close() {
        }
    }
}

