package com.linkedin.r2.transport.http.client;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.SimpleCallback;
import com.linkedin.common.stats.LongTracker;
import com.linkedin.common.stats.LongTracking;
import com.linkedin.common.util.None;
import com.linkedin.r2.transport.http.client.AsyncPool;
import com.linkedin.r2.util.LinkedDeque;
import com.linkedin.util.ArgumentUtil;
import com.linkedin.util.clock.Clock;
import com.linkedin.util.clock.SystemClock;
import datahub.shaded.slf4j.Logger;
import datahub.shaded.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/* loaded from: input_file:com/linkedin/r2/transport/http/client/AsyncSharedPoolImpl.class */
public class AsyncSharedPoolImpl<T> implements AsyncPool<T> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AsyncSharedPoolImpl.class);
    private static final boolean BAD = true;
    private static final boolean NOT_BAD = false;
    private final String _name;
    private final AsyncPool.Lifecycle<T> _lifecycle;
    private final ScheduledExecutorService _scheduler;
    private final RateLimiter _rateLimiter;
    private final long _timeoutMills;
    private final boolean _createImmediately;
    private final int _maxWaiters;
    private volatile ScheduledFuture<?> _reaperTaskFuture;
    private final Object _lock;
    private final AsyncSharedPoolImpl<T>.TimedObject<T> _item;
    private int _checkedOut;
    private final HashMap<T, Integer> _disposedItems;
    private final AsyncPoolStatsTracker _statsTracker;
    private final LinkedDeque<Callback<T>> _waiters;
    private State _state;
    private Callback<None> _shutdownCallback;
    private boolean _isCreateInProgress;
    private final HashSet<T> _destroyInProgress;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/r2/transport/http/client/AsyncSharedPoolImpl$State.class */
    public enum State {
        NOT_YET_STARTED,
        RUNNING,
        SHUTTING_DOWN,
        STOPPED
    }

    /* loaded from: input_file:com/linkedin/r2/transport/http/client/AsyncSharedPoolImpl$TimeTrackingCallback.class */
    private class TimeTrackingCallback implements Callback<T> {
        private final long _startTime = System.currentTimeMillis();
        private final Callback<T> _callback;

        public TimeTrackingCallback(Callback<T> callback) {
            this._callback = callback;
        }

        @Override // com.linkedin.common.callback.Callback
        public void onError(Throwable th) {
            long currentTimeMillis = System.currentTimeMillis() - this._startTime;
            synchronized (AsyncSharedPoolImpl.this._lock) {
                AsyncSharedPoolImpl.this._statsTracker.trackWaitTime(currentTimeMillis);
                AsyncSharedPoolImpl.this._statsTracker.sampleMaxWaitTime(currentTimeMillis);
            }
            this._callback.onError(th);
        }

        @Override // com.linkedin.common.callback.SuccessCallback
        public void onSuccess(T t) {
            long currentTimeMillis = System.currentTimeMillis() - this._startTime;
            synchronized (AsyncSharedPoolImpl.this._lock) {
                AsyncSharedPoolImpl.this._statsTracker.trackWaitTime(currentTimeMillis);
                AsyncSharedPoolImpl.this._statsTracker.sampleMaxWaitTime(currentTimeMillis);
            }
            this._callback.onSuccess(t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/r2/transport/http/client/AsyncSharedPoolImpl$TimedObject.class */
    public final class TimedObject<T> {
        private T _item;
        private long _timestamp;

        private TimedObject() {
            this._item = null;
            this._timestamp = 0L;
        }

        public final T get() {
            return this._item;
        }

        public final long timestamp() {
            return this._timestamp;
        }

        public void set(T t) {
            this._item = t;
            this._timestamp = System.currentTimeMillis();
        }

        public void renew() {
            this._timestamp = System.currentTimeMillis();
        }

        public final void reset() {
            this._item = null;
            this._timestamp = 0L;
        }

        public boolean expired() {
            return this._timestamp < System.currentTimeMillis() - AsyncSharedPoolImpl.this._timeoutMills;
        }
    }

    public AsyncSharedPoolImpl(String str, AsyncPool.Lifecycle<T> lifecycle, ScheduledExecutorService scheduledExecutorService, RateLimiter rateLimiter, long j, int i) {
        this(str, lifecycle, scheduledExecutorService, rateLimiter, j, false, i, SystemClock.instance(), new LongTracking());
    }

    public AsyncSharedPoolImpl(String str, AsyncPool.Lifecycle<T> lifecycle, ScheduledExecutorService scheduledExecutorService, RateLimiter rateLimiter, long j, boolean z, int i, Clock clock, LongTracker longTracker) {
        this._reaperTaskFuture = null;
        this._lock = new Object();
        this._checkedOut = 0;
        this._disposedItems = new HashMap<>();
        this._state = State.NOT_YET_STARTED;
        this._shutdownCallback = null;
        this._isCreateInProgress = false;
        this._destroyInProgress = new HashSet<>();
        ArgumentUtil.notNull(str, "name");
        ArgumentUtil.notNull(lifecycle, "lifecycle");
        ArgumentUtil.notNull(scheduledExecutorService, "scheduler");
        ArgumentUtil.notNull(rateLimiter, "rateLimiter");
        this._name = str + "/" + Integer.toHexString(hashCode());
        this._lifecycle = lifecycle;
        this._scheduler = scheduledExecutorService;
        this._rateLimiter = rateLimiter;
        this._timeoutMills = j;
        this._createImmediately = z;
        this._maxWaiters = i;
        this._item = new TimedObject<>();
        this._statsTracker = new AsyncPoolStatsTracker(() -> {
            return this._lifecycle.getStats();
        }, () -> {
            return 1;
        }, () -> {
            return Integer.valueOf(this._createImmediately ? 1 : 0);
        }, () -> {
            Integer valueOf;
            synchronized (this._lock) {
                valueOf = Integer.valueOf(this._item.get() == null ? 0 : 1);
            }
            return valueOf;
        }, () -> {
            Integer valueOf;
            synchronized (this._lock) {
                valueOf = Integer.valueOf(this._checkedOut);
            }
            return valueOf;
        }, () -> {
            synchronized (this._lock) {
                if (this._checkedOut > 0) {
                    return 0;
                }
                return Integer.valueOf(this._item.get() == null ? 0 : 1);
            }
        }, clock, longTracker);
        this._waiters = new LinkedDeque<>();
    }

    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    public String getName() {
        return this._name;
    }

    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    public void start() {
        LOG.info("{}: start requested", this._name);
        synchronized (this._lock) {
            if (this._state != State.NOT_YET_STARTED) {
                throw new IllegalStateException(this._name + " is " + this._state);
            }
            this._state = State.RUNNING;
            if (this._timeoutMills > 0) {
                long min = Math.min(this._timeoutMills / 10, 1000L);
                this._reaperTaskFuture = this._scheduler.scheduleAtFixedRate(() -> {
                    reap();
                }, min, min, TimeUnit.MILLISECONDS);
            }
            if (this._createImmediately) {
                this._isCreateInProgress = true;
                doCreate();
            }
        }
    }

    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    public void shutdown(Callback<None> callback) {
        State state;
        ArgumentUtil.notNull(callback, io.acryl.shaded.com.sun.jna.Callback.METHOD_NAME);
        LOG.info("{}: shutdown requested", this._name);
        synchronized (this._lock) {
            state = this._state;
            if (state == State.RUNNING) {
                this._state = State.SHUTTING_DOWN;
                this._shutdownCallback = callback;
            }
        }
        if (state == State.RUNNING) {
            doAttemptShutdown();
        } else {
            LOG.error("{}: shutdown requested while pool is not running", this._name);
            callback.onError(new IllegalStateException(this._name + " is " + this._state));
        }
    }

    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    public Collection<Callback<T>> cancelWaiters() {
        ArrayList arrayList;
        synchronized (this._lock) {
            arrayList = new ArrayList(this._waiters.size());
            while (true) {
                Callback<T> poll = this._waiters.poll();
                if (poll != null) {
                    arrayList.add(poll);
                }
            }
        }
        return arrayList;
    }

    /* JADX WARN: Code restructure failed: missing block: B:12:0x0046, code lost:
    
        if (r6._waiters.size() >= r6._maxWaiters) goto L12;
     */
    /* JADX WARN: Code restructure failed: missing block: B:13:0x0049, code lost:
    
        r0 = r6._waiters.addLastNode(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0055, code lost:
    
        r9 = r0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x005a, code lost:
    
        if (r6._isCreateInProgress == false) goto L16;
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x005d, code lost:
    
        com.linkedin.r2.transport.http.client.AsyncSharedPoolImpl.LOG.debug("{}: item creation is in progress", r6._name);
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x0120, code lost:
    
        if (r9 != null) goto L55;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x0123, code lost:
    
        r0.onError(new com.linkedin.r2.SizeLimitExceededException("AsyncPool " + r6._name + " reached maximum waiter size: " + r6._maxWaiters));
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x0157, code lost:
    
        return () -> { // com.linkedin.r2.util.Cancellable.cancel():boolean
            return lambda$get$10();
        };
     */
    /* JADX WARN: Code restructure failed: missing block: B:26:0x015a, code lost:
    
        if (r11 == false) goto L58;
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x015d, code lost:
    
        doCreate();
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x0168, code lost:
    
        return () -> { // com.linkedin.r2.util.Cancellable.cancel():boolean
            return r0.lambda$get$11(r1);
        };
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x006f, code lost:
    
        r6._isCreateInProgress = true;
        r11 = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x0054, code lost:
    
        r0 = null;
     */
    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public com.linkedin.r2.util.Cancellable get(com.linkedin.common.callback.Callback<T> r7) {
        /*
            Method dump skipped, instructions count: 361
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.r2.transport.http.client.AsyncSharedPoolImpl.get(com.linkedin.common.callback.Callback):com.linkedin.r2.util.Cancellable");
    }

    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    public void put(T t) {
        LOG.debug("{}: putting back an item {}", this._name, t);
        boolean z = false;
        boolean z2 = false;
        synchronized (this._lock) {
            if (this._item.get() == null || this._item.get() != t) {
                LOG.debug("{}: given item {} does not reference match current item {}", this._name, t, this._item.get());
                z = doDispose(t);
            } else if (this._lifecycle.validatePut(t)) {
                LOG.debug("{}: returning an item {} that passed validation", this._name, t);
                z2 = doReturn(t);
            } else {
                LOG.debug("{}: disposing an item {} that failed validation", this._name, t);
                z = doDispose(t);
            }
        }
        if (z) {
            doDestroy(t, true, () -> {
                doAttemptShutdown();
            });
        }
        if (z2) {
            doAttemptShutdown();
        }
    }

    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    public void dispose(T t) {
        boolean doDispose;
        LOG.error("{}: disposing an item {}", this._name, t);
        synchronized (this._lock) {
            doDispose = doDispose(t);
        }
        if (doDispose) {
            doDestroy(t, true, () -> {
                doAttemptShutdown();
            });
        }
    }

    @Override // com.linkedin.r2.transport.http.client.AsyncPool
    public PoolStats getStats() {
        AsyncPoolStats stats;
        synchronized (this._lock) {
            stats = this._statsTracker.getStats();
        }
        return stats;
    }

    private void reap() {
        synchronized (this._lock) {
            T t = this._item.get();
            if (t == null) {
                LOG.debug("{}: nothing to reap", this._name);
                return;
            }
            if (this._checkedOut > 0) {
                LOG.debug("{}: item still has {} outstanding checkouts", this._name, Integer.valueOf(this._checkedOut));
                this._item.renew();
            } else {
                if (!this._item.expired()) {
                    LOG.debug("{}: item is still valid", this._name);
                    return;
                }
                this._statsTracker.incrementTimedOut();
                this._item.reset();
                LOG.debug("{}: item timed out, proceed to destroy", this._name);
                doDestroy(t, false, () -> {
                    doAttemptShutdown();
                });
            }
        }
    }

    private boolean doReturn(T t) {
        boolean z;
        this._rateLimiter.setPeriod(0L);
        synchronized (this._lock) {
            if (this._item.get() == null || this._item.get() != t) {
                LOG.debug("{}: given item {} does not reference match current item {}", this._name, t, this._item.get());
                throw new IllegalArgumentException("Returning an item that is not the same as the current active item");
            }
            if (this._checkedOut == 0) {
                throw new IllegalArgumentException("Decrementing checked out when it's already at 0");
            }
            this._checkedOut--;
            z = this._checkedOut == 0;
        }
        return z;
    }

    private void doAttemptShutdown() {
        LOG.debug("{}: attempts to shutdown", this._name);
        synchronized (this._lock) {
            Callback<None> callback = this._shutdownCallback;
            ScheduledFuture<?> scheduledFuture = this._reaperTaskFuture;
            if (this._state != State.SHUTTING_DOWN) {
                LOG.debug("{}: current state is {}", this._name, this._state);
                return;
            }
            if (this._checkedOut > 0) {
                LOG.info("{}: awaiting {} more outstanding checkouts", this._name, Integer.valueOf(this._checkedOut));
                return;
            }
            if (this._disposedItems.size() > 0) {
                LOG.info("{}: awaiting {} more disposed items {}", this._name, Integer.valueOf(this._disposedItems.keySet().size()), this._disposedItems);
                return;
            }
            LOG.info("{}: shutdown conditions are met", this._name);
            this._state = State.STOPPED;
            this._shutdownCallback = null;
            this._reaperTaskFuture = null;
            if (scheduledFuture != null) {
                LOG.debug("{}: attempt to cancel reaper task", this._name);
                scheduledFuture.cancel(false);
            }
            LOG.info("{}: shutdown complete", this._name);
            callback.onSuccess(None.none());
        }
    }

    private void doCreate() {
        LOG.debug("{}: creating a new item", this._name);
        this._rateLimiter.submit(simpleCallback -> {
            this._lifecycle.create(new Callback<T>() { // from class: com.linkedin.r2.transport.http.client.AsyncSharedPoolImpl.1
                @Override // com.linkedin.common.callback.SuccessCallback
                public void onSuccess(T t) {
                    AsyncSharedPoolImpl.LOG.debug("{}: item creation succeeded", AsyncSharedPoolImpl.this._name);
                    ArrayList arrayList = new ArrayList();
                    synchronized (AsyncSharedPoolImpl.this._lock) {
                        AsyncSharedPoolImpl.this._statsTracker.incrementCreated();
                        int size = AsyncSharedPoolImpl.this._waiters.size();
                        AsyncSharedPoolImpl.access$812(AsyncSharedPoolImpl.this, size);
                        AsyncSharedPoolImpl.this._statsTracker.sampleMaxCheckedOut();
                        IntStream.range(0, size).forEach(i -> {
                            arrayList.add((Callback) AsyncSharedPoolImpl.this._waiters.poll());
                        });
                        AsyncSharedPoolImpl.this._item.set(t);
                        AsyncSharedPoolImpl.this._statsTracker.sampleMaxPoolSize();
                        AsyncSharedPoolImpl.this._isCreateInProgress = false;
                    }
                    arrayList.stream().forEach(callback -> {
                        try {
                            callback.onSuccess(t);
                        } catch (Exception e) {
                            AsyncSharedPoolImpl.LOG.error("Encountered error while invoking success waiter callback", (Throwable) e);
                        }
                    });
                    simpleCallback.onDone();
                }

                @Override // com.linkedin.common.callback.Callback
                public void onError(Throwable th) {
                    Collection<Callback<T>> cancelWaiters;
                    AsyncSharedPoolImpl.LOG.debug("{}: item creation failed", AsyncSharedPoolImpl.this._name, th);
                    AsyncSharedPoolImpl.this._rateLimiter.incrementPeriod();
                    AsyncSharedPoolImpl.this._rateLimiter.cancelPendingTasks();
                    synchronized (AsyncSharedPoolImpl.this._lock) {
                        cancelWaiters = AsyncSharedPoolImpl.this.cancelWaiters();
                        AsyncSharedPoolImpl.this._statsTracker.incrementCreateErrors();
                        AsyncSharedPoolImpl.this._isCreateInProgress = false;
                    }
                    cancelWaiters.stream().forEach(callback -> {
                        try {
                            callback.onError(th);
                        } catch (Exception e) {
                            AsyncSharedPoolImpl.LOG.error("Encountered error while invoking error waiter callback", (Throwable) e);
                        }
                    });
                    simpleCallback.onDone();
                }
            });
        });
    }

    private boolean doDispose(T t) {
        if (t == null) {
            LOG.error("{}: item is null so nothing to dispose", this._name);
            return false;
        }
        synchronized (this._lock) {
            if (this._item.get() != null && this._item.get() == t) {
                this._disposedItems.put(this._item.get(), Integer.valueOf(this._checkedOut));
                this._item.reset();
                this._checkedOut = 0;
            }
            if (!this._disposedItems.containsKey(t)) {
                throw new IllegalArgumentException("Disposing a previously destroyed item or an item that was not checked out from the pool");
            }
            int intValue = this._disposedItems.get(t).intValue() - 1;
            if (intValue != 0) {
                this._disposedItems.put(t, Integer.valueOf(intValue));
                return false;
            }
            this._disposedItems.remove(t);
            if (this._destroyInProgress.contains(t)) {
                LOG.debug("{}: item {} destroy is in progress", this._name, t);
                return false;
            }
            this._destroyInProgress.add(t);
            return true;
        }
    }

    private void doDestroy(final T t, boolean z, final SimpleCallback simpleCallback) {
        LOG.debug("{}: destroying an item {}", this._name, t);
        if (z) {
            this._statsTracker.incrementBadDestroyed();
        }
        this._lifecycle.destroy(t, z, new Callback<T>() { // from class: com.linkedin.r2.transport.http.client.AsyncSharedPoolImpl.2
            @Override // com.linkedin.common.callback.SuccessCallback
            public void onSuccess(T t2) {
                try {
                    synchronized (AsyncSharedPoolImpl.this._lock) {
                        AsyncSharedPoolImpl.this._statsTracker.incrementDestroyed();
                        AsyncSharedPoolImpl.this._destroyInProgress.remove(t2);
                    }
                } finally {
                    simpleCallback.onDone();
                }
            }

            @Override // com.linkedin.common.callback.Callback
            public void onError(Throwable th) {
                AsyncSharedPoolImpl.LOG.error("{}: failed to destroy an item", AsyncSharedPoolImpl.this._name, th);
                try {
                    synchronized (AsyncSharedPoolImpl.this._lock) {
                        AsyncSharedPoolImpl.this._statsTracker.incrementDestroyErrors();
                        AsyncSharedPoolImpl.this._destroyInProgress.remove(t);
                    }
                } finally {
                    simpleCallback.onDone();
                }
            }
        });
    }

    static /* synthetic */ int access$812(AsyncSharedPoolImpl asyncSharedPoolImpl, int i) {
        int i2 = asyncSharedPoolImpl._checkedOut + i;
        asyncSharedPoolImpl._checkedOut = i2;
        return i2;
    }
}
