package ai.tripl.arc.load;

import ai.tripl.arc.api.API;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventDataBatch;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.impl.RetryExponential;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.spark.sql.Row;
import org.apache.spark.util.LongAccumulator;
import scala.Serializable;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.runtime.ObjectRef;

/* compiled from: AzureEventHubsLoad.scala */
/* loaded from: input_file:ai/tripl/arc/load/AzureEventHubsLoad$$anonfun$load$2.class */
public final class AzureEventHubsLoad$$anonfun$load$2 extends AbstractFunction1<Iterator<Row>, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final API.AzureEventHubsLoad load$1;
    public final LongAccumulator recordAccumulator$1;
    public final LongAccumulator bytesAccumulator$1;
    public final LongAccumulator batchAccumulator$1;

    public final void apply(Iterator<Row> iterator) {
        ConnectionStringBuilder sasKey = new ConnectionStringBuilder().setNamespaceName(this.load$1.namespaceName()).setEventHubName(this.load$1.eventHubName()).setSasKeyName(this.load$1.sharedAccessSignatureKeyName()).setSasKey(this.load$1.sharedAccessSignatureKey());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        EventHubClient createSync = EventHubClient.createSync(sasKey.toString(), new RetryExponential(Duration.ofSeconds(this.load$1.retryMinBackoff()), Duration.ofSeconds(this.load$1.retryMaxBackoff()), this.load$1.retryCount(), "Custom"), newSingleThreadExecutor);
        ObjectRef create = ObjectRef.create(createSync.createBatch());
        this.batchAccumulator$1.add(1L);
        try {
            iterator.foreach(new AzureEventHubsLoad$$anonfun$load$2$$anonfun$apply$1(this, createSync, create));
            if (((EventDataBatch) create.elem).getSize() > 0) {
                createSync.sendSync((EventDataBatch) create.elem);
            }
        } finally {
            createSync.closeSync();
            newSingleThreadExecutor.shutdown();
        }
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((Iterator<Row>) obj);
        return BoxedUnit.UNIT;
    }

    public AzureEventHubsLoad$$anonfun$load$2(API.AzureEventHubsLoad azureEventHubsLoad, LongAccumulator longAccumulator, LongAccumulator longAccumulator2, LongAccumulator longAccumulator3) {
        this.load$1 = azureEventHubsLoad;
        this.recordAccumulator$1 = longAccumulator;
        this.bytesAccumulator$1 = longAccumulator2;
        this.batchAccumulator$1 = longAccumulator3;
    }
}
