package nstream.adapter.redis;

import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.common.schedule.DeferrableException;
import nstream.adapter.redis.scan.RedisScanner;
import redis.clients.jedis.JedisPooled;
import swim.concurrent.TimerRef;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/redis/RedisIngestingAgent.class */
public abstract class RedisIngestingAgent<V> extends IngestorMetricsAgent<RedisIngressSettings, V> {
    protected JedisPooled pool;
    protected RedisScanner scanner;
    protected TimerRef pollTimer;

    public abstract void scanAndIngest() throws DeferrableException;

    protected void cancel() {
        if (this.pollTimer != null) {
            this.pollTimer.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public RedisIngressSettings m4parseIngressSettings(Value value) {
        RedisIngressSettings redisIngressSettings = (RedisIngressSettings) RedisIngressSettings.form().cast(value);
        return redisIngressSettings == null ? RedisIngressSettings.defaultSettings() : redisIngressSettings;
    }

    protected void stageReception() {
        loadSettings("redisIngressConf");
        this.pool = (JedisPooled) ProvisionLoader.getProvision(((RedisIngressSettings) this.ingressSettings).poolProvisionName()).value();
        this.scanner = RedisAdapterUtils.buildScanner(this.pool, ((RedisIngressSettings) this.ingressSettings).key(), ((RedisIngressSettings) this.ingressSettings).type(), ((RedisIngressSettings) this.ingressSettings).match());
        this.pollTimer = scheduleWithFixedDelay(() -> {
            return this.pollTimer;
        }, ((RedisIngressSettings) this.ingressSettings).firstFetchDelayMillis(), ((RedisIngressSettings) this.ingressSettings).fetchIntervalMillis(), this::scanAndIngest);
    }
}
