package org.apache.storm.eventhubs.trident;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.storm.eventhubs.spout.EventDataWrap;
import org.apache.storm.eventhubs.spout.EventHubReceiverImpl;
import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
import org.apache.storm.eventhubs.spout.FieldConstants;
import org.apache.storm.eventhubs.spout.IEventHubReceiver;
import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IPartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.class */
public class TransactionalTridentEventHubEmitter implements IPartitionedTridentSpout.Emitter<Partitions, Partition, Map<String, Object>> {
    private static final Logger logger = LoggerFactory.getLogger(TransactionalTridentEventHubEmitter.class);
    private final int batchSize;
    private final EventHubSpoutConfig spoutConfig;
    private Map<String, ITridentPartitionManager> pmMap;
    private ITridentPartitionManagerFactory pmFactory;
    private IEventHubReceiverFactory recvFactory;

    public TransactionalTridentEventHubEmitter(EventHubSpoutConfig eventHubSpoutConfig) {
        this(eventHubSpoutConfig, eventHubSpoutConfig.getReceiverCredits(), null, null);
    }

    public TransactionalTridentEventHubEmitter(final EventHubSpoutConfig eventHubSpoutConfig, int i, ITridentPartitionManagerFactory iTridentPartitionManagerFactory, IEventHubReceiverFactory iEventHubReceiverFactory) {
        this.spoutConfig = eventHubSpoutConfig;
        this.batchSize = i;
        this.pmFactory = iTridentPartitionManagerFactory;
        this.recvFactory = iEventHubReceiverFactory;
        this.pmMap = new HashMap();
        if (this.pmFactory == null) {
            this.pmFactory = new ITridentPartitionManagerFactory() { // from class: org.apache.storm.eventhubs.trident.TransactionalTridentEventHubEmitter.1
                @Override // org.apache.storm.eventhubs.trident.ITridentPartitionManagerFactory
                public ITridentPartitionManager create(IEventHubReceiver iEventHubReceiver) {
                    return new TridentPartitionManager(eventHubSpoutConfig, iEventHubReceiver);
                }
            };
        }
        if (this.recvFactory == null) {
            this.recvFactory = new IEventHubReceiverFactory() { // from class: org.apache.storm.eventhubs.trident.TransactionalTridentEventHubEmitter.2
                @Override // org.apache.storm.eventhubs.spout.IEventHubReceiverFactory
                public IEventHubReceiver create(EventHubSpoutConfig eventHubSpoutConfig2, String str) {
                    return new EventHubReceiverImpl(eventHubSpoutConfig2, str);
                }
            };
        }
    }

    public void close() {
        Iterator<ITridentPartitionManager> it = this.pmMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private ITridentPartitionManager getOrCreatePartitionManager(Partition partition) {
        ITridentPartitionManager iTridentPartitionManager;
        if (this.pmMap.containsKey(partition.getId())) {
            iTridentPartitionManager = this.pmMap.get(partition.getId());
        } else {
            iTridentPartitionManager = this.pmFactory.create(this.recvFactory.create(this.spoutConfig, partition.getId()));
            this.pmMap.put(partition.getId(), iTridentPartitionManager);
        }
        return iTridentPartitionManager;
    }

    public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
        String str = (String) map.get(FieldConstants.Offset);
        int parseInt = Integer.parseInt((String) map.get("count"));
        logger.info("re-emit for partition " + partition.getId() + ", offset=" + str + ", count=" + parseInt);
        List<EventDataWrap> receiveBatch = getOrCreatePartitionManager(partition).receiveBatch(str, parseInt);
        if (receiveBatch.size() != parseInt) {
            logger.error("failed to refetch eventhub messages, new count=" + receiveBatch.size());
            return;
        }
        Iterator<EventDataWrap> it = receiveBatch.iterator();
        while (it.hasNext()) {
            tridentCollector.emit(this.spoutConfig.getEventDataScheme().deserialize(it.next().getEventData()));
        }
    }

    public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map<String, Object> map) {
        ITridentPartitionManager orCreatePartitionManager = getOrCreatePartitionManager(partition);
        String str = FieldConstants.DefaultStartingOffset;
        if (map != null && map.containsKey("nextOffset")) {
            str = (String) map.get("nextOffset");
        }
        String str2 = str;
        List<EventDataWrap> receiveBatch = orCreatePartitionManager.receiveBatch(str, this.batchSize);
        for (EventDataWrap eventDataWrap : receiveBatch) {
            str2 = eventDataWrap.getMessageId().getOffset();
            tridentCollector.emit(this.spoutConfig.getEventDataScheme().deserialize(eventDataWrap.getEventData()));
        }
        HashMap hashMap = new HashMap();
        hashMap.put(FieldConstants.Offset, str);
        hashMap.put("nextOffset", str2);
        hashMap.put("count", "" + receiveBatch.size());
        return hashMap;
    }

    public List<Partition> getOrderedPartitions(Partitions partitions) {
        return partitions.getPartitions();
    }

    public void refreshPartitions(List<Partition> list) {
    }
}
