package io.codemonastery.dropwizard.kinesis.producer;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import io.codemonastery.dropwizard.kinesis.DynamicRateLimiter;
import java.io.Closeable;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/codemonastery/dropwizard/kinesis/producer/RateLimitedRecordPutter.class */
public class RateLimitedRecordPutter implements RecordPutter {
    private static final Logger LOG = LoggerFactory.getLogger(RateLimitedRecordPutter.class);
    private final AmazonKinesis kinesis;
    private final PutterMetrics metrics;
    private final DynamicRateLimiter recordRateLimiter = DynamicRateLimiter.create(1000.0d, 1.2d, 0.0d);

    public RateLimitedRecordPutter(AmazonKinesis amazonKinesis, PutterMetrics putterMetrics) {
        this.kinesis = amazonKinesis;
        this.metrics = putterMetrics;
    }

    @Override // io.codemonastery.dropwizard.kinesis.producer.RecordPutter
    public int send(PutRecordsRequest putRecordsRequest) throws Exception {
        int size = putRecordsRequest.getRecords().size();
        try {
            Closeable time = this.metrics.time();
            Throwable th = null;
            boolean z = false;
            while (!z) {
                try {
                    try {
                        try {
                            this.recordRateLimiter.acquire(putRecordsRequest.getRecords().size());
                            size = ((Integer) Optional.ofNullable(this.kinesis.putRecords(putRecordsRequest).getFailedRecordCount()).orElse(0)).intValue();
                            this.recordRateLimiter.moveForward();
                            z = true;
                        } finally {
                        }
                    } catch (ProvisionedThroughputExceededException e) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(String.format("Exceeded rate limit for stream \"%s\", backing off", putRecordsRequest.getStreamName()), e);
                        }
                        if (e.getMessage() != null) {
                            this.recordRateLimiter.backOff();
                        }
                    }
                } finally {
                }
            }
            if (time != null) {
                if (0 != 0) {
                    try {
                        time.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    time.close();
                }
            }
            return size;
        } finally {
            this.metrics.sent(putRecordsRequest.getRecords().size() - size, size);
        }
    }
}
