package io.aeron.archive;

import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.archive.client.ArchiveException;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.FrameDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.EnumSet;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.LangUtil;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/aeron/archive/ReplaySession.class */
public class ReplaySession implements Session, AutoCloseable {
    private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(StandardOpenOption.READ);
    private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];
    private final long connectDeadlineMs;
    private final long correlationId;
    private final long sessionId;
    private final long recordingId;
    private final long startPosition;
    private long replayPosition;
    private long stopPosition;
    private long replayLimit;
    private long segmentFilePosition;
    private int termOffset;
    private int termBaseSegmentOffset;
    private final int streamId;
    private final int termLength;
    private final int segmentLength;
    private final ExclusivePublication publication;
    private final ControlSession controlSession;
    private final EpochClock epochClock;
    private final File archiveDir;
    private final Catalog catalog;
    private final Counter limitPosition;
    private final UnsafeBuffer replayBuffer;
    private FileChannel fileChannel;
    private File segmentFile;
    private volatile boolean isAborted;
    private final BufferClaim bufferClaim = new BufferClaim();
    private State state = State.INIT;
    private String errorMessage = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/aeron/archive/ReplaySession$State.class */
    public enum State {
        INIT,
        REPLAY,
        INACTIVE,
        DONE
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplaySession(long j, long j2, long j3, long j4, long j5, ControlSession controlSession, ControlResponseProxy controlResponseProxy, UnsafeBuffer unsafeBuffer, Catalog catalog, File file, File file2, EpochClock epochClock, ExclusivePublication exclusivePublication, RecordingSummary recordingSummary, Counter counter) {
        this.controlSession = controlSession;
        this.sessionId = j3;
        this.correlationId = j5;
        this.recordingId = recordingSummary.recordingId;
        this.segmentLength = recordingSummary.segmentFileLength;
        this.termLength = recordingSummary.termBufferLength;
        this.streamId = recordingSummary.streamId;
        this.epochClock = epochClock;
        this.archiveDir = file;
        this.segmentFile = file2;
        this.publication = exclusivePublication;
        this.limitPosition = counter;
        this.replayBuffer = unsafeBuffer;
        this.catalog = catalog;
        this.startPosition = recordingSummary.startPosition;
        this.stopPosition = null == this.limitPosition ? recordingSummary.stopPosition : this.limitPosition.get();
        long j6 = j == -1 ? this.startPosition : j;
        long j7 = null == this.limitPosition ? this.stopPosition - j6 : Long.MAX_VALUE - j6;
        long min = j2 == -1 ? j7 : Math.min(j2, j7);
        if (min < 0) {
            close();
            String str = "replay recording " + this.recordingId + " - length must be positive: " + min;
            controlSession.attemptErrorResponse(j5, str, controlResponseProxy);
            throw new ArchiveException(str);
        }
        if (null != this.limitPosition) {
            long j8 = this.limitPosition.get();
            if (j8 < j6) {
                close();
                String str2 = "replay recording " + this.recordingId + " - " + j6 + " after current position of " + j8;
                controlSession.attemptErrorResponse(j5, str2, controlResponseProxy);
                throw new ArchiveException(str2);
            }
        }
        this.segmentFilePosition = Archive.segmentFilePosition(j6, this.segmentLength);
        this.replayPosition = j6;
        this.replayLimit = j6 + min;
        controlSession.sendOkResponse(j5, j3, controlResponseProxy);
        this.connectDeadlineMs = epochClock.time() + j4;
    }

    @Override // io.aeron.archive.Session
    public void close() {
        closeRecordingSegment();
        CloseHelper.close(this.publication);
    }

    @Override // io.aeron.archive.Session
    public long sessionId() {
        return this.sessionId;
    }

    @Override // io.aeron.archive.Session
    public int doWork() {
        int i = 0;
        if (this.isAborted) {
            this.state = State.INACTIVE;
        }
        try {
            if (State.INIT == this.state) {
                i = 0 + init();
            }
            if (State.REPLAY == this.state) {
                i += replay();
            }
        } catch (IOException e) {
            onError("IOException - " + e.getMessage() + " - " + this.segmentFile.getName());
            LangUtil.rethrowUnchecked(e);
        }
        if (State.INACTIVE == this.state) {
            closeRecordingSegment();
            this.state = State.DONE;
        }
        return i;
    }

    @Override // io.aeron.archive.Session
    public void abort() {
        this.isAborted = true;
    }

    @Override // io.aeron.archive.Session
    public boolean isDone() {
        return this.state == State.DONE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long recordingId() {
        return this.recordingId;
    }

    State state() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Counter limitPosition() {
        return this.limitPosition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendPendingError(ControlResponseProxy controlResponseProxy) {
        if (null == this.errorMessage || this.controlSession.isDone()) {
            return;
        }
        this.controlSession.attemptErrorResponse(this.correlationId, this.errorMessage, controlResponseProxy);
    }

    private int init() throws IOException {
        if (null == this.fileChannel) {
            int positionBitsToShift = this.publication.positionBitsToShift();
            int i = ((int) (this.replayPosition - (this.startPosition - (this.startPosition & (this.termLength - 1))))) & (this.segmentLength - 1);
            int initialTermId = ((int) (this.replayPosition >> positionBitsToShift)) + this.publication.initialTermId();
            openRecordingSegment();
            this.termOffset = (int) (this.replayPosition & (this.termLength - 1));
            this.termBaseSegmentOffset = i - this.termOffset;
            if (this.replayPosition > this.startPosition && this.replayPosition != this.stopPosition && notHeaderAligned(this.fileChannel, this.replayBuffer, i, this.termOffset, initialTermId, this.streamId)) {
                onError(this.replayPosition + " position not aligned to data header");
                return 0;
            }
        }
        if (this.publication.isConnected()) {
            this.state = State.REPLAY;
            return 1;
        }
        if (this.epochClock.time() <= this.connectDeadlineMs) {
            return 0;
        }
        onError("no connection established for replay");
        return 0;
    }

    private int replay() throws IOException {
        int i = 0;
        if (!this.publication.isConnected()) {
            this.state = State.INACTIVE;
            return 0;
        }
        if (this.limitPosition != null && this.replayPosition >= this.stopPosition && noNewData(this.replayPosition, this.stopPosition)) {
            return 0;
        }
        if (this.termOffset == this.termLength) {
            nextTerm();
        }
        int i2 = 0;
        int readRecording = readRecording(this.stopPosition - this.replayPosition);
        while (true) {
            if (i2 >= readRecording) {
                break;
            }
            int frameLength = FrameDescriptor.frameLength(this.replayBuffer, i2);
            int frameType = FrameDescriptor.frameType(this.replayBuffer, i2);
            int align = BitUtil.align(frameLength, 32);
            int i3 = i2 + 32;
            int i4 = frameLength - 32;
            long j = 0;
            if (0 >= frameLength) {
                throw new IllegalStateException("unexpected end of recording reached");
            }
            if (frameType == 1) {
                if (i2 + align > readRecording) {
                    break;
                }
                j = this.publication.tryClaim(i4, this.bufferClaim);
                if (j > 0) {
                    this.bufferClaim.flags(FrameDescriptor.frameFlags(this.replayBuffer, i2)).reservedValue(this.replayBuffer.getLong(i2 + 24, ByteOrder.LITTLE_ENDIAN)).putBytes(this.replayBuffer, i3, i4).commit();
                }
            } else if (frameType == 0) {
                j = this.publication.appendPadding(i4);
            }
            if (j > 0) {
                i++;
                i2 += align;
                this.termOffset += align;
                this.replayPosition += align;
                if (this.replayPosition >= this.replayLimit) {
                    this.state = State.INACTIVE;
                    break;
                }
            } else if (j == -4 || j == -1) {
                onError("stream closed before replay is complete");
            }
        }
        return i;
    }

    private int readRecording(long j) throws IOException {
        if (this.publication.availableWindow() <= 0) {
            return 0;
        }
        int min = Math.min((int) Math.min(j, 2097152L), this.termLength - this.termOffset);
        ByteBuffer byteBuffer = this.replayBuffer.byteBuffer();
        byteBuffer.clear().limit(min);
        int i = this.termBaseSegmentOffset + this.termOffset;
        do {
            i += this.fileChannel.read(byteBuffer, i);
        } while (byteBuffer.remaining() > 0);
        return byteBuffer.limit();
    }

    private void onError(String str) {
        this.state = State.INACTIVE;
        this.errorMessage = str;
    }

    private boolean noNewData(long j, long j2) {
        long j3 = this.limitPosition.get();
        boolean isClosed = this.limitPosition.isClosed();
        long stopPosition = isClosed ? this.catalog.stopPosition(this.recordingId) : j3;
        if (isClosed) {
            if (stopPosition < this.replayLimit) {
                this.replayLimit = stopPosition;
            } else if (-1 == stopPosition) {
                this.replayLimit = j2;
            }
        }
        if (j >= this.replayLimit) {
            this.state = State.INACTIVE;
            return true;
        }
        if (stopPosition <= j2) {
            return true;
        }
        this.stopPosition = stopPosition;
        return false;
    }

    private void nextTerm() throws IOException {
        this.termOffset = 0;
        this.termBaseSegmentOffset += this.termLength;
        if (this.termBaseSegmentOffset == this.segmentLength) {
            closeRecordingSegment();
            this.segmentFilePosition += this.segmentLength;
            openRecordingSegment();
            this.termBaseSegmentOffset = 0;
        }
    }

    private void closeRecordingSegment() {
        CloseHelper.close(this.fileChannel);
        this.fileChannel = null;
        this.segmentFile = null;
    }

    private void openRecordingSegment() throws IOException {
        if (null == this.segmentFile) {
            String segmentFileName = Archive.segmentFileName(this.recordingId, this.segmentFilePosition);
            this.segmentFile = new File(this.archiveDir, segmentFileName);
            if (!this.segmentFile.exists()) {
                String str = "recording segment not found " + segmentFileName;
                onError(str);
                throw new ArchiveException(str);
            }
        }
        this.fileChannel = FileChannel.open(this.segmentFile.toPath(), FILE_OPTIONS, NO_ATTRIBUTES);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean notHeaderAligned(FileChannel fileChannel, UnsafeBuffer unsafeBuffer, int i, int i2, int i3, int i4) throws IOException {
        ByteBuffer byteBuffer = unsafeBuffer.byteBuffer();
        byteBuffer.clear().limit(32);
        if (32 != fileChannel.read(byteBuffer, i)) {
            throw new ArchiveException("failed to read fragment header");
        }
        return isInvalidHeader(unsafeBuffer, i4, i3, i2);
    }

    static boolean isInvalidHeader(UnsafeBuffer unsafeBuffer, int i, int i2, int i3) {
        return (DataHeaderFlyweight.termOffset(unsafeBuffer, 0) == i3 && DataHeaderFlyweight.termId(unsafeBuffer, 0) == i2 && DataHeaderFlyweight.streamId(unsafeBuffer, 0) == i) ? false : true;
    }
}
