/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.channel;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import net.openhft.chronicle.bytes.MethodReader;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.Mocker;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.core.time.SystemTimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.QueueTestCommon;
import net.openhft.chronicle.queue.TailerDirection;
import net.openhft.chronicle.queue.channel.PipeHandler;
import net.openhft.chronicle.queue.channel.Says;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.SelfDescribingMarshallable;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.channel.ChannelHandler;
import net.openhft.chronicle.wire.channel.ChannelHeader;
import net.openhft.chronicle.wire.channel.ChronicleChannel;
import net.openhft.chronicle.wire.channel.ChronicleContext;
import net.openhft.chronicle.wire.channel.ChronicleGatewayMain;
import net.openhft.chronicle.wire.channel.RedirectHeader;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class PipeHandlerTest
extends QueueTestCommon {
    private final boolean buffered;

    public PipeHandlerTest(boolean buffered) {
        this.buffered = buffered;
    }

    @Parameterized.Parameters(name="buffered: {0}")
    public static List<Object[]> combinations() {
        Object[][] comb = new Object[][]{{true}, {false}};
        return Arrays.asList(comb);
    }

    @Before
    public void deleteQueues() {
        IOTools.deleteDirWithFiles((String[])new String[]{"test-q"});
    }

    @Test
    @Ignore
    public void internal() {
        String url = "internal://";
        try (ChronicleContext context = ChronicleContext.newContext((String)url).buffered(this.buffered);){
            this.doTest(context);
        }
    }

    @Test
    public void clientOnly() throws IOException {
        String url = "tcp://localhost:65441";
        IOTools.deleteDirWithFiles((String[])new String[]{"target/client", "target/gw"});
        try (ChronicleContext context = ChronicleContext.newContext((String)url).name("target/client").buffered(this.buffered);){
            ChronicleGatewayMain gateway = new ChronicleGatewayMain(url);
            gateway.name("target/gw");
            context.addCloseable((Closeable)gateway);
            gateway.start();
            this.doTest(context);
        }
    }

    @Test
    public void server() {
        String url = "tcp://:0";
        IOTools.deleteDirWithFiles((String[])new String[]{"target/server"});
        try (ChronicleContext context = ChronicleContext.newContext((String)url).name("target/server").buffered(this.buffered);){
            this.doTest(context);
        }
    }

    @Ignore(value="https://github.com/OpenHFT/Chronicle-Queue/issues/1390")
    @Test
    public void redirectedServer() throws IOException {
        IOTools.deleteDirWithFiles((String[])new String[]{"target/zero", "target/one", "target/client"});
        Assume.assumeFalse((boolean)Jvm.isDebug());
        final String urlZzz = "tcp://localhost:65329";
        final String url0 = "tcp://localhost:65330";
        String url1 = "tcp://localhost:65331";
        try (ChronicleGatewayMain gateway0 = new ChronicleGatewayMain(url0).buffered(this.buffered);){
            gateway0.name("target/zero");
            gateway0.start();
            try (ChronicleGatewayMain gateway1 = new ChronicleGatewayMain(url1){

                protected ChannelHeader replaceOutHeader(ChannelHeader channelHeader) {
                    return new RedirectHeader(Arrays.asList(urlZzz, url0));
                }
            }.buffered(this.buffered);){
                gateway1.name("target/one");
                gateway1.start();
                try (ChronicleContext context = ChronicleContext.newContext((String)url1).name("target/client").buffered(this.buffered);){
                    this.doTest(context);
                }
            }
        }
    }

    private void doTest(ChronicleContext context) {
        ChronicleChannel channel = context.newChannelSupplier((ChannelHandler)new PipeHandler().subscribe("test-q").publish("test-q")).get();
        Says says = (Says)channel.methodWriter(Says.class, new Class[0]);
        says.say("Hello World");
        StringBuilder eventType = new StringBuilder();
        String text = (String)channel.readOne(eventType, String.class);
        Assert.assertEquals((Object)"say: Hello World", (Object)(eventType + ": " + text));
        try (DocumentContext dc = channel.readingDocument();){
            Assert.assertFalse((boolean)dc.isPresent());
            Assert.assertFalse((boolean)dc.isMetaData());
        }
        long now = SystemTimeProvider.CLOCK.currentTimeNanos();
        channel.testMessage(now);
        try (DocumentContext dc = channel.readingDocument();){
            Assert.assertTrue((boolean)dc.isPresent());
            Assert.assertTrue((boolean)dc.isMetaData());
        }
        Assert.assertEquals((long)now, (long)channel.lastTestMessage());
    }

    @Test(timeout=20000L)
    public void filtered() {
        String url = "tcp://:0";
        IOTools.deleteDirWithFiles((String[])new String[]{"target/filtered"});
        try (ChronicleContext context = ChronicleContext.newContext((String)url).name("target/filtered").buffered(this.buffered);
             ChronicleChannel channel1 = context.newChannelSupplier((ChannelHandler)PipeHandlerTest.createPipeHandler().filter((Predicate)new SaysFilter(""))).get();
             ChronicleChannel channel2 = context.newChannelSupplier((ChannelHandler)PipeHandlerTest.createPipeHandler().filter((Predicate)new SaysFilter("2 "))).get();
             ChronicleChannel channel3 = context.newChannelSupplier((ChannelHandler)PipeHandlerTest.createPipeHandler().filter((Predicate)new SaysFilter("3 "))).get();){
            Says says1 = (Says)channel1.methodWriter(Says.class, new Class[0]);
            Says says2 = (Says)channel2.methodWriter(Says.class, new Class[0]);
            says1.say("1 Hi one");
            says1.say("2 Hi two");
            says1.say("3 Hi three");
            says2.say("1 Bye one");
            says2.say("2 Bye two");
            says2.say("3 Bye three");
            LinkedBlockingQueue q = new LinkedBlockingQueue();
            MethodReader reader1 = channel1.methodReader(new Object[]{Mocker.queuing(Says.class, (String)"1 - ", q)});
            MethodReader reader2 = channel2.methodReader(new Object[]{Mocker.queuing(Says.class, (String)"2 - ", q)});
            MethodReader reader3 = channel3.methodReader(new Object[]{Mocker.queuing(Says.class, (String)"3 - ", q)});
            this.readN(reader1, 6);
            this.readN(reader2, 2);
            this.readN(reader3, 2);
            Assert.assertEquals((Object)"1 - say[1 Bye one]\n1 - say[1 Hi one]\n1 - say[2 Bye two]\n1 - say[2 Hi two]\n1 - say[3 Bye three]\n1 - say[3 Hi three]\n2 - say[2 Bye two]\n2 - say[2 Hi two]\n3 - say[3 Bye three]\n3 - say[3 Hi three]", (Object)new TreeSet(q).stream().collect(Collectors.joining("\n")));
        }
    }

    @Test(timeout=20000L)
    public void testsSubscriptionIndexController() {
        String url = "tcp://:0";
        IOTools.deleteDirWithFiles((String[])new String[]{"target/fromIndex"});
        try (ChronicleContext context = ChronicleContext.newContext((String)url).name("target/fromIndex").buffered(this.buffered);
             SingleChronicleQueue cq = ChronicleQueue.singleBuilder((File)context.toFile("test-q")).blockSize(OS.isSparseFileSupported() ? 0x8000000000L : 0x4000000L).sourceId(1).build();){
            Says says = (Says)cq.methodWriter(Says.class, new Class[0]);
            says.say("1 Hi one");
            says.say("2 Hi two");
            says.say("3 Hi three");
            try (ChronicleChannel channel1 = context.newChannelSupplier((ChannelHandler)PipeHandlerTest.createPipeHandler().subscriptionIndexController((Consumer)new ToLastMessage())).get();){
                LinkedBlockingQueue q = new LinkedBlockingQueue();
                MethodReader reader1 = channel1.methodReader(new Object[]{Mocker.queuing(Says.class, (String)"", q)});
                this.readN(reader1, 1);
                Assert.assertEquals((Object)"[say[3 Hi three]]", (Object)((Object)q).toString());
            }
        }
    }

    private static PipeHandler createPipeHandler() {
        return new PipeHandler().subscribe("test-q").publish("test-q").publishSourceId(1).subscribeSourceId(1);
    }

    private void readN(MethodReader reader, int n) {
        int count = 0;
        while (true) {
            if (reader.readOne()) {
                ++count;
            }
            if (count >= n) {
                return;
            }
            Jvm.pause((long)1L);
        }
    }

    static class SaysFilter
    extends SelfDescribingMarshallable
    implements Predicate<Wire> {
        private final String start;

        public SaysFilter(String start) {
            this.start = start;
        }

        @Override
        public boolean test(Wire wire) {
            String said = wire.read("say").text();
            if (said == null) {
                return false;
            }
            boolean b = said.startsWith(this.start);
            return b;
        }
    }

    public static class ToLastMessage
    extends SelfDescribingMarshallable
    implements Consumer<ExcerptTailer> {
        @Override
        public void accept(ExcerptTailer excerptTailer) {
            DocumentContext ignore = excerptTailer.toEnd().direction(TailerDirection.BACKWARD).readingDocument();
            Throwable throwable = null;
            if (ignore != null) {
                if (throwable != null) {
                    try {
                        ignore.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    ignore.close();
                }
            }
            ignore = excerptTailer.direction(TailerDirection.FORWARD).direction(TailerDirection.FORWARD).readingDocument();
            throwable = null;
            if (ignore != null) {
                if (throwable != null) {
                    try {
                        ignore.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                } else {
                    ignore.close();
                }
            }
        }
    }
}

