package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.cloudwatch.model.StandardUnit;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxyExtended;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsScope;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:lib/amazon-kinesis-client-1.7.3.jar:com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.class */
class ProcessTask implements ITask {
    private static final Log LOG = LogFactory.getLog(ProcessTask.class);
    private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator";
    private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
    private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
    private static final String MILLIS_BEHIND_LATEST_METRIC = "MillisBehindLatest";
    private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
    private final ShardInfo shardInfo;
    private final IRecordProcessor recordProcessor;
    private final RecordProcessorCheckpointer recordProcessorCheckpointer;
    private final KinesisDataFetcher dataFetcher;
    private final TaskType taskType = TaskType.PROCESS;
    private final StreamConfig streamConfig;
    private final long backoffTimeMillis;
    private final Shard shard;

    public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor iRecordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher kinesisDataFetcher, long j, boolean z) {
        this.shardInfo = shardInfo;
        this.recordProcessor = iRecordProcessor;
        this.recordProcessorCheckpointer = recordProcessorCheckpointer;
        this.dataFetcher = kinesisDataFetcher;
        this.streamConfig = streamConfig;
        this.backoffTimeMillis = j;
        IKinesisProxy streamProxy = this.streamConfig.getStreamProxy();
        if (z || !(streamProxy instanceof IKinesisProxyExtended)) {
            this.shard = null;
        } else {
            this.shard = ((IKinesisProxyExtended) streamProxy).getShard(this.shardInfo.getShardId());
        }
        if (this.shard != null || z) {
            return;
        }
        LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records in the event of resharding will not be dropped during deaggregation of Amazon Kinesis records.");
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public TaskResult call() {
        long currentTimeMillis = System.currentTimeMillis();
        IMetricsScope metricsScope = MetricsHelper.getMetricsScope();
        metricsScope.addDimension(MetricsHelper.SHARD_ID_DIMENSION_NAME, this.shardInfo.getShardId());
        metricsScope.addData(RECORDS_PROCESSED_METRIC, 0.0d, StandardUnit.Count, MetricsLevel.SUMMARY);
        metricsScope.addData(DATA_BYTES_PROCESSED_METRIC, 0.0d, StandardUnit.Bytes, MetricsLevel.SUMMARY);
        RuntimeException runtimeException = null;
        try {
        } catch (RuntimeException e) {
            LOG.error("ShardId " + this.shardInfo.getShardId() + ": Caught exception: ", e);
            runtimeException = e;
            try {
                Thread.sleep(this.backoffTimeMillis);
            } catch (InterruptedException e2) {
                LOG.debug(this.shardInfo.getShardId() + ": Sleep was interrupted", e2);
            }
        }
        if (this.dataFetcher.isShardEndReached()) {
            LOG.info("Reached end of shard " + this.shardInfo.getShardId());
            return new TaskResult(null, true);
        }
        GetRecordsResult recordsResult = getRecordsResult();
        List<Record> records = recordsResult.getRecords();
        if (records.isEmpty()) {
            LOG.debug("Kinesis didn't return any records for shard " + this.shardInfo.getShardId());
            long idleTimeInMilliseconds = this.streamConfig.getIdleTimeInMilliseconds() - (System.currentTimeMillis() - currentTimeMillis);
            if (idleTimeInMilliseconds > 0) {
                long max = Math.max(idleTimeInMilliseconds, this.streamConfig.getIdleTimeInMilliseconds());
                try {
                    LOG.debug("Sleeping for " + max + " ms since there were no new records in shard " + this.shardInfo.getShardId());
                    Thread.sleep(max);
                } catch (InterruptedException e3) {
                    LOG.debug("ShardId " + this.shardInfo.getShardId() + ": Sleep was interrupted");
                }
            }
        } else {
            metricsScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.Count, MetricsLevel.SUMMARY);
        }
        if (!records.isEmpty() && records.get(0).getClass().equals(Record.class)) {
            records = this.shard != null ? UserRecord.deaggregate(records, new BigInteger(this.shard.getHashKeyRange().getStartingHashKey()), new BigInteger(this.shard.getHashKeyRange().getEndingHashKey())) : UserRecord.deaggregate(records);
        }
        this.recordProcessorCheckpointer.setLargestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(metricsScope, records, this.recordProcessorCheckpointer.getLastCheckpointValue(), this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue()));
        if (!records.isEmpty() || this.streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
            LOG.debug("Calling application processRecords() with " + records.size() + " records from " + this.shardInfo.getShardId());
            ProcessRecordsInput withMillisBehindLatest = new ProcessRecordsInput().withRecords(records).withCheckpointer(this.recordProcessorCheckpointer).withMillisBehindLatest(recordsResult.getMillisBehindLatest());
            long currentTimeMillis2 = System.currentTimeMillis();
            try {
                try {
                    this.recordProcessor.processRecords(withMillisBehindLatest);
                    MetricsHelper.addLatencyPerShard(this.shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, currentTimeMillis2, MetricsLevel.SUMMARY);
                } catch (Exception e4) {
                    LOG.error("ShardId " + this.shardInfo.getShardId() + ": Application processRecords() threw an exception when processing shard ", e4);
                    LOG.error("ShardId " + this.shardInfo.getShardId() + ": Skipping over the following data records: " + records);
                    MetricsHelper.addLatencyPerShard(this.shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, currentTimeMillis2, MetricsLevel.SUMMARY);
                }
            } catch (Throwable th) {
                MetricsHelper.addLatencyPerShard(this.shardInfo.getShardId(), RECORD_PROCESSOR_PROCESS_RECORDS_METRIC, currentTimeMillis2, MetricsLevel.SUMMARY);
                throw th;
            }
        }
        return new TaskResult(runtimeException);
    }

    @Override // com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask
    public TaskType getTaskType() {
        return this.taskType;
    }

    private ExtendedSequenceNumber filterAndGetMaxExtendedSequenceNumber(IMetricsScope iMetricsScope, List<Record> list, ExtendedSequenceNumber extendedSequenceNumber, ExtendedSequenceNumber extendedSequenceNumber2) {
        ExtendedSequenceNumber extendedSequenceNumber3 = extendedSequenceNumber2;
        ListIterator<Record> listIterator = list.listIterator();
        while (listIterator.hasNext()) {
            Record next = listIterator.next();
            ExtendedSequenceNumber extendedSequenceNumber4 = new ExtendedSequenceNumber(next.getSequenceNumber(), next instanceof UserRecord ? Long.valueOf(((UserRecord) next).getSubSequenceNumber()) : null);
            if (extendedSequenceNumber4.compareTo(extendedSequenceNumber) <= 0) {
                listIterator.remove();
                LOG.debug("removing record with ESN " + extendedSequenceNumber4 + " because the ESN is <= checkpoint (" + extendedSequenceNumber + ")");
            } else {
                if (extendedSequenceNumber3 == null || extendedSequenceNumber3.compareTo(extendedSequenceNumber4) < 0) {
                    extendedSequenceNumber3 = extendedSequenceNumber4;
                }
                iMetricsScope.addData(DATA_BYTES_PROCESSED_METRIC, next.getData().limit(), StandardUnit.Bytes, MetricsLevel.SUMMARY);
            }
        }
        return extendedSequenceNumber3;
    }

    private GetRecordsResult getRecordsResult() {
        try {
            return getRecordsResultAndRecordMillisBehindLatest();
        } catch (ExpiredIteratorException e) {
            LOG.info("ShardId " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException - restarting after greatest seqNum passed to customer", e);
            MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1.0d, StandardUnit.Count, MetricsLevel.SUMMARY);
            this.dataFetcher.advanceIteratorTo(this.recordProcessorCheckpointer.getLargestPermittedCheckpointValue().getSequenceNumber(), this.streamConfig.getInitialPositionInStream());
            try {
                return getRecordsResultAndRecordMillisBehindLatest();
            } catch (ExpiredIteratorException e2) {
                LOG.error("Shard " + this.shardInfo.getShardId() + ": getRecords threw ExpiredIteratorException with a fresh iterator.", e2);
                throw e2;
            }
        }
    }

    private GetRecordsResult getRecordsResultAndRecordMillisBehindLatest() {
        GetRecordsResult records = this.dataFetcher.getRecords(this.streamConfig.getMaxRecords());
        if (records == null) {
            return new GetRecordsResult().withRecords(Collections.emptyList());
        }
        if (records.getMillisBehindLatest() != null) {
            MetricsHelper.getMetricsScope().addData(MILLIS_BEHIND_LATEST_METRIC, records.getMillisBehindLatest().longValue(), StandardUnit.Milliseconds, MetricsLevel.SUMMARY);
        }
        return records;
    }
}
