/*
 * Decompiled with CFR 0.152.
 */
package org.elder.sourcerer.eventstore;

import akka.pattern.AskTimeoutException;
import akka.util.ByteString;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import eventstore.Content;
import eventstore.ContentType;
import eventstore.Event;
import eventstore.EventNumber;
import eventstore.ExpectedVersion;
import eventstore.ReadStreamEventsCompleted;
import eventstore.ResolvedEvent;
import eventstore.StreamNotFoundException;
import eventstore.WriteResult;
import eventstore.WrongExpectedVersionException;
import eventstore.j.EsConnection;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.elder.sourcerer.EventData;
import org.elder.sourcerer.EventNormalizer;
import org.elder.sourcerer.EventReadResult;
import org.elder.sourcerer.EventRecord;
import org.elder.sourcerer.EventRepository;
import org.elder.sourcerer.EventSubscriptionUpdate;
import org.elder.sourcerer.ExpectedVersion;
import org.elder.sourcerer.exceptions.RetriableEventReadException;
import org.elder.sourcerer.exceptions.RetriableEventWriteException;
import org.elder.sourcerer.exceptions.UnexpectedVersionException;
import org.elder.sourcerer.utils.ElderPreconditions;
import org.elder.sourcerer.utils.ImmutableListCollector;
import org.joda.time.DateTime;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import scala.Option;
import scala.compat.java8.FutureConverters;
import scala.concurrent.Future;

