/*
 * Decompiled with CFR 0.152.
 */
package io.airlift.log;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import com.google.common.collect.Multisets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.Threads;
import io.airlift.log.BufferedHandler;
import io.airlift.log.MessageOutput;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
import java.util.logging.ErrorManager;
import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import javax.annotation.Nullable;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IteratorAssert;
import org.assertj.core.api.ListAssert;
import org.testng.annotations.Test;

public class TestBufferedHandler {
    @Test
    public void testIdleFlush() throws Exception {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler((MessageOutput)testingMessageOutput, TestBufferedHandler.testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create((double)10.0), Duration.ofSeconds(5L), 100, 100);
        bufferedHandler.initialize();
        LogRecord record = TestBufferedHandler.logRecord(Level.INFO, "TestLogger", "Test");
        bufferedHandler.publish(record);
        testingMessageOutput.awaitFirstFlushAttempt(5L, TimeUnit.SECONDS);
        ((ListAssert)Assertions.assertThat(testingMessageOutput.getFlushedMessages()).as("Buffer should flush when idle, even if still less than recordFlushCount", new Object[0])).containsExactly((Object[])new String[]{TestBufferedHandler.testingFormatter().format(record)});
        bufferedHandler.close();
    }

    @Test
    public void testLoggingSequence() {
        int maxBufferSize = 100;
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler((MessageOutput)testingMessageOutput, TestBufferedHandler.testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create((double)10.0), Duration.ofSeconds(5L), 100, maxBufferSize);
        bufferedHandler.initialize();
        ArrayList<LogRecord> logRecords = new ArrayList<LogRecord>();
        for (int i = 0; i < maxBufferSize - 1; ++i) {
            LogRecord logRecord = TestBufferedHandler.logRecord(Level.INFO, "TestLogger", String.valueOf(i));
            logRecords.add(logRecord);
            bufferedHandler.publish(logRecord);
        }
        bufferedHandler.close();
        ((ListAssert)Assertions.assertThat(testingMessageOutput.getFlushedMessages()).as("Every record should be present if the buffer size is not exceeded", new Object[0])).containsExactlyElementsOf((Iterable)logRecords.stream().map(record -> TestBufferedHandler.testingFormatter().format((LogRecord)record)).collect(ImmutableList.toImmutableList()));
    }

