package org.axonframework.eventstore.fs;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.CountingInputStream;
import org.axonframework.domain.AggregateIdentifier;
import org.axonframework.domain.DomainEvent;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.eventstore.EventSerializer;
import org.axonframework.eventstore.EventStore;
import org.axonframework.eventstore.EventStoreException;
import org.axonframework.eventstore.EventStreamNotFoundException;
import org.axonframework.eventstore.SnapshotEventStore;
import org.axonframework.eventstore.XStreamEventSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventstore/fs/FileSystemEventStore.class */
public class FileSystemEventStore implements EventStore, SnapshotEventStore {
    private static final Logger logger = LoggerFactory.getLogger(FileSystemEventStore.class);
    private final EventSerializer eventSerializer;
    private EventFileResolver eventFileResolver;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventstore/fs/FileSystemEventStore$BufferedReaderDomainEventStream.class */
    public static class BufferedReaderDomainEventStream implements DomainEventStream {
        private DomainEvent next = doReadNext();
        private final InputStream inputStream;
        private final EventSerializer serializer;

        public BufferedReaderDomainEventStream(InputStream inputStream, EventSerializer eventSerializer) {
            this.inputStream = new BufferedInputStream(inputStream);
            this.serializer = eventSerializer;
        }

        @Override // org.axonframework.domain.DomainEventStream
        public boolean hasNext() {
            return this.next != null;
        }

        @Override // org.axonframework.domain.DomainEventStream
        public DomainEvent next() {
            DomainEvent domainEvent = this.next;
            this.next = doReadNext();
            return domainEvent;
        }

        @Override // org.axonframework.domain.DomainEventStream
        public DomainEvent peek() {
            return this.next;
        }

        private DomainEvent doReadNext() {
            try {
                EventEntry readEventEntry = EventSerializationUtils.readEventEntry(this.inputStream);
                if (readEventEntry != null) {
                    return readEventEntry.deserialize(this.serializer);
                }
                IOUtils.closeQuietly(this.inputStream);
                return null;
            } catch (IOException e) {
                IOUtils.closeQuietly(this.inputStream);
                throw new EventStoreException("An error occurred while reading from the underlying source", e);
            } catch (RuntimeException e2) {
                IOUtils.closeQuietly(this.inputStream);
                throw e2;
            }
        }
    }

    public FileSystemEventStore() {
        this.eventSerializer = new XStreamEventSerializer();
    }

    public FileSystemEventStore(EventSerializer eventSerializer) {
        this.eventSerializer = eventSerializer;
    }

    @Override // org.axonframework.eventstore.EventStore
    public void appendEvents(String str, DomainEventStream domainEventStream) {
        if (domainEventStream.hasNext()) {
            OutputStream outputStream = null;
            try {
                try {
                    DomainEvent next = domainEventStream.next();
                    outputStream = this.eventFileResolver.openEventFileForWriting(str, next.getAggregateIdentifier());
                    do {
                        EventSerializationUtils.writeEventEntry(outputStream, next.getSequenceNumber().longValue(), next.getTimestamp().toString(), this.eventSerializer.serialize(next));
                        next = domainEventStream.hasNext() ? domainEventStream.next() : null;
                    } while (next != null);
                    IOUtils.closeQuietly(outputStream);
                } catch (IOException e) {
                    throw new EventStoreException("Unable to store given entity due to an IOException", e);
                }
            } catch (Throwable th) {
                IOUtils.closeQuietly(outputStream);
                throw th;
            }
        }
    }

    @Override // org.axonframework.eventstore.EventStore
    public DomainEventStream readEvents(String str, AggregateIdentifier aggregateIdentifier) {
        try {
            if (this.eventFileResolver.eventFileExists(str, aggregateIdentifier)) {
                return readEvents(str, aggregateIdentifier, this.eventFileResolver.openEventFileForReading(str, aggregateIdentifier));
            }
            throw new EventStreamNotFoundException(str, aggregateIdentifier);
        } catch (IOException e) {
            throw new EventStoreException(String.format("An error occurred while trying to open the event file for aggregate type [%s] with identifier [%s]", str, aggregateIdentifier.toString()), e);
        }
    }

