package akka.persistence.query.journal.leveldb;

import akka.actor.ActorRef;
import akka.actor.ScalaActorRef;
import akka.actor.package$;
import akka.persistence.JournalProtocol;
import akka.persistence.Persistence;
import akka.persistence.Persistence$;
import akka.persistence.PersistentRepr;
import akka.persistence.journal.leveldb.LeveldbJournal;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.EventEnvelope$;
import akka.persistence.query.Sequence;
import akka.stream.Outlet;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogicWithLogging;
import java.util.LinkedList;
import scala.MatchError;
import scala.Tuple2;
import scala.collection.immutable.Set;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;

/* compiled from: EventsByPersistenceIdStage.scala */
/* loaded from: input_file:akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage$$anon$1.class */
public final class EventsByPersistenceIdStage$$anon$1 extends TimerGraphStageLogicWithLogging implements OutHandler, Buffer<EventEnvelope> {
    private final ActorRef journal;
    private ActorRef stageActorRef;
    private boolean replayInProgress;
    private boolean outstandingReplay;
    private long nextSequenceNr;
    private long toSequenceNr;
    private LinkedList<EventEnvelope> akka$persistence$query$journal$leveldb$Buffer$$buf;
    private final /* synthetic */ EventsByPersistenceIdStage $outer;

    @Override // akka.persistence.query.journal.leveldb.Buffer
    public void buffer(EventEnvelope eventEnvelope) {
        buffer((EventsByPersistenceIdStage$$anon$1) ((Buffer) eventEnvelope));
    }

    @Override // akka.persistence.query.journal.leveldb.Buffer
    public void buffer(Set<EventEnvelope> set) {
        buffer((Set) set);
    }

    @Override // akka.persistence.query.journal.leveldb.Buffer
    public void deliverBuf(Outlet<EventEnvelope> outlet) {
        deliverBuf(outlet);
    }

    @Override // akka.persistence.query.journal.leveldb.Buffer
    public int bufferSize() {
        int bufferSize;
        bufferSize = bufferSize();
        return bufferSize;
    }

    @Override // akka.persistence.query.journal.leveldb.Buffer
    public boolean bufferEmpty() {
        boolean bufferEmpty;
        bufferEmpty = bufferEmpty();
        return bufferEmpty;
    }

    @Override // akka.stream.stage.OutHandler
    public void onDownstreamFinish() throws Exception {
        onDownstreamFinish();
    }

    @Override // akka.stream.stage.OutHandler
    public void onDownstreamFinish(Throwable th) throws Exception {
        onDownstreamFinish(th);
    }

    @Override // akka.persistence.query.journal.leveldb.Buffer
    public LinkedList<EventEnvelope> akka$persistence$query$journal$leveldb$Buffer$$buf() {
        return this.akka$persistence$query$journal$leveldb$Buffer$$buf;
    }

    @Override // akka.persistence.query.journal.leveldb.Buffer
    public final void akka$persistence$query$journal$leveldb$Buffer$_setter_$akka$persistence$query$journal$leveldb$Buffer$$buf_$eq(LinkedList<EventEnvelope> linkedList) {
        this.akka$persistence$query$journal$leveldb$Buffer$$buf = linkedList;
    }

    private ActorRef journal() {
        return this.journal;
    }

    private ActorRef stageActorRef() {
        return this.stageActorRef;
    }

    private void stageActorRef_$eq(ActorRef actorRef) {
        this.stageActorRef = actorRef;
    }

    private boolean replayInProgress() {
        return this.replayInProgress;
    }

    private void replayInProgress_$eq(boolean z) {
        this.replayInProgress = z;
    }

    private boolean outstandingReplay() {
        return this.outstandingReplay;
    }

    private void outstandingReplay_$eq(boolean z) {
        this.outstandingReplay = z;
    }

    private long nextSequenceNr() {
        return this.nextSequenceNr;
    }

    private void nextSequenceNr_$eq(long j) {
        this.nextSequenceNr = j;
    }

    private long toSequenceNr() {
        return this.toSequenceNr;
    }

    private void toSequenceNr_$eq(long j) {
        this.toSequenceNr = j;
    }

    @Override // akka.stream.stage.TimerGraphStageLogicWithLogging, akka.stream.stage.StageLogging
    public Class<?> logSource() {
        return EventsByPersistenceIdStage.class;
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        stageActorRef_$eq(getStageActor(tuple2 -> {
            this.journalInteraction(tuple2);
            return BoxedUnit.UNIT;
        }).ref());
        this.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$refreshInterval.foreach(finiteDuration -> {
            $anonfun$preStart$2(this, finiteDuration);
            return BoxedUnit.UNIT;
        });
        requestMore();
    }

    private void requestMore() {
        if (replayInProgress()) {
            outstandingReplay_$eq(true);
            return;
        }
        int bufferSize = this.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$maxBufSize - bufferSize();
        if (bufferSize <= 0 || nextSequenceNr() > toSequenceNr()) {
            return;
        }
        replayInProgress_$eq(true);
        outstandingReplay_$eq(false);
        JournalProtocol.ReplayMessages replayMessages = new JournalProtocol.ReplayMessages(nextSequenceNr(), toSequenceNr(), bufferSize, this.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$persistenceId, stageActorRef());
        ScalaActorRef actorRef2Scala = package$.MODULE$.actorRef2Scala(journal());
        actorRef2Scala.$bang(replayMessages, actorRef2Scala.$bang$default$2(replayMessages));
    }