    @Test
    public void testLoggingOverloadSingleThread() {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler((MessageOutput)testingMessageOutput, TestBufferedHandler.testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create((double)10.0), Duration.ofSeconds(5L), 2, 2);
        bufferedHandler.initialize();
        for (int i = 0; i < 1000; ++i) {
            bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "TestLogger", String.valueOf(i)));
        }
        bufferedHandler.close();
        ((AbstractLongAssert)Assertions.assertThat((long)bufferedHandler.getDroppedMessages()).as("Test control check to make sure that it is dropping some messages", new Object[0])).isGreaterThan(0L);
        int actualDropCount = TestBufferedHandler.assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "TestLogger", 1000, String::valueOf);
        Assertions.assertThat((int)actualDropCount).isEqualTo(bufferedHandler.getDroppedMessages());
    }

    @Test
    public void testLoggingOverloadMultiThread() {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler((MessageOutput)testingMessageOutput, TestBufferedHandler.testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create((double)10.0), Duration.ofSeconds(5L), 2, 2);
        bufferedHandler.initialize();
        ExecutorService executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"submitter-%s"));
        executor.execute(() -> {
            for (int i = 0; i < 1000; ++i) {
                bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "A-TestLogger", String.valueOf(i)));
            }
        });
        executor.execute(() -> {
            for (int i = 0; i < 1000; ++i) {
                bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "B-TestLogger", String.valueOf(i)));
            }
        });
        Assertions.assertThat((boolean)MoreExecutors.shutdownAndAwaitTermination((ExecutorService)executor, (long)10L, (TimeUnit)TimeUnit.SECONDS)).isTrue();
        bufferedHandler.close();
        ((AbstractLongAssert)Assertions.assertThat((long)bufferedHandler.getDroppedMessages()).as("Test control check to make sure that it is dropping some messages", new Object[0])).isGreaterThan(0L);
        int actualDropCount = TestBufferedHandler.assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "A-TestLogger", 1000, String::valueOf);
        Assertions.assertThat((int)(actualDropCount += TestBufferedHandler.assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "B-TestLogger", 1000, String::valueOf))).isEqualTo(bufferedHandler.getDroppedMessages());
    }

    @Test
    public void testMultiThreadErrorRetry() throws InterruptedException, TimeoutException {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput().setThrowOnWrite(true).setThrowOnFlush(true);
        BufferedHandler bufferedHandler = new BufferedHandler((MessageOutput)testingMessageOutput, TestBufferedHandler.testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create((double)10.0), Duration.ofSeconds(5L), 50, 100);
        bufferedHandler.initialize();
        ExecutorService executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"submitter-%s"));
        executor.execute(() -> {
            for (int i = 0; i < 1000; ++i) {
                bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "A-TestLogger", String.valueOf(i)));
            }
        });
        executor.execute(() -> {
            for (int i = 0; i < 1000; ++i) {
                bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "B-TestLogger", String.valueOf(i)));
            }
        });
        testingMessageOutput.awaitFirstWriteAttempt(5L, TimeUnit.SECONDS);
        testingMessageOutput.setThrowOnWrite(false);
        testingMessageOutput.awaitFirstFlushAttempt(5L, TimeUnit.SECONDS);
        testingMessageOutput.setThrowOnFlush(false);
        Assertions.assertThat((boolean)MoreExecutors.shutdownAndAwaitTermination((ExecutorService)executor, (long)10L, (TimeUnit)TimeUnit.SECONDS)).isTrue();
        bufferedHandler.close();
        int actualDropCount = TestBufferedHandler.assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "A-TestLogger", 1000, String::valueOf);
        Assertions.assertThat((int)(actualDropCount += TestBufferedHandler.assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "B-TestLogger", 1000, String::valueOf))).isEqualTo(bufferedHandler.getDroppedMessages());
    }

    @Test
    public void testCapacityErrorRetryDuringClose() throws InterruptedException, TimeoutException, ExecutionException {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput().setThrowOnWrite(true);
        BufferedHandler bufferedHandler = new BufferedHandler((MessageOutput)testingMessageOutput, TestBufferedHandler.testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create((double)10.0), Duration.ofSeconds(5L), 10, 1);
        bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "A-TestLogger", "1"));
        bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "B-TestLogger", "1"));
        bufferedHandler.initialize();
        ExecutorService executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"submitter-%s"));
        Future<?> spamFutureA = executor.submit(() -> {
            for (int i = 0; i < 100; ++i) {
                bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "Spam-TestLogger", String.valueOf(i)));
            }
        });
        Future<?> spamFutureB = executor.submit(() -> {
            for (int i = 0; i < 100; ++i) {
                bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "Spam-TestLogger", String.valueOf(i)));
            }
        });
        Future<?> closeFuture = executor.submit(() -> ((BufferedHandler)bufferedHandler).close());
        while (!bufferedHandler.isTerminalMessageDequeued()) {
            Uninterruptibles.sleepUninterruptibly((long)20L, (TimeUnit)TimeUnit.MILLISECONDS);
        }
        testingMessageOutput.getNextWriteAttemptLatch().await(5L, TimeUnit.SECONDS);
        testingMessageOutput.getNextWriteAttemptLatch().await(5L, TimeUnit.SECONDS);
        testingMessageOutput.setThrowOnWrite(false);
        spamFutureA.get(5L, TimeUnit.SECONDS);
        spamFutureB.get(5L, TimeUnit.SECONDS);
        closeFuture.get(5L, TimeUnit.SECONDS);
        TestBufferedHandler.assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "A-TestLogger", 1, String::valueOf);
        TestBufferedHandler.assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "B-TestLogger", 1, String::valueOf);
    }

    @Test
    public void testIgnoreWriteAfterClose() {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler((MessageOutput)testingMessageOutput, TestBufferedHandler.testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create((double)10.0), Duration.ofSeconds(5L), 2, 2);
        bufferedHandler.initialize();
        LogRecord record = TestBufferedHandler.logRecord(Level.INFO, "TestLogger", "Test message");
        bufferedHandler.publish(record);
        bufferedHandler.close();
        bufferedHandler.publish(TestBufferedHandler.logRecord(Level.INFO, "TestLogger", "Test message after close"));
        ((AbstractLongAssert)Assertions.assertThat((long)bufferedHandler.getDroppedMessages()).as("Messages after close are ignored and not counted as dropped", new Object[0])).isZero();
        Assertions.assertThat(testingMessageOutput.getFlushedMessages()).containsExactly((Object[])new String[]{TestBufferedHandler.testingFormatter().format(record)});
    }

    private static Formatter testingFormatter() {
        return new Formatter(){

            @Override
            public String format(LogRecord record) {
                return new LogEntry(record.getLoggerName(), record.getMessage()).serialize();
            }
        };
    }

    private static LogRecord logRecord(Level level, String loggerName, String message) {
        LogRecord record = new LogRecord(level, message);
        record.setLoggerName(loggerName);
        return record;
    }

    private static int assertLogStreamContents(List<String> actualMessages, String targetLoggerName, int expectedMessageCount, IntFunction<String> indexToExpectedMessage) {
        int actualDropCount = 0;
        Iterator<EntryOrDropSummary> iterator = TestBufferedHandler.deserializeMessages(actualMessages, (Predicate<String>)((Predicate)targetLoggerName::equals)).iterator();
        HashMultiset currentDropSummary = HashMultiset.create();
        for (int i = 0; i < expectedMessageCount; ++i) {
            if (currentDropSummary.isEmpty()) {
                ((IteratorAssert)Assertions.assertThat(iterator).as("More entries expected in the result", new Object[0])).hasNext();
                EntryOrDropSummary entryOrDropSummary = iterator.next();
                if (entryOrDropSummary.entry() != null) {
                    ((AbstractStringAssert)Assertions.assertThat((String)entryOrDropSummary.entry().message()).as("Verify that the message contents match the value sequence", new Object[0])).isEqualTo(indexToExpectedMessage.apply(i));
                    continue;
                }
                currentDropSummary = HashMultiset.create(entryOrDropSummary.dropSummary());
            }
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)currentDropSummary.remove((Object)targetLoggerName)).as("Expected to contain this logger name next in sequence", new Object[0])).isTrue();
            ++actualDropCount;
        }
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)currentDropSummary).as("Should not have more drop summary entries than total submitted", new Object[0])).isEmpty();
        return actualDropCount;
    }

    private static <T> Map<T, Integer> toMap(Multiset<T> multiset) {
        return (Map)multiset.entrySet().stream().collect(ImmutableMap.toImmutableMap(Multiset.Entry::getElement, Multiset.Entry::getCount));
    }

    private static String serializeMultiset(Multiset<String> multiset) {
        return Joiner.on((char)'\n').withKeyValueSeparator('=').join(TestBufferedHandler.toMap(multiset));
    }

    private static Multiset<String> deserializeMultiset(String serializedMultimap) {
        Map split = Splitter.on((char)'\n').withKeyValueSeparator('=').split((CharSequence)serializedMultimap);
        ImmutableMultiset.Builder builder = ImmutableMultiset.builder();
        for (Map.Entry entry : split.entrySet()) {
            builder.addCopies((Object)((String)entry.getKey()), Integer.parseInt((String)entry.getValue()));
        }
        return builder.build();
    }

    private static List<EntryOrDropSummary> deserializeMessages(List<String> messages, Predicate<String> loggerNameFilter) {
        return (List)messages.stream().map(logEntryString -> {
            LogEntry logEntry = LogEntry.deserialize(logEntryString);
            return logEntry.loggerName().equals(BufferedHandler.class.getName()) ? EntryOrDropSummary.forDropSummary((Multiset<String>)Multisets.filter(TestBufferedHandler.deserializeMultiset(logEntry.message()), (Predicate)loggerNameFilter)) : EntryOrDropSummary.forEntry(logEntry);
        }).filter(entry -> entry.entry() != null || !entry.dropSummary().isEmpty()).filter(entry -> entry.dropSummary() != null || loggerNameFilter.apply((Object)entry.entry().loggerName())).collect(ImmutableList.toImmutableList());
    }

    private static class TestingMessageOutput
    implements MessageOutput {
        private final AtomicBoolean closed = new AtomicBoolean();
        private final Queue<String> writeMessages = new LinkedBlockingQueue<String>();
        private final Queue<String> flushedMessages = new LinkedBlockingQueue<String>();
        private final BlockingQueue<CountDownLatch> nextWriteAttemptLatches = new LinkedBlockingQueue<CountDownLatch>();
        private final CountDownLatch firstWriteAttemptLatch = new CountDownLatch(1);
        private final CountDownLatch firstFlushAttemptLatch = new CountDownLatch(1);
        private final AtomicBoolean throwOnWrite = new AtomicBoolean();
        private final AtomicBoolean throwOnFlush = new AtomicBoolean();

        private TestingMessageOutput() {
        }

        public TestingMessageOutput setThrowOnWrite(boolean shouldThrow) {
            this.throwOnWrite.set(shouldThrow);
            return this;
        }

        public TestingMessageOutput setThrowOnFlush(boolean shouldThrow) {
            this.throwOnFlush.set(shouldThrow);
            return this;
        }

        public CountDownLatch getNextWriteAttemptLatch() {
            CountDownLatch latch = new CountDownLatch(1);
            this.nextWriteAttemptLatches.add(latch);
            return latch;
        }

        public void awaitFirstWriteAttempt(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
            if (!this.firstWriteAttemptLatch.await(timeout, timeUnit)) {
                throw new TimeoutException();
            }
        }

        public void awaitFirstFlushAttempt(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
            if (!this.firstFlushAttemptLatch.await(timeout, timeUnit)) {
                throw new TimeoutException();
            }
        }

        public List<String> getFlushedMessages() {
            return ImmutableList.copyOf(this.flushedMessages);
        }

        private static void signal(BlockingQueue<CountDownLatch> latches) {
            ArrayList drained = new ArrayList();
            latches.drainTo(drained);
            drained.forEach(CountDownLatch::countDown);
        }

        public void writeMessage(byte[] message) {
            try {
                Preconditions.checkState((!this.closed.get() ? 1 : 0) != 0, (Object)"Already closed");
                if (this.throwOnWrite.get()) {
                    throw new RuntimeException();
                }
                this.writeMessages.offer(new String(message, StandardCharsets.UTF_8));
            }
            finally {
                this.firstWriteAttemptLatch.countDown();
                TestingMessageOutput.signal(this.nextWriteAttemptLatches);
            }
        }

        public void flush() {
            try {
                Preconditions.checkState((!this.closed.get() ? 1 : 0) != 0, (Object)"Already closed");
                if (this.throwOnFlush.get()) {
                    throw new RuntimeException();
                }
                this.flushInternal();
            }
            finally {
                this.firstFlushAttemptLatch.countDown();
            }
        }

        public void close() {
            if (!this.closed.compareAndSet(false, true)) {
                return;
            }
            this.flushInternal();
        }

        private void flushInternal() {
            String message;
            while ((message = this.writeMessages.poll()) != null) {
                this.flushedMessages.add(message);
            }
        }
    }

    private record EntryOrDropSummary(@Nullable LogEntry entry, @Nullable Multiset<String> dropSummary) {
        private EntryOrDropSummary {
            Preconditions.checkArgument((entry == null != (dropSummary == null) ? 1 : 0) != 0, (Object)"Exactly one of the values must be non-null");
            dropSummary = dropSummary == null ? null : ImmutableMultiset.copyOf(dropSummary);
        }

        public static EntryOrDropSummary forEntry(LogEntry entry) {
            return new EntryOrDropSummary(entry, null);
        }

        public static EntryOrDropSummary forDropSummary(Multiset<String> dropSummary) {
            return new EntryOrDropSummary(null, dropSummary);
        }
    }

    private record LogEntry(String loggerName, String message) {
        private LogEntry {
            Objects.requireNonNull(loggerName, "loggerName is null");
            Objects.requireNonNull(message, "message is null");
        }

        public String serialize() {
            return Joiner.on((char)':').join((Object)this.loggerName, (Object)this.message, new Object[0]);
        }

        public static LogEntry deserialize(String serialized) {
            List splits = Splitter.on((char)':').splitToList((CharSequence)serialized);
            return new LogEntry((String)splits.get(0), (String)splits.get(1));
        }
    }
}

