package io.gridgo.connector.scheduler;

import io.gridgo.bean.BObject;
import io.gridgo.connector.impl.AbstractConsumer;
import io.gridgo.connector.support.ConnectionRef;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.support.Message;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.joo.promise4j.impl.AsyncDeferredObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gridgo/connector/scheduler/SchedulerConsumer.class */
public class SchedulerConsumer extends AbstractConsumer {
    private static final Logger log = LoggerFactory.getLogger(SchedulerConsumer.class);
    private static final Map<String, ScheduledExecutorService> executors = new NonBlockingHashMap();
    private static final Map<String, ConnectionRef<String>> connRefs = new NonBlockingHashMap();
    private String schedulerName;
    private Integer threads;
    private Long delay;
    private Long period;
    private Integer errorThreshold;
    private Integer idleThreshold;
    private Integer backoffMultiplier;
    private ScheduledFuture<?> future;
    private Supplier<Message> generator;
    private AtomicInteger errorCounter;
    private AtomicInteger idleCounter;
    private AtomicInteger backoffCounter;
    private boolean daemon;
    private String mode;

    public SchedulerConsumer(ConnectorContext connectorContext, String str, BObject bObject) {
        super(connectorContext);
        this.generator = this::createDefaultMessage;
        this.errorCounter = new AtomicInteger();
        this.idleCounter = new AtomicInteger();
        this.backoffCounter = new AtomicInteger();
        this.schedulerName = str;
        this.threads = bObject.getInteger("threads", 1);
        this.delay = bObject.getLong("delay", 1000);
        this.period = bObject.getLong("period", 1000);
        this.mode = bObject.getString("mode", (String) null);
        if (bObject.getString("generator") != null) {
            this.generator = (Supplier) connectorContext.getRegistry().lookupMandatory("generator", MessageGenerator.class);
        }
        this.errorThreshold = bObject.getInteger("errorThreshold", -1);
        this.idleThreshold = bObject.getInteger("idleThreshold", -1);
        this.backoffMultiplier = bObject.getInteger("backoffMultiplier", -1);
        this.daemon = bObject.getBoolean("daemon", true).booleanValue();
        if (this.backoffMultiplier.intValue() > 0 && this.errorThreshold.intValue() < 0 && this.idleThreshold.intValue() < 0) {
            throw new IllegalArgumentException("errorThreshold and/or idleThreshold must be set when backoffMultiplier is set");
        }
    }

    protected void onStart() {
        ScheduledExecutorService computeIfAbsent = executors.computeIfAbsent(this.schedulerName, str -> {
            return Executors.newScheduledThreadPool(this.threads.intValue(), this::spawnThread);
        });
        connRefs.computeIfAbsent(this.schedulerName, str2 -> {
            return new ConnectionRef(this.schedulerName);
        }).ref();
        if ("fixedRate".equals(this.mode)) {
            this.future = computeIfAbsent.scheduleAtFixedRate(this::poll, this.delay.longValue(), this.period.longValue(), TimeUnit.MILLISECONDS);
        } else if ("fixedDelay".equals(this.mode)) {
            this.future = computeIfAbsent.scheduleWithFixedDelay(this::poll, this.delay.longValue(), this.period.longValue(), TimeUnit.MILLISECONDS);
        } else {
            this.future = computeIfAbsent.schedule(this::poll, this.delay.longValue(), TimeUnit.MILLISECONDS);
        }
    }

    private Thread spawnThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.setName("[Scheduler] " + this.schedulerName);
        thread.setDaemon(this.daemon);
        return thread;
    }

    private void poll() {
        if (shouldBackoff()) {
            return;
        }
        AsyncDeferredObject asyncDeferredObject = new AsyncDeferredObject();
        if (publish(this.generator.get(), asyncDeferredObject)) {
            asyncDeferredObject.done((v1) -> {
                handleResult(v1);
            }).fail(this::handleException);
        } else {
            handleIdle();
        }
    }

    private boolean shouldBackoff() {
        if (this.backoffMultiplier.intValue() < 0) {
            return false;
        }
        if ((this.errorThreshold.intValue() < 0 || this.errorCounter.get() <= this.errorThreshold.intValue()) && (this.idleThreshold.intValue() < 0 || this.idleCounter.get() <= this.idleThreshold.intValue())) {
            return false;
        }
        if (this.backoffCounter.incrementAndGet() <= this.backoffMultiplier.intValue()) {
            log.debug("Number of errors or idles exceeds maximum allowed, trying to backoff");
            return true;
        }
        log.trace("Recovering from backoff");
        this.errorCounter.set(0);
        this.idleCounter.set(0);
        this.backoffCounter.set(0);
        return false;
    }

    private void handleIdle() {
        this.errorCounter.set(0);
        this.idleCounter.incrementAndGet();
    }

    private void handleResult(Object obj) {
        this.errorCounter.set(0);
        this.idleCounter.set(0);
    }

    private void handleException(Exception exc) {
        log.error("Exception caught while running scheduler", exc);
        this.errorCounter.incrementAndGet();
        this.idleCounter.set(0);
    }

    private Message createDefaultMessage() {
        return Message.ofAny(BObject.of("schedulerName", this.schedulerName).setAny("timestamp", Long.valueOf(System.currentTimeMillis())), (Object) null);
    }

    protected void onStop() {
        this.future.cancel(false);
        ConnectionRef<String> connectionRef = connRefs.get(this.schedulerName);
        if (connectionRef == null || connectionRef.deref() != 0) {
            return;
        }
        connRefs.remove(this.schedulerName);
        ScheduledExecutorService remove = executors.remove(this.schedulerName);
        if (remove != null) {
            remove.shutdown();
        }
    }

    protected String generateName() {
        return "consumer.scheduler." + this.schedulerName;
    }
}
