package org.apache.pulsar.io.kinesis;

import com.google.common.base.Preconditions;
import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

@Connector(name = "kinesis", type = IOType.SOURCE, help = "A source connector that copies messages from Kinesis to Pulsar", configClass = KinesisSourceConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/kinesis/KinesisSource.class */
public class KinesisSource extends AbstractAwsConnector implements Source<byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KinesisSource.class);
    private LinkedBlockingQueue<KinesisRecord> queue;
    private KinesisSourceConfig kinesisSourceConfig;
    private ConfigsBuilder configsBuilder;
    private ShardRecordProcessorFactory recordProcessorFactory;
    private String workerId;
    private Scheduler scheduler;
    private Thread schedulerThread;
    private Throwable threadEx;

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.scheduler.shutdown();
    }

    @Override // org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        this.kinesisSourceConfig = (KinesisSourceConfig) IOConfigUtils.loadWithSecrets(map, KinesisSourceConfig.class, sourceContext);
        Preconditions.checkArgument(StringUtils.isNotBlank(this.kinesisSourceConfig.getAwsKinesisStreamName()), "empty kinesis-stream name");
        Preconditions.checkArgument(StringUtils.isNotBlank(this.kinesisSourceConfig.getAwsEndpoint()) || StringUtils.isNotBlank(this.kinesisSourceConfig.getAwsRegion()), "Either the aws-end-point or aws-region must be set");
        Preconditions.checkArgument(StringUtils.isNotBlank(this.kinesisSourceConfig.getAwsCredentialPluginParam()), "empty aws-credential param");
        if (this.kinesisSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            Preconditions.checkArgument(this.kinesisSourceConfig.getStartAtTime() != null, "Timestamp must be specified");
        }
        this.queue = new LinkedBlockingQueue<>(this.kinesisSourceConfig.getReceiveQueueSize());
        this.workerId = InetAddress.getLocalHost().getCanonicalHostName() + ParameterizedMessage.ERROR_MSG_SEPARATOR + UUID.randomUUID();
        org.apache.pulsar.io.aws.AwsCredentialProviderPlugin createCredentialProvider = createCredentialProvider(this.kinesisSourceConfig.getAwsCredentialPluginName(), this.kinesisSourceConfig.getAwsCredentialPluginParam());
        KinesisAsyncClient buildKinesisAsyncClient = this.kinesisSourceConfig.buildKinesisAsyncClient(createCredentialProvider);
        this.recordProcessorFactory = new KinesisRecordProcessorFactory(this.queue, this.kinesisSourceConfig);
        this.configsBuilder = new ConfigsBuilder(this.kinesisSourceConfig.getAwsKinesisStreamName(), this.kinesisSourceConfig.getApplicationName(), buildKinesisAsyncClient, this.kinesisSourceConfig.buildDynamoAsyncClient(createCredentialProvider), this.kinesisSourceConfig.buildCloudwatchAsyncClient(createCredentialProvider), this.workerId, this.recordProcessorFactory);
        RetrievalConfig retrievalConfig = this.configsBuilder.retrievalConfig();
        if (!this.kinesisSourceConfig.isUseEnhancedFanOut()) {
            retrievalConfig.retrievalSpecificConfig(new PollingConfig(this.kinesisSourceConfig.getAwsKinesisStreamName(), buildKinesisAsyncClient));
        }
        retrievalConfig.initialPositionInStreamExtended(this.kinesisSourceConfig.getStreamStartPosition());
        this.scheduler = new Scheduler(this.configsBuilder.checkpointConfig(), this.configsBuilder.coordinatorConfig(), this.configsBuilder.leaseManagementConfig(), this.configsBuilder.lifecycleConfig(), this.configsBuilder.metricsConfig(), this.configsBuilder.processorConfig(), retrievalConfig);
        this.schedulerThread = new Thread(this.scheduler);
        this.schedulerThread.setDaemon(true);
        this.threadEx = null;
        this.schedulerThread.setUncaughtExceptionHandler((thread, th) -> {
            this.threadEx = th;
        });
        this.schedulerThread.start();
    }

    @Override // org.apache.pulsar.io.core.Source
    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public Record<byte[]> read2() throws Exception {
        try {
            return this.queue.take();
        } catch (InterruptedException e) {
            log.warn("Got interrupted when trying to fetch out of the queue");
            if (this.threadEx != null) {
                log.error("error from scheduler", this.threadEx);
            }
            throw e;
        }
    }
}
