/*
 * Decompiled with CFR 0.152.
 */
package cascading.local.tap.kafka;

import cascading.CascadingTestCase;
import cascading.flow.FlowProcess;
import cascading.local.tap.kafka.KafkaScheme;
import cascading.local.tap.kafka.KafkaTap;
import cascading.local.tap.kafka.TextKafkaScheme;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import java.io.IOException;
import java.util.function.Predicate;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.KafkaContainer;

public class KafkaTapIntegrationTest
extends CascadingTestCase {
    @ClassRule
    public static KafkaContainer kafka = new KafkaContainer("5.1.0");

    @Test
    public void writeRead() throws Exception {
        this.handle(new TextKafkaScheme(), tupleEntry -> tupleEntry.getObject(3) instanceof String, "my-test-default");
    }

    @Test
    public void writeReadTyped() throws Exception {
        Fields sourceFields = new Fields((Comparable)((Object)"topic"), String.class).append(new Fields((Comparable)((Object)"partition"), Integer.class)).append(new Fields((Comparable)((Object)"offset"), Long.class)).append(new Fields((Comparable)((Object)"key"), Integer.class).append(new Fields((Comparable)((Object)"value"), Integer.class)).append(new Fields((Comparable)((Object)"timestamp"), Long.class)).append(new Fields((Comparable)((Object)"tsType"), String.class)));
        this.handle(new TextKafkaScheme(sourceFields), tupleEntry -> tupleEntry.getObject(3) instanceof Integer, "my-test-typed");
    }

    private void handle(TextKafkaScheme text, Predicate<TupleEntry> predicate, String topic) throws IOException {
        String hostname = kafka.getBootstrapServers();
        KafkaTap tap = new KafkaTap((KafkaScheme)text, hostname, "test-client", new String[]{topic + "-topic"});
        try (TupleEntryCollector collector = tap.openForWrite(FlowProcess.nullFlowProcess());){
            for (int i = 0; i < 100; ++i) {
                collector.add(new Tuple(new Object[]{i, i}));
            }
        }
        int count = 0;
        try (TupleEntryIterator iterator = tap.openForRead(FlowProcess.nullFlowProcess());){
            while (iterator.hasNext() && predicate.test((TupleEntry)iterator.next())) {
                ++count;
            }
        }
        KafkaTapIntegrationTest.assertEquals((int)100, (int)count);
        count = 0;
        iterator = tap.openForRead(FlowProcess.nullFlowProcess());
        var8_12 = null;
        try {
            while (iterator.hasNext() && iterator.next() != null) {
                ++count;
            }
        }
        catch (Throwable throwable) {
            var8_12 = throwable;
            throw throwable;
        }
        finally {
            if (iterator != null) {
                if (var8_12 != null) {
                    try {
                        iterator.close();
                    }
                    catch (Throwable throwable) {
                        var8_12.addSuppressed(throwable);
                    }
                } else {
                    iterator.close();
                }
            }
        }
        KafkaTapIntegrationTest.assertEquals((int)0, (int)count);
        tap = new KafkaTap((KafkaScheme)text, hostname, "test-client-2", new String[]{"/" + topic + "-.*/"});
        count = 0;
        iterator = tap.openForRead(FlowProcess.nullFlowProcess());
        var8_12 = null;
        try {
            while (iterator.hasNext() && iterator.next() != null) {
                ++count;
            }
        }
        catch (Throwable throwable) {
            var8_12 = throwable;
            throw throwable;
        }
        finally {
            if (iterator != null) {
                if (var8_12 != null) {
                    try {
                        iterator.close();
                    }
                    catch (Throwable throwable) {
                        var8_12.addSuppressed(throwable);
                    }
                } else {
                    iterator.close();
                }
            }
        }
        KafkaTapIntegrationTest.assertEquals((int)100, (int)count);
    }
}

