/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka;

import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.kafka.FailedMsgRetryManager;
import org.apache.storm.kafka.SpoutConfig;

public class ExponentialBackoffMsgRetryManager
implements FailedMsgRetryManager {
    private long retryInitialDelayMs;
    private double retryDelayMultiplier;
    private long retryDelayMaxMs;
    private int retryLimit;
    private Queue<MessageRetryRecord> waiting;
    private Map<Long, MessageRetryRecord> records;

    @Override
    public void prepare(SpoutConfig spoutConfig, Map stormConf) {
        this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs;
        this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier;
        this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs;
        this.retryLimit = spoutConfig.retryLimit;
        this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
        this.records = new ConcurrentHashMap<Long, MessageRetryRecord>();
    }

    @Override
    public void failed(Long offset) {
        MessageRetryRecord oldRecord = this.records.get(offset);
        MessageRetryRecord newRecord = oldRecord == null ? new MessageRetryRecord(offset) : oldRecord.createNextRetryRecord();
        this.records.put(offset, newRecord);
        this.waiting.add(newRecord);
    }

    @Override
    public void acked(Long offset) {
        MessageRetryRecord record = this.records.remove(offset);
        if (record != null) {
            this.waiting.remove(record);
        }
    }

    @Override
    public void retryStarted(Long offset) {
        MessageRetryRecord record = this.records.get(offset);
        if (record == null || !this.waiting.contains(record)) {
            throw new IllegalStateException("cannot retry a message that has not failed");
        }
        this.waiting.remove(record);
    }

    @Override
    public Long nextFailedMessageToRetry() {
        if (this.waiting.size() > 0) {
            MessageRetryRecord first = this.waiting.peek();
            if (System.currentTimeMillis() >= first.retryTimeUTC) {
                if (this.records.containsKey(first.offset)) {
                    return first.offset;
                }
                this.waiting.remove(first);
                return this.nextFailedMessageToRetry();
            }
        }
        return null;
    }

    @Override
    public boolean shouldReEmitMsg(Long offset) {
        MessageRetryRecord record = this.records.get(offset);
        return record != null && this.waiting.contains(record) && System.currentTimeMillis() >= record.retryTimeUTC;
    }

    @Override
    public boolean retryFurther(Long offset) {
        MessageRetryRecord record = this.records.get(offset);
        return record == null || this.retryLimit <= 0 || this.retryLimit > record.retryNum;
    }

    @Override
    public Set<Long> clearOffsetsBefore(Long kafkaOffset) {
        HashSet<Long> invalidOffsets = new HashSet<Long>();
        for (Long offset : this.records.keySet()) {
            MessageRetryRecord record;
            if (offset >= kafkaOffset || (record = this.records.remove(offset)) == null) continue;
            this.waiting.remove(record);
            invalidOffsets.add(offset);
        }
        return invalidOffsets;
    }

    private static class RetryTimeComparator
    implements Comparator<MessageRetryRecord> {
        private RetryTimeComparator() {
        }

        @Override
        public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
            return Long.valueOf(record1.retryTimeUTC).compareTo(record2.retryTimeUTC);
        }

        @Override
        public boolean equals(Object obj) {
            return false;
        }
    }

    private class MessageRetryRecord {
        private final long offset;
        private final int retryNum;
        private final long retryTimeUTC;

        public MessageRetryRecord(long offset) {
            this(offset, 1);
        }

        private MessageRetryRecord(long offset, int retryNum) {
            this.offset = offset;
            this.retryNum = retryNum;
            this.retryTimeUTC = System.currentTimeMillis() + this.calculateRetryDelay();
        }

        public MessageRetryRecord createNextRetryRecord() {
            return new MessageRetryRecord(this.offset, this.retryNum + 1);
        }

        private long calculateRetryDelay() {
            Long maxLong;
            double delayMultiplier = Math.pow(ExponentialBackoffMsgRetryManager.this.retryDelayMultiplier, this.retryNum - 1);
            double delay = (double)ExponentialBackoffMsgRetryManager.this.retryInitialDelayMs * delayMultiplier;
            long delayThisRetryMs = delay >= (maxLong = Long.valueOf(Long.MAX_VALUE)).doubleValue() ? maxLong : (long)delay;
            return Math.min(delayThisRetryMs, ExponentialBackoffMsgRetryManager.this.retryDelayMaxMs);
        }

        public boolean equals(Object other) {
            return other instanceof MessageRetryRecord && this.offset == ((MessageRetryRecord)other).offset;
        }

        public int hashCode() {
            return Long.valueOf(this.offset).hashCode();
        }
    }
}

