package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:com/azure/messaging/servicebus/SynchronousReceiveWork.class */
class SynchronousReceiveWork implements AutoCloseable {
    private static final Duration TIMEOUT_BETWEEN_MESSAGES = Duration.ofMillis(1000);
    private final long id;
    private final AtomicInteger remaining;
    private final int numberToReceive;
    private final Duration timeout;
    private final FluxSink<ServiceBusReceivedMessage> emitter;
    private final Disposable nextMessageSubscriber;
    private boolean processingStarted;
    private final ClientLogger logger = new ClientLogger((Class<?>) SynchronousReceiveWork.class);
    private boolean workTimedOut = false;
    private volatile Throwable error = null;
    private final DirectProcessor<ServiceBusReceivedMessage> emitterProcessor = DirectProcessor.create();
    private final FluxSink<ServiceBusReceivedMessage> messageReceivedSink = this.emitterProcessor.sink();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousReceiveWork(long j, int i, Duration duration, FluxSink<ServiceBusReceivedMessage> fluxSink) {
        this.id = j;
        this.remaining = new AtomicInteger(i);
        this.numberToReceive = i;
        this.timeout = duration;
        this.emitter = fluxSink;
        this.nextMessageSubscriber = Flux.switchOnNext(this.emitterProcessor.map(serviceBusReceivedMessage -> {
            return Flux.interval(TIMEOUT_BETWEEN_MESSAGES);
        })).handle((l, synchronousSink) -> {
            this.logger.info("[{}]: Timeout between the messages occurred. Completing the work.", Long.valueOf(j));
            synchronousSink.next(l);
            fluxSink.complete();
        }).subscribe();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getId() {
        return this.id;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Duration getTimeout() {
        return this.timeout;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfEvents() {
        return this.numberToReceive;
    }

    int getRemaining() {
        return this.remaining.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isTerminal() {
        return this.emitter.isCancelled() || this.remaining.get() == 0 || this.error != null || this.workTimedOut;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void next(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        try {
            this.emitter.next(serviceBusReceivedMessage);
            this.messageReceivedSink.next(serviceBusReceivedMessage);
            this.remaining.decrementAndGet();
        } catch (Exception e) {
            this.logger.warning("Exception occurred while publishing downstream.", e);
            error(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void complete() {
        this.logger.info("[{}]: Completing task.", Long.valueOf(this.id));
        this.emitter.complete();
        close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void timeout() {
        this.logger.info("[{}]: Work timeout occurred. Completing the work.", Long.valueOf(this.id));
        this.emitter.complete();
        this.workTimedOut = true;
        close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void error(Throwable th) {
        this.error = th;
        this.emitter.error(th);
        close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Throwable getError() {
        return this.error;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startedProcessing() {
        this.processingStarted = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isProcessingStarted() {
        return this.processingStarted;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.nextMessageSubscriber.isDisposed()) {
            return;
        }
        this.nextMessageSubscriber.dispose();
    }
}
