/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesUtil;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.time.SystemTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.GcControls;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.WireType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

public final class AppenderFileHandleLeakTest {
    private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;
    private static final int MESSAGES_PER_THREAD = 50;
    private static final SystemTimeProvider SYSTEM_TIME_PROVIDER = SystemTimeProvider.INSTANCE;
    private final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private final List<Path> lastFileHandles = new ArrayList<Path>();
    private TrackingStoreFileListener storeFileListener = new TrackingStoreFileListener();
    private AtomicLong currentTime = new AtomicLong(System.currentTimeMillis());
    private File queuePath;

    private static Matcher<Integer> withinDelta(final int expected, final int delta) {
        return new TypeSafeMatcher<Integer>(){
            private int actual;

            protected boolean matchesSafely(Integer actual) {
                this.actual = actual;
                return Math.abs(actual - expected) < delta;
            }

            public void describeTo(Description description) {
                description.appendText(String.format("actual %d was not within %d of %d", this.actual, delta, expected));
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void readMessage(SingleChronicleQueue queue, boolean manuallyReleaseResources, Consumer<ExcerptTailer> refHolder) {
        Bytes bytes = Bytes.elasticByteBuffer();
        try {
            ExcerptTailer tailer = queue.createTailer();
            while (bytes.isEmpty()) {
                tailer.toStart().readBytes(bytes);
            }
            refHolder.accept(tailer);
            Assert.assertThat((Object)(Math.signum(bytes.readInt()) >= 0.0f ? 1 : 0), (Matcher)CoreMatchers.is((Object)true));
            if (manuallyReleaseResources) {
                try {
                    ((SingleChronicleQueueExcerpts.StoreTailer)tailer).releaseResources();
                }
                catch (RuntimeException runtimeException) {
                    // empty catch block
                }
            }
        }
        finally {
            bytes.release();
        }
    }

    private static void writeMessage(int j, SingleChronicleQueue queue) {
        ExcerptAppender appender = queue.acquireAppender();
        appender.writeBytes(b -> b.writeInt(j));
    }

    @Before
    public void setUp() throws Exception {
        this.queuePath = DirectoryUtils.tempDir(AppenderFileHandleLeakTest.class.getSimpleName());
    }

    @Test
    public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() throws Exception {
        System.gc();
        Thread.sleep(100L);
        Assume.assumeThat((Object)OS.isLinux(), (Matcher)CoreMatchers.is((Object)true));
        LinkedList gcGuard = new LinkedList();
        long openFileHandleCount = this.countFileHandlesOfCurrentProcess();
        ArrayList<Path> fileHandlesAtStart = new ArrayList<Path>(this.lastFileHandles);
        try (SingleChronicleQueue queue = this.createQueue((TimeProvider)SYSTEM_TIME_PROVIDER);){
            LinkedList<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
            for (int i = 0; i < THREAD_COUNT; ++i) {
                futures.add(this.threadPool.submit(() -> {
                    for (int j = 0; j < 50; ++j) {
                        AppenderFileHandleLeakTest.writeMessage(j, queue);
                        AppenderFileHandleLeakTest.readMessage(queue, false, gcGuard::add);
                    }
                    GcControls.requestGcCycle();
                    return Boolean.TRUE;
                }));
            }
            for (Future future : futures) {
                Assert.assertThat(future.get(1L, TimeUnit.MINUTES), (Matcher)CoreMatchers.is((Object)true));
            }
            Assert.assertFalse((boolean)gcGuard.isEmpty());
            gcGuard.clear();
        }
        GcControls.waitForGcCycle();
        GcControls.waitForGcCycle();
        this.waitForFileHandleCountToDrop(openFileHandleCount, fileHandlesAtStart);
    }

    @Test
    public void tailerResourcesCanBeReleasedManually() throws Exception {
        System.gc();
        Thread.sleep(100L);
        Assume.assumeThat((Object)OS.isLinux(), (Matcher)CoreMatchers.is((Object)true));
        try (SingleChronicleQueue queue = this.createQueue((TimeProvider)SYSTEM_TIME_PROVIDER);){
            long openFileHandleCount = this.countFileHandlesOfCurrentProcess();
            ArrayList<Path> fileHandlesAtStart = new ArrayList<Path>(this.lastFileHandles);
            LinkedList<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
            LinkedList gcGuard = new LinkedList();
            for (int i = 0; i < THREAD_COUNT; ++i) {
                futures.add(this.threadPool.submit(() -> {
                    for (int j = 0; j < 50; ++j) {
                        AppenderFileHandleLeakTest.writeMessage(j, queue);
                        AppenderFileHandleLeakTest.readMessage(queue, true, gcGuard::add);
                    }
                    return Boolean.TRUE;
                }));
            }
            for (Future future : futures) {
                Assert.assertThat(future.get(1L, TimeUnit.MINUTES), (Matcher)CoreMatchers.is((Object)true));
            }
            this.waitForFileHandleCountToDrop(openFileHandleCount, fileHandlesAtStart);
            Assert.assertFalse((boolean)gcGuard.isEmpty());
        }
    }

    @Test
    public void tailerShouldReleaseFileHandlesAsQueueRolls() throws Exception {
        System.gc();
        Thread.sleep(100L);
        Assume.assumeThat((Object)OS.isLinux(), (Matcher)CoreMatchers.is((Object)true));
        int messagesPerThread = 10;
        try (SingleChronicleQueue queue = this.createQueue(this.currentTime::get);){
            long openFileHandleCount = this.countFileHandlesOfCurrentProcess();
            ArrayList<Path> fileHandlesAtStart = new ArrayList<Path>(this.lastFileHandles);
            LinkedList<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
            for (int i = 0; i < THREAD_COUNT; ++i) {
                futures.add(this.threadPool.submit(() -> {
                    for (int j = 0; j < 10; ++j) {
                        AppenderFileHandleLeakTest.writeMessage(j, queue);
                        this.currentTime.addAndGet(100L);
                    }
                    return Boolean.TRUE;
                }));
            }
            for (Future future : futures) {
                Assert.assertThat(future.get(1L, TimeUnit.MINUTES), (Matcher)CoreMatchers.is((Object)true));
            }
            this.waitForFileHandleCountToDrop(openFileHandleCount, fileHandlesAtStart);
            fileHandlesAtStart.clear();
            long tailerOpenFileHandleCount = this.countFileHandlesOfCurrentProcess();
            ExcerptTailer tailer = queue.createTailer();
            tailer.toStart();
            int expectedMessageCount = THREAD_COUNT * 10;
            int messageCount = 0;
            this.storeFileListener.reset();
            while (true) {
                DocumentContext ctx = tailer.readingDocument();
                Throwable throwable = null;
                try {
                    if (!ctx.isPresent()) break;
                    ++messageCount;
                    continue;
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (ctx == null) continue;
                    if (throwable != null) {
                        try {
                            ctx.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    ctx.close();
                    continue;
                }
                break;
            }
            Assert.assertThat((Object)messageCount, (Matcher)CoreMatchers.is((Object)expectedMessageCount));
            Assert.assertThat((String)this.storeFileListener.toString(), (Object)this.storeFileListener.releasedCount, (Matcher)CoreMatchers.is(AppenderFileHandleLeakTest.withinDelta(this.storeFileListener.acquiredCount, 3)));
            this.waitForFileHandleCountToDrop(tailerOpenFileHandleCount, fileHandlesAtStart);
        }
    }

    @After
    public void checkRegisteredBytes() throws Exception {
        this.threadPool.shutdownNow();
        Assert.assertTrue((boolean)this.threadPool.awaitTermination(5L, TimeUnit.SECONDS));
        BytesUtil.checkRegisteredBytes();
    }

    private void waitForFileHandleCountToDrop(long startFileHandleCount, List<Path> fileHandlesAtStart) throws IOException {
        long failAt = System.currentTimeMillis() + 60000L;
        while (System.currentTimeMillis() < failAt) {
            if (this.countFileHandlesOfCurrentProcess() >= startFileHandleCount + 5L) continue;
            return;
        }
        ArrayList<Path> fileHandlesAtEnd = new ArrayList<Path>(this.lastFileHandles);
        fileHandlesAtEnd.removeAll(fileHandlesAtStart);
        Assert.fail((String)("File handle count did not drop for queue in directory " + this.queuePath.getAbsolutePath() + ", remaining handles:\n" + fileHandlesAtEnd));
    }

    private long countFileHandlesOfCurrentProcess() throws IOException {
        this.lastFileHandles.clear();
        try (Stream<Path> fileHandles = Files.list(Paths.get("/proc/self/fd", new String[0]));){
            fileHandles.map(p -> {
                try {
                    return p.toRealPath(new LinkOption[0]);
                }
                catch (IOException e) {
                    return p;
                }
            }).filter(p -> p.toString().contains(this.queuePath.getName())).forEach(this.lastFileHandles::add);
        }
        fileHandles = Files.list(Paths.get("/proc/self/fd", new String[0]));
        var2_2 = null;
        try {
            long l = fileHandles.count();
            return l;
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (fileHandles != null) {
                if (var2_2 != null) {
                    try {
                        fileHandles.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    fileHandles.close();
                }
            }
        }
    }

    private SingleChronicleQueue createQueue(TimeProvider timeProvider) {
        return SingleChronicleQueueBuilder.binary((File)this.queuePath).rollCycle((RollCycle)RollCycles.TEST_SECONDLY).wireType(WireType.BINARY_LIGHT).storeFileListener((StoreFileListener)this.storeFileListener).timeProvider(timeProvider).build();
    }

    private static final class TrackingStoreFileListener
    implements StoreFileListener {
        private final Map<String, Integer> acquiredCounts = new HashMap<String, Integer>();
        private final Map<String, Integer> releasedCounts = new HashMap<String, Integer>();
        private int acquiredCount = 0;
        private int releasedCount = 0;

        private TrackingStoreFileListener() {
        }

        public void onAcquired(int cycle, File file) {
            this.acquiredCounts.put(file.getName(), this.acquiredCounts.getOrDefault(file.getName(), 0) + 1);
            ++this.acquiredCount;
        }

        public void onReleased(int cycle, File file) {
            this.releasedCounts.put(file.getName(), this.releasedCounts.getOrDefault(file.getName(), 0) + 1);
            ++this.releasedCount;
        }

        void reset() {
            this.acquiredCounts.clear();
            this.releasedCounts.clear();
            this.acquiredCount = 0;
            this.releasedCount = 0;
        }

        public String toString() {
            return String.format("%nacquired: %d%nreleased: %d%ndiffs:%n%s%n", this.acquiredCount, this.releasedCount, this.buildDiffs());
        }

        private String buildDiffs() {
            StringBuilder builder = new StringBuilder();
            builder.append("acquired but not released:\n");
            HashSet<String> keyDiff = new HashSet<String>(this.acquiredCounts.keySet());
            keyDiff.removeAll(this.releasedCounts.keySet());
            keyDiff.forEach(k -> builder.append((String)k).append("(").append(this.acquiredCounts.get(k)).append(")\n"));
            builder.append("released but not acquired:\n");
            keyDiff.clear();
            keyDiff.addAll(this.releasedCounts.keySet());
            keyDiff.removeAll(this.acquiredCounts.keySet());
            keyDiff.forEach(k -> builder.append((String)k).append("(").append(this.releasedCounts.get(k)).append(")\n"));
            return builder.toString();
        }
    }
}

