package org.apache.storm.eventhubs.bolt;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.storm.eventhubs.spout.EventHubException;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/eventhubs/bolt/EventHubBolt.class */
public class EventHubBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1;
    private static final Logger logger = LoggerFactory.getLogger(EventHubBolt.class);
    protected OutputCollector collector;
    protected PartitionSender sender;
    protected EventHubClient ehClient;
    protected EventHubBoltConfig boltConfig;

    public EventHubBolt(String str, String str2) {
        this.boltConfig = new EventHubBoltConfig(str, str2);
    }

    public EventHubBolt(String str, String str2, String str3, String str4, boolean z) {
        this.boltConfig = new EventHubBoltConfig(str, str2, str3, str4, z);
    }

    public EventHubBolt(EventHubBoltConfig eventHubBoltConfig) {
        this.boltConfig = eventHubBoltConfig;
    }

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        String str = null;
        if (this.boltConfig.getPartitionMode()) {
            str = "" + topologyContext.getThisTaskIndex();
        }
        logger.info("creating sender: " + this.boltConfig.getConnectionString() + ", " + this.boltConfig.getEntityPath() + ", " + str);
        try {
            this.ehClient = EventHubClient.createFromConnectionStringSync(this.boltConfig.getConnectionString());
            if (this.boltConfig.getPartitionMode()) {
                this.sender = this.ehClient.createPartitionSenderSync(Integer.toString(topologyContext.getThisTaskIndex()));
            }
        } catch (Exception e) {
            outputCollector.reportError(e);
            throw new RuntimeException(e);
        }
    }

    public void execute(Tuple tuple) {
        try {
            EventData eventData = new EventData(this.boltConfig.getEventDataFormat().serialize(tuple));
            if (this.boltConfig.getPartitionMode() && this.sender != null) {
                this.sender.sendSync(eventData);
            } else {
                if (this.boltConfig.getPartitionMode() && this.sender == null) {
                    throw new EventHubException("Sender is null");
                }
                if (!this.boltConfig.getPartitionMode() && this.ehClient != null) {
                    this.ehClient.sendSync(eventData);
                } else if (!this.boltConfig.getPartitionMode() && this.ehClient == null) {
                    throw new EventHubException("ehclient is null");
                }
            }
            this.collector.ack(tuple);
        } catch (ServiceBusException e) {
            this.collector.reportError(e);
            this.collector.fail(tuple);
        } catch (EventHubException e2) {
            this.collector.reportError(e2);
            this.collector.fail(tuple);
        }
    }

    public void cleanup() {
        if (this.sender != null) {
            try {
                this.sender.close().whenComplete((r5, th) -> {
                    if (th != null) {
                        try {
                            logger.error("Exception during sender cleanup phase" + th.toString());
                        } catch (Exception e) {
                            logger.error("Exception during ehclient cleanup phase" + e.toString());
                            return;
                        }
                    }
                    this.ehClient.closeSync();
                }).get();
            } catch (InterruptedException e) {
                logger.error("Exception occured during cleanup phase" + e.toString());
            } catch (ExecutionException e2) {
                logger.error("Exception occured during cleanup phase" + e2.toString());
            }
            logger.info("Eventhub Bolt cleaned up");
            this.sender = null;
            this.ehClient = null;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}
