package io.axoniq.eventstore.client.util;

import io.axoniq.eventstore.grpc.NodeInfo;
import io.axoniq.ext.io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/eventstore/client/util/Broadcaster.class */
public class Broadcaster<T> {
    private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
    private final Collection<NodeInfo> destinations;
    private final Action<T> action;
    private final Consumer<T> onNextCallback;

    /* loaded from: input_file:io/axoniq/eventstore/client/util/Broadcaster$Action.class */
    public interface Action<T> {
        void perform(NodeInfo nodeInfo, StreamObserver<T> streamObserver);
    }

    public Broadcaster(Collection<NodeInfo> collection, Action<T> action, Consumer<T> consumer) {
        this.destinations = collection;
        this.action = action;
        this.onNextCallback = consumer;
    }

    public void broadcast(int i, TimeUnit timeUnit) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(this.destinations.size());
        this.destinations.forEach(nodeInfo -> {
            this.action.perform(nodeInfo, new StreamObserver<T>() { // from class: io.axoniq.eventstore.client.util.Broadcaster.1
                @Override // io.axoniq.ext.io.grpc.stub.StreamObserver
                public void onNext(T t) {
                    Broadcaster.this.onNextCallback.accept(t);
                }

                @Override // io.axoniq.ext.io.grpc.stub.StreamObserver
                public void onError(Throwable th) {
                    Broadcaster.logger.warn("Error from: {}:{} - {}", new Object[]{nodeInfo.getHostName(), Integer.valueOf(nodeInfo.getGrpcPort()), GrpcExceptionParser.parse(th).toString()});
                    countDownLatch.countDown();
                }

                @Override // io.axoniq.ext.io.grpc.stub.StreamObserver
                public void onCompleted() {
                    countDownLatch.countDown();
                }
            });
        });
        if (timeUnit != null) {
            countDownLatch.await(i, timeUnit);
        }
    }
}
