/*
 * Decompiled with CFR 0.152.
 */
package cascading.local.tap.splunk;

import cascading.flow.FlowProcess;
import cascading.local.tap.splunk.SplunkCSV;
import cascading.local.tap.splunk.SplunkIndexTap;
import cascading.local.tap.splunk.SplunkRawDelimited;
import cascading.local.tap.splunk.SplunkRawLine;
import cascading.local.tap.splunk.SplunkScheme;
import cascading.local.tap.splunk.SplunkSearchTap;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import com.splunk.Index;
import com.splunk.SDKTestCase;
import com.splunk.Service;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;

public class SplunkTapIntegrationTest
extends SDKTestCase {
    public static final Integer[] PORTS = new Integer[]{8000, 8089};
    @ClassRule
    public static GenericContainer splunk = new GenericContainer("splunk/splunk:7.3.4").withExposedPorts(PORTS).withEnv("SPLUNK_START_ARGS", "--accept-license").withEnv("SPLUNK_PASSWORD", "helloworld").withEnv("NO_HEALTHCHECK", "true").withStartupTimeout(Duration.ofMinutes(3L));
    private String indexName;
    private Index index;

    @Override
    @Before
    public void setUp() throws Exception {
        HashMap<String, Object> args = new HashMap<String, Object>();
        args.put("host", splunk.getContainerIpAddress());
        args.put("port", splunk.getMappedPort(8089));
        args.put("username", "admin");
        args.put("password", "helloworld");
        service = new Service(args);
        super.setUp();
        this.indexName = SplunkTapIntegrationTest.createTemporaryName();
        this.index = (Index)service.getIndexes().create(this.indexName);
        SplunkTapIntegrationTest.assertEventuallyTrue(() -> service.getIndexes().containsKey((Object)this.indexName));
    }

    @Override
    @After
    public void tearDown() throws Exception {
        if (service != null && service.getIndexes().containsKey((Object)this.indexName)) {
            this.index.remove();
        }
        super.tearDown();
    }

    @Test
    public void writeDelimitedReadCSV() throws IOException {
        Fields sinkFields = new Fields((Comparable)((Object)"time"), Long.TYPE).append(new Fields((Comparable)((Object)"value"), Integer.TYPE));
        Fields sourceFields = SplunkCSV.DEFAULTS.append(SplunkCSV._INDEXTIME).append(SplunkCSV._SUBSECOND).append(SplunkCSV.TIMESTARTPOS).append(SplunkCSV.TIMEENDPOS);
        this.writeRead(() -> new SplunkIndexTap((SplunkScheme)new SplunkRawDelimited(sinkFields), service, this.indexName), () -> new SplunkIndexTap((SplunkScheme)new SplunkCSV(sourceFields), service, this.indexName), (count, next) -> count + 1, 1, 100);
    }

    @Test
    public void writeDelimitedReadCSVNarrow() throws IOException {
        this.writeRead(() -> {
            SplunkRawDelimited sink = new SplunkRawDelimited(new Fields((Comparable)((Object)"time"), Long.TYPE).append(new Fields((Comparable)((Object)"value"), Integer.TYPE)));
            return new SplunkIndexTap((SplunkScheme)sink, service, this.indexName);
        }, () -> {
            SplunkCSV source = new SplunkCSV(SplunkCSV._TIME.append(SplunkCSV._RAW));
            return new SplunkIndexTap((SplunkScheme)source, service, this.indexName);
        }, (count, next) -> count + 1, 1, 100);
    }

    @Test
    public void writeDelimitedReadCSVAll() throws IOException {
        this.writeRead(() -> {
            SplunkRawDelimited sink = new SplunkRawDelimited(new Fields((Comparable)((Object)"time"), Long.TYPE).append(new Fields((Comparable)((Object)"value"), Integer.TYPE)));
            return new SplunkIndexTap((SplunkScheme)sink, service, this.indexName);
        }, () -> {
            SplunkCSV source = new SplunkCSV(Fields.ALL);
            return new SplunkIndexTap((SplunkScheme)source, service, this.indexName);
        }, (count, next) -> count + 1, 1, 100);
    }

    @Test
    public void writeDelimitedReadCSVAllMulti() throws IOException {
        this.writeRead(() -> {
            SplunkRawDelimited sink = new SplunkRawDelimited(new Fields((Comparable)((Object)"time"), Long.TYPE).append(new Fields((Comparable)((Object)"value"), Integer.TYPE)));
            return new SplunkIndexTap((SplunkScheme)sink, service, this.indexName);
        }, () -> {
            SplunkCSV source = new SplunkCSV(Fields.ALL);
            return new SplunkIndexTap((SplunkScheme)source, service, this.indexName);
        }, (count, next) -> count + 1, 3, 99);
    }

    @Test
    public void writeDelimitedReadDelimited() throws IOException {
        SplunkRawDelimited scheme = new SplunkRawDelimited(new Fields((Comparable)((Object)"time"), Long.TYPE).append(new Fields((Comparable)((Object)"value"), Integer.TYPE)));
        this.writeRead(() -> new SplunkIndexTap((SplunkScheme)scheme, service, this.indexName), () -> new SplunkIndexTap((SplunkScheme)scheme, service, this.indexName), (count, next) -> count + next.getInteger((Comparable)Integer.valueOf(1)), 1, 4950);
    }

    @Test
    public void writeLineReadLine() throws IOException {
        SplunkRawLine scheme = new SplunkRawLine(new Fields((Comparable)((Object)"num"), Long.TYPE).append(new Fields((Comparable)((Object)"line"), String.class)), new Fields((Comparable)((Object)"time"), Long.TYPE).append(new Fields((Comparable)((Object)"value"), Integer.TYPE)));
        this.writeRead(() -> new SplunkIndexTap((SplunkScheme)scheme, service, this.indexName), () -> new SplunkIndexTap((SplunkScheme)scheme, service, this.indexName), (count, next) -> count + next.getInteger((Comparable)Integer.valueOf(0)), 1, 4950);
    }

    @Test
    public void writeDelimitedSearchDelimited() throws IOException {
        SplunkRawDelimited scheme = new SplunkRawDelimited(new Fields((Comparable)((Object)"time"), Long.TYPE).append(new Fields((Comparable)((Object)"value"), Integer.TYPE)));
        this.writeRead(() -> new SplunkIndexTap((SplunkScheme)scheme, service, this.indexName), () -> new SplunkSearchTap((SplunkScheme)scheme, service, String.format("index=%s *", this.indexName)), (count, next) -> count + next.getInteger((Comparable)Integer.valueOf(1)), 1, 4950);
    }

    protected void writeRead(Supplier<Tap> sink, Supplier<Tap> source, BiFunction<Integer, TupleEntry, Integer> aggregate, int iterations, int test) throws IOException {
        Assert.assertTrue((this.getResultCountOfIndex(service, this.indexName) == 0 ? 1 : 0) != 0);
        Assert.assertTrue((this.getTotalEventCount(this.index) == 0 ? 1 : 0) != 0);
        int totalItems = this.repeat(sink, iterations);
        SplunkTapIntegrationTest.assertEventuallyTrue(() -> this.getResultCountOfIndex(service, this.indexName) == totalItems);
        SplunkTapIntegrationTest.assertEventuallyTrue(() -> this.getTotalEventCount(this.index) == totalItems);
        Tap input = source.get();
        TupleEntryIterator iterator = input.openForRead(FlowProcess.nullFlowProcess());
        int count = 0;
        while (iterator.hasNext()) {
            TupleEntry next = (TupleEntry)iterator.next();
            System.out.println("next = " + next);
            count = aggregate.apply(count, next);
        }
        Assert.assertEquals((long)test, (long)count);
    }

    private int repeat(Supplier<Tap> sink, int iterations) throws IOException {
        int totalItems = 100 / iterations;
        for (int j = 0; j < iterations; ++j) {
            Tap output = sink.get();
            try (TupleEntryCollector collector = output.openForWrite(FlowProcess.nullFlowProcess());){
                for (int i = 0; i < totalItems; ++i) {
                    collector.add(new Tuple(new Object[]{System.currentTimeMillis(), i + j * totalItems}));
                }
                continue;
            }
        }
        return totalItems * iterations;
    }
}

