/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import io.debezium.function.BooleanConsumer;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.TestLogger;
import io.debezium.pipeline.txmetadata.TransactionStatus;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.management.InstanceNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.fest.assertions.BooleanAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractConnectorTest
implements Testing {
    @Rule
    public TestRule skipTestRule = new SkipTestRule();
    protected static final Path OFFSET_STORE_PATH = Testing.Files.createTestingPath((String)"file-connector-offsets.txt").toAbsolutePath();
    private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
    private ExecutorService executor;
    protected EmbeddedEngine engine;
    private BlockingQueue<SourceRecord> consumedLines;
    protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5L);
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private CountDownLatch latch;
    private JsonConverter keyJsonConverter = new JsonConverter();
    private JsonConverter valueJsonConverter = new JsonConverter();
    private JsonDeserializer keyJsonDeserializer = new JsonDeserializer();
    private JsonDeserializer valueJsonDeserializer = new JsonDeserializer();
    private boolean skipAvroValidation = false;
    @Rule
    public TestRule logTestName = new TestLogger(this.logger);

    @Before
    public final void initializeConnectorTestFramework() {
        LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"test");
        this.keyJsonConverter = new JsonConverter();
        this.valueJsonConverter = new JsonConverter();
        this.keyJsonDeserializer = new JsonDeserializer();
        this.valueJsonDeserializer = new JsonDeserializer();
        Configuration converterConfig = Configuration.create().build();
        Configuration deserializerConfig = Configuration.create().build();
        this.keyJsonConverter.configure(converterConfig.asMap(), true);
        this.valueJsonConverter.configure(converterConfig.asMap(), false);
        this.keyJsonDeserializer.configure(deserializerConfig.asMap(), true);
        this.valueJsonDeserializer.configure(deserializerConfig.asMap(), false);
        this.resetBeforeEachTest();
        this.consumedLines = new ArrayBlockingQueue<SourceRecord>(this.getMaximumEnqueuedRecordCount());
        Testing.Files.delete((Path)OFFSET_STORE_PATH);
        OFFSET_STORE_PATH.getParent().toFile().mkdirs();
    }

    @After
    public final void stopConnector() {
        this.stopConnector(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopConnector(BooleanConsumer callback) {
        try {
            this.logger.info("Stopping the connector");
            if (this.engine != null && this.engine.isRunning()) {
                this.logger.info("Stopping the engine");
                this.engine.stop();
                try {
                    this.engine.await(60L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    this.logger.warn("Engine has not stopped on time");
                    Thread.currentThread().interrupt();
                }
            }
            if (this.executor != null) {
                this.logger.info("Interrupting the engine");
                List<Runnable> neverRunTasks = this.executor.shutdownNow();
                Assertions.assertThat(neverRunTasks).isEmpty();
                try {
                    while (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    }
                }
                catch (InterruptedException e) {
                    this.logger.warn("Executor has not stopped on time");
                    Thread.currentThread().interrupt();
                }
            }
            if (this.engine != null && this.engine.isRunning()) {
                this.logger.info("Waiting for engine to stop");
                try {
                    while (!this.engine.await(60L, TimeUnit.SECONDS)) {
                    }
                }
                catch (InterruptedException e) {
                    this.logger.warn("Connector has not stopped on time");
                    Thread.currentThread().interrupt();
                }
            }
            if (callback != null) {
                callback.accept(this.engine != null && this.engine.isRunning());
            }
        }
        finally {
            this.engine = null;
            this.executor = null;
        }
    }

    protected int getMaximumEnqueuedRecordCount() {
        return 100;
    }

    protected EmbeddedEngine.CompletionCallback loggingCompletion() {
        return (success, msg, error) -> {
            if (success) {
                this.logger.info(msg);
            } else {
                this.logger.error(msg, error);
            }
        };
    }

    protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig) {
        this.start(connectorClass, connectorConfig, (DebeziumEngine.CompletionCallback)this.loggingCompletion(), null);
    }

    protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, Predicate<SourceRecord> isStopRecord) {
        this.start(connectorClass, connectorConfig, (DebeziumEngine.CompletionCallback)this.loggingCompletion(), isStopRecord);
    }

    protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, DebeziumEngine.CompletionCallback callback) {
        this.start(connectorClass, connectorConfig, callback, null);
    }

    protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, DebeziumEngine.CompletionCallback callback, Predicate<SourceRecord> isStopRecord) {
        this.start(connectorClass, connectorConfig, callback, isStopRecord, x -> {});
    }

    protected void start(Class<? extends SourceConnector> connectorClass, Configuration connectorConfig, DebeziumEngine.CompletionCallback callback, Predicate<SourceRecord> isStopRecord, Consumer<SourceRecord> recordArrivedListener) {
        block3: {
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.copy((Configuration)connectorConfig).with(EmbeddedEngine.ENGINE_NAME, "testing-connector")).with(EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName())).with("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)).build();
            this.latch = new CountDownLatch(1);
            EmbeddedEngine.CompletionCallback wrapperCallback = (success, msg, error) -> {
                try {
                    if (callback != null) {
                        callback.handle(success, msg, error);
                    }
                }
                finally {
                    if (!success) {
                        this.latch.countDown();
                    }
                }
                Testing.debug((Object)"Stopped connector");
            };
            EmbeddedEngine.ConnectorCallback connectorCallback = new EmbeddedEngine.ConnectorCallback(){

                public void taskStarted() {
                    AbstractConnectorTest.this.latch.countDown();
                }
            };
            this.engine = EmbeddedEngine.create().using(config).notifying(record -> {
                if (isStopRecord != null && isStopRecord.test((SourceRecord)record)) {
                    this.logger.error("Stopping connector after record as requested");
                    throw new ConnectException("Stopping connector after record as requested");
                }
                if (!this.engine.isRunning() || Thread.currentThread().isInterrupted()) {
                    return;
                }
                while (!this.consumedLines.offer((SourceRecord)record)) {
                    if (this.engine.isRunning() && !Thread.currentThread().isInterrupted()) continue;
                    return;
                }
                recordArrivedListener.accept((SourceRecord)record);
            }).using(this.getClass().getClassLoader()).using(wrapperCallback).using(connectorCallback).build();
            Assertions.assertThat((Object)this.executor).isNull();
            this.executor = Executors.newFixedThreadPool(1);
            this.executor.execute(() -> {
                LoggingContext.forConnector((String)this.getClass().getSimpleName(), (String)"", (String)"engine");
                this.engine.run();
            });
            try {
                if (!this.latch.await(5L, TimeUnit.MINUTES)) {
                    this.logger.warn("The connector did not finish starting its task(s) or complete in the expected amount of time");
                }
            }
            catch (InterruptedException e) {
                if (!Thread.interrupted()) break block3;
                Assert.fail((String)"Interrupted while waiting for engine startup");
            }
        }
    }

    protected void setConsumeTimeout(long timeout, TimeUnit unit) {
        if (timeout < 0L) {
            throw new IllegalArgumentException("The timeout may not be negative");
        }
        this.pollTimeoutInMs = unit.toMillis(timeout);
    }

    protected SourceRecord consumeRecord() throws InterruptedException {
        return this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
    }

    protected int consumeRecords(int numberOfRecords) throws InterruptedException {
        return this.consumeRecords(numberOfRecords, null);
    }

    protected int consumeRecords(int numberOfRecords, int breakAfterNulls, Consumer<SourceRecord> recordConsumer, boolean assertRecords) throws InterruptedException {
        int recordsConsumed = 0;
        int nullReturn = 0;
        while (recordsConsumed < numberOfRecords) {
            SourceRecord record = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (record != null) {
                nullReturn = 0;
                ++recordsConsumed;
                if (recordConsumer != null) {
                    recordConsumer.accept(record);
                }
                if (Testing.Debug.isEnabled()) {
                    Testing.debug((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more)"));
                    this.debug(record);
                } else if (Testing.Print.isEnabled()) {
                    Testing.print((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more)"));
                    this.print(record);
                }
                if (!assertRecords) continue;
                VerifyRecord.isValid((SourceRecord)record, (boolean)this.skipAvroValidation);
                continue;
            }
            if (++nullReturn < breakAfterNulls) continue;
            return recordsConsumed;
        }
        return recordsConsumed;
    }

    protected int consumeRecords(int numberOfRecords, Consumer<SourceRecord> recordConsumer) throws InterruptedException {
        return this.consumeRecords(numberOfRecords, 3, recordConsumer, true);
    }

    protected SourceRecords consumeRecordsByTopic(int numRecords, int breakAfterNulls) throws InterruptedException {
        SourceRecords records = new SourceRecords();
        this.consumeRecords(numRecords, breakAfterNulls, records::add, true);
        return records;
    }

    protected SourceRecords consumeRecordsByTopic(int numRecords) throws InterruptedException {
        SourceRecords records = new SourceRecords();
        this.consumeRecords(numRecords, records::add);
        return records;
    }

    protected SourceRecords consumeRecordsByTopic(int numRecords, boolean assertRecords) throws InterruptedException {
        SourceRecords records = new SourceRecords();
        this.consumeRecords(numRecords, 3, records::add, assertRecords);
        return records;
    }

    protected SourceRecords consumeDmlRecordsByTopic(int numDmlRecords) throws InterruptedException {
        SourceRecords records = new SourceRecords();
        this.consumeDmlRecordsByTopic(numDmlRecords, records::add);
        return records;
    }

    protected int consumeDmlRecordsByTopic(int numberDmlRecords, Consumer<SourceRecord> recordConsumer) throws InterruptedException {
        return this.consumeDmlRecordsByTopic(numberDmlRecords, 3, recordConsumer, true);
    }

    protected int consumeDmlRecordsByTopic(int numberOfRecords, int breakAfterNulls, Consumer<SourceRecord> recordConsumer, boolean assertRecords) throws InterruptedException {
        String txId;
        String status;
        Struct value;
        SourceRecord record;
        int recordsConsumed = 0;
        int nullReturn = 0;
        LinkedHashSet<String> endTransactions = new LinkedHashSet<String>();
        while (recordsConsumed < numberOfRecords) {
            record = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (record != null) {
                nullReturn = 0;
                value = (Struct)record.value();
                if (this.isTransactionRecord(record)) {
                    status = value.getString("status");
                    if (status.equals(TransactionStatus.BEGIN.name())) {
                        endTransactions.add(value.getString("id"));
                    } else {
                        endTransactions.remove(value.getString("id"));
                    }
                } else {
                    txId = value.getStruct("source").getInt64("txId").toString();
                    ((BooleanAssert)Assertions.assertThat((boolean)endTransactions.contains(txId)).as("DML record txId " + txId + " not in open transaction set")).isTrue();
                    ++recordsConsumed;
                }
                if (recordConsumer != null) {
                    recordConsumer.accept(record);
                }
                if (Testing.Debug.isEnabled()) {
                    Testing.debug((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more), " + endTransactions.size() + " active transactions"));
                    this.debug(record);
                } else if (Testing.Print.isEnabled()) {
                    Testing.print((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more), " + endTransactions.size() + " active transactions"));
                    this.print(record);
                }
                if (!assertRecords) continue;
                VerifyRecord.isValid((SourceRecord)record);
                continue;
            }
            if (++nullReturn < breakAfterNulls) continue;
            return recordsConsumed;
        }
        while (!endTransactions.isEmpty()) {
            record = this.consumedLines.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
            if (record != null) {
                nullReturn = 0;
                value = (Struct)record.value();
                if (this.isTransactionRecord(record)) {
                    status = value.getString("status");
                    if (status.equals(TransactionStatus.END.name())) {
                        endTransactions.remove(value.getString("id"));
                    } else {
                        endTransactions.add(value.getString("id"));
                    }
                } else {
                    txId = value.getStruct("source").getInt64("txId").toString();
                    ((BooleanAssert)Assertions.assertThat((boolean)endTransactions.contains(txId)).as("DML record txId " + txId + " not in open transaction set")).isTrue();
                    ++recordsConsumed;
                }
                if (recordConsumer != null) {
                    recordConsumer.accept(record);
                }
                if (Testing.Debug.isEnabled()) {
                    Testing.debug((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more), " + endTransactions.size() + " active transactions"));
                    this.debug(record);
                } else if (Testing.Print.isEnabled()) {
                    Testing.print((Object)("Consumed record " + recordsConsumed + " / " + numberOfRecords + " (" + (numberOfRecords - recordsConsumed) + " more), " + endTransactions.size() + " active transactions"));
                    this.print(record);
                }
                if (!assertRecords) continue;
                VerifyRecord.isValid((SourceRecord)record);
                continue;
            }
            if (++nullReturn < breakAfterNulls) continue;
            return recordsConsumed;
        }
        return recordsConsumed;
    }

    protected boolean isTransactionRecord(SourceRecord record) {
        return record != null && record.topic().endsWith(".transaction") && record.keySchema().name().equals("io.debezium.connector.common.TransactionMetadataKey");
    }

    protected int consumeAvailableRecords(Consumer<SourceRecord> recordConsumer) {
        LinkedList records = new LinkedList();
        this.consumedLines.drainTo(records);
        if (recordConsumer != null) {
            records.forEach(recordConsumer);
        }
        return records.size();
    }

    protected boolean waitForAvailableRecords(long timeout, TimeUnit unit) {
        Assertions.assertThat((long)timeout).isGreaterThanOrEqualTo(0L);
        long now = System.currentTimeMillis();
        long stop = now + unit.toMillis(timeout);
        while (System.currentTimeMillis() < stop && this.consumedLines.isEmpty()) {
        }
        return !this.consumedLines.isEmpty();
    }

    protected void skipAvroValidation() {
        this.skipAvroValidation = true;
    }

    protected void assertConnectorIsRunning() {
        Assertions.assertThat((boolean)this.engine.isRunning()).isTrue();
    }

    protected void assertConnectorNotRunning() {
        Assertions.assertThat((this.engine != null && this.engine.isRunning() ? 1 : 0) != 0).isFalse();
    }

    protected void assertNoRecordsToConsume() {
        Assertions.assertThat((boolean)this.consumedLines.isEmpty()).isTrue();
    }

    protected void assertOnlyTransactionRecordsToConsume() {
        this.consumedLines.iterator().forEachRemaining(r -> Assertions.assertThat((boolean)this.isTransactionRecord((SourceRecord)r)).isTrue());
    }

    protected void assertKey(SourceRecord record, String pkField, int pk) {
        VerifyRecord.hasValidKey((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertInsert(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidInsert((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertUpdate(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidUpdate((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertDelete(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidDelete((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertSourceQuery(SourceRecord record, String query) {
        VerifyRecord.hasValidSourceQuery((SourceRecord)record, (String)query);
    }

    protected void assertHasNoSourceQuery(SourceRecord record) {
        VerifyRecord.hasNoSourceQuery((SourceRecord)record);
    }

    protected void assertTombstone(SourceRecord record, String pkField, int pk) {
        VerifyRecord.isValidTombstone((SourceRecord)record, (String)pkField, (int)pk);
    }

    protected void assertTombstone(SourceRecord record) {
        VerifyRecord.isValidTombstone((SourceRecord)record);
    }

    protected void assertOffset(SourceRecord record, Map<String, ?> expectedOffset) {
        Map offset = record.sourceOffset();
        Assertions.assertThat((Map)offset).isEqualTo(expectedOffset);
    }

    protected void assertOffset(SourceRecord record, String offsetField, Object expectedValue) {
        Map offset = record.sourceOffset();
        Object value = offset.get(offsetField);
        this.assertSameValue(value, expectedValue);
    }

    protected void assertValueField(SourceRecord record, String fieldPath, Object expectedValue) {
        VerifyRecord.assertValueField((SourceRecord)record, (String)fieldPath, (Object)expectedValue);
    }

    private void assertSameValue(Object actual, Object expected) {
        VerifyRecord.assertSameValue((Object)actual, (Object)expected);
    }

    protected void assertSchemaMatchesStruct(SchemaAndValue value) {
        VerifyRecord.schemaMatchesStruct((SchemaAndValue)value);
    }

    protected void assertSchemaMatchesStruct(Struct struct, Schema schema) {
        VerifyRecord.schemaMatchesStruct((Struct)struct, (Schema)schema);
    }

    protected void assertEngineIsRunning() {
        ((BooleanAssert)Assertions.assertThat((boolean)this.engine.isRunning()).as("Engine should not fail due to an exception")).isTrue();
    }

    protected void validate(SourceRecord record) {
        VerifyRecord.isValid((SourceRecord)record);
    }

    protected void print(SourceRecord record) {
        VerifyRecord.print((SourceRecord)record);
    }

    protected void debug(SourceRecord record) {
        VerifyRecord.debug((SourceRecord)record);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int numErrors) {
        ConfigValue value = this.configValue(config, field.name());
        Assertions.assertThat((int)value.errorMessages().size()).isEqualTo(numErrors);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field, int minErrorsInclusive, int maxErrorsInclusive) {
        ConfigValue value = this.configValue(config, field.name());
        Assertions.assertThat((int)value.errorMessages().size()).isGreaterThanOrEqualTo(minErrorsInclusive);
        Assertions.assertThat((int)value.errorMessages().size()).isLessThanOrEqualTo(maxErrorsInclusive);
    }

    protected void assertConfigurationErrors(Config config, io.debezium.config.Field field) {
        ConfigValue value = this.configValue(config, field.name());
        Assertions.assertThat((int)value.errorMessages().size()).isGreaterThan(0);
    }

    protected void assertNoConfigurationErrors(Config config, io.debezium.config.Field ... fields) {
        for (io.debezium.config.Field field : fields) {
            ConfigValue value = this.configValue(config, field.name());
            if (value == null || value.errorMessages().isEmpty()) continue;
            Assert.fail((String)("Error messages on field '" + field.name() + "': " + value.errorMessages()));
        }
    }

    protected ConfigValue configValue(Config config, String fieldName) {
        return config.configValues().stream().filter(value -> value.name().equals(fieldName)).findFirst().orElse(null);
    }

    protected <T> Map<String, Object> readLastCommittedOffset(Configuration config, Map<String, T> partition) {
        return this.readLastCommittedOffsets(config, Arrays.asList(partition)).get(partition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <T> Map<Map<String, T>, Map<String, Object>> readLastCommittedOffsets(Configuration config, Collection<Map<String, T>> partitions) {
        config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)config.edit().with(EmbeddedEngine.ENGINE_NAME, "testing-connector")).with("offset.storage.file.filename", (Object)OFFSET_STORE_PATH)).with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, 0)).build();
        String engineName = config.getString(EmbeddedEngine.ENGINE_NAME);
        Converter keyConverter = (Converter)config.getInstance(EmbeddedEngine.INTERNAL_KEY_CONVERTER_CLASS, Converter.class);
        keyConverter.configure(config.subset(EmbeddedEngine.INTERNAL_KEY_CONVERTER_CLASS.name() + ".", true).asMap(), true);
        Converter valueConverter = (Converter)config.getInstance(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS, Converter.class);
        Configuration valueConverterConfig = config;
        if (valueConverter instanceof JsonConverter) {
            valueConverterConfig = ((Configuration.Builder)config.edit().with(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS + ".schemas.enable", false)).build();
        }
        valueConverter.configure(valueConverterConfig.subset(EmbeddedEngine.INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);
        Map embeddedConfig = config.asMap(EmbeddedEngine.ALL_FIELDS);
        embeddedConfig.put("key.converter", JsonConverter.class.getName());
        embeddedConfig.put("value.converter", JsonConverter.class.getName());
        EmbeddedEngine.EmbeddedConfig workerConfig = new EmbeddedEngine.EmbeddedConfig(embeddedConfig);
        FileOffsetBackingStore offsetStore = new FileOffsetBackingStore();
        offsetStore.configure((WorkerConfig)workerConfig);
        offsetStore.start();
        try {
            OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl((OffsetBackingStore)offsetStore, engineName, keyConverter, valueConverter);
            Map map = offsetReader.offsets(partitions);
            return map;
        }
        finally {
            offsetStore.stop();
        }
    }

    protected String assertBeginTransaction(SourceRecord record) {
        Struct begin = (Struct)record.value();
        Struct beginKey = (Struct)record.key();
        Map offset = record.sourceOffset();
        Assertions.assertThat((String)begin.getString("status")).isEqualTo((Object)"BEGIN");
        Assertions.assertThat((Long)begin.getInt64("event_count")).isNull();
        String txId = begin.getString("id");
        Assertions.assertThat((String)beginKey.getString("id")).isEqualTo((Object)txId);
        Assertions.assertThat(offset.get("transaction_id")).isEqualTo((Object)txId);
        return txId;
    }

    protected void assertEndTransaction(SourceRecord record, String expectedTxId, long expectedEventCount, Map<String, Number> expectedPerTableCount) {
        Struct end = (Struct)record.value();
        Struct endKey = (Struct)record.key();
        Map offset = record.sourceOffset();
        Assertions.assertThat((String)end.getString("status")).isEqualTo((Object)"END");
        Assertions.assertThat((String)end.getString("id")).isEqualTo((Object)expectedTxId);
        Assertions.assertThat((Long)end.getInt64("event_count")).isEqualTo(expectedEventCount);
        Assertions.assertThat((String)endKey.getString("id")).isEqualTo((Object)expectedTxId);
        Assertions.assertThat(end.getArray("data_collections").stream().map(x -> (Struct)x).collect(Collectors.toMap(x -> x.getString("data_collection"), x -> x.getInt64("event_count")))).isEqualTo(expectedPerTableCount.entrySet().stream().collect(Collectors.toMap(x -> (String)x.getKey(), x -> ((Number)x.getValue()).longValue())));
        Assertions.assertThat(offset.get("transaction_id")).isEqualTo((Object)expectedTxId);
    }

    protected void assertRecordTransactionMetadata(SourceRecord record, String expectedTxId, long expectedTotalOrder, long expectedCollectionOrder) {
        Struct change = ((Struct)record.value()).getStruct("transaction");
        Map offset = record.sourceOffset();
        Assertions.assertThat((String)change.getString("id")).isEqualTo((Object)expectedTxId);
        Assertions.assertThat((Long)change.getInt64("total_order")).isEqualTo(expectedTotalOrder);
        Assertions.assertThat((Long)change.getInt64("data_collection_order")).isEqualTo(expectedCollectionOrder);
        Assertions.assertThat(offset.get("transaction_id")).isEqualTo((Object)expectedTxId);
    }

    public static int waitTimeForRecords() {
        return Integer.parseInt(System.getProperty("debezium.test.records.waittime", "2"));
    }

    public static void waitForSnapshotToBeCompleted(String connector, String server) throws InterruptedException {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost((long)(AbstractConnectorTest.waitTimeForRecords() * 30), TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> (boolean)((Boolean)mbeanServer.getAttribute(AbstractConnectorTest.getSnapshotMetricsObjectName(connector, server), "SnapshotCompleted")));
    }

    public static void waitForStreamingRunning(String connector, String server) throws InterruptedException {
        AbstractConnectorTest.waitForStreamingRunning(connector, server, "streaming");
    }

    public static void waitForStreamingRunning(String connector, String server, String contextName) {
        Awaitility.await().alias("Streaming was not started on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost((long)(AbstractConnectorTest.waitTimeForRecords() * 30), TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> AbstractConnectorTest.isStreamingRunning(connector, server, contextName));
    }

    public static void waitForConnectorShutdown(String connector, String server) {
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost((long)(AbstractConnectorTest.waitTimeForRecords() * 30), TimeUnit.SECONDS).until(() -> !AbstractConnectorTest.isStreamingRunning(connector, server));
    }

    public static boolean isStreamingRunning(String connector, String server) {
        return AbstractConnectorTest.isStreamingRunning(connector, server, "streaming");
    }

    public static boolean isStreamingRunning(String connector, String server, String contextName) {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            return (Boolean)mbeanServer.getAttribute(AbstractConnectorTest.getStreamingMetricsObjectName(connector, server, contextName), "Connected");
        }
        catch (JMException jMException) {
            return false;
        }
    }

    public static ObjectName getSnapshotMetricsObjectName(String connector, String server) throws MalformedObjectNameException {
        return new ObjectName("debezium." + connector + ":type=connector-metrics,context=snapshot,server=" + server);
    }

    public static ObjectName getStreamingMetricsObjectName(String connector, String server) throws MalformedObjectNameException {
        return AbstractConnectorTest.getStreamingMetricsObjectName(connector, server, "streaming");
    }

    public static ObjectName getStreamingMetricsObjectName(String connector, String server, String context) throws MalformedObjectNameException {
        return new ObjectName("debezium." + connector + ":type=connector-metrics,context=" + context + ",server=" + server);
    }

    protected class SourceRecords {
        private final List<SourceRecord> records = new ArrayList<SourceRecord>();
        private final Map<String, List<SourceRecord>> recordsByTopic = new HashMap<String, List<SourceRecord>>();
        private final Map<String, List<SourceRecord>> ddlRecordsByDbName = new HashMap<String, List<SourceRecord>>();

        protected SourceRecords() {
        }

        public void add(SourceRecord record) {
            this.records.add(record);
            this.recordsByTopic.computeIfAbsent(record.topic(), topicName -> new ArrayList()).add(record);
            String dbName = this.getAffectedDatabase(record);
            if (dbName != null) {
                this.ddlRecordsByDbName.computeIfAbsent(dbName, key -> new ArrayList()).add(record);
            }
        }

        protected String getAffectedDatabase(SourceRecord record) {
            Field dbField;
            Struct value = (Struct)record.value();
            if (value != null && (dbField = value.schema().field("databaseName")) != null) {
                return value.getString(dbField.name());
            }
            return null;
        }

        public List<SourceRecord> ddlRecordsForDatabase(String dbName) {
            return this.ddlRecordsByDbName.get(dbName);
        }

        public Set<String> databaseNames() {
            return this.ddlRecordsByDbName.keySet();
        }

        public List<SourceRecord> recordsForTopic(String topicName) {
            return this.recordsByTopic.get(topicName);
        }

        public Set<String> topics() {
            return this.recordsByTopic.keySet();
        }

        public void forEachInTopic(String topic, Consumer<SourceRecord> consumer) {
            this.recordsForTopic(topic).forEach(consumer);
        }

        public void forEach(Consumer<SourceRecord> consumer) {
            this.records.forEach(consumer);
        }

        public List<SourceRecord> allRecordsInOrder() {
            return Collections.unmodifiableList(this.records);
        }

        public void print() {
            Testing.print((Object)("" + this.topics().size() + " topics: " + this.topics()));
            this.recordsByTopic.forEach((? super K k, ? super V v) -> Testing.print((Object)(" - topic:'" + k + "'; # of events = " + v.size())));
            Testing.print((Object)"Records:");
            this.records.forEach(AbstractConnectorTest.this::print);
        }
    }
}

