package io.datarouter.util.buffer;

import io.datarouter.instrumentation.count.Counters;
import io.datarouter.util.concurrent.ThreadTool;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/datarouter/util/buffer/FlushingSetBuffer.class */
public class FlushingSetBuffer<T> {
    private static final Logger logger = LoggerFactory.getLogger(FlushingSetBuffer.class);
    private static final Duration MAX_BUFFER_FLUSH_CHECK_INTERVAL = Duration.ofMillis(100);
    private final Set<T> set = new HashSet();
    private final String name;
    private final int maxSize;
    private final Duration maxBufferedDuration;
    private final Consumer<Set<T>> flushConsumer;
    private long timeOfBufferExpirationMs;
    private Clock clock;

    /* loaded from: input_file:io/datarouter/util/buffer/FlushingSetBuffer$FlushingSetBufferBuilder.class */
    public static class FlushingSetBufferBuilder<T> {
        private static final Duration DEFAULT_MAX_BUFFERED_DURATION = Duration.ofSeconds(30);
        private static final int DEFAULT_MAX_SIZE = 10000;
        private String name = FlushingSetBuffer.class.getSimpleName();
        private Duration maxBufferedDuration = DEFAULT_MAX_BUFFERED_DURATION;
        private int maxSize = DEFAULT_MAX_SIZE;
        private Clock clock = Clock.systemDefaultZone();
        private Consumer<Set<T>> flushConsumer;

        public FlushingSetBufferBuilder<T> withName(String str) {
            this.name = str;
            return this;
        }

        public FlushingSetBufferBuilder<T> withMaxBufferedDuration(Duration duration) {
            this.maxBufferedDuration = duration;
            return this;
        }

        public FlushingSetBufferBuilder<T> withMaxSize(int i) {
            this.maxSize = i;
            return this;
        }

        public FlushingSetBufferBuilder<T> withClock(Clock clock) {
            this.clock = clock;
            return this;
        }

        public FlushingSetBufferBuilder<T> withFlushConsumer(Consumer<Set<T>> consumer) {
            this.flushConsumer = consumer;
            return this;
        }

        public FlushingSetBuffer<T> build() {
            return new FlushingSetBuffer<>(this.name, this.maxSize, this.maxBufferedDuration, this.flushConsumer, this.clock);
        }
    }

    /* loaded from: input_file:io/datarouter/util/buffer/FlushingSetBuffer$FlushingSetBufferTests.class */
    public static class FlushingSetBufferTests {
        @Test
        public void testMaxSize() {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            FlushingSetBuffer<T> build = new FlushingSetBufferBuilder().withMaxSize(5).withMaxBufferedDuration(Duration.ofDays(1L)).withFlushConsumer(set -> {
                atomicInteger.incrementAndGet();
                atomicInteger2.addAndGet(set.size());
            }).build();
            Assert.assertEquals(atomicInteger.get(), 0);
            Assert.assertEquals(atomicInteger2.get(), 0);
            build.add("item 1");
            build.add("item 2");
            build.add("item 3");
            build.add("item 4");
            build.add("item 5");
            build.add("item 6");
            Assert.assertEquals(atomicInteger.get(), 1);
            Assert.assertEquals(atomicInteger2.get(), 5);
        }

        @Test
        public void testBufferedDuration() {
            Clock fixed = Clock.fixed(Instant.now(), ZoneId.systemDefault());
            Duration ofHours = Duration.ofHours(1L);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            FlushingSetBuffer<T> build = new FlushingSetBufferBuilder().withClock(fixed).withMaxSize(Integer.MAX_VALUE).withMaxBufferedDuration(ofHours).withFlushConsumer(set -> {
                atomicInteger.incrementAndGet();
            }).build();
            Assert.assertEquals(atomicInteger.get(), 0);
            ((FlushingSetBuffer) build).clock = Clock.offset(fixed, ofHours);
            ThreadTool.sleep(FlushingSetBuffer.MAX_BUFFER_FLUSH_CHECK_INTERVAL.toMillis());
            Assert.assertEquals(atomicInteger.get(), 1);
        }
    }

    private FlushingSetBuffer(String str, int i, Duration duration, Consumer<Set<T>> consumer, Clock clock) {
        this.name = str;
        this.maxSize = i;
        this.maxBufferedDuration = duration;
        this.flushConsumer = consumer;
        this.clock = clock;
        this.timeOfBufferExpirationMs = Clock.offset(this.clock, duration).millis();
        long min = Math.min(MAX_BUFFER_FLUSH_CHECK_INTERVAL.toMillis(), duration.toMillis());
        Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(this::flushIfBufferExpired, min, min, TimeUnit.MILLISECONDS);
    }

    public synchronized boolean add(T t) {
        if (t == null || !this.set.add(t)) {
            return false;
        }
        if (this.set.size() < this.maxSize) {
            return true;
        }
        flush("max size reached");
        return true;
    }

    private synchronized void flushIfBufferExpired() {
        if (this.clock.millis() >= this.timeOfBufferExpirationMs) {
            flush("buffer expired");
        }
    }

    private synchronized void flush(String str) {
        try {
            this.flushConsumer.accept(this.set);
            Counters.inc("Datarouter buffer " + this.name + " " + str);
        } catch (Exception e) {
            Counters.inc("Datarouter buffer " + this.name + " failed on " + str);
            logger.error("Failed to flush buffer because the flush consumer failed.", e);
        }
        this.set.clear();
        this.timeOfBufferExpirationMs = Clock.offset(this.clock, this.maxBufferedDuration).millis();
    }
}
