/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.kinesis;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.inject.Named;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;

@Named(value="kinesis")
@Dependent
public class KinesisChangeConsumer
extends BaseChangeConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.kinesis.";
    private static final String PROP_REGION_NAME = "debezium.sink.kinesis.region";
    private static final String PROP_ENDPOINT_NAME = "debezium.sink.kinesis.endpoint";
    private String region;
    private Optional<String> endpointOverride;
    @ConfigProperty(name="debezium.sink.kinesis.credentials.profile", defaultValue="default")
    String credentialsProfile;
    @ConfigProperty(name="debezium.sink.kinesis.null.key", defaultValue="default")
    String nullKey;
    private KinesisClient client = null;
    @Inject
    @CustomConsumerBuilder
    Instance<KinesisClient> customClient;

    @PostConstruct
    void connect() {
        if (this.customClient.isResolvable()) {
            this.client = (KinesisClient)this.customClient.get();
            LOGGER.info("Obtained custom configured KinesisClient '{}'", (Object)this.client);
            return;
        }
        Config config = ConfigProvider.getConfig();
        this.region = (String)config.getValue(PROP_REGION_NAME, String.class);
        this.endpointOverride = config.getOptionalValue(PROP_ENDPOINT_NAME, String.class);
        KinesisClientBuilder builder = (KinesisClientBuilder)((KinesisClientBuilder)KinesisClient.builder().region(Region.of((String)this.region))).credentialsProvider((AwsCredentialsProvider)ProfileCredentialsProvider.create((String)this.credentialsProfile));
        this.endpointOverride.ifPresent(endpoint -> {
            KinesisClientBuilder cfr_ignored_0 = (KinesisClientBuilder)builder.endpointOverride(URI.create(endpoint));
        });
        this.client = (KinesisClient)builder.build();
        LOGGER.info("Using default KinesisClient '{}'", (Object)this.client);
    }

    @PreDestroy
    void close() {
        try {
            this.client.close();
        }
        catch (Exception e) {
            LOGGER.warn("Exception while closing Kinesis client: {}", (Throwable)e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer) throws InterruptedException {
        for (ChangeEvent<Object, Object> record : records) {
            LOGGER.trace("Received event '{}'", record);
            PutRecordRequest putRecord = (PutRecordRequest)PutRecordRequest.builder().partitionKey(record.key() != null ? this.getString(record.key()) : this.nullKey).streamName(this.streamNameMapper.map(record.destination())).data(SdkBytes.fromByteArray((byte[])this.getBytes(record.value()))).build();
            this.client.putRecord(putRecord);
            committer.markProcessed(record);
        }
        committer.markBatchFinished();
    }
}