    @Override // akka.stream.stage.TimerGraphStageLogic
    public void onTimer(Object obj) {
        requestMore();
        deliverBuf(this.$outer.out());
        maybeCompleteStage();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void journalInteraction(Tuple2<ActorRef, Object> tuple2) {
        BoxedUnit boxedUnit;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Object mo6497_2 = tuple2.mo6497_2();
        if (mo6497_2 instanceof JournalProtocol.ReplayedMessage) {
            PersistentRepr persistent = ((JournalProtocol.ReplayedMessage) mo6497_2).persistent();
            buffer(EventEnvelope$.MODULE$.apply(new Sequence(persistent.sequenceNr()), persistent.persistenceId(), persistent.sequenceNr(), persistent.payload(), persistent.timestamp()));
            nextSequenceNr_$eq(persistent.sequenceNr() + 1);
            deliverBuf(this.$outer.out());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (!(mo6497_2 instanceof JournalProtocol.RecoverySuccess)) {
            if (mo6497_2 instanceof JournalProtocol.ReplayMessagesFailure) {
                failStage(((JournalProtocol.ReplayMessagesFailure) mo6497_2).cause());
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            } else {
                if (!(mo6497_2 instanceof LeveldbJournal.EventAppended)) {
                    throw new MatchError(mo6497_2);
                }
                requestMore();
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                return;
            }
        }
        long highestSequenceNr = ((JournalProtocol.RecoverySuccess) mo6497_2).highestSequenceNr();
        replayInProgress_$eq(false);
        deliverBuf(this.$outer.out());
        if (highestSequenceNr < toSequenceNr() && isCurrentQuery()) {
            toSequenceNr_$eq(highestSequenceNr);
        }
        log().debug("Replay complete. From sequenceNr {} currentSequenceNr {} toSequenceNr {} buffer size {}", BoxesRunTime.boxToLong(this.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$fromSequenceNr), BoxesRunTime.boxToLong(nextSequenceNr()), BoxesRunTime.boxToLong(toSequenceNr()), BoxesRunTime.boxToInteger(bufferSize()));
        if (bufferEmpty() && (nextSequenceNr() > toSequenceNr() || (nextSequenceNr() == this.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$fromSequenceNr && isCurrentQuery()))) {
            completeStage();
            boxedUnit = BoxedUnit.UNIT;
        } else if (nextSequenceNr() >= toSequenceNr()) {
            boxedUnit = BoxedUnit.UNIT;
        } else if (bufferSize() >= this.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$maxBufSize || !(isCurrentQuery() || outstandingReplay())) {
            boxedUnit = BoxedUnit.UNIT;
        } else {
            requestMore();
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private boolean isCurrentQuery() {
        return this.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$refreshInterval.isEmpty();
    }

    private void maybeCompleteStage() {
        if (!bufferEmpty() || nextSequenceNr() <= toSequenceNr()) {
            return;
        }
        completeStage();
    }

    @Override // akka.stream.stage.OutHandler
    public void onPull() {
        requestMore();
        deliverBuf(this.$outer.out());
        maybeCompleteStage();
    }

    public static final /* synthetic */ void $anonfun$preStart$2(EventsByPersistenceIdStage$$anon$1 eventsByPersistenceIdStage$$anon$1, FiniteDuration finiteDuration) {
        eventsByPersistenceIdStage$$anon$1.scheduleWithFixedDelay(EventsByPersistenceIdStage$Continue$.MODULE$, finiteDuration, finiteDuration);
        eventsByPersistenceIdStage$$anon$1.journal().tell(new LeveldbJournal.SubscribePersistenceId(eventsByPersistenceIdStage$$anon$1.$outer.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$persistenceId), eventsByPersistenceIdStage$$anon$1.stageActorRef());
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public EventsByPersistenceIdStage$$anon$1(EventsByPersistenceIdStage eventsByPersistenceIdStage) {
        super(eventsByPersistenceIdStage.shape2());
        if (eventsByPersistenceIdStage == null) {
            throw null;
        }
        this.$outer = eventsByPersistenceIdStage;
        OutHandler.$init$(this);
        akka$persistence$query$journal$leveldb$Buffer$_setter_$akka$persistence$query$journal$leveldb$Buffer$$buf_$eq(new LinkedList<>());
        Persistence persistence = (Persistence) Persistence$.MODULE$.apply(eventsByPersistenceIdStage.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$mat.system());
        this.journal = persistence.journalFor(eventsByPersistenceIdStage.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$writeJournalPluginId, persistence.journalFor$default$2());
        this.stageActorRef = null;
        this.replayInProgress = false;
        this.outstandingReplay = false;
        this.nextSequenceNr = eventsByPersistenceIdStage.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$fromSequenceNr;
        this.toSequenceNr = eventsByPersistenceIdStage.akka$persistence$query$journal$leveldb$EventsByPersistenceIdStage$$initialToSequenceNr;
        setHandler(eventsByPersistenceIdStage.out(), this);
        Statics.releaseFence();
    }
}