    @Override // org.axonframework.eventstore.SnapshotEventStore
    public void appendSnapshotEvent(String str, DomainEvent domainEvent) {
        AggregateIdentifier aggregateIdentifier = domainEvent.getAggregateIdentifier();
        OutputStream outputStream = null;
        try {
            try {
                SnapshotEventEntry snapshotEventEntry = new SnapshotEventEntry(this.eventSerializer.serialize(domainEvent), domainEvent.getSequenceNumber().longValue(), domainEvent.getTimestamp().toString(), calculateOffset(str, aggregateIdentifier, domainEvent.getSequenceNumber().longValue()));
                outputStream = this.eventFileResolver.openSnapshotFileForWriting(str, aggregateIdentifier);
                EventSerializationUtils.writeSnapshotEntry(outputStream, snapshotEventEntry);
                IOUtils.closeQuietly(outputStream);
            } catch (IOException e) {
                throw new EventStoreException("Error writing a snapshot event due to an IO exception", e);
            }
        } catch (Throwable th) {
            IOUtils.closeQuietly(outputStream);
            throw th;
        }
    }

    private long calculateOffset(String str, AggregateIdentifier aggregateIdentifier, long j) throws IOException {
        InputStream inputStream = null;
        try {
            inputStream = new CountingInputStream(new BufferedInputStream(this.eventFileResolver.openEventFileForReading(str, aggregateIdentifier)));
            for (long j2 = -1; j2 < j; j2 = EventSerializationUtils.readEventEntry(inputStream).getSequenceNumber()) {
            }
            long byteCount = inputStream.getByteCount();
            IOUtils.closeQuietly(inputStream);
            return byteCount;
        } catch (Throwable th) {
            IOUtils.closeQuietly(inputStream);
            throw th;
        }
    }

    private DomainEventStream readEvents(String str, AggregateIdentifier aggregateIdentifier, InputStream inputStream) throws IOException {
        SnapshotEventEntry readSnapshotEvent = readSnapshotEvent(str, aggregateIdentifier, inputStream);
        InputStream inputStream2 = inputStream;
        if (readSnapshotEvent != null) {
            String timeStamp = readSnapshotEvent.getTimeStamp();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            EventSerializationUtils.writeEventEntry(byteArrayOutputStream, readSnapshotEvent.getSequenceNumber(), timeStamp, readSnapshotEvent.getBytes());
            inputStream2 = new SequenceInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), inputStream);
        }
        return new BufferedReaderDomainEventStream(inputStream2, this.eventSerializer);
    }

    private SnapshotEventEntry readSnapshotEvent(String str, AggregateIdentifier aggregateIdentifier, InputStream inputStream) throws IOException {
        SnapshotEventEntry snapshotEventEntry = null;
        if (this.eventFileResolver.snapshotFileExists(str, aggregateIdentifier)) {
            InputStream openSnapshotFileForReading = this.eventFileResolver.openSnapshotFileForReading(str, aggregateIdentifier);
            try {
                snapshotEventEntry = EventSerializationUtils.readLastSnapshotEntry(openSnapshotFileForReading);
                if (inputStream.skip(snapshotEventEntry.getOffset()) != snapshotEventEntry.getOffset()) {
                    logger.warn("The skip operation did not actually skip the expected amount of bytes. The event log of aggregate of type {} and identifier {} might be corrupt.", str, aggregateIdentifier.toString());
                }
            } finally {
                IOUtils.closeQuietly(openSnapshotFileForReading);
            }
        }
        return snapshotEventEntry;
    }

    public void setBaseDir(File file) {
        this.eventFileResolver = new SimpleEventFileResolver(file);
    }

    public void setEventFileResolver(EventFileResolver eventFileResolver) {
        this.eventFileResolver = eventFileResolver;
    }
}
