/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.impl.single;

import java.io.File;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.BackgroundResourceReleaser;
import net.openhft.chronicle.core.time.SystemTimeProvider;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.MappedFileUtil;
import net.openhft.chronicle.queue.QueueTestCommon;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.GcControls;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.rollcycles.TestRollCycles;
import net.openhft.chronicle.testframework.FlakyTestRunner;
import net.openhft.chronicle.testframework.Waiters;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.WireType;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

public final class AppenderFileHandleLeakTest
extends QueueTestCommon {
    private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;
    private static final int MESSAGES_PER_THREAD = 50;
    private static final SystemTimeProvider SYSTEM_TIME_PROVIDER = SystemTimeProvider.INSTANCE;
    private static final RollCycle ROLL_CYCLE = TestRollCycles.TEST_SECONDLY;
    private static final DateTimeFormatter ROLL_CYCLE_FORMATTER = DateTimeFormatter.ofPattern(ROLL_CYCLE.format()).withZone(ZoneId.of("UTC"));
    private final ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT, (ThreadFactory)new NamedThreadFactory("test"));
    private final TrackingStoreFileListener storeFileListener = new TrackingStoreFileListener();
    private final AtomicLong currentTime = new AtomicLong(System.currentTimeMillis());
    private File queuePath;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void readMessage(ChronicleQueue queue, boolean manuallyReleaseResources, Consumer<ExcerptTailer> refHolder) {
        Bytes bytes = Bytes.elasticByteBuffer();
        try (ExcerptTailer tailer = queue.createTailer();){
            while (bytes.isEmpty()) {
                tailer.toStart().readBytes(bytes);
            }
            refHolder.accept(tailer);
            Assert.assertTrue((Math.signum(bytes.readInt()) >= 0.0f ? 1 : 0) != 0);
            if (manuallyReleaseResources) {
                tailer.close();
            }
        }
        finally {
            bytes.releaseLast();
        }
    }

    private static void writeMessage(int j, ChronicleQueue queue) {
        ExcerptAppender appender = queue.acquireAppender();
        appender.writeBytes(b -> {
            Bytes cfr_ignored_0 = (Bytes)b.writeInt(j);
        });
    }

    @Before
    public void setUp() {
        Assume.assumeTrue((boolean)OS.isLinux());
        System.setProperty("chronicle.queue.disableFileShrinking", "true");
        this.queuePath = this.getTmpDir();
    }

    @Test
    public void appenderAndTailerResourcesShouldBeCleanedUpByGarbageCollection() throws InterruptedException, TimeoutException, ExecutionException {
        try (ChronicleQueue queue = this.createQueue((TimeProvider)SYSTEM_TIME_PROVIDER);){
            GcControls.requestGcCycle();
            Thread.sleep(100L);
            LinkedList gcGuard = new LinkedList();
            LinkedList<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
            for (int i = 0; i < THREAD_COUNT; ++i) {
                futures.add(this.threadPool.submit(() -> {
                    for (int j = 0; j < 50; ++j) {
                        AppenderFileHandleLeakTest.writeMessage(j, queue);
                        AppenderFileHandleLeakTest.readMessage(queue, false, gcGuard::add);
                    }
                    GcControls.requestGcCycle();
                    return Boolean.TRUE;
                }));
            }
            for (Future future : futures) {
                Assert.assertTrue((boolean)((Boolean)future.get(1L, TimeUnit.MINUTES)));
            }
            Assert.assertFalse((boolean)gcGuard.isEmpty());
            gcGuard.clear();
        }
        GcControls.waitForGcCycle();
        GcControls.waitForGcCycle();
        Assert.assertTrue((boolean)this.queueFilesAreAllClosed());
    }

    @Test
    public void tailerResourcesCanBeReleasedManually() throws Exception {
        FlakyTestRunner.builder(this::tailerResourcesCanBeReleasedManually0).build().run();
    }

    public void tailerResourcesCanBeReleasedManually0() throws InterruptedException, TimeoutException, ExecutionException {
        GcControls.requestGcCycle();
        Thread.sleep(100L);
        try (ChronicleQueue queue = this.createQueue((TimeProvider)SYSTEM_TIME_PROVIDER);){
            LinkedList<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();
            LinkedList gcGuard = new LinkedList();
            for (int i = 0; i < THREAD_COUNT; ++i) {
                futures.add(this.threadPool.submit(() -> {
                    for (int j = 0; j < 50; ++j) {
                        AppenderFileHandleLeakTest.writeMessage(j, queue);
                        AppenderFileHandleLeakTest.readMessage(queue, true, gcGuard::add);
                    }
                    return Boolean.TRUE;
                }));
            }
            for (Future future : futures) {
                Assert.assertTrue((boolean)((Boolean)future.get(1L, TimeUnit.MINUTES)));
            }
            Assert.assertFalse((boolean)gcGuard.isEmpty());
        }
        Assert.assertTrue((boolean)this.queueFilesAreAllClosed());
    }

    @Test
    public void tailerShouldReleaseFileHandlesAsQueueRolls() throws InterruptedException {
        System.gc();
        Thread.sleep(100L);
        int messagesPerThread = 10;
        try (ChronicleQueue queue = this.createQueue(this.currentTime::get);){
            for (int j = 0; j < 10; ++j) {
                AppenderFileHandleLeakTest.writeMessage(j, queue);
                this.currentTime.addAndGet(500L);
            }
            BackgroundResourceReleaser.releasePendingResources();
            int acquiredBefore = this.storeFileListener.acquiredCounts.size();
            this.storeFileListener.reset();
            ExcerptTailer tailer = queue.createTailer();
            tailer.toStart();
            int messageCount = 0;
            int notFoundAttempts = 5;
            while (true) {
                DocumentContext ctx = tailer.readingDocument();
                Throwable throwable = null;
                try {
                    if (!ctx.isPresent()) {
                        if (--notFoundAttempts <= 0) break;
                        continue;
                    }
                    ++messageCount;
                    continue;
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (ctx == null) continue;
                    if (throwable != null) {
                        try {
                            ctx.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    ctx.close();
                    continue;
                }
                break;
            }
            Assert.assertEquals((long)10L, (long)messageCount);
            BackgroundResourceReleaser.releasePendingResources();
            Jvm.debug().on(this.getClass(), "storeFileListener " + this.storeFileListener);
            Assert.assertEquals((long)acquiredBefore, (long)this.storeFileListener.acquiredCounts.size());
        }
        Assert.assertTrue((boolean)this.queueFilesAreAllClosed());
    }

    @Test
    public void appenderShouldOnlyKeepCurrentRollCycleOpen_deflaked() {
        FlakyTestRunner.builder(this::appenderShouldOnlyKeepCurrentRollCycleOpen).withMaxIterations(3).build().run();
    }

    public void appenderShouldOnlyKeepCurrentRollCycleOpen() {
        AtomicLong timeProvider = new AtomicLong(1661323015000L);
        try (ChronicleQueue queue = this.createQueue(timeProvider::get);){
            for (int j = 0; j < 10; ++j) {
                AppenderFileHandleLeakTest.writeMessage(j, queue);
                this.assertOnlyCurrentRollCycleIsOpen(timeProvider.get());
                timeProvider.addAndGet(1000L);
            }
        }
    }

    @Test
    public void tailerShouldOnlyKeepCurrentRollCycleOpen_deflaked() {
        FlakyTestRunner.builder(this::tailerShouldOnlyKeepCurrentRollCycleOpen).withMaxIterations(3).build().run();
    }

    public void tailerShouldOnlyKeepCurrentRollCycleOpen() {
        long startTime = 1661323015000L;
        AtomicLong timeProvider = new AtomicLong(1661323015000L);
        int messageCount = 10;
        try (ChronicleQueue queue = this.createQueue(timeProvider::get);){
            for (int j = 0; j < 10; ++j) {
                AppenderFileHandleLeakTest.writeMessage(j, queue);
                timeProvider.addAndGet(1000L);
            }
        }
        Assert.assertTrue((boolean)this.queueFilesAreAllClosed());
        timeProvider.set(1661323015000L);
        queue = this.createQueue(timeProvider::get);
        var6_5 = null;
        try (ExcerptTailer tailer = queue.createTailer();){
            IntStream.range(0, 10).forEach(i -> {
                tailer.readBytes(b -> Assert.assertEquals((long)i, (long)b.readInt()));
                this.assertOnlyCurrentRollCycleIsOpen(timeProvider.get());
                timeProvider.addAndGet(1000L);
            });
        }
        catch (Throwable throwable) {
            var6_5 = throwable;
            throw throwable;
        }
        finally {
            if (queue != null) {
                if (var6_5 != null) {
                    try {
                        queue.close();
                    }
                    catch (Throwable throwable) {
                        var6_5.addSuppressed(throwable);
                    }
                } else {
                    queue.close();
                }
            }
        }
    }

    private void assertOnlyCurrentRollCycleIsOpen(long timestamp) {
        BackgroundResourceReleaser.releasePendingResources();
        GcControls.waitForGcCycle();
        String currentRollCycleName = ROLL_CYCLE_FORMATTER.format(Instant.ofEpochMilli(timestamp)) + ".cq4";
        String absolutePathToCurrentRollCycle = this.queuePath.toPath().toAbsolutePath().resolve(currentRollCycleName).toString();
        Waiters.builder(() -> this.onlyCurrentRollCycleIsOpen(absolutePathToCurrentRollCycle)).message("Files that are not the table store or the current roll cycle (" + currentRollCycleName + ") remain open").maxTimeToWaitMs(5500L).checkIntervalMs(1000L).run();
    }

    private boolean onlyCurrentRollCycleIsOpen(String absolutePathToCurrentRollCycle) {
        boolean onlyCurrentFileIsOpen;
        Set<String> mappedFiles = MappedFileUtil.getAllMappedFiles();
        List<String> rollCyclesOpen = mappedFiles.stream().filter(lsofLine -> lsofLine.contains(this.queuePath.getAbsolutePath()) && !lsofLine.endsWith("metadata.cq4t")).collect(Collectors.toList());
        boolean bl = onlyCurrentFileIsOpen = rollCyclesOpen.contains(absolutePathToCurrentRollCycle) && rollCyclesOpen.size() == 1;
        if (!onlyCurrentFileIsOpen) {
            rollCyclesOpen.forEach(line -> Jvm.warn().on(AppenderFileHandleLeakTest.class, "Found file open:\n" + line));
        }
        return onlyCurrentFileIsOpen;
    }

    @Override
    public void assertReferencesReleased() {
        this.threadPool.shutdownNow();
        try {
            Assert.assertTrue((boolean)this.threadPool.awaitTermination(5L, TimeUnit.SECONDS));
        }
        catch (InterruptedException e) {
            throw new AssertionError((Object)e);
        }
        super.assertReferencesReleased();
    }

    private boolean queueFilesAreAllClosed() {
        GcControls.waitForGcCycle();
        List<String> openQueueFiles = MappedFileUtil.getAllMappedFiles().stream().filter(str -> str.contains(this.queuePath.getAbsolutePath())).collect(Collectors.toList());
        openQueueFiles.forEach(qf -> Jvm.error().on(AppenderFileHandleLeakTest.class, "Found open queue file: " + qf));
        return openQueueFiles.isEmpty();
    }

    private ChronicleQueue createQueue(TimeProvider timeProvider) {
        return SingleChronicleQueueBuilder.binary((File)this.queuePath).rollCycle((RollCycle)TestRollCycles.TEST_SECONDLY).wireType(WireType.BINARY_LIGHT).storeFileListener((StoreFileListener)this.storeFileListener).timeProvider(timeProvider).build();
    }

    private static final class TrackingStoreFileListener
    implements StoreFileListener {
        private final Map<String, Integer> acquiredCounts = new HashMap<String, Integer>();
        private final Map<String, Integer> releasedCounts = new HashMap<String, Integer>();

        private TrackingStoreFileListener() {
        }

        public void onAcquired(int cycle, File file) {
            this.acquiredCounts.put(file.getName(), this.acquiredCounts.getOrDefault(file.getName(), 0) + 1);
        }

        public void onReleased(int cycle, File file) {
            this.releasedCounts.put(file.getName(), this.releasedCounts.getOrDefault(file.getName(), 0) + 1);
        }

        void reset() {
            this.acquiredCounts.clear();
            this.releasedCounts.clear();
        }

        public String toString() {
            return String.format("%nacquired: %d%nreleased: %d%ndiffs:%n%s%n", this.acquiredCounts.size(), this.releasedCounts.size(), this.buildDiffs());
        }

        private String buildDiffs() {
            StringBuilder builder = new StringBuilder();
            builder.append("acquired but not released:\n");
            HashSet<String> keyDiff = new HashSet<String>(this.acquiredCounts.keySet());
            keyDiff.removeAll(this.releasedCounts.keySet());
            keyDiff.forEach(k -> builder.append((String)k).append("(").append(this.acquiredCounts.get(k)).append(")\n"));
            builder.append("released but not acquired:\n");
            keyDiff.clear();
            keyDiff.addAll(this.releasedCounts.keySet());
            keyDiff.removeAll(this.acquiredCounts.keySet());
            keyDiff.forEach(k -> builder.append((String)k).append("(").append(this.releasedCounts.get(k)).append(")\n"));
            return builder.toString();
        }
    }
}