public class EventStoreEventRepository<T>
implements EventRepository<T> {
    private static final int MAX_MAX_EVENTS_PER_READ = 4095;
    private static final int DEFAULT_MAX_SERIALIZED_EVENT_SIZE = 262144;
    private static final Logger logger = LoggerFactory.getLogger(EventStoreEventRepository.class);
    private final String streamPrefix;
    private final Class<T> eventClass;
    private final EventNormalizer<T> normalizer;
    private final EsConnection connection;
    private final ObjectMapper objectMapper;
    private final PooledByteBufAllocator bufferAllocator;
    private final int maxSerializedEventSize;

    public EventStoreEventRepository(String streamPrefix, EsConnection connection, Class<T> eventClass, ObjectMapper objectMapper, EventNormalizer<T> normalizer) {
        this.streamPrefix = streamPrefix;
        this.eventClass = eventClass;
        this.normalizer = normalizer;
        this.connection = connection;
        this.objectMapper = objectMapper;
        this.maxSerializedEventSize = 262144;
        this.bufferAllocator = PooledByteBufAllocator.DEFAULT;
    }

    public EventReadResult<T> read(String streamId, int version, int maxEvents) {
        try {
            int maxEventsPerRead = Integer.min(maxEvents, 4095);
            logger.debug("Reading from {} (in {}) (version {}) - effective max {}", new Object[]{streamId, this.streamPrefix, version, maxEventsPerRead});
            ReadStreamEventsCompleted res = (ReadStreamEventsCompleted)EventStoreEventRepository.completeReadFuture(this.connection.readStreamEventsForward(this.toEsStreamId(streamId), new EventNumber.Exact(version), maxEventsPerRead, false, null));
            logger.debug("Read {} events from {} (version {})", new Object[]{res.eventsJava().size(), streamId, version});
            ImmutableList events = (ImmutableList)res.eventsJava().stream().map(this::fromEsEvent).collect(new ImmutableListCollector());
            return new EventReadResult(events, version, res.lastEventNumber().value(), ((EventNumber.Exact)res.nextEventNumber()).value(), res.endOfStream());
        }
        catch (StreamNotFoundException ex) {
            return null;
        }
    }

    public int append(String streamId, List<EventData<T>> events, ExpectedVersion version) {
        Preconditions.checkNotNull(events);
        ElderPreconditions.checkNotEmpty(events);
        List esEvents = events.stream().map(this::toEsEventData).collect(Collectors.toList());
        logger.debug("Writing {} events to stream {} (in {}) (expected version {})", new Object[]{esEvents.size(), streamId, this.streamPrefix, version});
        try {
            WriteResult result = (WriteResult)EventStoreEventRepository.completeWriteFuture(this.connection.writeEvents(this.toEsStreamId(streamId), this.toEsVersion(version), esEvents, null));
            if (result == null) {
                logger.warn("Null result - when does this happen!?");
                throw new RetriableEventWriteException("Unknown write status, null result!");
            }
            int nextExpectedVersion = result.nextExpectedVersion().value();
            logger.debug("Write successful, next expected version is {}", (Object)nextExpectedVersion);
            return nextExpectedVersion;
        }
        catch (WrongExpectedVersionException ex) {
            throw new UnexpectedVersionException(ex.getMessage(), null, version);
        }
    }

    public Publisher<EventSubscriptionUpdate<T>> getStreamPublisher(String streamId, Integer fromVersion) {
        logger.info("Creating publisher for {} (in {}) (starting with version {})", new Object[]{streamId, this.streamPrefix, fromVersion});
        Publisher esPublisher = this.connection.streamPublisher(this.toEsStreamId(streamId), (EventNumber)(fromVersion == null ? null : new EventNumber.Exact(fromVersion.intValue())), false, null, true);
        return Flux.from((Publisher)esPublisher).map(this::fromEsEvent).map(EventSubscriptionUpdate::ofEvent);
    }

    public Publisher<EventSubscriptionUpdate<T>> getPublisher(Integer fromVersion) {
        logger.info("Creating publisher for all events in {} (starting with version {})", (Object)this.streamPrefix, (Object)fromVersion);
        Publisher esPublisher = this.connection.streamPublisher("$ce-" + this.streamPrefix, (EventNumber)(fromVersion == null ? null : new EventNumber.Exact(fromVersion.intValue())), true, null, true);
        return Flux.from((Publisher)esPublisher).map(this::fromEsEvent).map(EventSubscriptionUpdate::ofEvent);
    }

    private String toEsStreamId(String streamId) {
        return this.streamPrefix + "-" + streamId;
    }

    private eventstore.EventData toEsEventData(EventData<T> eventData) {
        return new eventstore.EventData(eventData.getEventType(), eventData.getEventId(), this.toEsEvent(eventData.getEvent()), this.toEsMetadata((Map<String, String>)eventData.getMetadata()));
    }

    private Content toEsMetadata(Map<String, String> metadata) {
        return this.jsonObjectToContent(metadata);
    }

    private Content toEsEvent(T event) {
        return this.jsonObjectToContent(event);
    }

    private Content jsonObjectToContent(Object obj) {
        return this.withBuffer(buffer -> {
            try {
                ByteBufOutputStream outputStream = new ByteBufOutputStream(buffer);
                this.objectMapper.writer().writeValue((OutputStream)outputStream, obj);
                ByteString byteString = ByteString.fromByteBuffer((ByteBuffer)buffer.nioBuffer());
                return new Content(byteString, (ContentType)eventstore.j.ContentType.json());
            }
            catch (IOException ex) {
                throw new RetriableEventWriteException("Internal error writing event", (Throwable)ex);
            }
        });
    }

    private eventstore.ExpectedVersion toEsVersion(ExpectedVersion version) {
        if (version == null) {
            return new ExpectedVersion.Any$();
        }
        switch (version.getType()) {
            case ANY: {
                return new ExpectedVersion.Any$();
            }
            case EXACTLY: {
                return new ExpectedVersion.Exact(version.getExpectedVersion());
            }
            case NOT_CREATED: {
                return new ExpectedVersion.NoStream$();
            }
        }
        throw new IllegalArgumentException("Unrecognized expected version type: " + version);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <U> U withBuffer(Function<ByteBuf, U> function) {
        ByteBuf buffer = null;
        try {
            buffer = this.bufferAllocator.directBuffer(this.maxSerializedEventSize, this.maxSerializedEventSize);
            U u = function.apply(buffer);
            return u;
        }
        finally {
            if (buffer != null) {
                buffer.release();
            }
        }
    }

    private String fromEsStreamId(String streamId) {
        return streamId.substring(streamId.indexOf(45) + 1);
    }

    private EventRecord<T> fromEsEvent(Event event) {
        int streamVersion;
        int aggregateVersion;
        if (event instanceof ResolvedEvent) {
            ResolvedEvent resolvedEvent = (ResolvedEvent)event;
            aggregateVersion = resolvedEvent.linkedEvent().number().value();
            streamVersion = resolvedEvent.linkEvent().number().value();
        } else {
            aggregateVersion = event.number().value();
            streamVersion = event.number().value();
        }
        return new EventRecord(this.fromEsStreamId(event.streamId().streamId()), streamVersion, aggregateVersion, event.data().eventType(), event.data().eventId(), EventStoreEventRepository.fromEsTimestamp((Option<DateTime>)event.created()), this.fromEsMetadata(event.data().metadata()), this.fromEsDataContent(event.data().data()));
    }

    private T fromEsDataContent(Content data) {
        try {
            Object rawEvent = this.objectMapper.readerFor(this.eventClass).readValue(data.value().iterator().asInputStream());
            return (T)this.normalizeEvent(rawEvent);
        }
        catch (IOException ex) {
            throw new RetriableEventReadException("Internal error reading events", (Throwable)ex);
        }
    }

    private T normalizeEvent(T rawEvent) {
        if (this.normalizer != null) {
            return (T)this.normalizer.normalizeEvent(rawEvent);
        }
        return rawEvent;
    }

    private ImmutableMap<String, String> fromEsMetadata(Content metadata) {
        if (metadata == null || metadata.contentType() != eventstore.j.ContentType.json() || metadata.value().size() == 0) {
            return ImmutableMap.of();
        }
        try {
            return ImmutableMap.copyOf((Map)((Map)this.objectMapper.readerFor((TypeReference)new TypeReference<Map<String, String>>(){}).readValue(metadata.value().iterator().asInputStream())));
        }
        catch (IOException ex) {
            throw new RetriableEventReadException("Internal error reading events", (Throwable)ex);
        }
    }

    private static Instant fromEsTimestamp(Option<DateTime> created) {
        if (!created.isDefined()) {
            throw new IllegalStateException("No time stamp returned from EventStore where expected");
        }
        return Instant.ofEpochMilli(((DateTime)created.get()).toInstant().getMillis());
    }

    private static <T> T completeReadFuture(Future<T> future) {
        try {
            return FutureConverters.toJava(future).toCompletableFuture().get();
        }
        catch (InterruptedException ex) {
            throw new RetriableEventReadException("Internal error reading event", (Throwable)ex);
        }
        catch (ExecutionException ex) {
            if (ex.getCause() instanceof RuntimeException) {
                throw (RuntimeException)ex.getCause();
            }
            if (ex.getCause() instanceof AskTimeoutException) {
                throw new RetriableEventReadException("Timeout reading events", ex.getCause());
            }
            throw new RetriableEventReadException("Internal error reading events", ex.getCause());
        }
    }

    private static <T> T completeWriteFuture(Future<T> future) {
        try {
            return FutureConverters.toJava(future).toCompletableFuture().get();
        }
        catch (InterruptedException ex) {
            throw new RetriableEventWriteException("Internal error writing event", (Throwable)ex);
        }
        catch (ExecutionException ex) {
            if (ex.getCause() instanceof RuntimeException) {
                throw (RuntimeException)ex.getCause();
            }
            if (ex.getCause() instanceof AskTimeoutException) {
                throw new RetriableEventWriteException("Timeout writing events", ex.getCause());
            }
            throw new RetriableEventWriteException("Internal error writing events", ex.getCause());
        }
    }
}

