/*
 * Decompiled with CFR 0.152.
 */
package dev.restate.sdk.http.vertx;

import com.google.protobuf.MessageLite;
import dev.restate.sdk.core.InvocationFlow;
import dev.restate.sdk.core.Util;
import dev.restate.sdk.http.vertx.MessageEncoder;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import java.util.concurrent.Flow;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

class HttpResponseFlowAdapter
implements InvocationFlow.InvocationOutputSubscriber {
    private static final Logger LOG = LogManager.getLogger(HttpResponseFlowAdapter.class);
    private final HttpServerResponse httpServerResponse;
    private Flow.Subscription outputSubscription;

    HttpResponseFlowAdapter(HttpServerResponse httpServerResponse) {
        this.httpServerResponse = httpServerResponse;
        this.httpServerResponse.exceptionHandler(this::propagateWireFailure);
    }

    public void onSubscribe(Flow.Subscription subscription) {
        this.outputSubscription = subscription;
        this.outputSubscription.request(Long.MAX_VALUE);
    }

    public void onNext(MessageLite messageLite) {
        this.write(messageLite);
    }

    public void onError(Throwable throwable) {
        this.propagatePublisherFailure(throwable);
    }

    public void onComplete() {
        this.endResponse();
    }

    private void write(MessageLite message) {
        if (this.httpServerResponse.closed()) {
            this.cancelSubscription();
            return;
        }
        LOG.trace("Writing response message " + message);
        Buffer buffer = Buffer.buffer((int)MessageEncoder.encodeLength(message));
        MessageEncoder.encode(buffer, message);
        this.httpServerResponse.write((Object)buffer);
    }

    private void propagateWireFailure(Throwable e) {
        LOG.warn("Error from wire", e);
        this.endResponse();
    }

    private void propagatePublisherFailure(Throwable e) {
        if (!this.httpServerResponse.headWritten()) {
            Util.findProtocolException((Throwable)e).ifPresentOrElse(pe -> this.httpServerResponse.setStatusCode(pe.getCode()), () -> this.httpServerResponse.setStatusCode(500));
        }
        LOG.warn("Error from publisher", e);
        this.endResponse();
    }

    private void endResponse() {
        LOG.trace("Closing response");
        if (!this.httpServerResponse.ended()) {
            this.httpServerResponse.end();
        }
        this.cancelSubscription();
    }

    private void cancelSubscription() {
        LOG.trace("Cancelling subscription");
        if (this.outputSubscription != null) {
            Flow.Subscription outputSubscription = this.outputSubscription;
            this.outputSubscription = null;
            outputSubscription.cancel();
        }
    }
}

