package io.zeebe.redis.exporter;

import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.SetArgs;
import io.lettuce.core.XTrimArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.zeebe.exporter.proto.RecordTransformer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/redis/exporter/RedisExporter.class */
public class RedisExporter implements Exporter {
    private static final String CLEANUP_LOCK = "zeebe:cleanup-lock";
    private static final String CLEANUP_TIMESTAMP = "zeebe:cleanup-time";
    private ExporterConfiguration config;
    private Logger logger;
    private Controller controller;
    private RedisClient redisClient;
    private StatefulRedisConnection<String, ?> redisConnection;
    private Function<Record, ?> recordTransformer;
    private Duration trimScheduleDelay;
    private String streamPrefix;
    private Map<String, Boolean> streams = new ConcurrentHashMap();
    private boolean useProtoBuf = false;
    private long maxTtlInMillisConfig = 0;
    private long minTtlInMillisConfig = 0;
    private boolean deleteAfterAcknowledge = false;
    private HashMap<String, Long> positions = new HashMap<>();

    public void configure(Context context) {
        this.logger = context.getLogger();
        this.config = (ExporterConfiguration) context.getConfiguration().instantiate(ExporterConfiguration.class);
        this.logger.info("Starting Redis exporter with configuration: {}", this.config);
        this.minTtlInMillisConfig = this.config.getMinTimeToLiveInSeconds() * 1000;
        if (this.minTtlInMillisConfig < 0) {
            this.minTtlInMillisConfig = 0L;
        }
        this.maxTtlInMillisConfig = this.config.getMaxTimeToLiveInSeconds() * 1000;
        if (this.maxTtlInMillisConfig < 0) {
            this.maxTtlInMillisConfig = 0L;
        }
        this.deleteAfterAcknowledge = this.config.isDeleteAfterAcknowledge();
        this.trimScheduleDelay = Duration.ofSeconds(this.config.getCleanupCycleInSeconds());
        this.streamPrefix = this.config.getName() + ":";
        context.setFilter(new RecordFilter(this.config));
        configureFormat();
    }

    private void configureFormat() {
        String format = this.config.getFormat();
        if (format.equalsIgnoreCase("protobuf")) {
            this.recordTransformer = this::recordToProtobuf;
            this.useProtoBuf = true;
        } else {
            if (!format.equalsIgnoreCase("json")) {
                throw new IllegalArgumentException(String.format("Expected the parameter 'format' to be one of 'protobuf' or 'json' but was '%s'", format));
            }
            this.recordTransformer = this::recordToJson;
        }
    }

    public void open(Controller controller) {
        this.controller = controller;
        if (this.config.getRemoteAddress().isEmpty()) {
            throw new IllegalStateException("Missing ZEEBE_REDIS_REMOTE_ADDRESS configuration.");
        }
        this.redisClient = RedisClient.create(this.config.getRemoteAddress().get());
        this.redisConnection = this.useProtoBuf ? this.redisClient.connect(new ProtobufCodec()) : this.redisClient.connect();
        this.logger.info("Successfully connected Redis exporter to {}", this.config.getRemoteAddress().get());
        if (this.config.getCleanupCycleInSeconds() > 0) {
            if (this.config.isDeleteAfterAcknowledge() || this.config.getMaxTimeToLiveInSeconds() > 0) {
                controller.scheduleCancellableTask(this.trimScheduleDelay, this::trimStreamValues);
            }
        }
    }

    public void close() {
        if (this.redisConnection != null) {
            this.redisConnection.close();
            this.redisConnection = null;
        }
        this.redisClient.shutdown();
    }

