package io.scalecube.services.gateway.transport.http;

import io.netty.buffer.ByteBuf;
import io.scalecube.services.api.Qualifier;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.transport.GatewayClient;
import io.scalecube.services.gateway.transport.GatewayClientCodec;
import io.scalecube.services.gateway.transport.GatewayClientSettings;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;

/* loaded from: input_file:io/scalecube/services/gateway/transport/http/HttpGatewayClient.class */
public final class HttpGatewayClient implements GatewayClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpGatewayClient.class);
    private final GatewayClientCodec<ByteBuf> codec;
    private final HttpClient httpClient;
    private final MonoProcessor<Void> close = MonoProcessor.create();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();
    private final LoopResources loopResources = LoopResources.create("http-gateway-client");

    public HttpGatewayClient(GatewayClientSettings gatewayClientSettings, GatewayClientCodec<ByteBuf> gatewayClientCodec) {
        this.codec = gatewayClientCodec;
        this.httpClient = HttpClient.create(ConnectionProvider.elastic("http-gateway-client")).followRedirect(gatewayClientSettings.followRedirect()).tcpConfiguration(tcpClient -> {
            if (gatewayClientSettings.sslProvider() != null) {
                tcpClient = tcpClient.secure(gatewayClientSettings.sslProvider());
            }
            return tcpClient.runOn(this.loopResources).host(gatewayClientSettings.host()).port(gatewayClientSettings.port());
        });
        this.close.then(doClose()).doFinally(signalType -> {
            this.onClose.onComplete();
        }).doOnTerminate(() -> {
            LOGGER.info("Closed HttpGatewayClient resources");
        }).subscribe((Consumer) null, th -> {
            LOGGER.warn("Exception occurred on HttpGatewayClient close: " + th);
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Mono<ServiceMessage> requestResponse(ServiceMessage serviceMessage) {
        return Mono.defer(() -> {
            return this.httpClient.post().uri("/" + serviceMessage.qualifier()).send((httpClientRequest, nettyOutbound) -> {
                LOGGER.debug("Sending request {}", serviceMessage);
                Map headers = serviceMessage.headers();
                Objects.requireNonNull(httpClientRequest);
                headers.forEach((v1, v2) -> {
                    r1.header(v1, v2);
                });
                return nettyOutbound.sendObject(Mono.just(this.codec.encode(serviceMessage))).then();
            }).responseSingle((httpClientResponse, byteBufMono) -> {
                return byteBufMono.map((v0) -> {
                    return v0.retain();
                }).map(byteBuf -> {
                    return toMessage(httpClientResponse, byteBuf);
                });
            });
        });
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Flux<ServiceMessage> requestStream(ServiceMessage serviceMessage) {
        return Flux.error(new UnsupportedOperationException("requestStream is not supported by HTTP/1.x"));
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Flux<ServiceMessage> requestChannel(Flux<ServiceMessage> flux) {
        return Flux.error(new UnsupportedOperationException("requestChannel is not supported by HTTP/1.x"));
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public void close() {
        this.close.onComplete();
    }

    @Override // io.scalecube.services.gateway.transport.GatewayClient
    public Mono<Void> onClose() {
        return this.onClose;
    }

    private Mono<Void> doClose() {
        LoopResources loopResources = this.loopResources;
        Objects.requireNonNull(loopResources);
        return Mono.defer(loopResources::disposeLater);
    }

    public GatewayClientCodec<ByteBuf> getCodec() {
        return this.codec;
    }

    private ServiceMessage toMessage(HttpClientResponse httpClientResponse, ByteBuf byteBuf) {
        int code = httpClientResponse.status().code();
        ServiceMessage.Builder data = ServiceMessage.builder().qualifier(isError(code) ? Qualifier.asError(code) : httpClientResponse.uri()).data(byteBuf);
        httpClientResponse.responseHeaders().entries().forEach(entry -> {
            data.header((String) entry.getKey(), (String) entry.getValue());
        });
        ServiceMessage build = data.build();
        LOGGER.debug("Received response {}", build);
        return build;
    }

    private boolean isError(int i) {
        return i >= 400 && i <= 599;
    }
}
