package io.zeebe.hazelcast.connect.java;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import io.zeebe.exporter.proto.Schema;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/zeebe-hazelcast-connector-1.0.0.jar:io/zeebe/hazelcast/connect/java/ZeebeHazelcast.class */
public class ZeebeHazelcast implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ZeebeHazelcast.class);
    private static final List<Class<? extends Message>> RECORD_MESSAGE_TYPES = new ArrayList();
    private final Ringbuffer<byte[]> ringbuffer;
    private final Map<Class<?>, List<Consumer<?>>> listeners;
    private final Consumer<Long> postProcessListener;
    private long sequence;
    private Future<?> future;
    private ExecutorService executorService;
    private volatile boolean isClosed = false;

    /* loaded from: input_file:BOOT-INF/lib/zeebe-hazelcast-connector-1.0.0.jar:io/zeebe/hazelcast/connect/java/ZeebeHazelcast$Builder.class */
    public static class Builder {
        private final HazelcastInstance hazelcastInstance;
        private final Map<Class<?>, List<Consumer<?>>> listeners = new HashMap();
        private String name = "zeebe";
        private long readFromSequence = -1;
        private boolean readFromHead = false;
        private Consumer<Long> postProcessListener = l -> {
        };

        private Builder(HazelcastInstance hazelcastInstance) {
            this.hazelcastInstance = hazelcastInstance;
        }

        public Builder name(String str) {
            this.name = str;
            return this;
        }

        public Builder readFrom(long j) {
            this.readFromSequence = j;
            this.readFromHead = false;
            return this;
        }

        public Builder readFromHead() {
            this.readFromSequence = -1L;
            this.readFromHead = true;
            return this;
        }

        public Builder readFromTail() {
            this.readFromSequence = -1L;
            this.readFromHead = false;
            return this;
        }

        public Builder postProcessListener(Consumer<Long> consumer) {
            this.postProcessListener = consumer;
            return this;
        }

        private <T extends Message> void addListener(Class<T> cls, Consumer<T> consumer) {
            List<Consumer<?>> orDefault = this.listeners.getOrDefault(cls, new ArrayList());
            orDefault.add(consumer);
            this.listeners.put(cls, orDefault);
        }

        public Builder addDeploymentListener(Consumer<Schema.DeploymentRecord> consumer) {
            addListener(Schema.DeploymentRecord.class, consumer);
            return this;
        }

        public Builder addDeploymentDistributionListener(Consumer<Schema.DeploymentDistributionRecord> consumer) {
            addListener(Schema.DeploymentDistributionRecord.class, consumer);
            return this;
        }

        public Builder addProcessListener(Consumer<Schema.ProcessRecord> consumer) {
            addListener(Schema.ProcessRecord.class, consumer);
            return this;
        }

        public Builder addProcessInstanceListener(Consumer<Schema.ProcessInstanceRecord> consumer) {
            addListener(Schema.ProcessInstanceRecord.class, consumer);
            return this;
        }

        public Builder addProcessEventListener(Consumer<Schema.ProcessEventRecord> consumer) {
            addListener(Schema.ProcessEventRecord.class, consumer);
            return this;
        }

        public Builder addVariableListener(Consumer<Schema.VariableRecord> consumer) {
            addListener(Schema.VariableRecord.class, consumer);
            return this;
        }

        public Builder addVariableDocumentListener(Consumer<Schema.VariableDocumentRecord> consumer) {
            addListener(Schema.VariableDocumentRecord.class, consumer);
            return this;
        }

        public Builder addJobListener(Consumer<Schema.JobRecord> consumer) {
            addListener(Schema.JobRecord.class, consumer);
            return this;
        }

        public Builder addJobBatchListener(Consumer<Schema.JobBatchRecord> consumer) {
            addListener(Schema.JobBatchRecord.class, consumer);
            return this;
        }

        public Builder addIncidentListener(Consumer<Schema.IncidentRecord> consumer) {
            addListener(Schema.IncidentRecord.class, consumer);
            return this;
        }

        public Builder addTimerListener(Consumer<Schema.TimerRecord> consumer) {
            addListener(Schema.TimerRecord.class, consumer);
            return this;
        }

        public Builder addMessageListener(Consumer<Schema.MessageRecord> consumer) {
            addListener(Schema.MessageRecord.class, consumer);
            return this;
        }

        public Builder addMessageSubscriptionListener(Consumer<Schema.MessageSubscriptionRecord> consumer) {
            addListener(Schema.MessageSubscriptionRecord.class, consumer);
            return this;
        }

        public Builder addMessageStartEventSubscriptionListener(Consumer<Schema.MessageStartEventSubscriptionRecord> consumer) {
            addListener(Schema.MessageStartEventSubscriptionRecord.class, consumer);
            return this;
        }

        public Builder addProcessMessageSubscriptionListener(Consumer<Schema.ProcessMessageSubscriptionRecord> consumer) {
            addListener(Schema.ProcessMessageSubscriptionRecord.class, consumer);
            return this;
        }

        public Builder addProcessInstanceCreationListener(Consumer<Schema.ProcessInstanceCreationRecord> consumer) {
            addListener(Schema.ProcessInstanceCreationRecord.class, consumer);
            return this;
        }

        public Builder addErrorListener(Consumer<Schema.ErrorRecord> consumer) {
            addListener(Schema.ErrorRecord.class, consumer);
            return this;
        }

        private long getSequence(Ringbuffer<?> ringbuffer) {
            long headSequence = ringbuffer.headSequence();
            long tailSequence = ringbuffer.tailSequence();
            if (this.readFromSequence <= 0) {
                return this.readFromHead ? headSequence : Math.max(headSequence, tailSequence);
            }
            if (this.readFromSequence <= tailSequence + 1) {
                return this.readFromSequence;
            }
            ZeebeHazelcast.LOGGER.info("The given sequence '{}' is greater than the current tail-sequence '{}' of the ringbuffer. Using the head-sequence instead.", Long.valueOf(this.readFromSequence), Long.valueOf(tailSequence));
            return headSequence;
        }

        public ZeebeHazelcast build() {
            ZeebeHazelcast.LOGGER.debug("Read from ringbuffer with name '{}'", this.name);
            Ringbuffer<?> ringbuffer = this.hazelcastInstance.getRingbuffer(this.name);
            if (ringbuffer == null) {
                throw new IllegalArgumentException(String.format("No ring buffer found with name '%s'", this.name));
            }
            ZeebeHazelcast.LOGGER.debug("Ringbuffer status: [head: {}, tail: {}, size: {}, capacity: {}]", Long.valueOf(ringbuffer.headSequence()), Long.valueOf(ringbuffer.tailSequence()), Long.valueOf(ringbuffer.size()), Long.valueOf(ringbuffer.capacity()));
            long sequence = getSequence(ringbuffer);
            ZeebeHazelcast.LOGGER.info("Read from ringbuffer '{}' starting from sequence '{}'", this.name, Long.valueOf(sequence));
            ZeebeHazelcast zeebeHazelcast = new ZeebeHazelcast(ringbuffer, sequence, this.listeners, this.postProcessListener);
            zeebeHazelcast.start();
            return zeebeHazelcast;
        }
    }

    private ZeebeHazelcast(Ringbuffer<byte[]> ringbuffer, long j, Map<Class<?>, List<Consumer<?>>> map, Consumer<Long> consumer) {
        this.ringbuffer = ringbuffer;
        this.sequence = j;
        this.listeners = map;
        this.postProcessListener = consumer;
    }

    public static Builder newBuilder(HazelcastInstance hazelcastInstance) {
        return new Builder(hazelcastInstance);
    }

    private void start() {
        this.executorService = Executors.newSingleThreadExecutor();
        this.future = this.executorService.submit(this::readFromBuffer);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOGGER.info("Closing. Stop reading from ringbuffer. Current sequence: '{}'", Long.valueOf(getSequence()));
        this.isClosed = true;
        if (this.future != null) {
            this.future.cancel(true);
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    public long getSequence() {
        return this.sequence;
    }

    private void readFromBuffer() {
        while (!this.isClosed) {
            readNext();
        }
    }

    private void readNext() {
        LOGGER.trace("Read from ring-buffer with sequence '{}'", Long.valueOf(this.sequence));
        try {
            handleRecord(Schema.Record.parseFrom(this.ringbuffer.readOne(this.sequence)));
            this.sequence++;
            this.postProcessListener.accept(Long.valueOf(this.sequence));
        } catch (InvalidProtocolBufferException e) {
            LOGGER.error("Failed to deserialize Protobuf message at sequence '{}'", Long.valueOf(this.sequence), e);
            this.sequence++;
        } catch (StaleSequenceException e2) {
            long headSeq = e2.getHeadSeq();
            LOGGER.warn("Fail to read from ring-buffer at sequence '{}'. The sequence is reported as stale. Continue with new head sequence at '{}'", Long.valueOf(this.sequence), Long.valueOf(headSeq), e2);
            this.sequence = headSeq;
        } catch (IllegalArgumentException e3) {
            long headSequence = this.ringbuffer.headSequence();
            LOGGER.warn("Fail to read from ring-buffer at sequence '{}'. Continue with head sequence at '{}'", Long.valueOf(this.sequence), Long.valueOf(headSequence), e3);
            this.sequence = headSequence;
        } catch (InterruptedException e4) {
            LOGGER.debug("Interrupted while reading from ring-buffer with sequence '{}'", Long.valueOf(this.sequence));
            throw new RuntimeException("Interrupted while reading from ring-buffer", e4);
        } catch (Exception e5) {
            if (this.isClosed) {
                return;
            }
            LOGGER.error("Fail to read from ring-buffer at sequence '{}'. Will try again.", Long.valueOf(this.sequence), e5);
        }
    }

    private void handleRecord(Schema.Record record) throws InvalidProtocolBufferException {
        Iterator<Class<? extends Message>> it = RECORD_MESSAGE_TYPES.iterator();
        while (it.hasNext() && !handleRecord(record, it.next())) {
        }
    }

    private <T extends Message> boolean handleRecord(Schema.Record record, Class<T> cls) throws InvalidProtocolBufferException {
        if (!record.getRecord().is(cls)) {
            return false;
        }
        Message unpack = record.getRecord().unpack(cls);
        this.listeners.getOrDefault(cls, List.of()).forEach(consumer -> {
            consumer.accept(unpack);
        });
        return true;
    }

    static {
        RECORD_MESSAGE_TYPES.add(Schema.DeploymentRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.DeploymentDistributionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ErrorRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.IncidentRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.JobRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.JobBatchRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.MessageStartEventSubscriptionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.MessageSubscriptionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.MessageRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessEventRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessInstanceRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessInstanceCreationRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.ProcessMessageSubscriptionRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.TimerRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.VariableRecord.class);
        RECORD_MESSAGE_TYPES.add(Schema.VariableDocumentRecord.class);
    }
}
