package akka.persistence.query.journal.leveldb;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.SupervisorStrategy;
import akka.actor.package$;
import akka.event.LoggingAdapter;
import akka.persistence.Persistence;
import akka.persistence.Persistence$;
import akka.persistence.journal.leveldb.LeveldbJournal;
import akka.persistence.query.EventEnvelope;
import akka.stream.actor.ActorPublisher;
import akka.stream.actor.ActorPublisher$Internal$LifecycleState;
import akka.stream.actor.ActorPublisherState;
import org.reactivestreams.Subscriber;
import scala.Option;
import scala.PartialFunction;
import scala.collection.immutable.Vector;
import scala.concurrent.duration.Duration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: EventsByTagPublisher.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005uaAB\r\u001b\u0003\u0003\u0011C\u0005\u0003\u0005A\u0001\t\u0015\r\u0011\"\u0001C\u0011!q\u0005A!A!\u0002\u0013\u0019\u0005\u0002C(\u0001\u0005\u000b\u0007I\u0011\u0001)\t\u0011Q\u0003!\u0011!Q\u0001\nEC\u0001\"\u0016\u0001\u0003\u0006\u0004%\tA\u0016\u0005\t5\u0002\u0011\t\u0011)A\u0005/\"A1\f\u0001BC\u0002\u0013\u0005!\t\u0003\u0005]\u0001\t\u0005\t\u0015!\u0003D\u0011\u0015i\u0006\u0001\"\u0001_\u0011\u001di\u0002A1A\u0005\u0002\u0011Da\u0001\u001b\u0001!\u0002\u0013)\u0007bB5\u0001\u0001\u0004%\t\u0001\u0015\u0005\bU\u0002\u0001\r\u0011\"\u0001l\u0011\u0019\t\b\u0001)Q\u0005#\")!\u000f\u0001D\u0001!\")1\u000f\u0001C\u0001i\")1\u0010\u0001C\u0001i\")A\u0010\u0001D\u0001{\")a\u0010\u0001C\u0001i\")q\u0010\u0001D\u0001{\"9\u0011\u0011\u0001\u0001\u0005\u0002\u0005\r\u0001BBA\u0006\u0001\u0011\u0005Q\u0010C\u0004\u0002\u000e\u0001!\t!a\u0004\t\u000f\u0005U\u0001A\"\u0001\u0002\u0018\ta\u0012IY:ue\u0006\u001cG/\u0012<f]R\u001c()\u001f+bOB+(\r\\5tQ\u0016\u0014(BA\u000e\u001d\u0003\u001daWM^3mI\nT!!\b\u0010\u0002\u000f)|WO\u001d8bY*\u0011q\u0004I\u0001\u0006cV,'/\u001f\u0006\u0003C\t\n1\u0002]3sg&\u001cH/\u001a8dK*\t1%\u0001\u0003bW.\f7#\u0002\u0001&W]Z\u0004C\u0001\u0014*\u001b\u00059#\"\u0001\u0015\u0002\u000bM\u001c\u0017\r\\1\n\u0005):#AB!osJ+g\rE\u0002-cMj\u0011!\f\u0006\u0003]=\nQ!Y2u_JT!\u0001\r\u0012\u0002\rM$(/Z1n\u0013\t\u0011TF\u0001\bBGR|'\u000fU;cY&\u001c\b.\u001a:\u0011\u0005Q*T\"\u0001\u0010\n\u0005Yr\"!D#wK:$XI\u001c<fY>\u0004X\rE\u00029sMj\u0011AG\u0005\u0003ui\u0011a\u0002R3mSZ,'/\u001f\"vM\u001a,'\u000f\u0005\u0002=}5\tQH\u0003\u0002/E%\u0011q(\u0010\u0002\r\u0003\u000e$xN\u001d'pO\u001eLgnZ\u0001\u0004i\u0006<7\u0001A\u000b\u0002\u0007B\u0011Ai\u0013\b\u0003\u000b&\u0003\"AR\u0014\u000e\u0003\u001dS!\u0001S!\u0002\rq\u0012xn\u001c;?\u0013\tQu%\u0001\u0004Qe\u0016$WMZ\u0005\u0003\u00196\u0013aa\u0015;sS:<'B\u0001&(\u0003\u0011!\u0018m\u001a\u0011\u0002\u0015\u0019\u0014x.\\(gMN,G/F\u0001R!\t1#+\u0003\u0002TO\t!Aj\u001c8h\u0003-1'o\\7PM\u001a\u001cX\r\u001e\u0011\u0002\u00155\f\u0007PQ;g'&TX-F\u0001X!\t1\u0003,\u0003\u0002ZO\t\u0019\u0011J\u001c;\u0002\u00175\f\u0007PQ;g'&TX\rI\u0001\u0015oJLG/\u001a&pkJt\u0017\r\u001c)mk\u001eLg.\u00133\u0002+]\u0014\u0018\u000e^3K_V\u0014h.\u00197QYV<\u0017N\\%eA\u00051A(\u001b8jiz\"Ra\u00181bE\u000e\u0004\"\u0001\u000f\u0001\t\u000b\u0001K\u0001\u0019A\"\t\u000b=K\u0001\u0019A)\t\u000bUK\u0001\u0019A,\t\u000bmK\u0001\u0019A\"\u0016\u0003\u0015\u0004\"\u0001\u00104\n\u0005\u001dl$\u0001C!di>\u0014(+\u001a4\u0002\u0011)|WO\u001d8bY\u0002\n!bY;se>3gm]3u\u00039\u0019WO\u001d:PM\u001a\u001cX\r^0%KF$\"\u0001\\8\u0011\u0005\u0019j\u0017B\u00018(\u0005\u0011)f.\u001b;\t\u000fAl\u0011\u0011!a\u0001#\u0006\u0019\u0001\u0010J\u0019\u0002\u0017\r,(O](gMN,G\u000fI\u0001\ti>|eMZ:fi\u00069!/Z2fSZ,W#A;\u0011\u0005Y<X\"\u0001\u0001\n\u0005aL(a\u0002*fG\u0016Lg/Z\u0005\u0003uv\u0012Q!Q2u_J\fA!\u001b8ji\u0006)\"/Z2fSZ,\u0017J\\5uS\u0006d'+Z9vKN$H#\u00017\u0002\t%$G.Z\u0001\u0013e\u0016\u001cW-\u001b<f\u0013\u0012dWMU3rk\u0016\u001cH/A\u0007uS6,gi\u001c:SKBd\u0017-_\u000b\u0003\u0003\u000b\u00012AJA\u0004\u0013\r\tIa\n\u0002\b\u0005>|G.Z1o\u0003\u0019\u0011X\r\u001d7bs\u0006I!/\u001a9mCfLgn\u001a\u000b\u0004k\u0006E\u0001BBA\n/\u0001\u0007q+A\u0003mS6LG/\u0001\fsK\u000e,\u0017N^3SK\u000e|g/\u001a:z'V\u001c7-Z:t)\ra\u0017\u0011\u0004\u0005\u0007\u00037A\u0002\u0019A)\u0002\u0019!Lw\r[3tiN+\u0017O\u0014:")
/* loaded from: input_file:akka/persistence/query/journal/leveldb/AbstractEventsByTagPublisher.class */
public abstract class AbstractEventsByTagPublisher implements ActorPublisher<EventEnvelope>, DeliveryBuffer<EventEnvelope>, ActorLogging {
    private final String tag;
    private final long fromOffset;
    private final int maxBufSize;
    private final String writeJournalPluginId;
    private final ActorRef journal;
    private long currOffset;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private Vector<EventEnvelope> buf;
    private final ActorPublisherState akka$stream$actor$ActorPublisher$$state;
    private Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber;
    private long akka$stream$actor$ActorPublisher$$demand;
    private ActorPublisher$Internal$LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState;
    private Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    private final ActorContext context;
    private final ActorRef self;

