package io.zeebe.redis.exporter;

import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.XTrimArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.zeebe.exporter.proto.RecordTransformer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/redis/exporter/RedisExporter.class */
public class RedisExporter implements Exporter {
    private ExporterConfiguration config;
    private Logger logger;
    private Controller controller;
    private RedisClient redisClient;
    private StatefulRedisConnection<String, ?> redisConnection;
    private Function<Record, ?> recordTransformer;
    private Duration trimScheduleDelay;
    private String streamPrefix;
    private Map<String, Boolean> streams = new ConcurrentHashMap();
    private boolean useProtoBuf = false;
    private long ttlInMillis = 0;
    private HashMap<String, Long> positions = new HashMap<>();

    public void configure(Context context) {
        this.logger = context.getLogger();
        this.config = (ExporterConfiguration) context.getConfiguration().instantiate(ExporterConfiguration.class);
        this.logger.debug("Starting exporter with configuration: {}", this.config);
        this.ttlInMillis = this.config.getTimeToLiveInSeconds() * 1000;
        this.trimScheduleDelay = Duration.ofSeconds(Math.min(60L, this.config.getTimeToLiveInSeconds()));
        this.streamPrefix = this.config.getName() + ":";
        context.setFilter(new RecordFilter(this.config));
        configureFormat();
    }

    private void configureFormat() {
        String format = this.config.getFormat();
        if (format.equalsIgnoreCase("protobuf")) {
            this.recordTransformer = this::recordToProtobuf;
            this.useProtoBuf = true;
        } else {
            if (!format.equalsIgnoreCase("json")) {
                throw new IllegalArgumentException(String.format("Expected the parameter 'format' to be one of 'protobuf' or 'json' but was '%s'", format));
            }
            this.recordTransformer = this::recordToJson;
        }
    }

    public void open(Controller controller) {
        this.controller = controller;
        if (this.config.getRemoteAddress().isEmpty()) {
            throw new IllegalStateException("Missing ZEEBE_REDIS_REMOTE_ADDRESS configuration.");
        }
        this.redisClient = RedisClient.create(this.config.getRemoteAddress().get());
        this.redisConnection = this.useProtoBuf ? this.redisClient.connect(new ProtobufCodec()) : this.redisClient.connect();
        this.logger.info("Successfully connected Redis exporter to {}", this.config.getRemoteAddress().get());
        if (this.config.getTimeToLiveInSeconds() > 0) {
            controller.scheduleCancellableTask(this.trimScheduleDelay, this::timeOutStreamValues);
        }
    }

    public void close() {
        if (this.redisConnection != null) {
            this.redisConnection.close();
            this.redisConnection = null;
        }
        this.redisClient.shutdown();
    }

    public void export(Record record) {
        if (this.redisConnection != null) {
            long currentTimeMillis = System.currentTimeMillis();
            String concat = this.streamPrefix.concat(record.getValueType().name());
            RedisFuture xadd = this.redisConnection.async().xadd(concat, new Object[]{Long.toString(currentTimeMillis), this.recordTransformer.apply(record)});
            long position = record.getPosition();
            xadd.thenRun(() -> {
                this.controller.updateLastExportedRecordPosition(position);
                this.streams.put(concat, Boolean.TRUE);
                try {
                    this.logger.trace("Added a record with key {} to stream {}, messageId: {}", new Object[]{Long.valueOf(currentTimeMillis), concat, xadd.get()});
                } catch (Exception e) {
                }
            });
        }
    }

    private byte[] recordToProtobuf(Record record) {
        return RecordTransformer.toGenericRecord(record).toByteArray();
    }

    private String recordToJson(Record record) {
        return record.toJson();
    }

    private void timeOutStreamValues() {
        if (this.streams.size() > 0) {
            String valueOf = String.valueOf(System.currentTimeMillis() - this.ttlInMillis);
            this.logger.debug("trim streams {} with minId={}", this.streams, valueOf);
            new ArrayList(this.streams.keySet()).forEach(str -> {
                this.redisConnection.async().xtrim(str, new XTrimArgs().minId(valueOf));
            });
        }
        this.controller.scheduleCancellableTask(this.trimScheduleDelay, this::timeOutStreamValues);
    }
}
