/*
 * Decompiled with CFR 0.152.
 */
package org.elder.sourcerer.eventstoredb;

import com.eventstore.dbclient.AppendToStreamOptions;
import com.eventstore.dbclient.ConnectionShutdownException;
import com.eventstore.dbclient.EventDataBuilder;
import com.eventstore.dbclient.EventStoreDBClient;
import com.eventstore.dbclient.ExpectedRevision;
import com.eventstore.dbclient.NotLeaderException;
import com.eventstore.dbclient.ReadResult;
import com.eventstore.dbclient.ReadStreamOptions;
import com.eventstore.dbclient.ResolvedEvent;
import com.eventstore.dbclient.ResourceNotFoundException;
import com.eventstore.dbclient.StreamNotFoundException;
import com.eventstore.dbclient.StreamRevision;
import com.eventstore.dbclient.SubscribeToStreamOptions;
import com.eventstore.dbclient.Subscription;
import com.eventstore.dbclient.SubscriptionListener;
import com.eventstore.dbclient.WriteResult;
import com.eventstore.dbclient.WrongExpectedVersionException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.elder.sourcerer.EventData;
import org.elder.sourcerer.EventNormalizer;
import org.elder.sourcerer.EventReadResult;
import org.elder.sourcerer.EventRecord;
import org.elder.sourcerer.EventRepository;
import org.elder.sourcerer.EventSubscriptionUpdate;
import org.elder.sourcerer.ExpectedVersion;
import org.elder.sourcerer.eventstoredb.EventStoreSubscriptionStoppedException;
import org.elder.sourcerer.exceptions.PermanentEventReadException;
import org.elder.sourcerer.exceptions.PermanentEventWriteException;
import org.elder.sourcerer.exceptions.RetriableEventReadException;
import org.elder.sourcerer.exceptions.RetriableEventWriteException;
import org.elder.sourcerer.exceptions.UnexpectedVersionException;
import org.elder.sourcerer.utils.ElderPreconditions;
import org.elder.sourcerer.utils.ImmutableListCollector;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class EventStoreGrpcEventRepository<T>
implements EventRepository<T> {
    private static final Logger logger = LoggerFactory.getLogger(EventStoreGrpcEventRepository.class);
    private static final int NON_EXISTING_STREAM_VERSION = -1;
    private static final int MAX_MAX_EVENTS_PER_READ = 4095;
    private static final long DEFAULT_TIMEOUT_MILLIS = 30000L;
    private final String streamPrefix;
    private final Class<T> eventClass;
    private final EventStoreDBClient eventStore;
    private final ObjectMapper objectMapper;
    private final EventNormalizer<T> normalizer;
    private final long timeoutMillis;

    public EventStoreGrpcEventRepository(String streamPrefix, EventStoreDBClient eventStore, Class<T> eventClass, ObjectMapper objectMapper, EventNormalizer<T> normalizer) {
        this.streamPrefix = streamPrefix;
        this.eventClass = eventClass;
        this.eventStore = eventStore;
        this.objectMapper = objectMapper;
        this.normalizer = normalizer;
        this.timeoutMillis = 30000L;
    }

    public Class<T> getEventType() {
        return this.eventClass;
    }

    public EventReadResult<T> readAll(int version, int maxEvents) {
        return this.readInternal(this.getCategoryStreamName(), version, maxEvents, true);
    }

    public EventReadResult<T> read(String streamId, int version, int maxEvents) {
        return this.readInternal(this.toEsStreamId(streamId), version, maxEvents, false);
    }

    public EventRecord<T> readFirst(String streamId) {
        return this.readSingleInternal(this.toEsStreamId(streamId), StreamRevision.START, false);
    }

    public EventRecord<T> readLast(String streamId) {
        return this.readSingleInternal(this.toEsStreamId(streamId), StreamRevision.END, false);
    }

    private String getCategoryStreamName() {
        return "$ce-" + this.streamPrefix;
    }

    private EventRecord<T> readSingleInternal(String internalStreamId, StreamRevision streamRevision, boolean resolveLinksTo) {
        logger.debug("Reading event {} from {} (in {})", new Object[]{streamRevision, internalStreamId, this.streamPrefix});
        ReadResult readResult = (ReadResult)this.completeReadFuture(this.eventStore.readStream(internalStreamId, 1L, (ReadStreamOptions)((ReadStreamOptions)ReadStreamOptions.get().fromRevision(streamRevision)).backwards().resolveLinkTos(resolveLinksTo)), ExpectedVersion.any());
        if (readResult == null || readResult.getEvents() == null || readResult.getEvents().isEmpty()) {
            logger.debug("Reading {} (in {}) returned no event", (Object)internalStreamId, (Object)this.streamPrefix);
            return null;
        }
        ResolvedEvent event = (ResolvedEvent)readResult.getEvents().get(0);
        logger.debug("Read event from {} (version {})", (Object)internalStreamId, (Object)event.getOriginalEvent().getStreamRevision().getValueUnsigned());
        return this.fromEsEvent(event);
    }

    private EventReadResult<T> readInternal(String internalStreamId, int version, int maxEvents, boolean resolveLinksTo) {
        int maxEventsPerRead = Integer.min(maxEvents, 4095);
        logger.debug("Reading from {} (in {}) (version {}) - effective max {}", new Object[]{internalStreamId, this.streamPrefix, version, maxEventsPerRead});
        ReadResult readResult = (ReadResult)this.completeReadFuture(this.eventStore.readStream(internalStreamId, (long)maxEventsPerRead, (ReadStreamOptions)((ReadStreamOptions)ReadStreamOptions.get().fromRevision((long)version)).resolveLinkTos(resolveLinksTo)), ExpectedVersion.exactly((int)version));
        if (readResult == null) {
            logger.debug("Reading {} (in {}) returned status not found", (Object)internalStreamId, (Object)this.streamPrefix);
            return null;
        }
        logger.debug("Read {} events from {} (version {})", new Object[]{readResult.getEvents().size(), internalStreamId, version});
        ImmutableList events = (ImmutableList)readResult.getEvents().stream().map(this::fromEsEvent).collect(new ImmutableListCollector());
        int fromEventNumber = version;
        int lastEventNumber = events.isEmpty() ? version - 1 : ((EventRecord)events.get(events.size() - 1)).getStreamVersion();
        int nextEventNumber = lastEventNumber + 1;
        return new EventReadResult(events, fromEventNumber, lastEventNumber, nextEventNumber, events.size() < maxEventsPerRead);
    }

    public int append(String streamId, List<EventData<T>> events, ExpectedVersion version) {
        Preconditions.checkNotNull(events);
        ElderPreconditions.checkNotEmpty(events);
        List esEvents = events.stream().map(this::toEsEventData).collect(Collectors.toList());
        logger.debug("Writing {} events to stream {} (in {}) (expected version {})", new Object[]{esEvents.size(), streamId, this.streamPrefix, version});
        try {
            WriteResult result = (WriteResult)this.completeWriteFuture(this.eventStore.appendToStream(this.toEsStreamId(streamId), (AppendToStreamOptions)AppendToStreamOptions.get().expectedRevision(this.toExpectedRevision(version)), esEvents.iterator()), version);
            int nextExpectedVersion = this.fromStreamRevision(result.getNextExpectedRevision());
            logger.debug("Write successful, next expected version is {}", (Object)nextExpectedVersion);
            return nextExpectedVersion;
        }
        catch (WrongExpectedVersionException ex) {
            logger.warn("Unexpected version when attempting append", (Throwable)ex);
            throw new UnexpectedVersionException(ex.getMessage(), version);
        }
    }

    public int getCurrentVersion() {
        return this.getStreamVersionInternal(this.getCategoryStreamName());
    }

    public int getCurrentVersion(String streamId) {
        return this.getStreamVersionInternal(this.toEsStreamId(streamId));
    }

    private int getStreamVersionInternal(String streamName) {
        ReadResult readResult = (ReadResult)this.completeReadFuture(this.eventStore.readStream(streamName, 1L, (ReadStreamOptions)((ReadStreamOptions)ReadStreamOptions.get().backwards().fromRevision(StreamRevision.END)).notResolveLinkTos()), ExpectedVersion.any());
        boolean hasResult = readResult != null && readResult.getEvents() != null && !readResult.getEvents().isEmpty();
        return hasResult ? this.fromStreamRevision(((ResolvedEvent)readResult.getEvents().get(0)).getOriginalEvent().getStreamRevision()) : -1;
    }

    public Publisher<EventSubscriptionUpdate<T>> getStreamPublisher(String streamId, Integer fromVersion) {
        logger.info("Creating publisher for {} (in {}) (starting with version {})", new Object[]{streamId, this.streamPrefix, fromVersion});
        return Flux.create(emitter -> {
            Subscription subscription = (Subscription)this.completeReadFuture(this.eventStore.subscribeToStream(this.toEsStreamId(streamId), (SubscriptionListener)new EmitterListener(emitter, this.streamPrefix + "-" + streamId), (SubscribeToStreamOptions)((SubscribeToStreamOptions)SubscribeToStreamOptions.get().fromRevision(this.toStreamRevision(fromVersion))).resolveLinkTos()), ExpectedVersion.any());
            emitter.onCancel(() -> {
                logger.info("Closing gRPC subscription (asynchronously)");
                if (subscription != null) {
                    subscription.stop();
                }
            });
        });
    }

    public Publisher<EventSubscriptionUpdate<T>> getPublisher(Integer fromVersion) {
        logger.info("Creating publisher for all events in {} (starting with version {})", (Object)this.streamPrefix, (Object)fromVersion);
        return Flux.create(emitter -> {
            Subscription subscription = (Subscription)this.completeReadFuture(this.eventStore.subscribeToStream(this.getCategoryStreamName(), (SubscriptionListener)new EmitterListener(emitter, this.streamPrefix + "-all"), (SubscribeToStreamOptions)((SubscribeToStreamOptions)SubscribeToStreamOptions.get().fromRevision(this.toStreamRevision(fromVersion))).resolveLinkTos()), ExpectedVersion.any());
            emitter.onCancel(() -> {
                logger.info("Closing gRPC subscription (asynchronously)");
                if (subscription != null) {
                    subscription.stop();
                }
            });
        });
    }

    private String toEsStreamId(String streamId) {
        return this.streamPrefix + "-" + streamId;
    }

    private com.eventstore.dbclient.EventData toEsEventData(EventData<T> eventData) {
        return EventDataBuilder.json((String)eventData.getEventType(), (byte[])this.toEsEvent(eventData.getEvent())).metadataAsBytes(this.toEsMetadata((Map<String, String>)eventData.getMetadata())).eventId(eventData.getEventId()).build();
    }

    private byte[] toEsMetadata(Map<String, String> metadata) {
        return this.jsonObjectToBytes(metadata);
    }

    private byte[] toEsEvent(T event) {
        return this.jsonObjectToBytes(event);
    }

    private byte[] jsonObjectToBytes(Object obj) {
        try {
            return this.objectMapper.writer().writeValueAsBytes(obj);
        }
        catch (IOException ex) {
            throw new RetriableEventWriteException("Internal error writing event", (Throwable)ex);
        }
    }

    private ExpectedRevision toExpectedRevision(ExpectedVersion version) {
        if (version == null) {
            return ExpectedRevision.ANY;
        }
        switch (version.getType()) {
            case ANY: {
                return ExpectedRevision.ANY;
            }
            case EXACTLY: {
                return ExpectedRevision.expectedRevision((long)version.getExpectedVersion());
            }
            case NOT_CREATED: {
                return ExpectedRevision.NO_STREAM;
            }
        }
        throw new IllegalArgumentException("Unrecognized expected version type: " + version);
    }

    private String fromEsStreamId(String streamId) {
        return streamId.substring(streamId.indexOf(45) + 1);
    }

    private EventRecord<T> fromEsEvent(ResolvedEvent event) {
        long streamVersion;
        long aggregateVersion;
        if (event.getLink() != null) {
            aggregateVersion = event.getEvent().getStreamRevision().getValueUnsigned();
            streamVersion = event.getLink().getStreamRevision().getValueUnsigned();
        } else {
            aggregateVersion = event.getEvent().getStreamRevision().getValueUnsigned();
            streamVersion = event.getEvent().getStreamRevision().getValueUnsigned();
        }
        return new EventRecord(this.fromEsStreamId(event.getEvent().getStreamId()), EventStoreGrpcEventRepository.convertTo32Bit(streamVersion), EventStoreGrpcEventRepository.convertTo32Bit(aggregateVersion), event.getEvent().getEventType(), event.getEvent().getEventId(), event.getEvent().getCreated(), this.fromEsMetadata(event.getEvent().getUserMetadata()), this.fromEsData(event.getEvent().getEventData()));
    }

    private T fromEsData(byte[] data) {
        try {
            Object rawEvent = this.objectMapper.readerFor(this.eventClass).readValue(data);
            return (T)this.normalizeEvent(rawEvent);
        }
        catch (IOException ex) {
            throw new RetriableEventReadException("Internal error reading events", (Throwable)ex);
        }
    }

    private T normalizeEvent(T rawEvent) {
        if (this.normalizer != null) {
            return (T)this.normalizer.normalizeEvent(rawEvent);
        }
        return rawEvent;
    }

    private ImmutableMap<String, String> fromEsMetadata(byte[] metadata) {
        if (metadata == null || metadata.length == 0) {
            return ImmutableMap.of();
        }
        try {
            return ImmutableMap.copyOf((Map)((Map)this.objectMapper.readerFor((TypeReference)new TypeReference<Map<String, String>>(){}).readValue(metadata)));
        }
        catch (IOException ex) {
            throw new RetriableEventReadException("Internal error reading events", (Throwable)ex);
        }
    }

    private <U> U completeReadFuture(CompletableFuture<U> future, ExpectedVersion expectedVersion) {
        try {
            return future.get(this.timeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RetriableEventReadException("Internal error reading event", (Throwable)ex);
        }
        catch (ExecutionException ex) {
            if (ex.getCause() instanceof StreamNotFoundException) {
                return null;
            }
            if (ex.getCause() instanceof WrongExpectedVersionException) {
                throw new UnexpectedVersionException(ex.getCause(), expectedVersion);
            }
            if (ex.getCause() instanceof ResourceNotFoundException) {
                throw new PermanentEventReadException(ex.getCause());
            }
            if (ex.getCause() instanceof ConnectionShutdownException || ex.getCause() instanceof NotLeaderException) {
                throw new RetriableEventReadException(ex.getCause());
            }
            if (ex.getCause() instanceof RuntimeException) {
                logger.warn("Unrecognized runtime exception reading events", ex.getCause());
                throw new RetriableEventReadException(ex.getCause());
            }
            logger.warn("Unrecognized exception reading events", ex.getCause());
            throw new RetriableEventReadException("Internal error reading events", ex.getCause());
        }
        catch (TimeoutException ex) {
            throw new RetriableEventReadException("Timeout reading events", ex.getCause());
        }
    }

    private <U> U completeWriteFuture(CompletableFuture<U> future, ExpectedVersion expectedVersion) {
        try {
            return future.get(this.timeoutMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new RetriableEventWriteException("Internal error writing event", (Throwable)ex);
        }
        catch (ExecutionException ex) {
            if (ex.getCause() instanceof WrongExpectedVersionException) {
                throw new UnexpectedVersionException(ex.getCause(), expectedVersion);
            }
            if (ex.getCause() instanceof ResourceNotFoundException || ex.getCause() instanceof StreamNotFoundException) {
                throw new PermanentEventWriteException(ex.getCause());
            }
            if (ex.getCause() instanceof ConnectionShutdownException || ex.getCause() instanceof NotLeaderException) {
                throw new RetriableEventWriteException(ex.getCause());
            }
            if (ex.getCause() instanceof RuntimeException) {
                logger.warn("Unrecognized runtime exception writing events", ex.getCause());
                throw new RetriableEventWriteException(ex.getCause());
            }
            logger.warn("Unrecognized exception writing events", ex.getCause());
            throw new RetriableEventWriteException("Internal error writing events", ex.getCause());
        }
        catch (TimeoutException ex) {
            throw new RetriableEventWriteException("Timeout writing events", ex.getCause());
        }
    }

    private static int convertTo32Bit(long longPosition) {
        if (longPosition > Integer.MAX_VALUE) {
            throw new IllegalStateException("Server returned a 64 bit position greater than what could be converted to a 64 bit number, this suggest that you have a stream (individual or projection) with more than 2 billion events. If you really need this, please fork Sourcerer and change ints to be longs.");
        }
        return (int)longPosition;
    }

    private int fromStreamRevision(StreamRevision revision) {
        return EventStoreGrpcEventRepository.convertTo32Bit(revision.getValueUnsigned());
    }

    private StreamRevision toStreamRevision(Integer version) {
        return version == null ? StreamRevision.START : new StreamRevision((long)version.intValue());
    }

    private class EmitterListener
    extends SubscriptionListener {
        private final FluxSink<EventSubscriptionUpdate<T>> emitter;
        private final String name;

        public EmitterListener(FluxSink<EventSubscriptionUpdate<T>> emitter, String name) {
            this.emitter = emitter;
            this.name = name;
        }

        public void onEvent(Subscription subscription, ResolvedEvent event) {
            logger.debug("Incoming message in {}: {}", (Object)this.name, (Object)event);
            this.emitter.next((Object)EventSubscriptionUpdate.ofEvent((EventRecord)EventStoreGrpcEventRepository.this.fromEsEvent(event)));
        }

        public void onError(Subscription subscription, Throwable exception) {
            if (exception != null) {
                logger.error("Subscription " + this.name + " failed with reason " + exception.getMessage() + "", exception);
            } else {
                logger.error("Subscription {} failed with reason {} and no exception", (Object)this.name, (Object)"unknown");
            }
            this.emitter.error((Throwable)new EventStoreSubscriptionStoppedException(exception));
        }

        public void onCancelled(Subscription subscription) {
            logger.info("Subscription " + this.name + " was cancelled");
        }
    }
}