    @Override // akka.actor.ActorLogging
    public LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // akka.persistence.query.journal.leveldb.DeliveryBuffer
    public void deliverBuf() {
        deliverBuf();
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundReceive(PartialFunction partialFunction, Object obj) {
        aroundReceive(partialFunction, obj);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreStart() {
        aroundPreStart();
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPreRestart(Throwable th, Option option) {
        aroundPreRestart(th, option);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostRestart(Throwable th) {
        aroundPostRestart(th);
    }

    @Override // akka.stream.actor.ActorPublisher
    public /* synthetic */ void akka$stream$actor$ActorPublisher$$super$aroundPostStop() {
        aroundPostStop();
    }

    @Override // akka.stream.actor.ActorPublisher
    public Duration subscriptionTimeout() {
        Duration subscriptionTimeout;
        subscriptionTimeout = subscriptionTimeout();
        return subscriptionTimeout;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isActive() {
        boolean isActive;
        isActive = isActive();
        return isActive;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final long totalDemand() {
        long j;
        j = totalDemand();
        return j;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isCompleted() {
        boolean isCompleted;
        isCompleted = isCompleted();
        return isCompleted;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isErrorEmitted() {
        boolean isErrorEmitted;
        isErrorEmitted = isErrorEmitted();
        return isErrorEmitted;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final boolean isCanceled() {
        boolean isCanceled;
        isCanceled = isCanceled();
        return isCanceled;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onNext(EventEnvelope eventEnvelope) {
        onNext(eventEnvelope);
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onComplete() {
        onComplete();
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onCompleteThenStop() {
        onCompleteThenStop();
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onError(Throwable th) {
        onError(th);
    }

    @Override // akka.stream.actor.ActorPublisher
    public void onErrorThenStop(Throwable th) {
        onErrorThenStop(th);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        aroundReceive(partialFunction, obj);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPreStart() {
        aroundPreStart();
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPreRestart(Throwable th, Option<Object> option) {
        aroundPreRestart(th, option);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPostRestart(Throwable th) {
        aroundPostRestart(th);
    }

    @Override // akka.stream.actor.ActorPublisher, akka.actor.Actor
    public void aroundPostStop() {
        aroundPostStop();
    }

    @Override // akka.actor.Actor
    public final ActorRef sender() {
        ActorRef sender;
        sender = sender();
        return sender;
    }

    @Override // akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        SupervisorStrategy supervisorStrategy;
        supervisorStrategy = supervisorStrategy();
        return supervisorStrategy;
    }

    @Override // akka.actor.Actor
    public void preStart() throws Exception {
        preStart();
    }

    @Override // akka.actor.Actor
    public void postStop() throws Exception {
        postStop();
    }

    @Override // akka.actor.Actor
    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        preRestart(th, option);
    }

    @Override // akka.actor.Actor
    public void postRestart(Throwable th) throws Exception {
        postRestart(th);
    }

    @Override // akka.actor.Actor
    public void unhandled(Object obj) {
        unhandled(obj);
    }

    @Override // akka.actor.ActorLogging
    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    @Override // akka.actor.ActorLogging
    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    @Override // akka.persistence.query.journal.leveldb.DeliveryBuffer
    public Vector<EventEnvelope> buf() {
        return this.buf;
    }

    @Override // akka.persistence.query.journal.leveldb.DeliveryBuffer
    public void buf_$eq(Vector<EventEnvelope> vector) {
        this.buf = vector;
    }

    @Override // akka.stream.actor.ActorPublisher
    public ActorPublisherState akka$stream$actor$ActorPublisher$$state() {
        return this.akka$stream$actor$ActorPublisher$$state;
    }

    @Override // akka.stream.actor.ActorPublisher
    public Subscriber<Object> akka$stream$actor$ActorPublisher$$subscriber() {
        return this.akka$stream$actor$ActorPublisher$$subscriber;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$subscriber_$eq(Subscriber<Object> subscriber) {
        this.akka$stream$actor$ActorPublisher$$subscriber = subscriber;
    }

    @Override // akka.stream.actor.ActorPublisher
    public long akka$stream$actor$ActorPublisher$$demand() {
        return this.akka$stream$actor$ActorPublisher$$demand;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$demand_$eq(long j) {
        this.akka$stream$actor$ActorPublisher$$demand = j;
    }

    @Override // akka.stream.actor.ActorPublisher
    public ActorPublisher$Internal$LifecycleState akka$stream$actor$ActorPublisher$$lifecycleState() {
        return this.akka$stream$actor$ActorPublisher$$lifecycleState;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$lifecycleState_$eq(ActorPublisher$Internal$LifecycleState actorPublisher$Internal$LifecycleState) {
        this.akka$stream$actor$ActorPublisher$$lifecycleState = actorPublisher$Internal$LifecycleState;
    }

    @Override // akka.stream.actor.ActorPublisher
    public Cancellable akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout() {
        return this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout;
    }

    @Override // akka.stream.actor.ActorPublisher
    public void akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout_$eq(Cancellable cancellable) {
        this.akka$stream$actor$ActorPublisher$$scheduledSubscriptionTimeout = cancellable;
    }

    @Override // akka.stream.actor.ActorPublisher
    public final void akka$stream$actor$ActorPublisher$_setter_$akka$stream$actor$ActorPublisher$$state_$eq(ActorPublisherState actorPublisherState) {
        this.akka$stream$actor$ActorPublisher$$state = actorPublisherState;
    }

    @Override // akka.actor.Actor
    public ActorContext context() {
        return this.context;
    }

    @Override // akka.actor.Actor
    public final ActorRef self() {
        return this.self;
    }

    @Override // akka.actor.Actor
    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    @Override // akka.actor.Actor
    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public String tag() {
        return this.tag;
    }

    public long fromOffset() {
        return this.fromOffset;
    }

    public int maxBufSize() {
        return this.maxBufSize;
    }

    public String writeJournalPluginId() {
        return this.writeJournalPluginId;
    }

    public ActorRef journal() {
        return this.journal;
    }

    public long currOffset() {
        return this.currOffset;
    }

    public void currOffset_$eq(long j) {
        this.currOffset = j;
    }

    public abstract long toOffset();

    @Override // akka.actor.Actor
    public PartialFunction<Object, BoxedUnit> receive() {
        return init();
    }

    public PartialFunction<Object, BoxedUnit> init() {
        return new AbstractEventsByTagPublisher$$anonfun$init$1(this);
    }

    public abstract void receiveInitialRequest();

    public PartialFunction<Object, BoxedUnit> idle() {
        return new AbstractEventsByTagPublisher$$anonfun$idle$1(this);
    }

    public abstract void receiveIdleRequest();

    public boolean timeForReplay() {
        return (buf().isEmpty() || buf().size() <= maxBufSize() / 2) && currOffset() < toOffset();
    }

    public void replay() {
        int maxBufSize = maxBufSize() - buf().size();
        log().debug("request replay for tag [{}] from [{}] to [{}] limit [{}]", tag(), BoxesRunTime.boxToLong(currOffset()), BoxesRunTime.boxToLong(toOffset()), BoxesRunTime.boxToInteger(maxBufSize));
        package$.MODULE$.actorRef2Scala(journal()).$bang(new LeveldbJournal.ReplayTaggedMessages(currOffset(), toOffset(), maxBufSize, tag(), self()), self());
        context().become(replaying(maxBufSize));
    }

    public PartialFunction<Object, BoxedUnit> replaying(int i) {
        return new AbstractEventsByTagPublisher$$anonfun$replaying$1(this);
    }

    public abstract void receiveRecoverySuccess(long j);

    public AbstractEventsByTagPublisher(String str, long j, int i, String str2) {
        this.tag = str;
        this.fromOffset = j;
        this.maxBufSize = i;
        this.writeJournalPluginId = str2;
        Actor.$init$(this);
        ActorPublisher.$init$((ActorPublisher) this);
        buf_$eq(scala.package$.MODULE$.Vector().empty());
        ActorLogging.$init$(this);
        Persistence persistence = (Persistence) Persistence$.MODULE$.apply(context().system());
        this.journal = persistence.journalFor(str2, persistence.journalFor$default$2());
        this.currOffset = j;
    }
}
