/*
 * Decompiled with CFR 0.152.
 */
package io.axway.iron.spi.kafka;

import io.axway.iron.spi.storage.TransactionStore;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

class KafkaTransactionStore
implements TransactionStore {
    private static final int PARTITION = 0;
    private static final int CONSTANT_KEY = 0;
    private static final int RETRIES = 5;
    private static final int PRODUCER_BUFFER_MEMORY = 0x2000000;
    private static final String CONSUMER_SESSION_TIMEOUT = "30000";
    private final Consumer<Integer, byte[]> m_consumer;
    private final Producer<Integer, byte[]> m_producer;
    private final String m_topicName;
    private final TopicPartition m_topicPartition;

    KafkaTransactionStore(Properties kafkaProperties, String topicName) {
        this.m_topicName = topicName.trim();
        if (this.m_topicName.isEmpty()) {
            throw new IllegalArgumentException("Topic name can't be null");
        }
        this.m_topicPartition = new TopicPartition(this.m_topicName, 0);
        UUID uuid = UUID.randomUUID();
        Properties producerKafkaProperties = (Properties)kafkaProperties.clone();
        producerKafkaProperties.put("acks", "all");
        producerKafkaProperties.put("retries", (Object)5);
        producerKafkaProperties.put("batch.size", (Object)1);
        producerKafkaProperties.put("linger.ms", (Object)1);
        producerKafkaProperties.put("buffer.memory", (Object)0x2000000);
        producerKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        producerKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerKafkaProperties.put("client.id", "ironClient-" + uuid);
        this.m_producer = new KafkaProducer(producerKafkaProperties);
        Properties consumerKafkaProperties = (Properties)kafkaProperties.clone();
        consumerKafkaProperties.put("max.poll.records", (Object)1);
        consumerKafkaProperties.put("auto.offset.reset", "earliest");
        consumerKafkaProperties.put("enable.auto.commit", "false");
        consumerKafkaProperties.put("session.timeout.ms", CONSUMER_SESSION_TIMEOUT);
        consumerKafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        consumerKafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        consumerKafkaProperties.put("group.id", "ironGroup-" + uuid);
        this.m_consumer = new KafkaConsumer(consumerKafkaProperties);
        this.m_consumer.subscribe(Collections.singletonList(this.m_topicName));
    }

    public OutputStream createTransactionOutput() throws IOException {
        return new ByteArrayOutputStream(){

            @Override
            public void close() throws IOException {
                super.close();
                KafkaTransactionStore.this.m_producer.send(new ProducerRecord(KafkaTransactionStore.this.m_topicName, Integer.valueOf(0), (Object)0, (Object)this.toByteArray()));
            }
        };
    }

    public void seekTransactionPoll(long latestProcessedTransactionId) {
        this.m_consumer.seek(this.m_topicPartition, latestProcessedTransactionId + 1L);
    }

    public TransactionStore.TransactionInput pollNextTransaction(long timeout, TimeUnit unit) {
        Iterator iterator = this.m_consumer.poll(unit.toMillis(timeout)).iterator();
        if (!iterator.hasNext()) {
            return null;
        }
        final ConsumerRecord firstRecord = (ConsumerRecord)iterator.next();
        if (iterator.hasNext()) {
            throw new IllegalStateException("Kafka should not return more than one record");
        }
        return new TransactionStore.TransactionInput(){

            public InputStream getInputStream() throws IOException {
                return new ByteArrayInputStream((byte[])firstRecord.value());
            }

            public long getTransactionId() {
                return firstRecord.offset();
            }
        };
    }
}

