package akka.http.impl.engine.client;

import akka.event.LoggingAdapter;
import akka.http.impl.engine.client.PoolConductor;
import akka.http.impl.engine.client.PoolFlow;
import akka.http.impl.engine.client.PoolSlot;
import akka.http.impl.util.package$;
import akka.http.impl.util.package$RichHttpRequest$;
import akka.http.scaladsl.Http;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.macros.LogHelper;
import akka.stream.Graph;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source$;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.GraphStageLogic$EagerTerminateOutput$;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import java.util.ArrayDeque;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;

/* compiled from: PoolSlot.scala */
/* loaded from: input_file:akka/http/impl/engine/client/PoolSlot$SlotProcessor$$anon$1.class */
public final class PoolSlot$SlotProcessor$$anon$1 extends GraphStageLogic implements InHandler, LogHelper {
    private PoolFlow.RequestContext akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest;
    private final ArrayDeque<PoolFlow.RequestContext> akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests;
    private GraphStageLogic.SubSourceOutlet<HttpRequest> akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource;
    private GraphStageLogic.SubSinkInlet<HttpResponse> akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink;
    private Option<Future<Http.OutgoingConnection>> currentConnectionInfo;
    private final OutHandler connectionOutFlowHandler;
    private final InHandler connectionInFlowHandler;
    private final /* synthetic */ PoolSlot.SlotProcessor $outer;

    @Override // akka.stream.stage.InHandler
    public void onUpstreamFinish() throws Exception {
        onUpstreamFinish();
    }

    @Override // akka.stream.stage.InHandler
    public void onUpstreamFailure(Throwable th) throws Exception {
        onUpstreamFailure(th);
    }

    @Override // akka.macros.LogHelper
    public LoggingAdapter log() {
        return this.$outer.log();
    }

    public PoolFlow.RequestContext akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest() {
        return this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest;
    }

    public void akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest_$eq(PoolFlow.RequestContext requestContext) {
        this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest = requestContext;
    }

    public ArrayDeque<PoolFlow.RequestContext> akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests() {
        return this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests;
    }

    public GraphStageLogic.SubSourceOutlet<HttpRequest> akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource() {
        return this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource;
    }

    private void akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource_$eq(GraphStageLogic.SubSourceOutlet<HttpRequest> subSourceOutlet) {
        this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource = subSourceOutlet;
    }

    public GraphStageLogic.SubSinkInlet<HttpResponse> akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink() {
        return this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink;
    }

    private void akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink_$eq(GraphStageLogic.SubSinkInlet<HttpResponse> subSinkInlet) {
        this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink = subSinkInlet;
    }

    private Option<Future<Http.OutgoingConnection>> currentConnectionInfo() {
        return this.currentConnectionInfo;
    }

    private void currentConnectionInfo_$eq(Option<Future<Http.OutgoingConnection>> option) {
        this.currentConnectionInfo = option;
    }

    public boolean akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$isConnected() {
        return currentConnectionInfo().isDefined();
    }

