package org.apache.storm.kinesis.spout;

import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.storm.spout.SpoutOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kinesis/spout/KinesisRecordsManager.class */
class KinesisRecordsManager {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
    private transient ZKConnection zkConnection;
    private transient KinesisConnection kinesisConnection;
    private final transient KinesisConfig kinesisConfig;
    private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap();
    private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap();
    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap();
    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap();
    private transient Map<String, TreeSet<BigInteger>> failedPerShard = new HashMap();
    private transient Map<String, String> shardIteratorPerShard = new HashMap();
    private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap();
    private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap();
    private transient long lastCommitTime;
    private transient boolean deactivated;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KinesisRecordsManager(KinesisConfig kinesisConfig) {
        this.kinesisConfig = kinesisConfig;
        this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
        this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize(int i, int i2) {
        this.deactivated = false;
        this.lastCommitTime = System.currentTimeMillis();
        this.kinesisConnection.initialize();
        this.zkConnection.initialize();
        List<Shard> shardsForStream = this.kinesisConnection.getShardsForStream(this.kinesisConfig.getStreamName());
        LOG.info("myTaskIndex is " + i);
        LOG.info("totalTasks is " + i2);
        int i3 = i;
        while (true) {
            int i4 = i3;
            if (i4 >= shardsForStream.size()) {
                initializeFetchedSequenceNumbers();
                refreshShardIteratorsForNewRecords();
                return;
            } else {
                LOG.info("Shard id " + shardsForStream.get(i4).getShardId() + " assigned to task " + i);
                this.toEmitPerShard.put(shardsForStream.get(i4).getShardId(), new LinkedList<>());
                i3 = i4 + i2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void next(SpoutOutputCollector spoutOutputCollector) {
        if (shouldCommit()) {
            commit();
        }
        KinesisMessageId nextFailedMessageToRetry = this.kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry();
        if (nextFailedMessageToRetry != null) {
            BigInteger bigInteger = new BigInteger(nextFailedMessageToRetry.getSequenceNumber());
            if (this.failedPerShard.containsKey(nextFailedMessageToRetry.getShardId()) && this.failedPerShard.get(nextFailedMessageToRetry.getShardId()).contains(bigInteger)) {
                if (!this.failedandFetchedRecords.containsKey(nextFailedMessageToRetry)) {
                    fetchFailedRecords(nextFailedMessageToRetry);
                }
                if (emitFailedRecord(spoutOutputCollector, nextFailedMessageToRetry)) {
                    this.failedPerShard.get(nextFailedMessageToRetry.getShardId()).remove(bigInteger);
                    this.kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(nextFailedMessageToRetry);
                    return;
                }
                LOG.warn("failedMessageEmitted not called on retrier for " + nextFailedMessageToRetry + ". This can happen a few times but should not happen infinitely");
            } else {
                LOG.warn("failedPerShard does not contain " + nextFailedMessageToRetry + ". This should never happen.");
            }
        }
        LOG.debug("No failed record to emit for now. Hence will try to emit new records");
        if (getUncommittedRecordsCount().longValue() >= this.kinesisConfig.getMaxUncommittedRecords().longValue()) {
            LOG.warn("maximum uncommitted records count has reached. so not emitting any new records and returning");
        } else {
            if (this.toEmitPerShard.isEmpty()) {
                LOG.warn("No shard is assigned to this task. Hence not emitting any tuple.");
                return;
            }
            if (shouldFetchNewRecords()) {
                fetchNewRecords();
            }
            emitNewRecord(spoutOutputCollector);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ack(KinesisMessageId kinesisMessageId) {
        String shardId = kinesisMessageId.getShardId();
        BigInteger bigInteger = new BigInteger(kinesisMessageId.getSequenceNumber());
        LOG.debug("Ack received for shardId: {} sequenceNumber: {}", shardId, bigInteger);
        if (!this.ackedPerShard.containsKey(shardId)) {
            this.ackedPerShard.put(shardId, new TreeSet<>());
        }
        this.ackedPerShard.get(shardId).add(bigInteger);
        if (this.emittedPerShard.containsKey(shardId)) {
            this.emittedPerShard.get(shardId).remove(bigInteger);
        }
        if (this.failedPerShard.containsKey(shardId)) {
            this.failedPerShard.get(shardId).remove(bigInteger);
        }
        if (this.failedandFetchedRecords.containsKey(kinesisMessageId)) {
            this.kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
            this.failedandFetchedRecords.remove(kinesisMessageId);
        }
        if (this.deactivated) {
            commit();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void fail(KinesisMessageId kinesisMessageId) {
        String shardId = kinesisMessageId.getShardId();
        BigInteger bigInteger = new BigInteger(kinesisMessageId.getSequenceNumber());
        LOG.debug("Fail received for shardId: {} sequenceNumber: {}", shardId, bigInteger);
        if (this.kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
            if (!this.failedPerShard.containsKey(shardId)) {
                this.failedPerShard.put(shardId, new TreeSet<>());
            }
            this.failedPerShard.get(shardId).add(bigInteger);
            this.emittedPerShard.get(shardId).remove(bigInteger);
        } else {
            ack(kinesisMessageId);
        }
        if (this.deactivated) {
            commit();
        }
    }

    void commit() {
        for (String str : this.toEmitPerShard.keySet()) {
            if (this.ackedPerShard.containsKey(str)) {
                BigInteger bigInteger = null;
                if (this.failedPerShard.containsKey(str) && !this.failedPerShard.get(str).isEmpty()) {
                    bigInteger = this.failedPerShard.get(str).first();
                }
                if (this.emittedPerShard.containsKey(str) && !this.emittedPerShard.get(str).isEmpty()) {
                    BigInteger first = this.emittedPerShard.get(str).first();
                    if (bigInteger == null || bigInteger.compareTo(first) == 1) {
                        bigInteger = first;
                    }
                }
                Iterator<BigInteger> it = this.ackedPerShard.get(str).iterator();
                BigInteger bigInteger2 = null;
                while (it.hasNext()) {
                    BigInteger next = it.next();
                    if (bigInteger != null && bigInteger.compareTo(next) != 1) {
                        break;
                    }
                    bigInteger2 = next;
                    it.remove();
                }
                if (bigInteger2 != null) {
                    HashMap hashMap = new HashMap();
                    hashMap.put("committedSequenceNumber", bigInteger2.toString());
                    LOG.debug("Committing sequence number {} for shardId {}", bigInteger2.toString(), str);
                    this.zkConnection.commitState(this.kinesisConfig.getStreamName(), str, hashMap);
                }
            }
        }
        this.lastCommitTime = System.currentTimeMillis();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void activate() {
        LOG.info("Activate called");
        this.deactivated = false;
        this.kinesisConnection.initialize();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deactivate() {
        LOG.info("Deactivate called");
        this.deactivated = true;
        commit();
        this.kinesisConnection.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        commit();
        this.kinesisConnection.shutdown();
        this.zkConnection.shutdown();
    }

    private void fetchFailedRecords(KinesisMessageId kinesisMessageId) {
        if (!this.shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
            refreshShardIteratorForFailedRecord(kinesisMessageId);
        }
        String str = this.shardIteratorPerFailedMessage.get(kinesisMessageId);
        LOG.debug("Fetching failed records for shard id :{} at sequence number {} using shardIterator {}", new Object[]{kinesisMessageId.getShardId(), kinesisMessageId.getSequenceNumber(), str});
        try {
            GetRecordsResult fetchRecords = this.kinesisConnection.fetchRecords(str);
            if (fetchRecords != null) {
                List<Record> records = fetchRecords.getRecords();
                LOG.debug("Records size from fetchFailedRecords is {}", Integer.valueOf(records.size()));
                this.shardIteratorPerFailedMessage.put(kinesisMessageId, fetchRecords.getNextShardIterator());
                if (records.size() == 0) {
                    LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
                    Thread.sleep(1000L);
                } else {
                    for (Record record : records) {
                        KinesisMessageId kinesisMessageId2 = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
                        if (this.failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(kinesisMessageId2.getSequenceNumber()))) {
                            this.failedandFetchedRecords.put(kinesisMessageId2, record);
                            this.shardIteratorPerFailedMessage.remove(kinesisMessageId2);
                        }
                    }
                }
            }
        } catch (ExpiredIteratorException e) {
            LOG.warn("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
            refreshShardIteratorForFailedRecord(kinesisMessageId);
        } catch (ProvisionedThroughputExceededException e2) {
            try {
                LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", e2);
                Thread.sleep(1000L);
            } catch (InterruptedException e3) {
                LOG.warn("Thread interrupted exception", e3);
            }
        } catch (InterruptedException e4) {
            LOG.warn("Thread interrupted while sleeping", e4);
        }
    }

    private void fetchNewRecords() {
        for (Map.Entry<String, LinkedList<Record>> entry : this.toEmitPerShard.entrySet()) {
            String key = entry.getKey();
            try {
                String str = this.shardIteratorPerShard.get(key);
                LOG.debug("Fetching new records for shard id :{} using shardIterator {} after sequence number {}", new Object[]{key, str, this.fetchedSequenceNumberPerShard.get(key)});
                GetRecordsResult fetchRecords = this.kinesisConnection.fetchRecords(str);
                if (fetchRecords != null) {
                    List records = fetchRecords.getRecords();
                    LOG.debug("Records size from fetchNewRecords is {}", Integer.valueOf(records.size()));
                    this.shardIteratorPerShard.put(key, fetchRecords.getNextShardIterator());
                    if (records.size() == 0) {
                        LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
                        Thread.sleep(1000L);
                    } else {
                        entry.getValue().addAll(records);
                        this.fetchedSequenceNumberPerShard.put(key, ((Record) records.get(records.size() - 1)).getSequenceNumber());
                    }
                }
            } catch (InterruptedException e) {
                LOG.warn("Thread interrupted while sleeping", e);
            } catch (ProvisionedThroughputExceededException e2) {
                try {
                    LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", e2);
                    Thread.sleep(1000L);
                } catch (InterruptedException e3) {
                    LOG.warn("Thread interrupted exception", e3);
                }
            } catch (ExpiredIteratorException e4) {
                LOG.warn("shardIterator for shardId " + key + " has expired. Refreshing shardIterator");
                refreshShardIteratorForNewRecords(key);
            }
        }
    }

    private void emitNewRecord(SpoutOutputCollector spoutOutputCollector) {
        Record pollFirst;
        for (Map.Entry<String, LinkedList<Record>> entry : this.toEmitPerShard.entrySet()) {
            String key = entry.getKey();
            LinkedList<Record> value = entry.getValue();
            do {
                pollFirst = value.pollFirst();
                if (pollFirst != null) {
                }
            } while (!emitRecord(spoutOutputCollector, pollFirst, new KinesisMessageId(this.kinesisConfig.getStreamName(), key, pollFirst.getSequenceNumber())));
            return;
        }
    }

    private boolean emitFailedRecord(SpoutOutputCollector spoutOutputCollector, KinesisMessageId kinesisMessageId) {
        if (this.failedandFetchedRecords.containsKey(kinesisMessageId)) {
            return emitRecord(spoutOutputCollector, this.failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
        }
        return false;
    }

    private boolean emitRecord(SpoutOutputCollector spoutOutputCollector, Record record, KinesisMessageId kinesisMessageId) {
        boolean z = false;
        List<Object> tuple = this.kinesisConfig.getRecordToTupleMapper().getTuple(record);
        if (tuple == null || tuple.size() <= 0) {
            LOG.warn("Record " + record + " did not return a tuple to emit. Hence acking it");
            ack(kinesisMessageId);
        } else {
            spoutOutputCollector.emit(tuple, kinesisMessageId);
            if (!this.emittedPerShard.containsKey(kinesisMessageId.getShardId())) {
                this.emittedPerShard.put(kinesisMessageId.getShardId(), new TreeSet<>());
            }
            this.emittedPerShard.get(kinesisMessageId.getShardId()).add(new BigInteger(record.getSequenceNumber()));
            z = true;
        }
        return z;
    }

    private boolean shouldCommit() {
        return System.currentTimeMillis() - this.lastCommitTime >= this.kinesisConfig.getZkInfo().getCommitIntervalMs().longValue();
    }

    private void initializeFetchedSequenceNumbers() {
        for (String str : this.toEmitPerShard.keySet()) {
            Map<Object, Object> readState = this.zkConnection.readState(this.kinesisConfig.getStreamName(), str);
            if (readState != null) {
                Object obj = readState.get("committedSequenceNumber");
                LOG.info("State read is committedSequenceNumber: " + obj + " shardId:" + str);
                if (obj != null) {
                    this.fetchedSequenceNumberPerShard.put(str, (String) obj);
                }
            }
        }
    }

    private void refreshShardIteratorsForNewRecords() {
        Iterator<String> it = this.toEmitPerShard.keySet().iterator();
        while (it.hasNext()) {
            refreshShardIteratorForNewRecords(it.next());
        }
    }

    private void refreshShardIteratorForNewRecords(String str) {
        String str2 = this.fetchedSequenceNumberPerShard.get(str);
        String shardIterator = this.kinesisConnection.getShardIterator(this.kinesisConfig.getStreamName(), str, str2 == null ? this.kinesisConfig.getShardIteratorType() : ShardIteratorType.AFTER_SEQUENCE_NUMBER, str2, this.kinesisConfig.getTimestamp());
        if (shardIterator == null || shardIterator.isEmpty()) {
            return;
        }
        LOG.warn("Refreshing shard iterator for new records for shardId " + str + " with shardIterator " + shardIterator);
        this.shardIteratorPerShard.put(str, shardIterator);
    }

    private void refreshShardIteratorForFailedRecord(KinesisMessageId kinesisMessageId) {
        String shardIterator = this.kinesisConnection.getShardIterator(this.kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
        if (shardIterator == null || shardIterator.isEmpty()) {
            return;
        }
        LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
        this.shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
    }

    private Long getUncommittedRecordsCount() {
        Long l = 0L;
        Iterator<Map.Entry<String, TreeSet<BigInteger>>> it = this.emittedPerShard.entrySet().iterator();
        while (it.hasNext()) {
            l = Long.valueOf(l.longValue() + it.next().getValue().size());
        }
        Iterator<Map.Entry<String, TreeSet<BigInteger>>> it2 = this.ackedPerShard.entrySet().iterator();
        while (it2.hasNext()) {
            l = Long.valueOf(l.longValue() + it2.next().getValue().size());
        }
        Iterator<Map.Entry<String, TreeSet<BigInteger>>> it3 = this.failedPerShard.entrySet().iterator();
        while (it3.hasNext()) {
            l = Long.valueOf(l.longValue() + it3.next().getValue().size());
        }
        LOG.debug("Returning uncommittedRecordsCount as {}", l);
        return l;
    }

    private boolean shouldFetchNewRecords() {
        boolean z = true;
        Iterator<Map.Entry<String, LinkedList<Record>>> it = this.toEmitPerShard.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().getValue().isEmpty()) {
                z = false;
                break;
            }
        }
        return z;
    }
}