    public void export(Record record) {
        if (this.redisConnection != null) {
            long currentTimeMillis = System.currentTimeMillis();
            String concat = this.streamPrefix.concat(record.getValueType().name());
            RedisFuture<String> xadd = this.redisConnection.async().xadd((RedisAsyncCommands<String, ?>) concat, Long.toString(currentTimeMillis), this.recordTransformer.apply(record));
            long position = record.getPosition();
            xadd.thenRun(() -> {
                this.controller.updateLastExportedRecordPosition(position);
                this.streams.put(concat, Boolean.TRUE);
                try {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("Added a record with key {} to stream {}, messageId: {}", new Object[]{Long.valueOf(currentTimeMillis), concat, xadd.get()});
                    }
                } catch (Exception e) {
                }
            });
        }
    }

    private byte[] recordToProtobuf(Record record) {
        return RecordTransformer.toGenericRecord(record).toByteArray();
    }

    private String recordToJson(Record record) {
        return record.toJson();
    }

    private void trimStreamValues() {
        if (this.streams.size() > 0 && acquireCleanupLock()) {
            try {
                try {
                    long currentTimeMillis = System.currentTimeMillis() - this.maxTtlInMillisConfig;
                    String valueOf = String.valueOf(currentTimeMillis);
                    long currentTimeMillis2 = System.currentTimeMillis() - this.minTtlInMillisConfig;
                    String valueOf2 = String.valueOf(currentTimeMillis2);
                    this.logger.debug("trim streams {}", this.streams);
                    new ArrayList(this.streams.keySet()).forEach(str -> {
                        Optional empty = !this.deleteAfterAcknowledge ? Optional.empty() : this.redisConnection.sync().xinfoGroups(str).stream().map(obj -> {
                            return XInfoGroup.fromXInfo(obj, this.useProtoBuf);
                        }).map(xInfoGroup -> {
                            if (xInfoGroup.getPending() > 0) {
                                xInfoGroup.considerPendingMessageId(this.redisConnection.sync().xpending(str, xInfoGroup.getName()).getMessageIds().getLower().getValue());
                            }
                            return Long.valueOf(xInfoGroup.getLastDeliveredId());
                        }).min(Comparator.comparing((v0) -> {
                            return v0.longValue();
                        }));
                        if (!empty.isPresent()) {
                            if (this.maxTtlInMillisConfig > 0) {
                                long longValue = this.redisConnection.sync().xtrim((RedisCommands<String, ?>) str, new XTrimArgs().minId(valueOf)).longValue();
                                if (longValue > 0) {
                                    this.logger.debug("{}: {} cleaned records", str, Long.valueOf(longValue));
                                    return;
                                }
                                return;
                            }
                            return;
                        }
                        long longValue2 = ((Long) empty.get()).longValue();
                        String valueOf3 = String.valueOf(longValue2);
                        if (this.maxTtlInMillisConfig > 0 && currentTimeMillis > longValue2) {
                            valueOf3 = valueOf;
                        } else if (this.minTtlInMillisConfig > 0 && currentTimeMillis2 < longValue2) {
                            valueOf3 = valueOf2;
                        }
                        long longValue3 = this.redisConnection.sync().xtrim((RedisCommands<String, ?>) str, new XTrimArgs().minId(valueOf3)).longValue();
                        if (longValue3 > 0) {
                            this.logger.debug("{}: {} cleaned records", str, Long.valueOf(longValue3));
                        }
                    });
                    releaseCleanupLock();
                } catch (Exception e) {
                    this.logger.error("Error during cleanup", e);
                    releaseCleanupLock();
                }
            } catch (Throwable th) {
                releaseCleanupLock();
                throw th;
            }
        }
        this.controller.scheduleCancellableTask(this.trimScheduleDelay, this::trimStreamValues);
    }

    private boolean acquireCleanupLock() {
        byte[] bArr;
        try {
            String uuid = UUID.randomUUID().toString();
            Long valueOf = Long.valueOf(System.currentTimeMillis());
            if (!this.useProtoBuf) {
                StatefulRedisConnection<String, ?> statefulRedisConnection = this.redisConnection;
                statefulRedisConnection.sync().set(CLEANUP_LOCK, uuid, SetArgs.Builder.nx().px(this.trimScheduleDelay));
                if (!uuid.equals(statefulRedisConnection.sync().get(CLEANUP_LOCK))) {
                    return false;
                }
                String str = (String) statefulRedisConnection.sync().get(CLEANUP_TIMESTAMP);
                if (str != null && !str.isEmpty() && Long.parseLong(str) >= valueOf.longValue() - this.trimScheduleDelay.toMillis()) {
                    return false;
                }
                statefulRedisConnection.sync().set(CLEANUP_TIMESTAMP, Long.toString(valueOf.longValue()));
                return true;
            }
            StatefulRedisConnection<String, ?> statefulRedisConnection2 = this.redisConnection;
            statefulRedisConnection2.sync().set(CLEANUP_LOCK, uuid.getBytes(StandardCharsets.UTF_8), SetArgs.Builder.nx().px(this.trimScheduleDelay));
            byte[] bArr2 = (byte[]) statefulRedisConnection2.sync().get(CLEANUP_LOCK);
            if (bArr2 == null || bArr2.length <= 0 || !uuid.equals(new String(bArr2, StandardCharsets.UTF_8)) || !((bArr = (byte[]) statefulRedisConnection2.sync().get(CLEANUP_TIMESTAMP)) == null || bArr.length == 0 || Long.parseLong(new String(bArr, StandardCharsets.UTF_8)) < valueOf.longValue() - this.trimScheduleDelay.toMillis())) {
                return false;
            }
            statefulRedisConnection2.sync().set(CLEANUP_TIMESTAMP, Long.toString(valueOf.longValue()).getBytes(StandardCharsets.UTF_8));
            return true;
        } catch (Exception e) {
            this.logger.error("Error acquiring cleanup lock", e);
            return false;
        }
    }

    private void releaseCleanupLock() {
        try {
            this.redisConnection.sync().del(CLEANUP_LOCK);
        } catch (Exception e) {
            this.logger.error("Error releasing cleanup lock", e);
        }
    }
}
