/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.performance.core;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.Sizeable;
import io.debezium.util.LoggingContext;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;

public class ChangeEventQueuePerf {

    @Fork(value=1)
    @State(value=Scope.Thread)
    @Warmup(iterations=5, time=1)
    @Measurement(iterations=5, time=1)
    @OutputTimeUnit(value=TimeUnit.MILLISECONDS)
    @BenchmarkMode(value={Mode.AverageTime})
    public static class MultiWriterQueuePerf {
        private static final DataChangeEvent EVENT = new DataChangeEvent(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "dummy", Schema.STRING_SCHEMA, (Object)"Change Data Capture Even via Debezium"));
        private static final int TOTAL_RECORDS_PER_PRODUCER = 1000000;
        private static final int TOTAL_PRODUCERS = 10;
        @Param(value={"10", "50", "500"})
        long pollIntervalMillis;
        private ChangeEventQueue<DataChangeEvent> changeEventQueue;
        private Thread[] producers;
        private Thread consumer;

        @Setup(value=Level.Trial)
        public void setupInvocation() {
            this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(this.pollIntervalMillis)).maxQueueSize(8192).maxBatchSize(2048).loggingContextSupplier(() -> LoggingContext.forConnector((String)"a", (String)"b", (String)"c")).maxQueueSizeInBytes(0L).build();
        }

        @Setup(value=Level.Invocation)
        public void setup() {
            this.producers = new Thread[10];
            for (int i = 0; i < 10; ++i) {
                this.producers[i] = new Thread(() -> {
                    try {
                        for (int j = 0; j < 1000000; ++j) {
                            this.changeEventQueue.enqueue((Sizeable)EVENT);
                        }
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                });
            }
            final long recordsToPoll = 10000000L;
            this.consumer = new Thread(new Runnable(){
                private long noOfRecords = 0L;

                @Override
                public void run() {
                    while (this.noOfRecords < recordsToPoll) {
                        try {
                            this.noOfRecords += (long)changeEventQueue.poll().size();
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                }
            });
        }

        @Benchmark
        public void benchmarkChangeEventQueue() throws InterruptedException {
            for (Thread producer : this.producers) {
                producer.start();
            }
            this.consumer.start();
            for (Thread producer : this.producers) {
                producer.join();
            }
            this.consumer.join();
        }

        @TearDown(value=Level.Invocation)
        public void teardown() {
            for (Thread producer : this.producers) {
                producer.interrupt();
            }
            this.consumer.interrupt();
        }
    }

    @Fork(value=1)
    @State(value=Scope.Thread)
    @Warmup(iterations=5, time=1)
    @Measurement(iterations=5, time=1)
    @OutputTimeUnit(value=TimeUnit.MILLISECONDS)
    @BenchmarkMode(value={Mode.AverageTime})
    public static class QueuePerf {
        private static final int TOTAL_RECORDS = 10000000;
        private static final DataChangeEvent EVENT = new DataChangeEvent(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "dummy", Schema.STRING_SCHEMA, (Object)"Change Data Capture Even via Debezium"));
        @Param(value={"10", "50", "500"})
        long pollIntervalMillis;
        private ChangeEventQueue<DataChangeEvent> changeEventQueue;
        private Thread producer;
        private Thread consumer;

        @Setup(value=Level.Trial)
        public void setupInvocation() {
            this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(this.pollIntervalMillis)).maxQueueSize(8192).maxBatchSize(2048).loggingContextSupplier(() -> LoggingContext.forConnector((String)"a", (String)"b", (String)"c")).maxQueueSizeInBytes(0L).build();
        }

        @Setup(value=Level.Invocation)
        public void setup() {
            this.producer = new Thread(() -> {
                for (int i = 1; i <= 10000000; ++i) {
                    try {
                        this.changeEventQueue.enqueue((Sizeable)EVENT);
                        continue;
                    }
                    catch (InterruptedException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            });
            this.consumer = new Thread(new Runnable(){
                private long noOfRecords = 0L;

                @Override
                public void run() {
                    while (this.noOfRecords < 10000000L) {
                        try {
                            this.noOfRecords += (long)changeEventQueue.poll().size();
                        }
                        catch (InterruptedException ex) {
                            throw new RuntimeException(ex);
                        }
                    }
                }
            });
        }

        @Benchmark
        public void benchmarkChangeEventQueue() throws InterruptedException {
            this.producer.start();
            this.consumer.start();
            this.producer.join();
            this.consumer.join();
        }

        @TearDown(value=Level.Invocation)
        public void teardown() {
            this.producer.interrupt();
            this.consumer.interrupt();
        }
    }

    @Fork(value=1)
    @State(value=Scope.Thread)
    @Warmup(iterations=2, time=5)
    @Measurement(iterations=2, time=5)
    @OutputTimeUnit(value=TimeUnit.SECONDS)
    @BenchmarkMode(value={Mode.Throughput})
    public static class ConsumerPerf {
        private static final DataChangeEvent EVENT = new DataChangeEvent(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "dummy", Schema.STRING_SCHEMA, (Object)"Change Data Capture Even via Debezium"));
        @Param(value={"10", "50", "500"})
        private long pollIntervalMillis;
        private ChangeEventQueue<DataChangeEvent> changeEventQueue;
        private Thread producer;

        @Setup(value=Level.Trial)
        public void setup() {
            this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(this.pollIntervalMillis)).maxQueueSize(8192).maxBatchSize(2048).loggingContextSupplier(() -> LoggingContext.forConnector((String)"a", (String)"b", (String)"c")).maxQueueSizeInBytes(0L).build();
            this.producer = new Thread(() -> {
                try {
                    while (true) {
                        this.changeEventQueue.enqueue((Sizeable)EVENT);
                    }
                }
                catch (InterruptedException interruptedException) {
                    return;
                }
            });
            this.producer.start();
        }

        @Benchmark
        public void benchmarkConsumer() throws InterruptedException {
            this.changeEventQueue.poll();
        }

        @TearDown(value=Level.Trial)
        public void teardown() {
            this.producer.interrupt();
        }
    }

    @Fork(value=1)
    @State(value=Scope.Thread)
    @Warmup(iterations=2, time=5)
    @Measurement(iterations=2, time=5)
    @OutputTimeUnit(value=TimeUnit.SECONDS)
    @BenchmarkMode(value={Mode.Throughput})
    public static class ProducerPerf {
        private static final DataChangeEvent EVENT = new DataChangeEvent(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "dummy", Schema.STRING_SCHEMA, (Object)"Change Data Capture Even via Debezium"));
        @Param(value={"10", "50", "500"})
        private long pollIntervalMillis;
        private ChangeEventQueue<DataChangeEvent> changeEventQueue;
        private Thread consumer;

        @Setup(value=Level.Trial)
        public void setup() {
            this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(Duration.ofMillis(this.pollIntervalMillis)).maxQueueSize(8192).maxBatchSize(2048).loggingContextSupplier(() -> LoggingContext.forConnector((String)"a", (String)"b", (String)"c")).maxQueueSizeInBytes(0L).build();
            this.consumer = new Thread(() -> {
                try {
                    while (true) {
                        this.changeEventQueue.poll();
                    }
                }
                catch (InterruptedException interruptedException) {
                    return;
                }
            });
            this.consumer.start();
        }

        @Benchmark
        public void benchmarkProducer() throws InterruptedException {
            this.changeEventQueue.enqueue((Sizeable)EVENT);
        }

        @TearDown(value=Level.Trial)
        public void teardown() {
            this.consumer.interrupt();
        }
    }
}