    public void akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$disconnect(Option<Throwable> option) {
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource().complete();
        if (akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$isConnected()) {
            currentConnectionInfo_$eq(None$.MODULE$);
            Tuple2 tuple2 = (Tuple2) option.map(th -> {
                Tuple2 partition = ((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests().iterator()).asScala()).partition(requestContext -> {
                    return BoxesRunTime.boxToBoolean($anonfun$disconnect$2(requestContext));
                });
                if (partition == null) {
                    throw new MatchError(partition);
                }
                Tuple2 tuple22 = new Tuple2((Iterator) partition.mo5664_1(), (Iterator) partition.mo5663_2());
                return new Tuple2(((Iterator) tuple22.mo5664_1()).map(requestContext2 -> {
                    return new PoolSlot.SlotEvent.RetryRequest(requestContext2.copy(requestContext2.copy$default$1(), requestContext2.copy$default$2(), requestContext2.retriesLeft() - 1));
                }).toList(), ((Iterator) tuple22.mo5663_2()).map(requestContext3 -> {
                    return new PoolFlow.ResponseContext(requestContext3, new Failure(th));
                }).toList());
            }).getOrElse(() -> {
                return new Tuple2(((Iterator) JavaConverters$.MODULE$.asScalaIteratorConverter(this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests().iterator()).asScala()).map(requestContext -> {
                    return new PoolSlot.SlotEvent.RetryRequest(requestContext);
                }).toList(), Nil$.MODULE$);
            });
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2((List) tuple2.mo5664_1(), (List) tuple2.mo5663_2());
            List list = (List) tuple22.mo5664_1();
            List list2 = (List) tuple22.mo5663_2();
            akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests().clear();
            emitMultiple(this.$outer.responseOut(), list2);
            emitMultiple(this.$outer.eventOut(), list.$colon$colon(new PoolSlot.SlotEvent.Disconnected(this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$slotIx, list.size() + list2.size())), () -> {
                if (!list2.isEmpty() || this.hasBeenPulled(this.$outer.slotCommandIn())) {
                    return;
                }
                this.pull(this.$outer.slotCommandIn());
            });
        }
    }

    public Option<Throwable> akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$disconnect$default$1() {
        return None$.MODULE$;
    }

    private OutHandler connectionOutFlowHandler() {
        return this.connectionOutFlowHandler;
    }

    private InHandler connectionInFlowHandler() {
        return this.connectionInFlowHandler;
    }

