package software.amazon.kinesis.lifecycle;

import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
import software.amazon.kinesis.retrieval.RetryableRetrievalException;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:META-INF/bundled-dependencies/amazon-kinesis-client-2.2.8.jar:software/amazon/kinesis/lifecycle/ShardConsumerSubscriber.class */
public class ShardConsumerSubscriber implements Subscriber<RecordsRetrieved> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ShardConsumerSubscriber.class);
    private final RecordsPublisher recordsPublisher;
    private final Scheduler scheduler;
    private final int bufferSize;
    private final ShardConsumer shardConsumer;
    private final int readTimeoutsToIgnoreBeforeWarning;
    private volatile int readTimeoutSinceLastRead;

    @VisibleForTesting
    final Object lockObject;
    private Instant lastRequestTime;
    private RecordsRetrieved lastAccepted;
    private Subscription subscription;
    private volatile Instant lastDataArrival;
    private volatile Throwable dispatchFailure;
    private volatile Throwable retrievalFailure;

    @Deprecated
    ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int i, ShardConsumer shardConsumer) {
        this(recordsPublisher, executorService, i, shardConsumer, 0);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int i, ShardConsumer shardConsumer, int i2) {
        this.readTimeoutSinceLastRead = 0;
        this.lockObject = new Object();
        this.lastRequestTime = null;
        this.lastAccepted = null;
        this.recordsPublisher = recordsPublisher;
        this.scheduler = Schedulers.from(executorService);
        this.bufferSize = i;
        this.shardConsumer = shardConsumer;
        this.readTimeoutsToIgnoreBeforeWarning = i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startSubscriptions() {
        synchronized (this.lockObject) {
            this.lastRequestTime = Instant.now();
            if (this.lastAccepted != null) {
                this.recordsPublisher.restartFrom(this.lastAccepted);
            }
            Flowable.fromPublisher(this.recordsPublisher).subscribeOn(this.scheduler).observeOn(this.scheduler, true, this.bufferSize).subscribe(new ShardConsumerNotifyingSubscriber(this, this.recordsPublisher));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Throwable healthCheck(long j) {
        Throwable restartIfFailed = restartIfFailed();
        if (restartIfFailed == null) {
            restartIfRequestTimerExpired(j);
        }
        return restartIfFailed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Throwable getAndResetDispatchFailure() {
        Throwable th;
        synchronized (this.lockObject) {
            th = this.dispatchFailure;
            this.dispatchFailure = null;
        }
        return th;
    }

    private Throwable restartIfFailed() {
        Throwable th = null;
        if (this.retrievalFailure != null) {
            synchronized (this.lockObject) {
                String format = String.format("%s: Failure occurred in retrieval.  Restarting data requests", this.shardConsumer.shardInfo().shardId());
                if (this.retrievalFailure instanceof RetryableRetrievalException) {
                    log.debug(format, this.retrievalFailure.getCause());
                } else {
                    log.warn(format, this.retrievalFailure);
                }
                th = this.retrievalFailure;
                this.retrievalFailure = null;
            }
            startSubscriptions();
        }
        return th;
    }

    private void restartIfRequestTimerExpired(long j) {
        synchronized (this.lockObject) {
            if (this.lastRequestTime != null) {
                Instant now = Instant.now();
                Duration between = Duration.between(this.lastRequestTime, now);
                if (between.toMillis() > j) {
                    log.error("{}: Last request was dispatched at {}, but no response as of {} ({}).  Cancelling subscription, and restarting.", this.shardConsumer.shardInfo().shardId(), this.lastRequestTime, now, between);
                    cancel();
                    startSubscriptions();
                }
            }
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1L);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(RecordsRetrieved recordsRetrieved) {
        try {
            try {
                synchronized (this.lockObject) {
                    this.lastRequestTime = null;
                }
                this.lastDataArrival = Instant.now();
                this.shardConsumer.handleInput(recordsRetrieved.processRecordsInput().toBuilder().cacheExitTime(Instant.now()).build(), this.subscription);
                this.subscription.request(1L);
                synchronized (this.lockObject) {
                    this.lastAccepted = recordsRetrieved;
                    this.lastRequestTime = Instant.now();
                }
            } catch (Throwable th) {
                log.warn("{}: Caught exception from handleInput", this.shardConsumer.shardInfo().shardId(), th);
                synchronized (this.lockObject) {
                    this.dispatchFailure = th;
                    this.subscription.request(1L);
                    synchronized (this.lockObject) {
                        this.lastAccepted = recordsRetrieved;
                        this.lastRequestTime = Instant.now();
                    }
                }
            }
            this.readTimeoutSinceLastRead = 0;
        } catch (Throwable th2) {
            this.subscription.request(1L);
            synchronized (this.lockObject) {
                this.lastAccepted = recordsRetrieved;
                this.lastRequestTime = Instant.now();
                throw th2;
            }
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        synchronized (this.lockObject) {
            if ((th instanceof RetryableRetrievalException) && th.getMessage().contains("ReadTimeout")) {
                this.readTimeoutSinceLastRead++;
                if (this.readTimeoutSinceLastRead > this.readTimeoutsToIgnoreBeforeWarning) {
                    logOnErrorReadTimeoutWarning(th);
                }
            } else {
                logOnErrorWarning(th);
            }
            this.subscription.cancel();
            this.retrievalFailure = th;
        }
    }

    protected void logOnErrorWarning(Throwable th) {
        log.warn("{}: onError().  Cancelling subscription, and marking self as failed. KCL will recreate the subscription as neccessary to continue processing.", this.shardConsumer.shardInfo().shardId(), th);
    }

    protected void logOnErrorReadTimeoutWarning(Throwable th) {
        log.warn("{}: onError().  Cancelling subscription, and marking self as failed. KCL will recreate the subscription as neccessary to continue processing. If you are seeing this warning frequently consider increasing the SDK timeouts by providing an OverrideConfiguration to the kinesis client. Alternatively youcan configure LifecycleConfig.readTimeoutsToIgnoreBeforeWarning to suppressintermittant ReadTimeout warnings.", this.shardConsumer.shardInfo().shardId(), th);
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        log.debug("{}: onComplete(): Received onComplete.  Activity should be triggered externally", this.shardConsumer.shardInfo().shardId());
    }

    public void cancel() {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    public Instant lastDataArrival() {
        return this.lastDataArrival;
    }

    public Throwable dispatchFailure() {
        return this.dispatchFailure;
    }

    Throwable retrievalFailure() {
        return this.retrievalFailure;
    }
}
