package org.apache.storm.eventhubs.spout;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.storm.metric.api.CountMetric;
import org.apache.storm.metric.api.MeanReducer;
import org.apache.storm.metric.api.ReducedMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/eventhubs/spout/EventHubReceiverImpl.class */
public class EventHubReceiverImpl implements IEventHubReceiver {
    private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
    private final String connectionString;
    private final String entityName;
    private final String partitionId;
    private final String consumerGroupName;
    private final int receiverTimeoutInMillis;
    private PartitionReceiver receiver;
    private EventHubClient ehClient;
    private ReducedMetric receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
    private CountMetric receiveApiCallCount = new CountMetric();
    private CountMetric receiveMessageCount = new CountMetric();

    public EventHubReceiverImpl(EventHubSpoutConfig eventHubSpoutConfig, String str) {
        this.connectionString = eventHubSpoutConfig.getConnectionString();
        this.entityName = eventHubSpoutConfig.getEntityPath();
        this.partitionId = str;
        this.consumerGroupName = eventHubSpoutConfig.getConsumerGroupName();
        this.receiverTimeoutInMillis = eventHubSpoutConfig.getReceiverTimeoutInMillis();
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public void open(IEventFilter iEventFilter) throws EventHubException {
        logger.info(new StringBuilder().append("creating eventhub receiver: partitionId=").append(this.partitionId).append(", filter=").append(iEventFilter.getOffset()).toString() != null ? iEventFilter.getOffset() : Long.toString(iEventFilter.getTime().toEpochMilli()));
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.ehClient = EventHubClient.createFromConnectionStringSync(this.connectionString);
            if (iEventFilter.getOffset() != null) {
                this.receiver = this.ehClient.createEpochReceiverSync(this.consumerGroupName, this.partitionId, iEventFilter.getOffset(), false, 1L);
            } else {
                if (iEventFilter.getTime() == null) {
                    throw new RuntimeException("Eventhub receiver must have an offset or time to be created");
                }
                this.receiver = this.ehClient.createEpochReceiverSync(this.consumerGroupName, this.partitionId, iEventFilter.getTime(), 1L);
            }
            if (this.receiver != null) {
                this.receiver.setReceiveTimeout(Duration.ofMillis(this.receiverTimeoutInMillis));
            }
            logger.info("created eventhub receiver, time taken(ms): " + (System.currentTimeMillis() - currentTimeMillis));
        } catch (IOException e) {
            logger.error("Exception in creating ehclient" + e.toString());
            throw new EventHubException(e);
        } catch (ServiceBusException e2) {
            logger.error("Exception in creating Receiver" + e2.toString());
            throw new EventHubException((Throwable) e2);
        }
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public void close() {
        if (this.receiver != null) {
            try {
                this.receiver.close().whenComplete((r5, th) -> {
                    if (th != null) {
                        try {
                            logger.error("Exception during receiver close phase" + th.toString());
                        } catch (Exception e) {
                            logger.error("Exception during ehclient close phase" + e.toString());
                            return;
                        }
                    }
                    this.ehClient.closeSync();
                }).get();
            } catch (InterruptedException e) {
                logger.error("Exception occured during close phase" + e.toString());
            } catch (ExecutionException e2) {
                logger.error("Exception occured during close phase" + e2.toString());
            }
            logger.info("closed eventhub receiver: partitionId=" + this.partitionId);
            this.receiver = null;
            this.ehClient = null;
        }
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public boolean isOpen() {
        return this.receiver != null;
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public EventDataWrap receive() {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Iterable receiveSync = this.receiver.receiveSync(1);
            this.receiveApiLatencyMean.update(Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            this.receiveApiCallCount.incr();
            if (receiveSync == null || receiveSync.spliterator().getExactSizeIfKnown() == 0) {
                return null;
            }
            this.receiveMessageCount.incr();
            EventData eventData = (EventData) receiveSync.iterator().next();
            return EventDataWrap.create(eventData, new MessageId(this.partitionId, eventData.getSystemProperties().getOffset(), eventData.getSystemProperties().getSequenceNumber()));
        } catch (ServiceBusException e) {
            logger.error("Exception occured during receive" + e.toString());
            return null;
        }
    }

    @Override // org.apache.storm.eventhubs.spout.IEventHubReceiver
    public Map<String, Object> getMetricsData() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.partitionId + "/receiveApiLatencyMean", this.receiveApiLatencyMean.getValueAndReset());
        hashMap.put(this.partitionId + "/receiveApiCallCount", this.receiveApiCallCount.getValueAndReset());
        hashMap.put(this.partitionId + "/receiveMessageCount", this.receiveMessageCount.getValueAndReset());
        return hashMap;
    }
}
