package io.openlineage.client.transports;

import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.UserRecord;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import datahub.spark2.shaded.o.a.c.lang3.StringUtils;
import datahub.spark2.shaded.org.slf4j.Logger;
import datahub.spark2.shaded.org.slf4j.LoggerFactory;
import io.acryl.shaded.com.google.common.util.concurrent.FutureCallback;
import io.acryl.shaded.com.google.common.util.concurrent.Futures;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import io.openlineage.client.transports.Transport;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import lombok.NonNull;

/* loaded from: input_file:io/openlineage/client/transports/KinesisTransport.class */
public class KinesisTransport extends Transport {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KinesisTransport.class);
    private final String streamName;
    private final String region;
    private final String roleArn;
    private final KinesisProducer producer;
    private final Executor listeningExecutor;

    public KinesisTransport(@NonNull KinesisProducer kinesisProducer, @NonNull KinesisConfig kinesisConfig) {
        super(Transport.Type.KINESIS);
        if (kinesisProducer == null) {
            throw new NullPointerException("kinesisProducer is marked non-null but is null");
        }
        if (kinesisConfig == null) {
            throw new NullPointerException("kinesisConfig is marked non-null but is null");
        }
        this.streamName = kinesisConfig.getStreamName();
        this.region = kinesisConfig.getRegion();
        this.roleArn = kinesisConfig.getRoleArn();
        this.producer = kinesisProducer;
        this.listeningExecutor = Executors.newSingleThreadExecutor();
    }

    public KinesisTransport(@NonNull KinesisConfig kinesisConfig) {
        super(Transport.Type.KINESIS);
        if (kinesisConfig == null) {
            throw new NullPointerException("kinesisConfig is marked non-null but is null");
        }
        this.streamName = kinesisConfig.getStreamName();
        this.region = kinesisConfig.getRegion();
        this.roleArn = kinesisConfig.getRoleArn();
        KinesisProducerConfiguration fromProperties = KinesisProducerConfiguration.fromProperties(kinesisConfig.getProperties());
        fromProperties.setRegion(this.region);
        if (StringUtils.isNotBlank(this.roleArn)) {
            fromProperties.setCredentialsProvider(new STSAssumeRoleSessionCredentialsProvider.Builder(this.roleArn, "OLProducer").build());
        }
        this.producer = new KinesisProducer(fromProperties);
        this.listeningExecutor = Executors.newSingleThreadExecutor();
    }

    @Override // io.openlineage.client.transports.Transport
    public void emit(@NonNull OpenLineage.RunEvent runEvent) {
        if (runEvent == null) {
            throw new NullPointerException("runEvent is marked non-null but is null");
        }
        final String json = OpenLineageClientUtils.toJson(runEvent);
        Futures.addCallback(this.producer.addUserRecord(new UserRecord(this.streamName, runEvent.getJob().getNamespace() + ":" + runEvent.getJob().getName(), ByteBuffer.wrap(json.getBytes()))), new FutureCallback<UserRecordResult>() { // from class: io.openlineage.client.transports.KinesisTransport.1
            @Override // io.acryl.shaded.com.google.common.util.concurrent.FutureCallback
            public void onSuccess(UserRecordResult userRecordResult) {
                KinesisTransport.log.debug("Success to send to Kinesis lineage event: {}", json);
            }

            @Override // io.acryl.shaded.com.google.common.util.concurrent.FutureCallback
            public void onFailure(Throwable th) {
                KinesisTransport.log.error("Failed to send to Kinesis lineage event: {}", json, th);
            }
        }, this.listeningExecutor);
    }
}
