/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.kafka;

import io.debezium.annotation.ThreadSafe;
import io.debezium.document.Document;
import io.debezium.document.DocumentSerdes;
import io.debezium.kafka.KafkaServer;
import io.debezium.kafka.ZookeeperServer;
import io.debezium.util.IoUtil;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class KafkaCluster {
    public static final boolean DEFAULT_DELETE_DATA_UPON_SHUTDOWN = true;
    public static final boolean DEFAULT_DELETE_DATA_PRIOR_TO_STARTUP = false;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCluster.class);
    private final ConcurrentMap<Integer, KafkaServer> kafkaServers = new ConcurrentHashMap<Integer, KafkaServer>();
    private final ZookeeperServer zkServer = new ZookeeperServer();
    private volatile File dataDir = null;
    private volatile boolean deleteDataUponShutdown = true;
    private volatile boolean deleteDataPriorToStartup = false;
    private volatile boolean running = false;
    private volatile Properties kafkaConfig = null;
    private volatile int startingKafkaPort = -1;
    private final AtomicLong nextKafkaPort = new AtomicLong(this.startingKafkaPort);

    public KafkaCluster deleteDataUponShutdown(boolean delete) {
        if (this.running) {
            throw new IllegalStateException("Unable to change cluster settings when running");
        }
        this.deleteDataUponShutdown = delete;
        return this;
    }

    public KafkaCluster deleteDataPriorToStartup(boolean delete) {
        if (this.running) {
            throw new IllegalStateException("Unable to change cluster settings when running");
        }
        this.deleteDataPriorToStartup = delete;
        return this;
    }

    public KafkaCluster addBrokers(int count) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        AtomicLong added = new AtomicLong();
        while (added.intValue() < count) {
            this.kafkaServers.computeIfAbsent(added.intValue() + 1, id -> {
                added.incrementAndGet();
                KafkaServer server = new KafkaServer(this.zkServer::getConnection, (int)id);
                if (this.dataDir != null) {
                    server.setStateDirectory(this.dataDir);
                }
                if (this.kafkaConfig != null) {
                    server.setProperties(this.kafkaConfig);
                }
                if (this.startingKafkaPort >= 0) {
                    server.setPort((int)this.nextKafkaPort.getAndIncrement());
                }
                return server;
            });
        }
        return this;
    }

    public KafkaCluster usingDirectory(File dataDir) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        if (dataDir != null && dataDir.exists() && !dataDir.isDirectory() && !dataDir.canWrite() && !dataDir.canRead()) {
            throw new IllegalArgumentException("The directory must be readable and writable");
        }
        this.dataDir = dataDir;
        return this;
    }

    public KafkaCluster withKafkaConfiguration(Properties properties) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        if (properties != null && !properties.isEmpty()) {
            this.kafkaConfig = new Properties();
            this.kafkaConfig.putAll((Map<?, ?>)properties);
            this.kafkaServers.values().forEach(kafka -> kafka.setProperties(this.kafkaConfig));
        }
        return this;
    }

    public KafkaCluster withPorts(int zkPort, int firstKafkaPort) {
        if (this.running) {
            throw new IllegalStateException("Unable to add a broker when the cluster is already running");
        }
        this.zkServer.setPort(zkPort);
        this.startingKafkaPort = firstKafkaPort;
        if (this.startingKafkaPort >= 0) {
            this.nextKafkaPort.set(this.startingKafkaPort);
            this.kafkaServers.values().forEach(kafka -> kafka.setPort((int)this.nextKafkaPort.getAndIncrement()));
        }
        return this;
    }

    public boolean isRunning() {
        return this.running;
    }

    public synchronized KafkaCluster startup() throws IOException {
        if (!this.running) {
            if (this.dataDir == null) {
                try {
                    File temp = File.createTempFile("kafka", "suffix");
                    this.dataDir = new File(temp.getParentFile(), "cluster");
                    this.dataDir.mkdirs();
                    temp.delete();
                }
                catch (IOException e) {
                    throw new RuntimeException("Unable to create temporary directory", e);
                }
            } else if (this.deleteDataPriorToStartup) {
                IoUtil.delete((File)this.dataDir);
                this.dataDir.mkdirs();
            }
            File zkDir = new File(this.dataDir, "zk");
            this.zkServer.setStateDirectory(zkDir);
            this.dataDir = this.dataDir;
            File kafkaDir = new File(this.dataDir, "kafka");
            this.kafkaServers.values().forEach(server -> server.setStateDirectory(new File(kafkaDir, "broker" + server.brokerId())));
            this.zkServer.startup();
            LOGGER.debug("Starting {} brokers", (Object)this.kafkaServers.size());
            this.kafkaServers.values().forEach(KafkaServer::startup);
            this.running = true;
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public synchronized KafkaCluster shutdown() {
        if (!this.running) return this;
        try {
            this.kafkaServers.values().forEach(this::shutdownReliably);
            return this;
        }
        finally {
            try {
                this.zkServer.shutdown(this.deleteDataUponShutdown);
            }
            catch (Throwable t) {
                LOGGER.error("Error while shutting down {}", (Object)this.zkServer, (Object)t);
            }
            finally {
                if (this.deleteDataUponShutdown) {
                    try {
                        this.kafkaServers.values().forEach(KafkaServer::deleteData);
                    }
                    finally {
                        try {
                            IoUtil.delete((File)this.dataDir);
                        }
                        catch (IOException e) {
                            LOGGER.error("Error while deleting cluster data", (Throwable)e);
                        }
                    }
                }
                this.running = false;
            }
        }
    }

    public void createTopics(String ... topics) {
        LOGGER.debug("Creating topics: {}", (Object)Arrays.toString(topics));
        if (!this.running) {
            throw new IllegalStateException("The cluster must be running to create topics");
        }
        this.kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(topics));
    }

    public void createTopics(Set<String> topics) {
        this.createTopics(topics.toArray(new String[topics.size()]));
    }

    public void createTopics(int numPartitions, int replicationFactor, String ... topics) {
        LOGGER.debug("Creating topics with {} partitions and {} replicas each: {}", new Object[]{numPartitions, replicationFactor, Arrays.toString(topics)});
        if (!this.running) {
            throw new IllegalStateException("The cluster must be running to create topics");
        }
        this.kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopics(numPartitions, replicationFactor, topics));
    }

    public void createTopics(int numPartitions, int replicationFactor, Set<String> topics) {
        this.createTopics(numPartitions, replicationFactor, topics.toArray(new String[topics.size()]));
    }

    public void createTopic(String topic, int numPartitions, int replicationFactor) {
        LOGGER.debug("Creating topic '{}' with {} partitions and {} replicas", new Object[]{topic, numPartitions, replicationFactor});
        if (!this.running) {
            throw new IllegalStateException("The cluster must be running to create topics");
        }
        this.kafkaServers.values().stream().findFirst().ifPresent(server -> server.createTopic(topic, numPartitions, replicationFactor));
    }

    void onEachDirectory(Consumer<File> consumer) {
        consumer.accept(this.zkServer.getSnapshotDirectory());
        consumer.accept(this.zkServer.getLogDirectory());
        this.kafkaServers.values().forEach(server -> consumer.accept(server.getStateDirectory()));
    }

    public String brokerList() {
        StringJoiner joiner = new StringJoiner(",");
        this.kafkaServers.values().forEach(server -> joiner.add(server.getConnection()));
        return joiner.toString();
    }

    public int zkPort() {
        return this.zkServer.getPort();
    }

    private void shutdownReliably(KafkaServer server) {
        try {
            server.shutdown(this.deleteDataUponShutdown);
        }
        catch (Throwable t) {
            LOGGER.error("Error while shutting down {}", (Object)server, (Object)t);
        }
    }

    public Usage useTo() {
        if (!this.running) {
            throw new IllegalStateException("Unable to use the cluster it is not running");
        }
        return new Usage();
    }

    public class Usage {
        public Properties getConsumerProperties(String groupId, String clientId, OffsetResetStrategy autoOffsetReset) {
            if (groupId == null) {
                throw new IllegalArgumentException("The groupId is required");
            }
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", KafkaCluster.this.brokerList());
            props.setProperty("group.id", groupId);
            props.setProperty("enable.auto.commit", Boolean.FALSE.toString());
            if (autoOffsetReset != null) {
                props.setProperty("auto.offset.reset", autoOffsetReset.toString().toLowerCase());
            }
            if (clientId != null) {
                props.setProperty("client.id", clientId);
            }
            return props;
        }

        public Properties getProducerProperties(String clientId) {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", KafkaCluster.this.brokerList());
            props.setProperty("acks", Integer.toString(1));
            if (clientId != null) {
                props.setProperty("client.id", clientId);
            }
            return props;
        }

        public <K, V> InteractiveProducer<K, V> createProducer(String producerName, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
            Properties props = this.getProducerProperties(producerName);
            final KafkaProducer producer = new KafkaProducer(props, keySerializer, valueSerializer);
            return new InteractiveProducer<K, V>(){

                @Override
                public InteractiveProducer<K, V> write(ProducerRecord<K, V> record) {
                    producer.send(record);
                    producer.flush();
                    return this;
                }

                @Override
                public void close() {
                    producer.close();
                }
            };
        }

        public InteractiveProducer<String, Document> createProducer(String producerName) {
            return this.createProducer(producerName, (Serializer)new StringSerializer(), (Serializer)new DocumentSerdes());
        }

        public <K, V> InteractiveConsumer<K, V> createConsumer(String groupId, String clientId, String topicName, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Runnable completion) {
            Set<String> topicNames = Collections.singleton(topicName);
            return this.createConsumer(groupId, clientId, topicNames, keyDeserializer, valueDeserializer, completion);
        }

        public <K, V> InteractiveConsumer<K, V> createConsumer(String groupId, String clientId, Set<String> topicNames, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Runnable completion) {
            final LinkedBlockingQueue consumed = new LinkedBlockingQueue();
            final LinkedList allMessages = new LinkedList();
            final AtomicBoolean keepReading = new AtomicBoolean();
            OffsetCommitCallback offsetCommitCallback = null;
            this.consume(groupId, clientId, OffsetResetStrategy.EARLIEST, keyDeserializer, valueDeserializer, () -> keepReading.get(), offsetCommitCallback, completion, topicNames, record -> {
                consumed.add(record);
                allMessages.add(record);
            });
            return new InteractiveConsumer<K, V>(){

                @Override
                public ConsumerRecord<K, V> nextRecord() throws InterruptedException {
                    return (ConsumerRecord)consumed.take();
                }

                @Override
                public ConsumerRecord<K, V> nextRecord(long timeout, TimeUnit unit) throws InterruptedException {
                    return (ConsumerRecord)consumed.poll(timeout, unit);
                }

                @Override
                public void close() {
                    keepReading.set(false);
                }

                @Override
                public Stream<ConsumerRecord<K, V>> stream() {
                    return consumed.stream();
                }

                @Override
                public Stream<ConsumerRecord<K, V>> streamAll() {
                    return allMessages.stream();
                }
            };
        }

        public InteractiveConsumer<String, Document> createConsumer(String groupId, String clientId, String topicName, Runnable completion) {
            Set<String> topicNames = Collections.singleton(topicName);
            return this.createConsumer(groupId, clientId, topicNames, (Deserializer)new StringDeserializer(), (Deserializer)new DocumentSerdes(), completion);
        }

        public InteractiveConsumer<String, Document> createConsumer(String groupId, String clientId, Set<String> topicNames, Runnable completion) {
            return this.createConsumer(groupId, clientId, topicNames, (Deserializer)new StringDeserializer(), (Deserializer)new DocumentSerdes(), completion);
        }

        public <K, V> void produce(String producerName, Consumer<InteractiveProducer<String, Document>> producer) {
            this.produce(producerName, (Serializer<K>)new StringSerializer(), (Serializer<V>)new DocumentSerdes(), producer);
        }

        public <K, V> void produce(String producerName, Serializer<K> keySerializer, Serializer<V> valueSerializer, Consumer<InteractiveProducer<K, V>> producer) {
            Properties props = this.getProducerProperties(producerName);
            final KafkaProducer kafkaProducer = new KafkaProducer(props, keySerializer, valueSerializer);
            InteractiveProducer interactive = new InteractiveProducer<K, V>(){

                @Override
                public InteractiveProducer<K, V> write(ProducerRecord<K, V> record) {
                    kafkaProducer.send(record);
                    kafkaProducer.flush();
                    return this;
                }

                @Override
                public void close() {
                    kafkaProducer.close();
                }
            };
            Thread t = new Thread(() -> {
                try {
                    producer.accept(interactive);
                }
                finally {
                    interactive.close();
                }
            });
            t.setName(producerName + "-thread");
            t.start();
        }

        public <K, V> void produce(String producerName, int messageCount, Serializer<K> keySerializer, Serializer<V> valueSerializer, Runnable completionCallback, Supplier<ProducerRecord<K, V>> messageSupplier) {
            Properties props = this.getProducerProperties(producerName);
            Thread t = new Thread(() -> {
                LOGGER.debug("Starting producer {} to write {} messages", (Object)producerName, (Object)messageCount);
                try (KafkaProducer producer = new KafkaProducer(props, keySerializer, valueSerializer);){
                    for (int i = 0; i != messageCount; ++i) {
                        ProducerRecord record = (ProducerRecord)messageSupplier.get();
                        producer.send(record);
                        producer.flush();
                        LOGGER.debug("Producer {}: sent message {}", (Object)producerName, (Object)record);
                    }
                }
                finally {
                    if (completionCallback != null) {
                        completionCallback.run();
                    }
                    LOGGER.debug("Stopping producer {}", (Object)producerName);
                }
            });
            t.setName(producerName + "-thread");
            t.start();
        }

        public void produceStrings(int messageCount, Runnable completionCallback, Supplier<ProducerRecord<String, String>> messageSupplier) {
            StringSerializer keySer;
            StringSerializer valSer = keySer = new StringSerializer();
            String randomId = UUID.randomUUID().toString();
            this.produce(randomId, messageCount, (Serializer)keySer, (Serializer)valSer, completionCallback, messageSupplier);
        }

        public void produceDocuments(int messageCount, Runnable completionCallback, Supplier<ProducerRecord<String, Document>> messageSupplier) {
            StringSerializer keySer = new StringSerializer();
            DocumentSerdes valSer = new DocumentSerdes();
            String randomId = UUID.randomUUID().toString();
            this.produce(randomId, messageCount, (Serializer)keySer, (Serializer)valSer, completionCallback, messageSupplier);
        }

        public void produceIntegers(int messageCount, Runnable completionCallback, Supplier<ProducerRecord<String, Integer>> messageSupplier) {
            StringSerializer keySer = new StringSerializer();
            IntegerSerializer valSer = new IntegerSerializer();
            String randomId = UUID.randomUUID().toString();
            this.produce(randomId, messageCount, (Serializer)keySer, (Serializer)valSer, completionCallback, messageSupplier);
        }

        public void produceIntegers(String topic, int messageCount, int initialValue, Runnable completionCallback) {
            AtomicLong counter = new AtomicLong(initialValue);
            this.produceIntegers(messageCount, completionCallback, () -> {
                long i = counter.incrementAndGet();
                String keyAndValue = Long.toString(i);
                return new ProducerRecord(topic, (Object)keyAndValue, (Object)((int)i));
            });
        }

        public void produceStrings(String topic, int messageCount, Runnable completionCallback, Supplier<String> valueSupplier) {
            AtomicLong counter = new AtomicLong(0L);
            this.produceStrings(messageCount, completionCallback, () -> {
                long i = counter.incrementAndGet();
                String keyAndValue = Long.toString(i);
                return new ProducerRecord(topic, (Object)keyAndValue, valueSupplier.get());
            });
        }

        public void produceDocuments(String topic, int messageCount, Runnable completionCallback, Supplier<Document> valueSupplier) {
            AtomicLong counter = new AtomicLong(0L);
            this.produceDocuments(messageCount, completionCallback, () -> {
                long i = counter.incrementAndGet();
                String keyAndValue = Long.toString(i);
                return new ProducerRecord(topic, (Object)keyAndValue, valueSupplier.get());
            });
        }

        public <K, V> void consume(String groupId, String clientId, OffsetResetStrategy autoOffsetReset, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, BooleanSupplier continuation, OffsetCommitCallback offsetCommitCallback, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<K, V>> consumerFunction) {
            Properties props = this.getConsumerProperties(groupId, clientId, autoOffsetReset);
            Thread t = new Thread(() -> {
                LOGGER.debug("Starting consumer {} to read messages", (Object)clientId);
                try (KafkaConsumer consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer);){
                    consumer.subscribe(new ArrayList(topics));
                    while (continuation.getAsBoolean()) {
                        consumer.poll(10L).forEach(record -> {
                            LOGGER.debug("Consumer {}: consuming message {}", (Object)clientId, record);
                            consumerFunction.accept((ConsumerRecord)record);
                            if (offsetCommitCallback != null) {
                                consumer.commitAsync(offsetCommitCallback);
                            }
                        });
                    }
                }
                finally {
                    if (completion != null) {
                        completion.run();
                    }
                    LOGGER.debug("Stopping consumer {}", (Object)clientId);
                }
            });
            t.setName(clientId + "-thread");
            t.start();
        }

        public void consumeDocuments(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, Document>> consumerFunction) {
            StringDeserializer keyDes = new StringDeserializer();
            DocumentSerdes valDes = new DocumentSerdes();
            String randomId = UUID.randomUUID().toString();
            OffsetCommitCallback offsetCommitCallback = null;
            this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, (Deserializer)keyDes, (Deserializer)valDes, continuation, offsetCommitCallback, completion, topics, consumerFunction);
        }

        public void consumeStrings(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, String>> consumerFunction) {
            StringDeserializer keyDes;
            StringDeserializer valDes = keyDes = new StringDeserializer();
            String randomId = UUID.randomUUID().toString();
            OffsetCommitCallback offsetCommitCallback = null;
            this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, (Deserializer)keyDes, (Deserializer)valDes, continuation, offsetCommitCallback, completion, topics, consumerFunction);
        }

        public void consumeIntegers(BooleanSupplier continuation, Runnable completion, Collection<String> topics, Consumer<ConsumerRecord<String, Integer>> consumerFunction) {
            StringDeserializer keyDes = new StringDeserializer();
            IntegerDeserializer valDes = new IntegerDeserializer();
            String randomId = UUID.randomUUID().toString();
            OffsetCommitCallback offsetCommitCallback = null;
            this.consume(randomId, randomId, OffsetResetStrategy.EARLIEST, (Deserializer)keyDes, (Deserializer)valDes, continuation, offsetCommitCallback, completion, topics, consumerFunction);
        }

        public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion, BiPredicate<String, String> consumer) {
            AtomicLong readCounter = new AtomicLong();
            this.consumeStrings(this.continueIfNotExpired(() -> readCounter.get() < (long)count, timeout, unit), completion, Collections.singleton(topicName), record -> {
                if (consumer.test((String)record.key(), (String)record.value())) {
                    readCounter.incrementAndGet();
                }
            });
        }

        public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion, BiPredicate<String, Document> consumer) {
            AtomicLong readCounter = new AtomicLong();
            this.consumeDocuments(this.continueIfNotExpired(() -> readCounter.get() < (long)count, timeout, unit), completion, Collections.singleton(topicName), record -> {
                if (consumer.test((String)record.key(), (Document)record.value())) {
                    readCounter.incrementAndGet();
                }
            });
        }

        public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion, BiPredicate<String, Integer> consumer) {
            AtomicLong readCounter = new AtomicLong();
            this.consumeIntegers(this.continueIfNotExpired(() -> readCounter.get() < (long)count, timeout, unit), completion, Collections.singleton(topicName), record -> {
                if (consumer.test((String)record.key(), (Integer)record.value())) {
                    readCounter.incrementAndGet();
                }
            });
        }

        public void consumeStrings(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
            this.consumeStrings(topicName, count, timeout, unit, completion, (key, value) -> true);
        }

        public void consumeDocuments(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
            this.consumeDocuments(topicName, count, timeout, unit, completion, (key, value) -> true);
        }

        public void consumeIntegers(String topicName, int count, long timeout, TimeUnit unit, Runnable completion) {
            this.consumeIntegers(topicName, count, timeout, unit, completion, (key, value) -> true);
        }

        protected BooleanSupplier continueIfNotExpired(final BooleanSupplier continuation, final long timeout, final TimeUnit unit) {
            return new BooleanSupplier(){
                long stopTime = 0L;

                @Override
                public boolean getAsBoolean() {
                    if (this.stopTime == 0L) {
                        this.stopTime = System.currentTimeMillis() + unit.toMillis(timeout);
                    }
                    return continuation.getAsBoolean() && System.currentTimeMillis() <= this.stopTime;
                }
            };
        }
    }

    public static interface InteractiveConsumer<K, V>
    extends Closeable {
        default public V nextValue() throws InterruptedException {
            return (V)this.nextRecord().value();
        }

        public ConsumerRecord<K, V> nextRecord() throws InterruptedException;

        default public V nextValue(long timeout, TimeUnit unit) throws InterruptedException {
            ConsumerRecord<K, V> record = this.nextRecord(timeout, unit);
            return (V)(record != null ? record.value() : null);
        }

        public ConsumerRecord<K, V> nextRecord(long var1, TimeUnit var3) throws InterruptedException;

        public Stream<ConsumerRecord<K, V>> stream();

        public Stream<ConsumerRecord<K, V>> streamAll();

        @Override
        public void close();
    }

    public static interface InteractiveProducer<K, V>
    extends Closeable {
        default public InteractiveProducer<K, V> write(String topic, K key, V value) {
            return this.write(new ProducerRecord(topic, key, value));
        }

        public InteractiveProducer<K, V> write(ProducerRecord<K, V> var1);

        @Override
        public void close();
    }
}

