package io.zeebe.redis.exporter;

import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.resource.ClientResources;
import io.zeebe.exporter.proto.RecordTransformer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/redis/exporter/RedisExporter.class */
public class RedisExporter implements Exporter {
    private ExporterConfiguration config;
    private Logger logger;
    private UniversalRedisClient redisClient;
    private UniversalRedisConnection<String, ?> cleanupConnection;
    private UniversalRedisConnection<String, ?> senderConnection;
    private Function<Record, ?> recordTransformer;
    private String streamPrefix;
    private RedisCleaner redisCleaner;
    private RedisSender redisSender;
    private Controller controller;
    private ScheduledExecutorService startupThread;
    private boolean useProtoBuf = false;
    private EventQueue eventQueue = new EventQueue();
    private ScheduledExecutorService senderThread = Executors.newSingleThreadScheduledExecutor();
    private ScheduledExecutorService cleanerThread = Executors.newSingleThreadScheduledExecutor();
    private boolean fullyLoggedStartupException = false;
    private List<Integer> reconnectIntervals = new ArrayList(List.of(2, 3, 3, 4, 4, 4, 5));

    public void configure(Context context) {
        this.logger = context.getLogger();
        this.config = (ExporterConfiguration) context.getConfiguration().instantiate(ExporterConfiguration.class);
        this.logger.info("Starting Redis exporter with configuration: {}", this.config);
        this.streamPrefix = this.config.getName() + ":";
        context.setFilter(new RecordFilter(this.config));
        configureFormat();
    }

    private void configureFormat() {
        String format = this.config.getFormat();
        if (format.equalsIgnoreCase("protobuf")) {
            this.recordTransformer = this::recordToProtobuf;
            this.useProtoBuf = true;
        } else {
            if (!format.equalsIgnoreCase("json")) {
                throw new IllegalArgumentException(String.format("Expected the parameter 'format' to be one of 'protobuf' or 'json' but was '%s'", format));
            }
            this.recordTransformer = this::recordToJson;
        }
    }

    public void open(Controller controller) {
        if (this.config.getRemoteAddress().isEmpty()) {
            throw new IllegalStateException("Missing ZEEBE_REDIS_REMOTE_ADDRESS configuration.");
        }
        this.controller = controller;
        if (this.config.isUseClusterClient()) {
            RedisClusterClient create = RedisClusterClient.create(ClusterClientSettings.createResourcesFromConfig(this.config), this.config.getRemoteAddress().get());
            create.setOptions(ClusterClientSettings.createStandardOptions());
            this.redisClient = new UniversalRedisClient(create);
        } else {
            this.redisClient = new UniversalRedisClient(RedisClient.create(ClientResources.builder().ioThreadPoolSize(this.config.getIoThreadPoolSize()).build(), this.config.getRemoteAddress().get()));
        }
        connectToRedis();
    }

    private void connectToRedis() {
        boolean z = false;
        try {
            this.senderConnection = this.useProtoBuf ? this.redisClient.connect(new ProtobufCodec()) : this.redisClient.connect();
            this.cleanupConnection = this.useProtoBuf ? this.redisClient.connect(new ProtobufCodec()) : this.redisClient.connect();
            this.logger.info("Successfully connected Redis exporter to {}", this.config.getRemoteAddress().get());
        } catch (RedisConnectionException e) {
            if (this.fullyLoggedStartupException) {
                this.logger.warn("Failure connecting Redis exporter to {}: {}", this.config.getRemoteAddress().get(), e.getMessage());
            } else {
                this.logger.error("Failure connecting Redis exporter to " + String.valueOf(this.config.getRemoteAddress().get()), e);
                this.fullyLoggedStartupException = true;
            }
            z = true;
        }
        if (this.redisSender == null && this.senderConnection != null) {
            this.redisSender = new RedisSender(this.config, this.controller, this.senderConnection, this.logger);
            this.senderThread.schedule(this::sendBatches, this.config.getBatchCycleMillis(), TimeUnit.MILLISECONDS);
        }
        if (this.redisCleaner == null) {
            this.redisCleaner = new RedisCleaner(this.cleanupConnection, this.useProtoBuf, this.config, this.logger);
            if (this.config.getCleanupCycleInSeconds() > 0 && (this.config.isDeleteAfterAcknowledge() || this.config.getMaxTimeToLiveInSeconds() > 0)) {
                this.cleanerThread.schedule(this::trimStreamValues, this.config.getCleanupCycleInSeconds(), TimeUnit.SECONDS);
            }
        } else if (this.cleanupConnection != null) {
            this.redisCleaner.setRedisConnection(this.cleanupConnection);
        }
        if (z) {
            if (this.startupThread == null) {
                this.startupThread = Executors.newSingleThreadScheduledExecutor();
            }
            this.startupThread.schedule(this::connectToRedis, (this.reconnectIntervals.size() > 1 ? this.reconnectIntervals.remove(0) : this.reconnectIntervals.get(0)).intValue(), TimeUnit.SECONDS);
        } else if (this.startupThread != null) {
            this.startupThread.shutdown();
            this.startupThread = null;
        }
    }

    public void close() {
        this.senderThread.shutdown();
        if (this.senderConnection != null) {
            this.senderConnection.close();
            this.senderConnection = null;
        }
        this.cleanerThread.shutdown();
        if (this.cleanupConnection != null) {
            this.cleanupConnection.close();
            this.cleanupConnection = null;
        }
        this.redisClient.shutdown();
    }

    public void export(Record record) {
        String concat = this.streamPrefix.concat(record.getValueType().name());
        this.eventQueue.addEvent(new ImmutablePair<>(Long.valueOf(record.getPosition()), new RedisEvent(concat, System.currentTimeMillis(), this.recordTransformer.apply(record))));
        this.redisCleaner.considerStream(concat);
    }

    private void sendBatches() {
        this.redisSender.sendFrom(this.eventQueue);
        this.senderThread.schedule(this::sendBatches, this.config.getBatchCycleMillis(), TimeUnit.MILLISECONDS);
    }

    private byte[] recordToProtobuf(Record record) {
        return RecordTransformer.toGenericRecord(record).toByteArray();
    }

    private String recordToJson(Record record) {
        return record.toJson();
    }

    private void trimStreamValues() {
        this.redisCleaner.trimStreamValues();
        this.cleanerThread.schedule(this::trimStreamValues, this.config.getCleanupCycleInSeconds(), TimeUnit.SECONDS);
    }
}
