/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.IOTools;
import net.openhft.chronicle.core.util.Time;
import net.openhft.chronicle.queue.BufferMode;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.QueueTestCommon;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class OvertakeTest
extends QueueTestCommon {
    private String path;
    private long a_index;
    private int messages = 500;

    private static long doReadBad(@NotNull ExcerptTailer tailer, int expected, boolean additionalClose) {
        int[] i = new int[]{0};
        long t_index = 0L;
        while (true) {
            DocumentContext dc = tailer.readingDocument();
            Throwable throwable = null;
            try {
                if (!dc.isPresent()) break;
                t_index = tailer.index();
                dc.wire().read("log").marshallable(m -> {
                    String msg = m.read("msg").text();
                    Assert.assertNotNull((Object)msg);
                    i[0] = i[0] + 1;
                });
                if (!additionalClose) continue;
                dc.close();
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (dc == null) continue;
                if (throwable != null) {
                    try {
                        dc.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                dc.close();
                continue;
            }
            break;
        }
        Assert.assertEquals((long)expected, (long)i[0]);
        return t_index;
    }

    @Before
    public void before() {
        this.path = OS.getTarget() + "/" + this.getClass().getSimpleName() + "-" + Time.uniqueId();
        try (SingleChronicleQueue appender_queue = ChronicleQueue.singleBuilder((String)this.path).testBlockSize().writeBufferMode(BufferMode.None).build();){
            ExcerptAppender appender = appender_queue.acquireAppender();
            for (int i = 0; i < this.messages; ++i) {
                long l = i;
                appender.writeDocument(wireOut -> wireOut.write((CharSequence)"log").marshallable(m -> {
                    m.write((CharSequence)"msg").text("hello world ola multi-verse");
                    m.write((CharSequence)"ts").int64(l);
                }));
            }
            this.a_index = appender.lastIndexAppended();
        }
    }

    @Test
    public void appendAndTail() {
        try (SingleChronicleQueue tailer_queue = ChronicleQueue.singleBuilder((String)this.path).testBlockSize().writeBufferMode(BufferMode.None).build();){
            ExcerptTailer tailer = tailer_queue.createTailer();
            tailer = tailer.toStart();
            long t_index = OvertakeTest.doReadBad(tailer, this.messages, false);
            Assert.assertEquals((long)this.a_index, (long)t_index);
            tailer = tailer_queue.createTailer();
            tailer = tailer.toStart();
            t_index = OvertakeTest.doReadBad(tailer, this.messages, true);
            Assert.assertEquals((long)this.a_index, (long)t_index);
        }
    }

    @After
    public void after() {
        try {
            IOTools.deleteDirWithFiles((String)this.path, (int)2);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void threadingTest() throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService execService = Executors.newFixedThreadPool(2, (ThreadFactory)new NamedThreadFactory("test"));
        SynchronousQueue<Long> sync = new SynchronousQueue<Long>();
        MyAppender myapp = new MyAppender(sync);
        Future<Long> f = execService.submit(myapp);
        try (SingleChronicleQueue tailer_queue = ChronicleQueue.singleBuilder((String)this.path).testBlockSize().writeBufferMode(BufferMode.None).build();){
            long t_index = 0L;
            MyTailer mytailer = new MyTailer((ChronicleQueue)tailer_queue, t_index, sync);
            Future<Long> f2 = execService.submit(mytailer);
            t_index = f2.get(10L, TimeUnit.SECONDS);
            this.a_index = f.get(10L, TimeUnit.SECONDS);
            Assert.assertEquals((long)this.a_index, (long)t_index);
        }
        execService.shutdown();
        execService.awaitTermination(1L, TimeUnit.SECONDS);
    }

    class MyTailer
    implements Callable<Long> {
        ChronicleQueue queue;
        long startIndex;
        SynchronousQueue<Long> sync;

        MyTailer(ChronicleQueue q, long s, SynchronousQueue<Long> sync) {
            this.queue = q;
            this.startIndex = s;
            this.sync = sync;
        }

        @Override
        public Long call() throws InterruptedException {
            ExcerptTailer tailer = this.queue.createTailer();
            tailer.moveToIndex(this.startIndex);
            Long fromWriter = this.sync.take();
            long index = OvertakeTest.doReadBad(tailer, OvertakeTest.this.messages + 50, false);
            if (index != fromWriter) {
                // empty if block
            }
            this.sync.put(index);
            fromWriter = this.sync.take();
            index = OvertakeTest.doReadBad(tailer, 50, false);
            if (index != fromWriter) {
                // empty if block
            }
            return index;
        }
    }

    class MyAppender
    implements Callable<Long> {
        ExcerptAppender appender;
        SynchronousQueue<Long> sync;

        MyAppender(SynchronousQueue<Long> sync) {
            this.sync = sync;
        }

        @Override
        public Long call() throws InterruptedException {
            try (SingleChronicleQueue queue = ChronicleQueue.singleBuilder((String)OvertakeTest.this.path).writeBufferMode(BufferMode.None).build();){
                this.appender = queue.acquireAppender();
                for (int i = 0; i < 50; ++i) {
                    this.appender.writeDocument(wireOut -> wireOut.write((CharSequence)"log").marshallable(m -> m.write((CharSequence)"msg").text("hello world2 ")));
                }
                long index = this.appender.lastIndexAppended();
                this.sync.put(index);
                Long fromReader = this.sync.take();
                if (index != fromReader) {
                    // empty if block
                }
                for (int i = 0; i < 50; ++i) {
                    this.appender.writeDocument(wireOut -> wireOut.write((CharSequence)"log").marshallable(m -> m.write((CharSequence)"msg").text("hello world2 ")));
                }
                index = this.appender.lastIndexAppended();
                this.sync.put(index);
                Long l = index;
                return l;
            }
        }
    }
}

