/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded.async;

import ch.qos.logback.classic.Level;
import io.debezium.DebeziumException;
import io.debezium.connector.simple.SimpleSourceConnector;
import io.debezium.doc.FixFor;
import io.debezium.embedded.DebeziumEngineTestUtils;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.async.AsyncEmbeddedEngine;
import io.debezium.embedded.async.AsyncEngineConfig;
import io.debezium.embedded.async.DebeziumAsyncEngineTestUtils;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncEmbeddedEngineTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncEmbeddedEngineTest.class);
    private static final int NUMBER_OF_LINES = 10;
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath((String)"file-connector-offsets.txt").toAbsolutePath();
    private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath((String)"file-connector-input.txt").toAbsolutePath();
    private static final long TEST_TASK_MANAGEMENT_TIMEOUT_MS = 1000L;
    protected static final AtomicBoolean isEngineRunning = new AtomicBoolean(false);
    protected static final AtomicInteger runningTasks = new AtomicInteger(0);
    protected DebeziumEngine<SourceRecord> engine;
    protected ExecutorService engineExecSrv = Executors.newFixedThreadPool(1);
    private File inputFile;
    private int linesAdded;

    @Before
    public void beforeEach() throws Exception {
        this.linesAdded = 0;
        Testing.Files.delete((Path)TEST_FILE_PATH);
        Testing.Files.delete((Path)OFFSET_STORE_PATH);
        this.inputFile = Testing.Files.createTestingFile((Path)TEST_FILE_PATH);
        isEngineRunning.set(false);
        runningTasks.set(0);
    }

    @Test
    public void testEngineBasicLifecycle() throws Exception {
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("tasks.max", "1");
        props.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "testTopic");
        this.appendLinesToSource(10);
        CountDownLatch snapshotLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(6);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(10);
            Integer groupCount = records.size() / 10;
            for (SourceRecord r : records) {
                committer.markProcessed((Object)r);
            }
            committer.markBatchFinished();
            snapshotLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        snapshotLatch.await(1L, TimeUnit.SECONDS);
        Assertions.assertThat((long)snapshotLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(1L, TimeUnit.SECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopEngine();
    }

    @Test
    public void testRunMultipleTasks() throws Exception {
        int NUMBER_OF_TASKS = 5;
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.setProperty("tasks.max", String.valueOf(5));
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.MultiTaskSimpleSourceConnector.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("batch.count", (Object)1);
        AtomicInteger recordsRead = new AtomicInteger(0);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).notifying((records, committer) -> {
            for (SourceRecord record : records) {
                recordsRead.incrementAndGet();
                committer.markProcessed((Object)record);
            }
        }).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        Awaitility.await().alias("Haven't read all the records in time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(1L, TimeUnit.SECONDS).until(() -> recordsRead.get() == 50);
        this.stopEngine();
    }

    @Test
    public void testTasksAreStoppedIfSomeFailsToStart() {
        int NUMBER_OF_TASKS = 10;
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.setProperty("tasks.max", String.valueOf(10));
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.RandomlyFailingDuringStartConnector.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("batch.count", (Object)1);
        props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        AtomicInteger recordsRead = new AtomicInteger(0);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying((records, committer) -> {}).using(this.getClass().getClassLoader()).build();
        ExecutorService exec = Executors.newFixedThreadPool(1);
        exec.execute(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        Awaitility.await().alias("At least some tasks haven't stared on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(1L, TimeUnit.SECONDS).until(() -> runningTasks.get() > 0);
        Awaitility.await().alias("Tasks haven't been stopped on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(5L, TimeUnit.SECONDS).until(() -> runningTasks.get() <= 0);
        this.waitForEngineToStop();
    }

    @Test
    public void testCompletionCallbackCalledUponSuccess() throws Exception {
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("tasks.max", "1");
        props.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "testTopic");
        this.appendLinesToSource(10);
        CountDownLatch recordsLatch = new CountDownLatch(2);
        CountDownLatch callbackLatch = new CountDownLatch(1);
        AtomicInteger recordsSent = new AtomicInteger();
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).using((success, message, error) -> {
            if (success && error == null) {
                callbackLatch.countDown();
            }
        }).notifying((records, committer) -> {
            for (SourceRecord r : records) {
                committer.markProcessed((Object)r);
                recordsSent.getAndIncrement();
            }
            committer.markBatchFinished();
            recordsLatch.countDown();
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        this.appendLinesToSource(10);
        recordsLatch.await(1L, TimeUnit.SECONDS);
        Assertions.assertThat((int)recordsSent.get()).isEqualTo(20);
        this.stopEngine();
        callbackLatch.await(100L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)callbackLatch.getCount()).isEqualTo(0L);
    }

    @Test
    public void testCompletionCallbackCalledUponFailure() throws Exception {
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.setProperty("tasks.max", "1");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.InterruptedConnector.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("batch.count", (Object)1);
        props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        CountDownLatch callbackLatch = new CountDownLatch(1);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).using((success, message, error) -> {
            if (!success && error instanceof InterruptedException) {
                callbackLatch.countDown();
            }
        }).notifying((records, committer) -> {}).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        callbackLatch.await(2000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((long)callbackLatch.getCount()).isEqualTo(0L);
    }

    @Test
    @FixFor(value={"DBZ-2534"})
    public void testCannotStopWhileTasksAreStarting() throws Exception {
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.setProperty("tasks.max", "1");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), WaitInTaskStartConnector.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("batch.count", (Object)1);
        props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        CountDownLatch taskStartingLatch = new CountDownLatch(1);
        CountDownLatch enginStopLatch = new CountDownLatch(1);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying((records, committer) -> {}).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        WaitInTaskStartTask.taskStartingLatch.await(100L, TimeUnit.MILLISECONDS);
        Exception error = null;
        try {
            this.stopEngine();
        }
        catch (Exception e) {
            error = e;
        }
        Assertions.assertThat((Throwable)error).isNotNull();
        Assertions.assertThat((Throwable)error).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((String)error.getMessage()).isEqualTo((Object)"Cannot stop engine while tasks are starting, this may lead to leaked resource. Wait for the tasks to be fully started.");
        WaitInTaskStartTask.continueLatch.countDown();
        this.waitForTasksToStart(1);
        this.stopEngine();
    }

    @Test
    public void testCannotStopAlreadyStoppedEngine() throws Exception {
        Properties props = new Properties();
        props.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "testing-connector");
        props.setProperty("tasks.max", "1");
        props.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), DebeziumAsyncEngineTestUtils.NoOpConnector.class.getName());
        props.put("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.put("batch.count", (Object)1);
        props.put(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS, "10");
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying((records, committer) -> {}).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        this.waitForTasksToStart(1);
        this.stopEngine();
        this.waitForEngineToStop();
        Exception error = null;
        try {
            this.stopEngine();
        }
        catch (Exception e) {
            error = e;
        }
        Assertions.assertThat((Throwable)error).isNotNull();
        Assertions.assertThat((Throwable)error).isInstanceOf(IllegalStateException.class);
        Assertions.assertThat((String)error.getMessage()).isEqualTo((Object)"Engine has been already shut down.");
    }

    @Test
    public void testExecuteSmt() throws Exception {
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("tasks.max", "1");
        props.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "testTopic");
        props.setProperty("predicates", "filter");
        props.setProperty("predicates.filter.type", DebeziumEngineTestUtils.FilterPredicate.class.getName());
        props.setProperty("transforms", "filter, router");
        props.setProperty("transforms.filter.type", "io.debezium.embedded.DebeziumEngineTestUtils$FilterTransform");
        props.setProperty("transforms.filter.predicate", "filter");
        props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
        props.setProperty("transforms.router.regex", "(.*)");
        props.setProperty("transforms.router.replacement", "routing_smt_$1");
        this.appendLinesToSource(10);
        CountDownLatch snapshotLatch = new CountDownLatch(1);
        CountDownLatch allLatch = new CountDownLatch(5);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(9);
            records.forEach(r -> Assertions.assertThat((String)r.topic()).isEqualTo((Object)"routing_smt_testTopic"));
            Integer groupCount = records.size() / 10;
            for (SourceRecord r2 : records) {
                committer.markProcessed((Object)r2);
            }
            committer.markBatchFinished();
            snapshotLatch.countDown();
            for (int i = 0; i < groupCount; ++i) {
                allLatch.countDown();
            }
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        snapshotLatch.await(1L, TimeUnit.SECONDS);
        Assertions.assertThat((long)snapshotLatch.getCount()).isEqualTo(0L);
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(1L, TimeUnit.SECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        this.stopEngine();
    }

    @Test
    public void testPollingIsRetriedUponFailure() throws Exception {
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("tasks.max", "1");
        props.setProperty("connector.class", SimpleSourceConnector.class.getName());
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("error.retriable.on", "5, 7");
        CountDownLatch recordsLatch = new CountDownLatch(10);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isEqualTo(1);
            committer.markProcessed((Object)((SourceRecord)records.get(0)));
            committer.markBatchFinished();
            recordsLatch.countDown();
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        recordsLatch.await(5L, TimeUnit.SECONDS);
        Assertions.assertThat((long)recordsLatch.getCount()).isEqualTo(0L);
        this.stopEngine();
    }

    @Test
    public void testConnectorFailsIfMaxRetriesExceeded() throws Exception {
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("tasks.max", "1");
        props.setProperty("connector.class", SimpleSourceConnector.class.getName());
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("error.retriable.on", "5, 7");
        props.setProperty(EmbeddedEngineConfig.ERRORS_MAX_RETRIES.name(), "1");
        CountDownLatch recordsLatch = new CountDownLatch(10);
        LogInterceptor interceptor = new LogInterceptor(AsyncEmbeddedEngine.class);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying((records, committer) -> {
            Assertions.assertThat((int)records.size()).isEqualTo(1);
            committer.markProcessed((Object)((SourceRecord)records.get(0)));
            committer.markBatchFinished();
            recordsLatch.countDown();
        }).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        recordsLatch.await(5L, TimeUnit.SECONDS);
        Assertions.assertThat((long)recordsLatch.getCount()).isEqualTo(4L);
        this.waitForEngineToStop();
        Assertions.assertThat((boolean)interceptor.containsErrorMessage("Engine has failed with")).isTrue();
        Assertions.assertThat((boolean)interceptor.containsMessage("Engine state has changed from 'POLLING_TASKS' to 'STOPPING'")).isTrue();
        Assertions.assertThat((boolean)interceptor.containsMessage("Engine state has changed from 'STOPPING' to 'STOPPED'")).isTrue();
    }

    @Test
    public void testEngineBasicLifecycleConsumerSequentially() throws Exception {
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("tasks.max", "1");
        props.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "testTopic");
        props.setProperty(AsyncEngineConfig.RECORD_PROCESSING_ORDER.name(), "ORDERED");
        this.runEngineBasicLifecycleWithConsumer(props);
    }

    @Test
    public void testEngineBasicLifecycleConsumerNonSequentially() throws Exception {
        Properties props = new Properties();
        props.setProperty("name", "debezium-engine");
        props.setProperty("tasks.max", "1");
        props.setProperty("connector.class", FileStreamSourceConnector.class.getName());
        props.setProperty("offset.storage.file.filename", OFFSET_STORE_PATH.toAbsolutePath().toString());
        props.setProperty("offset.flush.interval.ms", "0");
        props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
        props.setProperty("topic", "testTopic");
        props.setProperty(AsyncEngineConfig.RECORD_PROCESSING_ORDER.name(), "UNORDERED");
        this.runEngineBasicLifecycleWithConsumer(props);
    }

    private void runEngineBasicLifecycleWithConsumer(Properties props) throws IOException, InterruptedException {
        LogInterceptor interceptor = new LogInterceptor(AsyncEmbeddedEngine.class);
        interceptor.setLoggerLevel(AsyncEmbeddedEngine.class, Level.DEBUG);
        this.appendLinesToSource(10);
        CountDownLatch allLatch = new CountDownLatch(60);
        AsyncEmbeddedEngine.AsyncEngineBuilder builder = new AsyncEmbeddedEngine.AsyncEngineBuilder();
        this.engine = builder.using(props).using((DebeziumEngine.ConnectorCallback)new TestEngineConnectorCallback()).notifying(r -> allLatch.countDown()).build();
        this.engineExecSrv.submit(() -> {
            LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
            this.engine.run();
        });
        for (int i = 0; i < 5; ++i) {
            this.appendLinesToSource(10);
            Thread.sleep(10L);
        }
        allLatch.await(1L, TimeUnit.SECONDS);
        Assertions.assertThat((long)allLatch.getCount()).isEqualTo(0L);
        Assertions.assertThat((boolean)interceptor.containsMessage("Using io.debezium.embedded.async.AsyncEmbeddedEngine$ParallelSmtConsumerProcessor processor"));
        this.stopEngine();
    }

    protected void stopEngine() {
        try {
            this.engine.close();
            Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> !isEngineRunning.get());
        }
        catch (IOException e) {
            LOGGER.warn("Failed during engine stop", (Throwable)e);
            this.engineExecSrv.shutdownNow();
        }
        catch (ConditionTimeoutException e) {
            LOGGER.warn("Engine has not stopped on time");
            this.engineExecSrv.shutdownNow();
        }
    }

    protected void waitForEngineToStart() {
        Awaitility.await().alias("Engine haven't started on time").pollInterval(1000L, TimeUnit.MILLISECONDS).atMost(1L, TimeUnit.SECONDS).until(() -> isEngineRunning.get());
    }

    protected void waitForEngineToStop() {
        Awaitility.await().alias("Engine haven't stopped on time").pollInterval(1000L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).until(() -> !isEngineRunning.get());
    }

    protected void waitForTasksToStart(int minRunningTasks) {
        Awaitility.await().alias("Engine haven't started on time").pollInterval(10L, TimeUnit.MILLISECONDS).atMost(1000L, TimeUnit.MILLISECONDS).until(() -> runningTasks.get() >= minRunningTasks);
    }

    protected void appendLinesToSource(int numberOfLines) throws IOException {
        this.linesAdded += DebeziumEngineTestUtils.appendLinesToSource(this.inputFile, numberOfLines, this.linesAdded);
    }

    public static class TestEngineConnectorCallback
    implements DebeziumEngine.ConnectorCallback {
        public void taskStarted() {
            runningTasks.incrementAndGet();
        }

        public void taskStopped() {
            runningTasks.decrementAndGet();
        }

        public void connectorStarted() {
            isEngineRunning.compareAndExchange(false, true);
        }

        public void connectorStopped() {
            isEngineRunning.set(false);
        }
    }

    static class WaitInTaskStartConnector
    extends SimpleSourceConnector {
        WaitInTaskStartConnector() {
        }

        public Class<? extends Task> taskClass() {
            return WaitInTaskStartTask.class;
        }
    }

    static class WaitInTaskStartTask
    extends SimpleSourceConnector.SimpleConnectorTask {
        public static CountDownLatch taskStartingLatch = new CountDownLatch(1);
        public static CountDownLatch continueLatch = new CountDownLatch(1);

        WaitInTaskStartTask() {
        }

        public void start(Map<String, String> props) {
            taskStartingLatch.countDown();
            try {
                continueLatch.await(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                throw new DebeziumException("Waiting for continuation of start was interrupted.");
            }
        }

        public List<SourceRecord> poll() throws InterruptedException {
            return new ArrayList<SourceRecord>();
        }
    }
}