    public void akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$dispatchRequest(PoolFlow.RequestContext requestContext) {
        LoggingAdapter log = log();
        if (log.isDebugEnabled()) {
            log.debug(new StringBuilder(0).append(prefixString()).append(new StringBuilder(31).append("pushing request to connection: ").append(package$RichHttpRequest$.MODULE$.debugString$extension(package$.MODULE$.RichHttpRequest(requestContext.request()))).toString()).toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests().add(requestContext);
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource().push(requestContext.request());
    }

    @Override // akka.stream.stage.InHandler
    public void onPush() {
        PoolFlow.RequestContext rc;
        BoxedUnit boxedUnit;
        PoolConductor.SlotCommand slotCommand = (PoolConductor.SlotCommand) grab(this.$outer.slotCommandIn());
        if (PoolConductor$ConnectEagerlyCommand$.MODULE$.equals(slotCommand)) {
            if (!akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$isConnected()) {
                establishConnectionFlow$1();
            }
            emit(this.$outer.eventOut(), new PoolSlot.SlotEvent.ConnectedEagerly(this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$slotIx));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (!(slotCommand instanceof PoolConductor.DispatchCommand) || (rc = ((PoolConductor.DispatchCommand) slotCommand).rc()) == null) {
            throw new MatchError(slotCommand);
        }
        if (akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$isConnected()) {
            akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$dispatchRequest(rc);
            boxedUnit = BoxedUnit.UNIT;
        } else {
            akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest_$eq(rc);
            establishConnectionFlow$1();
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void postStop() {
        LoggingAdapter log = log();
        if (log.isDebugEnabled()) {
            log.debug(new StringBuilder(16).append(prefixString()).append("Slot was stopped").toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        super.postStop();
    }

    @Override // akka.macros.LogHelper
    public String prefixString() {
        return new StringBuilder(6).append("[").append(this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$slotIx).append("] <").append(connectionInfoString$1()).append("> ").toString();
    }

    public String akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequestsInfo() {
        return akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests().isEmpty() ? "while no requests were in flight" : new StringBuilder(14).append("while running ").append(((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests()).asScala()).map(requestContext -> {
            return package$RichHttpRequest$.MODULE$.debugString$extension(package$.MODULE$.RichHttpRequest(requestContext.request()));
        }, Iterable$.MODULE$.canBuildFrom())).mkString(", ")).toString();
    }

    public /* synthetic */ PoolSlot.SlotProcessor akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ boolean $anonfun$disconnect$2(PoolFlow.RequestContext requestContext) {
        return requestContext.retriesLeft() > 0;
    }

    private final void establishConnectionFlow$1() {
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource_$eq(new GraphStageLogic.SubSourceOutlet<>(this, "SlotProcessor.RequestSource"));
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource().setHandler(connectionOutFlowHandler());
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink_$eq(new GraphStageLogic.SubSinkInlet<>(this, "SlotProcessor.ResponseSink"));
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink().setHandler(connectionInFlowHandler());
        LoggingAdapter log = log();
        if (log.isDebugEnabled()) {
            log.debug(new StringBuilder(26).append(prefixString()).append("Establishing connection...").toString());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        currentConnectionInfo_$eq(new Some(Source$.MODULE$.fromGraph(akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource().source()).viaMat((Graph) this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$connectionFlow, Keep$.MODULE$.right()).toMat((Graph) Sink$.MODULE$.fromGraph(akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink().sink()), Keep$.MODULE$.left()).run(subFusingMaterializer())));
        akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink().pull();
    }

    private final String connectionInfoString$1() {
        return (String) currentConnectionInfo().flatMap(future -> {
            return future.value2();
        }).flatMap(r2 -> {
            return r2.toOption();
        }).fold(() -> {
            return "unconnected";
        }, outgoingConnection -> {
            return new StringBuilder(2).append(outgoingConnection.localAddress().toString()).append("->").append(outgoingConnection.remoteAddress().toString()).toString();
        });
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PoolSlot$SlotProcessor$$anon$1(PoolSlot.SlotProcessor slotProcessor) {
        super(slotProcessor.shape2());
        if (slotProcessor == null) {
            throw null;
        }
        this.$outer = slotProcessor;
        InHandler.$init$(this);
        LogHelper.$init$(this);
        this.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$inflightRequests = new ArrayDeque<>();
        this.currentConnectionInfo = None$.MODULE$;
        this.connectionOutFlowHandler = new OutHandler(this) { // from class: akka.http.impl.engine.client.PoolSlot$SlotProcessor$$anon$1$$anon$2
            private final /* synthetic */ PoolSlot$SlotProcessor$$anon$1 $outer;

            @Override // akka.stream.stage.OutHandler
            public void onPull() {
                if (this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest() == null) {
                    this.$outer.pull(this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$$outer().slotCommandIn());
                } else {
                    this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$dispatchRequest(this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest());
                    this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$firstRequest_$eq(null);
                }
            }

            @Override // akka.stream.stage.OutHandler
            public void onDownstreamFinish() {
                this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSource().complete();
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        };
        this.connectionInFlowHandler = new PoolSlot$SlotProcessor$$anon$1$$anon$3(this);
        setHandler(slotProcessor.slotCommandIn(), this);
        setHandler(slotProcessor.responseOut(), new OutHandler(this) { // from class: akka.http.impl.engine.client.PoolSlot$SlotProcessor$$anon$1$$anon$4
            private final /* synthetic */ PoolSlot$SlotProcessor$$anon$1 $outer;

            @Override // akka.stream.stage.OutHandler
            public void onDownstreamFinish() throws Exception {
                onDownstreamFinish();
            }

            @Override // akka.stream.stage.OutHandler
            public void onPull() {
                if (this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$isConnected()) {
                    this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$connectionFlowSink().pull();
                } else {
                    if (this.$outer.hasBeenPulled(this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$$outer().slotCommandIn())) {
                        return;
                    }
                    this.$outer.pull(this.$outer.akka$http$impl$engine$client$PoolSlot$SlotProcessor$$anon$$$outer().slotCommandIn());
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
        setHandler(slotProcessor.eventOut(), GraphStageLogic$EagerTerminateOutput$.MODULE$);
    }
}
