package com.zanox.vertx.mods;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.kinesis.AmazonKinesisAsyncClient;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.zanox.vertx.mods.exception.KinesisException;
import com.zanox.vertx.mods.internal.EventProperties;
import com.zanox.vertx.mods.internal.KinesisProperties;
import java.nio.ByteBuffer;
import org.vertx.java.busmods.BusModBase;
import org.vertx.java.core.Context;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonObject;

/* loaded from: input_file:com/zanox/vertx/mods/KinesisMessageProcessor.class */
public class KinesisMessageProcessor extends BusModBase implements Handler<Message<JsonObject>> {
    private int retryCounter;
    private AmazonKinesisAsyncClient kinesisAsyncClient;
    private String streamName;
    private String partitionKey;
    private String region;

    public void handle(Message<JsonObject> message) {
        try {
            sendMessageToKinesis(message);
        } catch (KinesisException e) {
            this.logger.error(e);
        }
    }

    public void start() {
        super.start();
        this.kinesisAsyncClient = createClient();
        this.vertx.eventBus().registerHandler(getMandatoryStringConfig("address"), this);
    }

    public void stop() {
        if (this.kinesisAsyncClient != null) {
            this.kinesisAsyncClient.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AmazonKinesisAsyncClient createClient() {
        int optionalIntConfig = getOptionalIntConfig(KinesisProperties.CONNECTION_TIMEOUT, 50000);
        int optionalIntConfig2 = getOptionalIntConfig(KinesisProperties.MAX_CONNECTION, 50);
        RetryPolicy retryPolicy = ClientConfiguration.DEFAULT_RETRY_POLICY;
        int optionalIntConfig3 = getOptionalIntConfig(KinesisProperties.SOCKET_TIMEOUT, 50000);
        boolean optionalBooleanConfig = getOptionalBooleanConfig(KinesisProperties.USE_REAPER, true);
        String optionalStringConfig = getOptionalStringConfig(KinesisProperties.USER_AGENT, ClientConfiguration.DEFAULT_USER_AGENT);
        String optionalStringConfig2 = getOptionalStringConfig(KinesisProperties.ENDPOINT, null);
        this.streamName = getMandatoryStringConfig(KinesisProperties.STREAM_NAME);
        this.partitionKey = getMandatoryStringConfig(KinesisProperties.PARTITION_KEY);
        this.region = getMandatoryStringConfig(KinesisProperties.REGION);
        this.logger.info(" --- Stream name: " + this.streamName);
        this.logger.info(" --- Partition key: " + this.partitionKey);
        this.logger.info(" --- Region: " + this.region);
        if (optionalStringConfig2 != null) {
            this.logger.info(" --- Endpoint: " + optionalStringConfig2);
        }
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setConnectionTimeout(optionalIntConfig);
        clientConfiguration.setMaxConnections(optionalIntConfig2);
        clientConfiguration.setRetryPolicy(retryPolicy);
        clientConfiguration.setSocketTimeout(optionalIntConfig3);
        clientConfiguration.setUseReaper(optionalBooleanConfig);
        clientConfiguration.setUserAgent(optionalStringConfig);
        AmazonKinesisAsyncClient amazonKinesisAsyncClient = new AmazonKinesisAsyncClient(new DefaultAWSCredentialsProviderChain(), clientConfiguration);
        amazonKinesisAsyncClient.setRegion(RegionUtils.getRegion(this.region));
        if (optionalStringConfig2 != null) {
            amazonKinesisAsyncClient.setEndpoint(optionalStringConfig2);
        }
        return amazonKinesisAsyncClient;
    }

    protected void sendMessageToKinesis(Message<JsonObject> message) throws KinesisException {
        if (this.kinesisAsyncClient == null) {
            throw new KinesisException("AmazonKinesisAsyncClient is not initialized");
        }
        if (!isValid(((JsonObject) message.body()).getString(EventProperties.PAYLOAD))) {
            this.logger.error("Invalid message provided.");
            return;
        }
        JsonObject jsonObject = (JsonObject) message.body();
        this.logger.debug(" --- Got event " + message.toString());
        this.logger.debug(" --- Got body + " + jsonObject.toString());
        byte[] binary = jsonObject.getBinary(EventProperties.PAYLOAD);
        if (binary == null) {
            this.logger.debug(" --- Payload is null, trying to get the payload as String");
            binary = jsonObject.getString(EventProperties.PAYLOAD).getBytes();
        }
        this.logger.debug("Binary payload size: " + binary.length);
        String string = jsonObject.getString(KinesisProperties.PARTITION_KEY);
        String str = string != null ? string : this.partitionKey;
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setStreamName(this.streamName);
        putRecordRequest.setPartitionKey(str);
        this.logger.debug("Writing to streamName " + this.streamName + " using partitionkey " + str);
        putRecordRequest.setData(ByteBuffer.wrap(binary));
        this.retryCounter = 0;
        sendUsingAsyncClient(putRecordRequest, message);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendUsingAsyncClient(final PutRecordRequest putRecordRequest, final Message<JsonObject> message) {
        if (this.retryCounter == 3) {
            sendError(message, "Failed sending message to Kinesis");
        }
        final Context currentContext = this.vertx.currentContext();
        this.kinesisAsyncClient.putRecordAsync(putRecordRequest, new AsyncHandler<PutRecordRequest, PutRecordResult>() { // from class: com.zanox.vertx.mods.KinesisMessageProcessor.1
            public void onSuccess(PutRecordRequest putRecordRequest2, PutRecordResult putRecordResult) {
                Context context = currentContext;
                Message message2 = message;
                context.runOnContext(r7 -> {
                    KinesisMessageProcessor.this.logger.debug("Sent message to Kinesis: " + putRecordResult.toString());
                    KinesisMessageProcessor.this.sendOK(message2);
                });
            }

            public void onError(Exception exc) {
                Context context = currentContext;
                PutRecordRequest putRecordRequest2 = putRecordRequest;
                Message message2 = message;
                context.runOnContext(r11 -> {
                    KinesisMessageProcessor.access$008(KinesisMessageProcessor.this);
                    KinesisMessageProcessor.this.kinesisAsyncClient = KinesisMessageProcessor.this.createClient();
                    KinesisMessageProcessor.this.logger.info("Failed sending message to Kinesis, retry: " + KinesisMessageProcessor.this.retryCounter + " ... ", exc);
                    KinesisMessageProcessor.this.vertx.setTimer(500L, l -> {
                        KinesisMessageProcessor.this.sendUsingAsyncClient(putRecordRequest2, message2);
                    });
                });
            }
        });
    }

    private boolean isValid(String str) {
        return (str == null || str.isEmpty()) ? false : true;
    }

    static /* synthetic */ int access$008(KinesisMessageProcessor kinesisMessageProcessor) {
        int i = kinesisMessageProcessor.retryCounter;
        kinesisMessageProcessor.retryCounter = i + 1;
        return i;
    }
}
