package org.apache.storm.eventhubs.spout;

import java.time.Instant;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/eventhubs/spout/SimplePartitionManager.class */
public class SimplePartitionManager implements IPartitionManager {
    private static final Logger logger = LoggerFactory.getLogger(SimplePartitionManager.class);
    private static final String statePathPrefix = "/eventhubspout";
    protected final IEventHubReceiver receiver;
    protected final EventHubSpoutConfig config;
    private final String partitionId;
    private final IStateStore stateStore;
    protected String lastOffset = FieldConstants.DefaultStartingOffset;
    protected String committedOffset = FieldConstants.DefaultStartingOffset;
    private final String statePath = getPartitionStatePath();

    public SimplePartitionManager(EventHubSpoutConfig eventHubSpoutConfig, String str, IStateStore iStateStore, IEventHubReceiver iEventHubReceiver) {
        this.receiver = iEventHubReceiver;
        this.config = eventHubSpoutConfig;
        this.partitionId = str;
        this.stateStore = iStateStore;
    }

    @Override // org.apache.storm.eventhubs.spout.IPartitionManager
    public void open() throws Exception {
        String readData = this.stateStore.readData(this.statePath);
        logger.info("read offset from state store: " + readData);
        if (readData == null) {
            readData = FieldConstants.DefaultStartingOffset;
        }
        this.receiver.open((!readData.equals(FieldConstants.DefaultStartingOffset) || this.config.getEnqueueTimeFilter() == 0) ? new EventHubFilter(readData) : new EventHubFilter(Instant.ofEpochMilli(this.config.getEnqueueTimeFilter())));
    }

    @Override // org.apache.storm.eventhubs.spout.IPartitionManager
    public void close() {
        this.receiver.close();
        checkpoint();
    }

    @Override // org.apache.storm.eventhubs.spout.IPartitionManager
    public void checkpoint() {
        String completedOffset = getCompletedOffset();
        if (this.committedOffset.equals(completedOffset)) {
            return;
        }
        logger.info("saving state " + completedOffset);
        this.stateStore.saveData(this.statePath, completedOffset);
        this.committedOffset = completedOffset;
    }

    protected String getCompletedOffset() {
        return this.lastOffset;
    }

    @Override // org.apache.storm.eventhubs.spout.IPartitionManager
    public EventDataWrap receive() {
        EventDataWrap receive = this.receiver.receive();
        if (receive != null) {
            this.lastOffset = receive.getEventData().getSystemProperties().getOffset();
        }
        return receive;
    }

    @Override // org.apache.storm.eventhubs.spout.IPartitionManager
    public void ack(String str) {
    }

    @Override // org.apache.storm.eventhubs.spout.IPartitionManager
    public void fail(String str) {
        logger.warn("fail on " + str);
    }

    private String getPartitionStatePath() {
        String str = "/eventhubspout/" + this.config.getTopologyName() + "/" + this.config.getNamespace() + "/" + this.config.getEntityPath() + "/partitions/" + this.partitionId;
        logger.info("partition state path: " + str);
        return str;
    }

    @Override // org.apache.storm.eventhubs.spout.IPartitionManager
    public Map<String, Object> getMetricsData() {
        return this.receiver.getMetricsData();
    }
}
