package io.zeebe.redis.connect.java;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import io.camunda.zeebe.protocol.record.ValueType;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisConnectionStateAdapter;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.zeebe.exporter.proto.Schema;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/zeebe/redis/connect/java/ZeebeRedis.class */
public class ZeebeRedis implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZeebeRedis.class);
    private static final Map<String, Class<? extends Message>> RECORD_MESSAGE_TYPES = Map.ofEntries(typeEntry(ValueType.DEPLOYMENT.name(), Schema.DeploymentRecord.class), typeEntry(ValueType.DEPLOYMENT_DISTRIBUTION.name(), Schema.DeploymentDistributionRecord.class), typeEntry(ValueType.ERROR.name(), Schema.ErrorRecord.class), typeEntry(ValueType.INCIDENT.name(), Schema.IncidentRecord.class), typeEntry(ValueType.JOB.name(), Schema.JobRecord.class), typeEntry(ValueType.JOB_BATCH.name(), Schema.JobBatchRecord.class), typeEntry(ValueType.MESSAGE_START_EVENT_SUBSCRIPTION.name(), Schema.MessageStartEventSubscriptionRecord.class), typeEntry(ValueType.MESSAGE_SUBSCRIPTION.name(), Schema.MessageSubscriptionRecord.class), typeEntry(ValueType.MESSAGE.name(), Schema.MessageRecord.class), typeEntry(ValueType.PROCESS.name(), Schema.ProcessRecord.class), typeEntry(ValueType.PROCESS_EVENT.name(), Schema.ProcessEventRecord.class), typeEntry(ValueType.PROCESS_INSTANCE.name(), Schema.ProcessInstanceRecord.class), typeEntry(ValueType.PROCESS_INSTANCE_CREATION.name(), Schema.ProcessInstanceCreationRecord.class), typeEntry(ValueType.PROCESS_MESSAGE_SUBSCRIPTION.name(), Schema.ProcessMessageSubscriptionRecord.class), typeEntry(ValueType.TIMER.name(), Schema.TimerRecord.class), typeEntry(ValueType.VARIABLE.name(), Schema.VariableRecord.class), typeEntry(ValueType.VARIABLE_DOCUMENT.name(), Schema.VariableDocumentRecord.class));
    private RedisClient redisClient;
    private StatefulRedisConnection<String, byte[]> redisConnection;
    private String consumerGroup;
    private String consumerId;
    private String prefix;
    private XReadArgs.StreamOffset[] offsets;
    private final Map<String, List<Consumer<?>>> listeners;
    private boolean deleteMessages;
    private Future<?> future;
    private ExecutorService executorService;
    private boolean reconnectUsesNewConnection;
    private long reconnectIntervalMillis;
    private Future<?> reconnectFuture;
    private ExecutorService reconnectExecutorService;
    private volatile boolean isClosed = false;
    private volatile boolean externalClose = false;

    /* loaded from: input_file:io/zeebe/redis/connect/java/ZeebeRedis$Builder.class */
    public static class Builder {
        private final RedisClient redisClient;
        private boolean reconnectUsesNewConnection = false;
        private Duration reconnectInterval = Duration.ofSeconds(1);
        private final Map<String, List<Consumer<?>>> listeners = new HashMap();
        private String consumerGroup = UUID.randomUUID().toString();
        private String consumerId = UUID.randomUUID().toString();
        private String prefix = "zeebe:";
        private String offset = "0-0";
        private boolean deleteMessages = false;

        private Builder(RedisClient redisClient) {
            this.redisClient = redisClient;
        }

        public Builder withReconnectUsingNewConnection() {
            this.reconnectUsesNewConnection = true;
            return this;
        }

        public Builder reconnectInterval(Duration duration) {
            this.reconnectInterval = duration;
            return this;
        }

        public Builder consumerGroup(String str) {
            this.consumerGroup = str;
            return this;
        }

        public Builder consumerId(String str) {
            this.consumerId = str;
            return this;
        }

        public Builder prefix(String str) {
            this.prefix = str + ":";
            return this;
        }

        public Builder offset(String str) {
            this.offset = str;
            return this;
        }

        public Builder deleteMessagesAfterSuccessfulHandling(boolean z) {
            this.deleteMessages = z;
            return this;
        }

        private <T extends Message> void addListener(String str, Consumer<T> consumer) {
            List<Consumer<?>> orDefault = this.listeners.getOrDefault(str, new ArrayList());
            orDefault.add(consumer);
            this.listeners.put(this.prefix + str, orDefault);
        }

        public Builder addDeploymentListener(Consumer<Schema.DeploymentRecord> consumer) {
            addListener(ValueType.DEPLOYMENT.name(), consumer);
            return this;
        }

        public Builder addDeploymentDistributionListener(Consumer<Schema.DeploymentDistributionRecord> consumer) {
            addListener(ValueType.DEPLOYMENT_DISTRIBUTION.name(), consumer);
            return this;
        }

        public Builder addProcessListener(Consumer<Schema.ProcessRecord> consumer) {
            addListener(ValueType.PROCESS.name(), consumer);
            return this;
        }

        public Builder addProcessInstanceListener(Consumer<Schema.ProcessInstanceRecord> consumer) {
            addListener(ValueType.PROCESS_INSTANCE.name(), consumer);
            return this;
        }

        public Builder addProcessEventListener(Consumer<Schema.ProcessEventRecord> consumer) {
            addListener(ValueType.PROCESS_EVENT.name(), consumer);
            return this;
        }

        public Builder addVariableListener(Consumer<Schema.VariableRecord> consumer) {
            addListener(ValueType.VARIABLE.name(), consumer);
            return this;
        }

        public Builder addVariableDocumentListener(Consumer<Schema.VariableDocumentRecord> consumer) {
            addListener(ValueType.VARIABLE_DOCUMENT.name(), consumer);
            return this;
        }

        public Builder addJobListener(Consumer<Schema.JobRecord> consumer) {
            addListener(ValueType.JOB.name(), consumer);
            return this;
        }

        public Builder addJobBatchListener(Consumer<Schema.JobBatchRecord> consumer) {
            addListener(ValueType.JOB_BATCH.name(), consumer);
            return this;
        }

        public Builder addIncidentListener(Consumer<Schema.IncidentRecord> consumer) {
            addListener(ValueType.INCIDENT.name(), consumer);
            return this;
        }

        public Builder addTimerListener(Consumer<Schema.TimerRecord> consumer) {
            addListener(ValueType.TIMER.name(), consumer);
            return this;
        }

        public Builder addMessageListener(Consumer<Schema.MessageRecord> consumer) {
            addListener(ValueType.MESSAGE.name(), consumer);
            return this;
        }

        public Builder addMessageSubscriptionListener(Consumer<Schema.MessageSubscriptionRecord> consumer) {
            addListener(ValueType.MESSAGE_SUBSCRIPTION.name(), consumer);
            return this;
        }

        public Builder addMessageStartEventSubscriptionListener(Consumer<Schema.MessageStartEventSubscriptionRecord> consumer) {
            addListener(ValueType.MESSAGE_START_EVENT_SUBSCRIPTION.name(), consumer);
            return this;
        }

        public Builder addProcessMessageSubscriptionListener(Consumer<Schema.ProcessMessageSubscriptionRecord> consumer) {
            addListener(ValueType.PROCESS_MESSAGE_SUBSCRIPTION.name(), consumer);
            return this;
        }

        public Builder addProcessInstanceCreationListener(Consumer<Schema.ProcessInstanceCreationRecord> consumer) {
            addListener(ValueType.PROCESS_INSTANCE_CREATION.name(), consumer);
            return this;
        }

        public Builder addErrorListener(Consumer<Schema.ErrorRecord> consumer) {
            addListener(ValueType.ERROR.name(), consumer);
            return this;
        }

        public ZeebeRedis build() {
            if (this.listeners.size() == 0) {
                throw new IllegalArgumentException("Must register a least one listener, but none found.");
            }
            StatefulRedisConnection connect = this.redisClient.connect(new ProtobufCodec());
            ZeebeRedis.LOGGER.info("Read from streams '{}*' with offset '{}'", this.prefix, this.offset);
            ArrayList arrayList = new ArrayList();
            this.listeners.keySet().stream().forEach(str -> {
                arrayList.add(XReadArgs.StreamOffset.lastConsumed(str));
                try {
                    connect.sync().xgroupCreate(XReadArgs.StreamOffset.from(str, this.offset), this.consumerGroup, XGroupCreateArgs.Builder.mkstream());
                } catch (RedisBusyException e) {
                }
            });
            ZeebeRedis zeebeRedis = new ZeebeRedis(this.redisClient, connect, this.reconnectUsesNewConnection, this.reconnectInterval, this.consumerGroup, this.consumerId, this.prefix, (XReadArgs.StreamOffset[]) arrayList.toArray(new XReadArgs.StreamOffset[0]), this.listeners, this.deleteMessages);
            zeebeRedis.start();
            return zeebeRedis;
        }
    }

    private static AbstractMap.SimpleEntry<String, Class<? extends Message>> typeEntry(String str, Class<? extends Message> cls) {
        return new AbstractMap.SimpleEntry<>(str, cls);
    }

    private ZeebeRedis(RedisClient redisClient, StatefulRedisConnection<String, byte[]> statefulRedisConnection, boolean z, Duration duration, String str, String str2, String str3, XReadArgs.StreamOffset<String>[] streamOffsetArr, Map<String, List<Consumer<?>>> map, boolean z2) {
        this.deleteMessages = false;
        this.reconnectUsesNewConnection = false;
        this.redisClient = redisClient;
        this.redisConnection = statefulRedisConnection;
        this.reconnectUsesNewConnection = z;
        this.reconnectIntervalMillis = duration.toMillis();
        this.consumerGroup = str;
        this.consumerId = str2;
        this.prefix = str3;
        this.offsets = streamOffsetArr;
        this.listeners = map;
        this.deleteMessages = z2;
    }

    public static Builder newBuilder(RedisClient redisClient) {
        return new Builder(redisClient);
    }

    private void start() {
        this.redisConnection.addListener(new RedisConnectionStateAdapter() { // from class: io.zeebe.redis.connect.java.ZeebeRedis.1
            public void onRedisConnected(RedisChannelHandler<?, ?> redisChannelHandler, SocketAddress socketAddress) {
                ZeebeRedis.LOGGER.info("Redis reconnected.");
            }

            public void onRedisDisconnected(RedisChannelHandler<?, ?> redisChannelHandler) {
                if (ZeebeRedis.this.externalClose) {
                    return;
                }
                ZeebeRedis.LOGGER.warn("Redis connection lost.");
                if (ZeebeRedis.this.reconnectUsesNewConnection) {
                    ZeebeRedis.this.doClose();
                    ZeebeRedis.this.reconnectExecutorService = Executors.newSingleThreadExecutor();
                    ZeebeRedis zeebeRedis = ZeebeRedis.this;
                    ExecutorService executorService = ZeebeRedis.this.reconnectExecutorService;
                    ZeebeRedis zeebeRedis2 = ZeebeRedis.this;
                    zeebeRedis.reconnectFuture = executorService.submit(zeebeRedis2::reconnect);
                }
            }
        });
        this.externalClose = false;
        this.isClosed = false;
        this.executorService = Executors.newSingleThreadExecutor();
        this.future = this.executorService.submit(this::readFromStream);
    }

    public void reconnect() {
        ProtobufCodec protobufCodec = new ProtobufCodec();
        while (true) {
            try {
                Thread.sleep(this.reconnectIntervalMillis);
                this.redisConnection = this.redisClient.connect(protobufCodec);
                LOGGER.info("Redis reconnected.");
                this.listeners.keySet().stream().forEach(str -> {
                    try {
                        this.redisConnection.sync().xgroupCreate(XReadArgs.StreamOffset.from(str, "0-0"), this.consumerGroup, XGroupCreateArgs.Builder.mkstream());
                    } catch (RedisBusyException e) {
                    }
                });
                start();
                return;
            } catch (InterruptedException e) {
                return;
            } catch (Exception e2) {
                LOGGER.trace("Redis reconnect failure: {}", e2.getMessage());
            }
        }
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.externalClose = true;
        doClose();
    }

    public void doClose() {
        LOGGER.info("Closing Consumer[group={}, id={}]. Stop reading from streams '{}*'.", new Object[]{this.consumerGroup, this.consumerId, this.prefix});
        this.isClosed = true;
        if (this.future != null) {
            this.future.cancel(true);
            this.future = null;
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            this.executorService = null;
        }
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(true);
            this.reconnectFuture = null;
        }
        if (this.reconnectExecutorService != null) {
            this.reconnectExecutorService.shutdown();
            this.reconnectExecutorService = null;
        }
        this.redisConnection.close();
    }

    private void readFromStream() {
        if (this.reconnectFuture != null) {
            this.reconnectFuture.cancel(true);
            this.reconnectFuture = null;
        }
        if (this.reconnectExecutorService != null) {
            this.reconnectExecutorService.shutdown();
            this.reconnectExecutorService = null;
        }
        while (!this.isClosed) {
            readNext();
        }
    }

    private void readNext() {
        LOGGER.trace("Consumer[id={}] reads from streams '{}*'", this.consumerId, this.prefix);
        try {
            for (StreamMessage<String, byte[]> streamMessage : this.redisConnection.sync().xreadgroup(io.lettuce.core.Consumer.from(this.consumerGroup, this.consumerId), XReadArgs.Builder.block(Long.MAX_VALUE), this.offsets)) {
                LOGGER.trace("Consumer[id={}] received message {} from {}", new Object[]{this.consumerId, streamMessage.getId(), streamMessage.getStream()});
                boolean handleRecord = handleRecord(streamMessage);
                this.redisConnection.async().xack((String) streamMessage.getStream(), this.consumerGroup, new String[]{streamMessage.getId()});
                if (this.deleteMessages && handleRecord) {
                    this.redisConnection.async().xdel((String) streamMessage.getStream(), new String[]{streamMessage.getId()});
                }
            }
        } catch (IllegalArgumentException e) {
            LOGGER.error("Illegal arguments for xreadgroup: {}. Closing Redis client.", e.getMessage());
            try {
                close();
            } catch (Exception e2) {
                LOGGER.debug("Failure while closing the client", e2);
            }
        } catch (Exception e3) {
            if (this.isClosed) {
                return;
            }
            LOGGER.error("Consumer[group={}, id={}] failed to read from streams '{}*'", new Object[]{this.consumerGroup, this.consumerId, this.prefix, e3});
        } catch (RedisCommandExecutionException e4) {
            if (this.isClosed) {
                return;
            }
            LOGGER.error("Consumer[group={}, id={}] failed to read from streams '{}*': {}. Initiating reconnect.", new Object[]{this.consumerGroup, this.consumerId, this.prefix, e4.getMessage()});
            try {
                close();
            } catch (Exception e5) {
                LOGGER.debug("Failure while closing the client", e5);
            }
            this.reconnectExecutorService = Executors.newSingleThreadExecutor();
            this.reconnectFuture = this.reconnectExecutorService.submit(this::reconnect);
        } catch (RedisCommandTimeoutException e6) {
            if (this.isClosed) {
                return;
            }
            LOGGER.debug("Consumer[group={}, id={}] timed out reading from streams '{}*'", new Object[]{this.consumerGroup, this.consumerId, this.prefix});
        }
    }

    private boolean handleRecord(StreamMessage<String, byte[]> streamMessage) throws InvalidProtocolBufferException {
        try {
            handleRecord((String) streamMessage.getStream(), Schema.Record.parseFrom((byte[]) streamMessage.getBody().values().iterator().next()), RECORD_MESSAGE_TYPES.get(((String) streamMessage.getStream()).substring(this.prefix.length())));
            return true;
        } catch (Exception e) {
            LOGGER.error("Error handling message {} from {}: {}", new Object[]{streamMessage.getId(), streamMessage.getStream(), e.getMessage()});
            return false;
        } catch (InvalidProtocolBufferException e2) {
            LOGGER.error("Failed to deserialize Protobuf message {} from {}", new Object[]{streamMessage.getId(), streamMessage.getStream(), e2});
            return true;
        }
    }

    private <T extends Message> void handleRecord(String str, Schema.Record record, Class<T> cls) throws InvalidProtocolBufferException {
        Message unpack = record.getRecord().unpack(cls);
        LOGGER.trace("Consumer[id={}] handling record {}", this.consumerId, unpack);
        this.listeners.getOrDefault(str, List.of()).forEach(consumer -> {
            consumer.accept(unpack);
        });
    }
}
