/*
 * Decompiled with CFR 0.152.
 */
package org.birchframework.framework.kafka;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.birchframework.configuration.BirchProperties;
import org.birchframework.framework.kafka.KafkaSendResult;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

@Component
@ConditionalOnClass(value={KafkaTemplate.class})
@EnableConfigurationProperties(value={BirchProperties.class})
public class KafkaSender<K extends String, V extends Serializable> {
    private final KafkaTemplate<K, V> kafkaTemplate;
    private final long waitTimeMillis;

    public KafkaSender(KafkaTemplate<K, V> theTemplate, BirchProperties theProperties) {
        this.kafkaTemplate = theTemplate;
        BirchProperties.Kafka.Sender aSenderProperties = theProperties.getKafka().getSender();
        this.waitTimeMillis = aSenderProperties.getWaitTime().toMillis();
        this.kafkaTemplate.setAllowNonTransactional(aSenderProperties.isAllowNonTransactional());
    }

    public Optional<KafkaSendResult<K, V>> send(String topic, K key, V data) throws InterruptedException {
        return this.send(topic, null, key, data);
    }

    public Optional<KafkaSendResult<K, V>> send(String topic, V data) throws InterruptedException {
        return this.send(topic, null, null, data);
    }

    @SuppressFBWarnings(value={"NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"})
    public Optional<KafkaSendResult<K, V>> send(@NonNull String topic, @Nullable Integer partition, @Nullable K key, @NonNull V data) throws InterruptedException {
        KafkaSendResult aResult = new KafkaSendResult();
        try {
            aResult.result = (SendResult)this.kafkaTemplate.send(topic, partition, key, data).get(this.waitTimeMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            aResult.hasError = true;
            aResult.exception = e;
            throw e;
        }
        catch (ExecutionException | TimeoutException e) {
            aResult.hasError = true;
            aResult.exception = e;
        }
        return Optional.ofNullable(aResult);
    }

    public Optional<KafkaSendResult<K, V>> sendTransactional(@NonNull String topic, @Nullable Integer partition, @Nullable K key, @NonNull V data) {
        KafkaSendResult aReturnValue = new KafkaSendResult();
        aReturnValue.result = (SendResult)this.kafkaTemplate.executeInTransaction(operations -> {
            SendResult aResult;
            try {
                aResult = (SendResult)operations.send(topic, partition, (Object)key, (Object)data).get(this.waitTimeMillis, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                aReturnValue.hasError = true;
                aReturnValue.exception = e;
                aResult = new SendResult(null, null);
            }
            return aResult;
        });
        return Optional.of(aReturnValue);
    }

    public void sendAsync(String topic, V data) {
        this.sendAsync(topic, null, null, data, r -> {}, e -> {});
    }

    public void sendAsync(String topic, V data, SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        this.sendAsync(topic, null, null, data, successCallback, failureCallback);
    }

    public void sendAsync(String topic, K key, V data) {
        this.sendAsync(topic, key, data, r -> {}, e -> {});
    }

    public void sendAsync(String topic, K key, V data, SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        this.sendAsync(topic, null, key, data, successCallback, failureCallback);
    }

    @SuppressFBWarnings(value={"NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE"})
    public void sendAsync(@NonNull String topic, @Nullable Integer partition, @Nullable K key, @NonNull V data, @NonNull SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        ListenableFuture aFuture = this.kafkaTemplate.send(topic, partition, key, data);
        aFuture.addCallback(successCallback, failureCallback);
    }

    public void sendAsyncTransactional(@NonNull String topic, @Nullable Integer partition, @Nullable K key, @NonNull V data, @NonNull SuccessCallback<SendResult<K, V>> successCallback, FailureCallback failureCallback) {
        ListenableFuture aFuture = (ListenableFuture)this.kafkaTemplate.executeInTransaction(operations -> operations.send(topic, partition, key, data));
        Assert.notNull((Object)aFuture, (String)"Future returned by Kafka Template is null");
        aFuture.addCallback(successCallback, failureCallback);
    }
}

