/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.FlakyTestRunner;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.AbstractReferenceCounted;
import net.openhft.chronicle.core.io.BackgroundResourceReleaser;
import net.openhft.chronicle.core.time.SystemTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueTestBase;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.GcControls;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.WireType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public final class AppenderFileHandleLeakTest
extends ChronicleQueueTestBase {
    private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;
    private static final int MESSAGES_PER_THREAD = 50;
    private static final SystemTimeProvider SYSTEM_TIME_PROVIDER = SystemTimeProvider.INSTANCE;
    private final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT, (ThreadFactory)new NamedThreadFactory("test"));
    private final List<String> lastFileHandles = new ArrayList<String>();
    private TrackingStoreFileListener storeFileListener = new TrackingStoreFileListener();
    private AtomicLong currentTime = new AtomicLong(System.currentTimeMillis());
    private File queuePath;

    private static Matcher<Integer> withinDelta(final int expected, final int delta) {
        return new TypeSafeMatcher<Integer>(){
            private int actual;

            protected boolean matchesSafely(Integer actual) {
                this.actual = actual;
                return Math.abs(actual - expected) < delta;
            }

            public void describeTo(Description description) {
                description.appendText(String.format("actual %d was not within %d of %d", this.actual, delta, expected));
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void readMessage(ChronicleQueue queue, boolean manuallyReleaseResources, Consumer<ExcerptTailer> refHolder) {
        Bytes bytes = Bytes.elasticByteBuffer();
        try (ExcerptTailer tailer = queue.createTailer();){
            while (bytes.isEmpty()) {
                tailer.toStart().readBytes(bytes);
            }
            refHolder.accept(tailer);
            Assert.assertTrue((Math.signum(bytes.readInt()) >= 0.0f ? 1 : 0) != 0);
            if (manuallyReleaseResources) {
                tailer.close();
            }
        }
        finally {
            bytes.releaseLast();
        }
    }

    private static void writeMessage(int j, ChronicleQueue queue) {
        ExcerptAppender appender = queue.acquireAppender();
        appender.writeBytes(b -> b.writeInt(j));
    }

    @Before
    public void setUp() {
        System.setProperty("chronicle.queue.disableFileShrinking", "true");
        this.queuePath = this.getTmpDir();
    }

    @Test
    public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() throws InterruptedException, IOException, TimeoutException, ExecutionException {
        Assume.assumeThat((Object)OS.isLinux(), (Matcher)CoreMatchers.is((Object)true));
        GcControls.requestGcCycle();
        Thread.sleep(100L);
        LinkedList gcGuard = new LinkedList();
        long openFileHandleCount = this.countFileHandlesOfCurrentProcess();
        ArrayList<String> fileHandlesAtStart = new ArrayList<String>(this.lastFileHandles);
        try (ChronicleQueue queue = this.createQueue((TimeProvider)SYSTEM_TIME_PROVIDER);){
            LinkedList<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
            for (int i = 0; i < THREAD_COUNT; ++i) {
                futures.add(this.threadPool.submit(() -> {
                    for (int j = 0; j < 50; ++j) {
                        AppenderFileHandleLeakTest.writeMessage(j, queue);
                        AppenderFileHandleLeakTest.readMessage(queue, false, gcGuard::add);
                    }
                    GcControls.requestGcCycle();
                    return Boolean.TRUE;
                }));
            }
            for (Future future : futures) {
                Assert.assertTrue((boolean)((Boolean)future.get(1L, TimeUnit.MINUTES)));
            }
            Assert.assertFalse((boolean)gcGuard.isEmpty());
            gcGuard.clear();
        }
        GcControls.waitForGcCycle();
        GcControls.waitForGcCycle();
        this.waitForFileHandleCountToDrop(openFileHandleCount, fileHandlesAtStart);
    }

    @Test
    @Ignore(value="Flaky")
    public void tailerResourcesCanBeReleasedManually() throws Exception {
        FlakyTestRunner.run(this::tailerResourcesCanBeReleasedManually0);
    }

    public void tailerResourcesCanBeReleasedManually0() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        Assume.assumeTrue((boolean)OS.isLinux());
        GcControls.requestGcCycle();
        Thread.sleep(100L);
        try (ChronicleQueue queue = this.createQueue((TimeProvider)SYSTEM_TIME_PROVIDER);){
            long openFileHandleCount = this.countFileHandlesOfCurrentProcess();
            ArrayList<String> fileHandlesAtStart = new ArrayList<String>(this.lastFileHandles);
            LinkedList<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
            LinkedList gcGuard = new LinkedList();
            for (int i = 0; i < THREAD_COUNT; ++i) {
                futures.add(this.threadPool.submit(() -> {
                    for (int j = 0; j < 50; ++j) {
                        AppenderFileHandleLeakTest.writeMessage(j, queue);
                        AppenderFileHandleLeakTest.readMessage(queue, true, gcGuard::add);
                    }
                    return Boolean.TRUE;
                }));
            }
            for (Future future : futures) {
                Assert.assertTrue((boolean)((Boolean)future.get(1L, TimeUnit.MINUTES)));
            }
            this.waitForFileHandleCountToDrop(openFileHandleCount, fileHandlesAtStart);
            Assert.assertFalse((boolean)gcGuard.isEmpty());
        }
    }

    @Test
    public void tailerShouldReleaseFileHandlesAsQueueRolls() throws IOException, InterruptedException, ExecutionException, TimeoutException {
        Assume.assumeThat((Object)OS.isLinux(), (Matcher)CoreMatchers.is((Object)true));
        System.gc();
        Thread.sleep(100L);
        int messagesPerThread = 10;
        try (ChronicleQueue queue = this.createQueue(this.currentTime::get);){
            long openFileHandleCount = this.countFileHandlesOfCurrentProcess();
            ArrayList<String> fileHandlesAtStart = new ArrayList<String>(this.lastFileHandles);
            for (int j = 0; j < 10; ++j) {
                AppenderFileHandleLeakTest.writeMessage(j, queue);
                this.currentTime.addAndGet(500L);
            }
            this.waitForFileHandleCountToDrop(openFileHandleCount, fileHandlesAtStart);
            fileHandlesAtStart.clear();
            long tailerOpenFileHandleCount = this.countFileHandlesOfCurrentProcess();
            int acquiredBefore = this.storeFileListener.acquiredCounts.size();
            this.storeFileListener.reset();
            ExcerptTailer tailer = queue.createTailer();
            tailer.toStart();
            int messageCount = 0;
            int notFoundAttempts = 5;
            while (true) {
                DocumentContext ctx = tailer.readingDocument();
                Throwable throwable = null;
                try {
                    if (!ctx.isPresent()) {
                        if (--notFoundAttempts <= 0) break;
                        continue;
                    }
                    ++messageCount;
                    continue;
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (ctx == null) continue;
                    if (throwable != null) {
                        try {
                            ctx.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    ctx.close();
                    continue;
                }
                break;
            }
            Assert.assertEquals((long)10L, (long)messageCount);
            BackgroundResourceReleaser.releasePendingResources();
            LOGGER.info("storeFileListener {}", (Object)this.storeFileListener);
            Assert.assertEquals((long)acquiredBefore, (long)this.storeFileListener.acquiredCounts.size());
            this.waitForFileHandleCountToDrop(tailerOpenFileHandleCount - 1L, fileHandlesAtStart);
        }
    }

    @After
    public void checkRegisteredBytes() throws InterruptedException {
        this.threadPool.shutdownNow();
        Assert.assertTrue((boolean)this.threadPool.awaitTermination(5L, TimeUnit.SECONDS));
        AbstractReferenceCounted.assertReferencesReleased();
    }

    private void waitForFileHandleCountToDrop(long startFileHandleCount, List<String> fileHandlesAtStart) throws IOException {
        long failAt = System.currentTimeMillis() + 10000L;
        while (System.currentTimeMillis() < failAt) {
            System.gc();
            BackgroundResourceReleaser.releasePendingResources();
            if (this.countFileHandlesOfCurrentProcess() <= startFileHandleCount + 2L) {
                return;
            }
            Thread.yield();
        }
        ArrayList<String> fileHandlesAtEnd = new ArrayList<String>(this.lastFileHandles);
        fileHandlesAtEnd.removeAll(fileHandlesAtStart);
        Assert.fail((String)("File handle count did not drop for queue in directory " + this.queuePath.getAbsolutePath() + ", remaining handles:\n" + fileHandlesAtEnd));
    }

    private long countFileHandlesOfCurrentProcess() throws IOException {
        this.lastFileHandles.clear();
        try (Stream<Path> fileHandles = Files.list(Paths.get("/proc/self/fd", new String[0]));){
            fileHandles.map(p -> {
                try {
                    return p.toRealPath(new LinkOption[0]);
                }
                catch (IOException e) {
                    return p;
                }
            }).filter(p -> p.toString().contains(this.queuePath.getName())).map(p -> p.toFile().getName()).forEach(this.lastFileHandles::add);
        }
        fileHandles = Files.list(Paths.get("/proc/self/fd", new String[0]));
        var2_2 = null;
        try {
            long l = fileHandles.count();
            return l;
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (fileHandles != null) {
                if (var2_2 != null) {
                    try {
                        fileHandles.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    fileHandles.close();
                }
            }
        }
    }

    private ChronicleQueue createQueue(TimeProvider timeProvider) {
        return SingleChronicleQueueBuilder.binary((File)this.queuePath).rollCycle((RollCycle)RollCycles.TEST_SECONDLY).wireType(WireType.BINARY_LIGHT).storeFileListener((StoreFileListener)this.storeFileListener).timeProvider(timeProvider).build();
    }

    private static final class TrackingStoreFileListener
    implements StoreFileListener {
        private final Map<String, Integer> acquiredCounts = new HashMap<String, Integer>();
        private final Map<String, Integer> releasedCounts = new HashMap<String, Integer>();

        private TrackingStoreFileListener() {
        }

        public void onAcquired(int cycle, File file) {
            this.acquiredCounts.put(file.getName(), this.acquiredCounts.getOrDefault(file.getName(), 0) + 1);
        }

        public void onReleased(int cycle, File file) {
            this.releasedCounts.put(file.getName(), this.releasedCounts.getOrDefault(file.getName(), 0) + 1);
        }

        void reset() {
            this.acquiredCounts.clear();
            this.releasedCounts.clear();
        }

        public String toString() {
            return String.format("%nacquired: %d%nreleased: %d%ndiffs:%n%s%n", this.acquiredCounts.size(), this.releasedCounts.size(), this.buildDiffs());
        }

        private String buildDiffs() {
            StringBuilder builder = new StringBuilder();
            builder.append("acquired but not released:\n");
            HashSet<String> keyDiff = new HashSet<String>(this.acquiredCounts.keySet());
            keyDiff.removeAll(this.releasedCounts.keySet());
            keyDiff.forEach(k -> builder.append((String)k).append("(").append(this.acquiredCounts.get(k)).append(")\n"));
            builder.append("released but not acquired:\n");
            keyDiff.clear();
            keyDiff.addAll(this.releasedCounts.keySet());
            keyDiff.removeAll(this.acquiredCounts.keySet());
            keyDiff.forEach(k -> builder.append((String)k).append("(").append(this.releasedCounts.get(k)).append(")\n"));
            return builder.toString();
        }
    }
}

