package io.engineblock.activityapi.core;

import com.codahale.metrics.Counter;
import io.engineblock.activityapi.core.Activity;
import io.engineblock.activityapi.core.ops.OpContext;
import io.engineblock.activityimpl.ActivityDef;
import io.engineblock.metrics.ActivityMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/engineblock/activityapi/core/BaseAsyncAction.class */
public abstract class BaseAsyncAction<T extends OpContext, A extends Activity> implements AsyncAction<T>, Stoppable, ActivityDefObserver {
    private static final Logger logger = LoggerFactory.getLogger("BaseAsyncAction");
    protected final A activity;
    public Counter pendingOpsCounter;
    protected int slot;
    protected boolean running = true;
    private int pendingOpsQueuedForThread = 0;
    private int maxOpsQueuedForThread = 1;

    public BaseAsyncAction(A a, int i) {
        this.activity = a;
        this.slot = i;
        onActivityDefUpdate(a.getActivityDef());
    }

    @Override // io.engineblock.activityapi.core.ActivityDefObserver
    public void onActivityDefUpdate(ActivityDef activityDef) {
        activityDef.getParams().getOptionalInteger("async").orElseThrow(() -> {
            return new RuntimeException("the async parameter is required to activate async actions");
        });
        this.maxOpsQueuedForThread = getMaxPendingOps(activityDef);
        this.pendingOpsCounter = ActivityMetrics.counter(activityDef, "pending_ops");
    }

    protected int getMaxPendingOps(ActivityDef activityDef) {
        int intValue = activityDef.getParams().getOptionalInteger("async").orElse(1).intValue();
        int threads = activityDef.getThreads();
        return (intValue / threads) + (this.slot < intValue % threads ? 1 : 0);
    }

    @Override // io.engineblock.activityapi.core.AsyncAction
    public boolean enqueue(T t) {
        synchronized (this) {
            while (available() == 0) {
                try {
                    logger.trace("Blocking for enqueue with (" + this.pendingOpsQueuedForThread + "/" + this.maxOpsQueuedForThread + ") queued ops");
                    wait(60000L);
                } catch (InterruptedException e) {
                }
            }
        }
        incrementOps();
        startOpCycle(t);
        return this.running && available() > 0;
    }

    @Override // io.engineblock.activityapi.core.AsyncAction
    public synchronized boolean awaitCompletion(long j) {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (this.running && pending() > 0 && System.currentTimeMillis() < currentTimeMillis) {
            try {
                wait(Math.max(0L, currentTimeMillis - System.currentTimeMillis()));
            } catch (InterruptedException e) {
            }
        }
        return pending() == 0;
    }

    protected int available() {
        return this.maxOpsQueuedForThread - this.pendingOpsQueuedForThread;
    }

    protected int pending() {
        return this.pendingOpsQueuedForThread;
    }

    protected void incrementOps() {
        this.pendingOpsCounter.inc();
        this.pendingOpsQueuedForThread++;
    }

    protected void decrementOps() {
        this.pendingOpsCounter.dec();
        this.pendingOpsQueuedForThread--;
        if (this.pendingOpsQueuedForThread == 0 || this.pendingOpsQueuedForThread == this.maxOpsQueuedForThread - 1) {
            synchronized (this) {
                notifyAll();
            }
        }
    }

    protected abstract T startOpCycle(T t);

    @Override // io.engineblock.activityapi.core.Stoppable
    public void requestStop() {
        logger.info(toString() + " requested to stop.");
        this.running = false;
    }

    @Override // io.engineblock.activityapi.core.AsyncAction
    public abstract T newOpContext();
}
