/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.spout;

import java.util.Map;
import java.util.logging.Logger;
import org.apache.storm.spout.CheckPointState;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.StateFactory;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class CheckpointSpout
extends BaseRichSpout {
    public static final String CHECKPOINT_STREAM_ID = "$checkpoint";
    public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout";
    public static final String CHECKPOINT_FIELD_TXID = "txid";
    public static final String CHECKPOINT_FIELD_ACTION = "action";
    private static final Logger LOG = Logger.getLogger(CheckpointSpout.class.getName());
    private static final String TX_STATE_KEY = "__state";
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private long lastCheckpointTs;
    private int checkpointInterval;
    private int sleepInterval;
    private boolean recoveryStepInProgress;
    private boolean checkpointStepInProgress;
    private boolean recovering;
    private KeyValueState<String, CheckPointState> checkpointState;
    private CheckPointState curTxState;

    public static boolean isCheckpoint(Tuple input) {
        return CHECKPOINT_STREAM_ID.equals(input.getSourceStreamId());
    }

    @Override
    public void open(Map<String, Object> conf, TopologyContext ctx, SpoutOutputCollector cllctr) {
        this.open(ctx, cllctr, this.loadCheckpointInterval(conf), this.loadCheckpointState(conf, ctx));
    }

    void open(TopologyContext ctx, SpoutOutputCollector collctr, int checkPInterval, KeyValueState<String, CheckPointState> checkPState) {
        this.context = ctx;
        this.collector = collctr;
        this.checkpointInterval = checkPInterval;
        this.sleepInterval = checkPInterval / 10;
        this.checkpointState = checkPState;
        this.curTxState = checkPState.get(TX_STATE_KEY);
        this.lastCheckpointTs = 0L;
        this.recoveryStepInProgress = false;
        this.checkpointStepInProgress = false;
        this.recovering = true;
    }

    @Override
    public void nextTuple() {
        if (this.shouldRecover()) {
            this.handleRecovery();
            this.startProgress();
        } else if (this.shouldCheckpoint()) {
            this.doCheckpoint();
            this.startProgress();
        } else {
            Utils.sleep(this.sleepInterval);
        }
    }

    @Override
    public void ack(Object msgId) {
        LOG.fine(() -> String.format("Got ack with txid %s, current txState %s", msgId, this.curTxState));
        if (this.curTxState.getTxid() == ((Number)msgId).longValue()) {
            if (this.recovering) {
                this.handleRecoveryAck();
            } else {
                this.handleCheckpointAck();
            }
        } else {
            LOG.warning(() -> String.format("Ack msgid %s, txState.txid %d mismatch", msgId, this.curTxState.getTxid()));
        }
        this.resetProgress();
    }

    @Override
    public void fail(Object msgId) {
        LOG.fine(() -> String.format("Got fail with msgid %s", msgId));
        if (!this.recovering) {
            LOG.fine("Checkpoint failed, will trigger recovery");
            this.recovering = true;
        }
        this.resetProgress();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(CHECKPOINT_STREAM_ID, new Fields(CHECKPOINT_FIELD_TXID, CHECKPOINT_FIELD_ACTION));
    }

    private KeyValueState<String, CheckPointState> loadCheckpointState(Map<String, Object> conf, TopologyContext ctx) {
        String namespace = ctx.getThisComponentId() + "-" + ctx.getThisTaskId();
        KeyValueState state = (KeyValueState)StateFactory.getState(namespace, conf, ctx);
        if (state.get(TX_STATE_KEY) == null) {
            CheckPointState txState = new CheckPointState(-1L, CheckPointState.State.COMMITTED);
            state.put(TX_STATE_KEY, txState);
            state.commit();
            LOG.fine(() -> String.format("Initialized checkpoint spout state with txState %s", txState));
        } else {
            LOG.fine(() -> String.format("Got checkpoint spout state %s", state.get(TX_STATE_KEY)));
        }
        return state;
    }

    private int loadCheckpointInterval(Map<String, Object> topoConf) {
        int interval = 0;
        if (topoConf.containsKey("topology.state.checkpoint.interval.ms")) {
            interval = ((Number)topoConf.get("topology.state.checkpoint.interval.ms")).intValue();
        }
        interval = Math.max(100, interval);
        LOG.info(String.format("Checkpoint interval is %d millis", interval));
        return interval;
    }

    private boolean shouldRecover() {
        return this.recovering && !this.recoveryStepInProgress;
    }

    private boolean shouldCheckpoint() {
        return !this.recovering && !this.checkpointStepInProgress && (this.curTxState.getState() != CheckPointState.State.COMMITTED || this.checkpointIntervalElapsed());
    }

    private boolean checkpointIntervalElapsed() {
        return System.currentTimeMillis() - this.lastCheckpointTs > (long)this.checkpointInterval;
    }

    private void handleRecovery() {
        LOG.fine("In recovery");
        CheckPointState.Action action = this.curTxState.nextAction(true);
        this.emit(this.curTxState.getTxid(), action);
    }

    private void handleRecoveryAck() {
        CheckPointState nextState = this.curTxState.nextState(true);
        if (this.curTxState != nextState) {
            this.saveTxState(nextState);
        } else {
            LOG.fine(() -> String.format("Recovery complete, current state %s", this.curTxState));
            this.recovering = false;
        }
    }

    private void doCheckpoint() {
        LOG.fine("In checkpoint");
        if (this.curTxState.getState() == CheckPointState.State.COMMITTED) {
            this.saveTxState(this.curTxState.nextState(false));
            this.lastCheckpointTs = System.currentTimeMillis();
        }
        CheckPointState.Action action = this.curTxState.nextAction(false);
        this.emit(this.curTxState.getTxid(), action);
    }

    private void handleCheckpointAck() {
        CheckPointState nextState = this.curTxState.nextState(false);
        this.saveTxState(nextState);
    }

    private void emit(long txid, CheckPointState.Action action) {
        LOG.fine(() -> String.format("Current state %s, emitting txid %d, action %s", new Object[]{this.curTxState, txid, action}));
        this.collector.emit(CHECKPOINT_STREAM_ID, new Values(new Object[]{txid, action}), txid);
    }

    private void saveTxState(CheckPointState txState) {
        LOG.fine(() -> String.format("saveTxState, current state %s -> new state %s", this.curTxState, txState));
        this.checkpointState.put(TX_STATE_KEY, txState);
        this.checkpointState.commit();
        this.curTxState = txState;
    }

    private void startProgress() {
        if (this.recovering) {
            this.recoveryStepInProgress = true;
        } else {
            this.checkpointStepInProgress = true;
        }
    }

    private void resetProgress() {
        if (this.recovering) {
            this.recoveryStepInProgress = false;
        } else {
            this.checkpointStepInProgress = false;
        }
    }
}

