package org.eclipse.hono.client.pubsub.publisher;

import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/hono/client/pubsub/publisher/PubSubPublisherClientImpl.class */
public final class PubSubPublisherClientImpl implements PubSubPublisherClient {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubPublisherClientImpl.class);
    private final Vertx vertx;
    private Publisher publisher;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubSubPublisherClientImpl(Vertx vertx, String str, String str2, CredentialsProvider credentialsProvider) throws ClientErrorException {
        this.vertx = (Vertx) Objects.requireNonNull(vertx);
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        try {
            Publisher.Builder enableMessageOrdering = Publisher.newBuilder(TopicName.of(str, str2)).setEnableMessageOrdering(true);
            Optional ofNullable = Optional.ofNullable(credentialsProvider);
            Objects.requireNonNull(enableMessageOrdering);
            ofNullable.ifPresent(enableMessageOrdering::setCredentialsProvider);
            this.publisher = enableMessageOrdering.build();
        } catch (IOException e) {
            this.publisher = null;
            LOG.warn("error initializing publisher client", e);
            throw new ClientErrorException(503, "failed to create publisher for Pub/Sub", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.vertx.executeBlocking(promise -> {
            if (this.publisher == null) {
                promise.complete();
                return;
            }
            try {
                this.publisher.shutdown();
                this.publisher.awaitTermination(5L, TimeUnit.SECONDS);
                promise.complete();
            } catch (InterruptedException e) {
                LOG.debug("timed out waiting for shut down of publisher", e);
                Thread.currentThread().interrupt();
                promise.fail(e);
            }
        }, false);
    }

    @Override // org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClient
    public Future<String> publish(PubsubMessage pubsubMessage) {
        final Promise promise = Promise.promise();
        final Context orCreateContext = this.vertx.getOrCreateContext();
        ApiFutures.addCallback(this.publisher.publish(pubsubMessage), new ApiFutureCallback<String>() { // from class: org.eclipse.hono.client.pubsub.publisher.PubSubPublisherClientImpl.1
            public void onSuccess(String str) {
                Context context = orCreateContext;
                Promise promise2 = promise;
                context.runOnContext(r5 -> {
                    promise2.complete(str);
                });
            }

            public void onFailure(Throwable th) {
                Context context = orCreateContext;
                Promise promise2 = promise;
                context.runOnContext(r8 -> {
                    PubSubPublisherClientImpl.LOG.debug("error publishing messages to Pub/Sub", th);
                    promise2.fail(new ServerErrorException(503, th));
                });
            }
        }, MoreExecutors.directExecutor());
        return promise.future();
    }
}
