/*
 * Decompiled with CFR 0.152.
 */
package io.axway.iron.spi.kafka;

import io.axway.iron.spi.StoreNamePrefixManagement;
import io.axway.iron.spi.storage.TransactionStore;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.reactivestreams.Publisher;

public class KafkaTransactionStore
implements TransactionStore {
    private static final int PARTITION = 0;
    private static final int CONSTANT_KEY = 0;
    private static final int RETRIES = 5;
    private static final int PRODUCER_BUFFER_MEMORY = 0x2000000;
    private static final String CONSUMER_SESSION_TIMEOUT = "30000";
    private static final long NO_SEEK = -1L;
    private final String m_topicName;
    private final TopicPartition m_topicPartition;
    private final Producer<Integer, byte[]> m_producer;
    private final AtomicLong m_pendingSeek = new AtomicLong(-1L);
    private final Flowable<TransactionStore.TransactionInput> m_transactionsFlow;
    private final StoreNamePrefixManagement m_prefixManagement = new StoreNamePrefixManagement();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    KafkaTransactionStore(Properties kafkaProperties, String topicName) {
        this.m_topicName = topicName;
        this.m_topicPartition = new TopicPartition(this.m_topicName, 0);
        UUID uuid = UUID.randomUUID();
        this.createKafkaTopic((Properties)kafkaProperties.clone(), topicName);
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(null);
            Properties producerKafkaProperties = (Properties)kafkaProperties.clone();
            producerKafkaProperties.put("acks", "all");
            producerKafkaProperties.put("retries", (Object)5);
            producerKafkaProperties.put("batch.size", (Object)1);
            producerKafkaProperties.put("buffer.memory", (Object)0x2000000);
            producerKafkaProperties.put("client.id", "ironClient-" + uuid);
            this.m_producer = new KafkaProducer(producerKafkaProperties, (Serializer)new IntegerSerializer(), (Serializer)new ByteArraySerializer());
        }
        finally {
            Thread.currentThread().setContextClassLoader(classLoader);
        }
        this.m_transactionsFlow = Flowable.generate(() -> {
            KafkaConsumer kafkaConsumer;
            ClassLoader ccl = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(null);
                Properties consumerKafkaProperties = (Properties)kafkaProperties.clone();
                consumerKafkaProperties.put("max.poll.records", (Object)1);
                consumerKafkaProperties.put("auto.offset.reset", "earliest");
                consumerKafkaProperties.put("enable.auto.commit", "false");
                consumerKafkaProperties.put("session.timeout.ms", CONSUMER_SESSION_TIMEOUT);
                consumerKafkaProperties.put("group.id", "ironGroup-" + uuid);
                kafkaConsumer = new KafkaConsumer(consumerKafkaProperties, (Deserializer)new IntegerDeserializer(), (Deserializer)new ByteArrayDeserializer());
            }
            finally {
                Thread.currentThread().setContextClassLoader(ccl);
            }
            kafkaConsumer.assign(List.of(this.m_topicPartition));
            return kafkaConsumer;
        }, (consumer, emitter) -> {
            long seek = this.m_pendingSeek.getAndSet(-1L);
            if (seek > -1L) {
                consumer.seek(this.m_topicPartition, seek);
            }
            emitter.onNext((Object)consumer.poll(Duration.ofMillis(Long.parseLong(CONSUMER_SESSION_TIMEOUT) / 5L)));
        }, KafkaConsumer::close).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).concatMap(Flowable::fromIterable).map(this::prepareTransactionInput);
    }

    public OutputStream createTransactionOutput(String storeName) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(){

            @Override
            public void close() throws IOException {
                super.close();
                KafkaTransactionStore.this.m_producer.send(new ProducerRecord(KafkaTransactionStore.this.m_topicName, Integer.valueOf(0), (Object)0, (Object)this.toByteArray()));
            }
        };
        this.m_prefixManagement.writeNamePrefix(storeName, (OutputStream)outputStream);
        return outputStream;
    }

    public Publisher<TransactionStore.TransactionInput> allTransactions() {
        return this.m_transactionsFlow;
    }

    public void seekTransaction(BigInteger latestProcessedTransactionId) {
        if (!BigInteger.ZERO.equals(latestProcessedTransactionId)) {
            this.m_pendingSeek.set(latestProcessedTransactionId.longValueExact() + 1L);
        }
    }

    public void close() {
        this.m_producer.flush();
        this.m_producer.close();
    }

    public void lockReadonly(boolean readonly) {
    }

    public boolean isReadonlyLockSet() {
        return false;
    }

    private void createKafkaTopic(Properties kafkaProperties, String topicName) {
        block9: {
            try (AdminClient adminClient = AdminClient.create((Properties)kafkaProperties);){
                adminClient.createTopics(List.of(new NewTopic(topicName, 1, 1))).all().get();
            }
            catch (Exception e) {
                Throwable current = e;
                while (!(current instanceof TopicExistsException) && current.getCause() != null) {
                    current = current.getCause();
                }
                if (current instanceof TopicExistsException) break block9;
                throw new IllegalStateException(e);
            }
        }
    }

    private TransactionStore.TransactionInput prepareTransactionInput(final ConsumerRecord<Integer, byte[]> record) throws IOException {
        final ByteArrayInputStream inputStream = new ByteArrayInputStream((byte[])record.value());
        final String storeName = StoreNamePrefixManagement.readStoreName((InputStream)inputStream);
        return new TransactionStore.TransactionInput(){

            public InputStream getInputStream() {
                return inputStream;
            }

            public BigInteger getTransactionId() {
                return BigInteger.valueOf(record.offset());
            }

            public String storeName() {
                return storeName;
            }
        };
    }
}

