/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.embedded;

import io.debezium.config.Configuration;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.Test;

public class EmbeddedEngineTest
extends AbstractConnectorTest {
    private static final int NUMBER_OF_LINES = 10;
    private static final Path TEST_FILE_PATH = Testing.Files.createTestingPath((String)"file-connector-input.txt").toAbsolutePath();
    private static final Charset UTF8 = StandardCharsets.UTF_8;
    private File inputFile;
    private int nextConsumedLineNumber;
    private int linesAdded;
    private Configuration connectorConfig;

    @Before
    public void beforeEach() throws Exception {
        this.nextConsumedLineNumber = 1;
        this.linesAdded = 0;
        Testing.Files.delete((Path)TEST_FILE_PATH);
        this.inputFile = Testing.Files.createTestingFile((Path)TEST_FILE_PATH);
        this.connectorConfig = ((Configuration.Builder)Configuration.create().with("file", (Object)TEST_FILE_PATH)).with("topic", "topicX").build();
    }

    @Test
    public void shouldStartAndUseFileConnectorUsingMemoryOffsetStorage() throws Exception {
        this.appendLinesToSource(10);
        this.start(FileStreamSourceConnector.class, this.connectorConfig);
        this.consumeLines(10);
        this.assertNoRecordsToConsume();
        for (int i = 1; i != 5; ++i) {
            this.appendLinesToSource(10);
            this.consumeLines(10);
            this.assertNoRecordsToConsume();
        }
        this.stopConnector();
        this.appendLinesToSource(10);
        this.assertNoRecordsToConsume();
        this.start(FileStreamSourceConnector.class, this.connectorConfig);
        this.consumeLines(10);
        this.assertNoRecordsToConsume();
    }

    protected void appendLinesToSource(int numberOfLines) throws IOException {
        Object[] lines = new CharSequence[numberOfLines];
        for (int i = 0; i != numberOfLines; ++i) {
            lines[i] = this.generateLine(this.linesAdded + i + 1);
        }
        Files.write(this.inputFile.toPath(), (Iterable<? extends CharSequence>)Collect.arrayListOf((Object[])lines), UTF8, StandardOpenOption.APPEND);
        this.linesAdded += numberOfLines;
    }

    protected String generateLine(int lineNumber) {
        return "Generated line number " + lineNumber;
    }

    protected void consumeLines(int numberOfLines) throws InterruptedException {
        this.consumeRecords(numberOfLines, record -> {
            String line = record.value().toString();
            Assertions.assertThat((String)line).isEqualTo((Object)this.generateLine(this.nextConsumedLineNumber));
            ++this.nextConsumedLineNumber;
        });
    }
}

