package io.camunda.zeebe.client.impl.worker;

import io.camunda.zeebe.client.api.ZeebeFuture;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.command.StreamJobsCommandStep1;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.response.StreamJobsResponse;
import io.camunda.zeebe.client.api.worker.BackoffSupplier;
import io.camunda.zeebe.client.api.worker.JobClient;
import io.camunda.zeebe.client.impl.Loggers;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
/* loaded from: input_file:BOOT-INF/lib/zeebe-client-java-8.3.3.jar:io/camunda/zeebe/client/impl/worker/JobStreamerImpl.class */
final class JobStreamerImpl implements JobStreamer {
    private static final Logger LOGGER = Loggers.JOB_WORKER_LOGGER;
    private final JobClient jobClient;
    private final String jobType;
    private final String workerName;
    private final Duration timeout;
    private final List<String> fetchVariables;
    private final List<String> tenantIds;
    private final Duration requestTimeout;
    private final BackoffSupplier backoffSupplier;
    private final ScheduledExecutorService executor;
    private final Lock streamLock = new ReentrantLock();

    @GuardedBy("streamLock")
    private ZeebeFuture<StreamJobsResponse> streamControl;

    @GuardedBy("streamLock")
    private FinalCommandStep<StreamJobsResponse> command;

    @GuardedBy("streamLock")
    private boolean isClosed;

    @GuardedBy("streamLock")
    private long retryDelay;

    public JobStreamerImpl(JobClient jobClient, String str, String str2, Duration duration, List<String> list, List<String> list2, Duration duration2, BackoffSupplier backoffSupplier, ScheduledExecutorService scheduledExecutorService) {
        this.jobClient = jobClient;
        this.jobType = str;
        this.workerName = str2;
        this.timeout = duration;
        this.fetchVariables = list;
        this.tenantIds = list2;
        this.requestTimeout = duration2;
        this.backoffSupplier = backoffSupplier;
        this.executor = scheduledExecutorService;
    }

    @Override // io.camunda.zeebe.client.impl.worker.JobStreamer, java.lang.AutoCloseable
    public void close() {
        try {
            this.streamLock.lockInterruptibly();
            try {
                lockedClose();
            } finally {
                this.streamLock.unlock();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // io.camunda.zeebe.client.impl.worker.JobStreamer
    public boolean isOpen() {
        return !this.isClosed;
    }

    @Override // io.camunda.zeebe.client.impl.worker.JobStreamer
    public void openStreamer(Consumer<ActivatedJob> consumer) {
        open(buildCommand(consumer));
    }

    private void open(FinalCommandStep<StreamJobsResponse> finalCommandStep) {
        try {
            this.streamLock.lockInterruptibly();
            if (this.isClosed) {
                LOGGER.trace("Skip opening stream '{}' for worker '{}' because it's closed", this.jobType, this.workerName);
                return;
            }
            try {
                this.command = finalCommandStep;
                lockedOpen();
            } finally {
                this.streamLock.unlock();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void handleStreamComplete(Throwable th) {
        try {
            this.streamLock.lockInterruptibly();
            try {
                lockedHandleStreamComplete(th);
            } finally {
                this.streamLock.unlock();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private FinalCommandStep<StreamJobsResponse> buildCommand(Consumer<ActivatedJob> consumer) {
        StreamJobsCommandStep1.StreamJobsCommandStep3 timeout = this.jobClient.newStreamJobsCommand().jobType(this.jobType).consumer(consumer).workerName(this.workerName).tenantIds(this.tenantIds).timeout(this.timeout);
        if (this.fetchVariables != null) {
            timeout = timeout.fetchVariables(this.fetchVariables);
        }
        return timeout.requestTimeout(this.requestTimeout);
    }

    @GuardedBy("streamLock")
    private void lockedClose() {
        LOGGER.debug("Closing job stream for type '{}' and worker '{}", this.jobType, this.workerName);
        this.isClosed = true;
        if (this.streamControl != null) {
            this.streamControl.cancel(true);
        }
    }

    @GuardedBy("streamLock")
    private void lockedOpen() {
        if (this.streamControl != null) {
            this.streamControl.cancel(true);
            this.streamControl = null;
        }
        ZeebeFuture<StreamJobsResponse> send = this.command.send();
        send.whenCompleteAsync((streamJobsResponse, th) -> {
            handleStreamComplete(th);
        }, this.executor);
        this.streamControl = send;
        LOGGER.debug("Opened job stream of type '{}' for worker '{}'", this.jobType, this.workerName);
    }

    @GuardedBy("streamLock")
    private void lockedHandleStreamComplete(Throwable th) {
        if (this.isClosed) {
            LOGGER.trace("Skip re-opening job stream of type '{}' for worker '{}'", this.jobType, this.workerName);
        } else if (th != null) {
            logStreamError(th);
            this.retryDelay = this.backoffSupplier.supplyRetryDelay(this.retryDelay);
            LOGGER.atDebug().addArgument(this.jobType).addArgument(this.workerName).addArgument(() -> {
                return Duration.ofMillis(this.retryDelay);
            }).setMessage("Recreating closed stream of type '{}' and worker '{}' in {}").log();
            this.executor.schedule(() -> {
                open(this.command);
            }, this.retryDelay, TimeUnit.MILLISECONDS);
        }
    }

    private void logStreamError(Throwable th) {
        if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode() == Status.RESOURCE_EXHAUSTED.getCode()) {
            LOGGER.trace("Failed to stream jobs of type '{}' to worker '{}'", this.jobType, this.workerName, th);
        } else {
            LOGGER.warn("Failed to stream jobs of type '{}' to worker '{}'", this.jobType, this.workerName, th);
        }
    }
}
