package shaded.software.amazon.awssdk.core.internal.async;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import shaded.org.reactivestreams.Subscriber;
import shaded.software.amazon.awssdk.annotations.SdkInternalApi;
import shaded.software.amazon.awssdk.annotations.SdkTestInternalApi;
import shaded.software.amazon.awssdk.core.async.AsyncRequestBody;
import shaded.software.amazon.awssdk.core.async.AsyncRequestBodyFromInputStreamConfiguration;
import shaded.software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import shaded.software.amazon.awssdk.core.exception.NonRetryableException;
import shaded.software.amazon.awssdk.core.internal.util.NoopSubscription;
import shaded.software.amazon.awssdk.utils.IoUtils;
import shaded.software.amazon.awssdk.utils.Logger;

@SdkInternalApi
/* loaded from: input_file:shaded/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.class */
public class InputStreamWithExecutorAsyncRequestBody implements AsyncRequestBody {
    private static final Logger log = Logger.loggerFor((Class<?>) InputStreamWithExecutorAsyncRequestBody.class);
    private final Object subscribeLock = new Object();
    private final InputStream inputStream;
    private final Long contentLength;
    private final ExecutorService executor;
    private Future<?> writeFuture;

    public InputStreamWithExecutorAsyncRequestBody(AsyncRequestBodyFromInputStreamConfiguration asyncRequestBodyFromInputStreamConfiguration) {
        this.inputStream = asyncRequestBodyFromInputStreamConfiguration.inputStream();
        this.contentLength = asyncRequestBodyFromInputStreamConfiguration.contentLength();
        this.executor = asyncRequestBodyFromInputStreamConfiguration.executor();
        IoUtils.markStreamWithMaxReadLimit(this.inputStream, asyncRequestBodyFromInputStreamConfiguration.maxReadLimit());
    }

    @Override // shaded.software.amazon.awssdk.core.async.AsyncRequestBody
    public Optional<Long> contentLength() {
        return Optional.ofNullable(this.contentLength);
    }

    @Override // shaded.org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
        synchronized (this.subscribeLock) {
            try {
                if (this.writeFuture != null) {
                    this.writeFuture.cancel(true);
                    waitForCancellation(this.writeFuture);
                    tryReset(this.inputStream);
                }
                BlockingInputStreamAsyncRequestBody forBlockingInputStream = AsyncRequestBody.forBlockingInputStream(this.contentLength);
                this.writeFuture = this.executor.submit(() -> {
                    doBlockingWrite(forBlockingInputStream);
                });
                forBlockingInputStream.subscribe(subscriber);
            } catch (Throwable th) {
                subscriber.onSubscribe(new NoopSubscription(subscriber));
                subscriber.onError(th);
            }
        }
    }

    private void tryReset(InputStream inputStream) {
        try {
            inputStream.reset();
        } catch (IOException e) {
            throw NonRetryableException.create("Request cannot be retried, because the request stream could not be reset.", (Throwable) e);
        }
    }

    @SdkTestInternalApi
    public Future<?> activeWriteFuture() {
        Future<?> future;
        synchronized (this.subscribeLock) {
            future = this.writeFuture;
        }
        return future;
    }

    private void doBlockingWrite(BlockingInputStreamAsyncRequestBody blockingInputStreamAsyncRequestBody) {
        try {
            blockingInputStreamAsyncRequestBody.writeInputStream(this.inputStream);
        } catch (Throwable th) {
            log.debug(() -> {
                return "Encountered error while writing input stream to service.";
            }, th);
            throw th;
        }
    }

    private void waitForCancellation(Future<?> future) {
        try {
            future.get(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (CancellationException | ExecutionException e2) {
        } catch (TimeoutException e3) {
            throw new IllegalStateException("Timed out waiting to reset the input stream.", e3);
        }
    }
}
