/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.openshift.tools.kafka;

import io.debezium.testing.openshift.tools.HttpUtils;
import io.debezium.testing.openshift.tools.OpenShiftUtils;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPort;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPortBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.openshift.api.model.Route;
import io.fabric8.openshift.client.OpenShiftClient;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.model.DoneableKafkaConnector;
import io.strimzi.api.kafka.model.KafkaConnect;
import io.strimzi.api.kafka.model.KafkaConnector;
import io.strimzi.api.kafka.model.status.KafkaConnectorStatus;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectController {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectController.class);
    private final OpenShiftClient ocp;
    private final KafkaConnect kafkaConnect;
    private final OkHttpClient http;
    private final String project;
    private final OpenShiftUtils ocpUtils;
    private final HttpUtils httpUtils;
    private final boolean useConnectorResources;
    private Route apiRoute;
    private Route metricsRoute;

    public KafkaConnectController(KafkaConnect kafkaConnect, OpenShiftClient ocp, OkHttpClient http, boolean useConnectorResources) {
        this.kafkaConnect = kafkaConnect;
        this.ocp = ocp;
        this.http = http;
        this.useConnectorResources = useConnectorResources;
        this.project = kafkaConnect.getMetadata().getNamespace();
        this.ocpUtils = new OpenShiftUtils(ocp);
        this.httpUtils = new HttpUtils(http);
    }

    public NetworkPolicy allowServiceAccess() {
        LOGGER.info("Creating NetworkPolicy allowing public access to " + this.kafkaConnect.getMetadata().getName() + "'s services");
        HashMap<String, String> labels = new HashMap<String, String>();
        labels.put("strimzi.io/cluster", this.kafkaConnect.getMetadata().getName());
        labels.put("strimzi.io/kind", "KafkaConnect");
        labels.put("strimzi.io/name", this.kafkaConnect.getMetadata().getName() + "-connect");
        List<NetworkPolicyPort> ports = Stream.of(8083, 8404).map(IntOrString::new).map(p -> ((NetworkPolicyPortBuilder)((NetworkPolicyPortBuilder)new NetworkPolicyPortBuilder().withProtocol("TCP")).withPort(p)).build()).collect(Collectors.toList());
        NetworkPolicy policy = this.ocpUtils.createNetworkPolicy(this.project, this.kafkaConnect.getMetadata().getName() + "-allowed", labels, ports);
        return policy;
    }

    public Route exposeApi() {
        LOGGER.info("Exposing KafkaConnect API");
        String name = this.kafkaConnect.getMetadata().getName() + "-connect-api";
        Service service = (Service)((ServiceResource)((NonNamespaceOperation)this.ocp.services().inNamespace(this.project)).withName(name)).get();
        this.apiRoute = this.ocpUtils.createRoute(this.project, name, name, "rest-api", service.getMetadata().getLabels());
        this.httpUtils.awaitApi(this.getApiURL());
        return this.apiRoute;
    }

    public Route exposeMetrics() {
        LOGGER.info("Exposing KafkaConnect metrics");
        String name = this.kafkaConnect.getMetadata().getName() + "-connect-metrics";
        String nameSvc = this.kafkaConnect.getMetadata().getName() + "-connect-api";
        Service service = (Service)((ServiceResource)((NonNamespaceOperation)this.ocp.services().inNamespace(this.project)).withName(nameSvc)).get();
        this.metricsRoute = this.ocpUtils.createRoute(this.project, name, nameSvc, "prometheus", service.getMetadata().getLabels());
        this.httpUtils.awaitApi(this.getMetricsURL());
        return this.metricsRoute;
    }

    public void deployConnector(String name, ConnectorConfigBuilder config) throws IOException, InterruptedException {
        if (this.useConnectorResources) {
            this.deployConnectorCr(name, config);
        } else {
            this.deployConnectorJson(name, config);
        }
    }

    private void deployConnectorJson(String name, ConnectorConfigBuilder config) throws IOException {
        if (this.apiRoute == null) {
            throw new IllegalStateException("KafkaConnect API was not exposed");
        }
        HttpUrl url = this.getApiURL().resolve("/connectors/" + name + "/config");
        Request r = new Request.Builder().url(url).put(RequestBody.create((String)config.getJsonString(), (MediaType)MediaType.parse((String)"application/json"))).build();
        try (Response res = this.http.newCall(r).execute();){
            if (!res.isSuccessful()) {
                LOGGER.error(res.request().url().toString());
                throw new RuntimeException("Connector registration request returned status code '" + res.code() + "'");
            }
            LOGGER.info("Registered kafka connector '" + name + "'");
        }
    }

    private void deployConnectorCr(String name, ConnectorConfigBuilder config) throws InterruptedException {
        LOGGER.info("Deploying connector CR");
        KafkaConnector connector = config.getCustomResource();
        connector.getMetadata().setName(name);
        connector.getMetadata().getLabels().put("strimzi.io/cluster", this.kafkaConnect.getMetadata().getName());
        this.kafkaConnectorOperation().createOrReplace((Object[])new KafkaConnector[]{connector});
        this.waitForKafkaConnector(connector.getMetadata().getName());
    }

    public KafkaConnector waitForKafkaConnector(String name) throws InterruptedException {
        if (!this.useConnectorResources) {
            throw new IllegalStateException("Unable to wait for connector, deployment doesn't use custom resources.");
        }
        return (KafkaConnector)((Resource)this.kafkaConnectorOperation().withName(name)).waitUntilCondition(this::waitForReadyStatus, 5L, TimeUnit.MINUTES);
    }

    private boolean waitForReadyStatus(KafkaConnector connector) {
        KafkaConnectorStatus status = connector.getStatus();
        if (status == null) {
            return false;
        }
        return status.getConditions().stream().anyMatch(c -> c.getType().equalsIgnoreCase("Ready") && c.getStatus().equalsIgnoreCase("True"));
    }

    private NonNamespaceOperation<KafkaConnector, KafkaConnectorList, DoneableKafkaConnector, Resource<KafkaConnector, DoneableKafkaConnector>> kafkaConnectorOperation() {
        return (NonNamespaceOperation)Crds.kafkaConnectorOperation((KubernetesClient)this.ocp).inNamespace(this.project);
    }

    public void undeployConnector(String name) throws IOException {
        if (this.useConnectorResources) {
            this.undeployConnectorrCr(name);
        } else {
            this.undeployConnectorJson(name);
        }
    }

    private void undeployConnectorJson(String name) throws IOException {
        if (this.apiRoute == null) {
            throw new IllegalStateException("KafkaConnect API was not exposed");
        }
        HttpUrl url = this.getApiURL().resolve("/connectors/" + name);
        Request r = new Request.Builder().url(url).delete().build();
        try (Response res = this.http.newCall(r).execute();){
            if (!res.isSuccessful()) {
                LOGGER.error(res.request().url().toString());
                throw new RuntimeException("Connector deletion request returned status code '" + res.code() + "'");
            }
            LOGGER.info("Deleted kafka connector '" + name + "'");
        }
    }

    private void undeployConnectorrCr(String name) {
        ((Resource)this.kafkaConnectorOperation().withName(name)).delete();
    }

    public List<String> getConnectMetrics() throws IOException {
        OkHttpClient httpClient = new OkHttpClient();
        Request r = new Request.Builder().url(this.getMetricsURL()).get().build();
        try (Response res = httpClient.newCall(r).execute();){
            String metrics = res.body().string();
            List<String> list = Stream.of(metrics.split("\\r?\\n")).collect(Collectors.toList());
            return list;
        }
    }

    public void waitForSnapshot(String connectorName, String metricName) throws IOException {
        List<String> metrics = this.getConnectMetrics();
        Awaitility.await().atMost(5L, TimeUnit.MINUTES).pollInterval(10L, TimeUnit.SECONDS).until(() -> metrics.stream().anyMatch(s -> s.contains(metricName) && s.contains(connectorName)));
    }

    public void waitForMySqlSnapshot(String connectorName) throws IOException {
        this.waitForSnapshot(connectorName, "debezium_mysql_connector_metrics_snapshotcompleted");
    }

    public void waitForPostgreSqlSnapshot(String connectorName) throws IOException {
        this.waitForSnapshot(connectorName, "debezium_postgres_connector_metrics_snapshotcompleted");
    }

    public HttpUrl getApiURL() {
        return new HttpUrl.Builder().scheme("http").host(this.apiRoute.getSpec().getHost()).build();
    }

    public HttpUrl getMetricsURL() {
        return new HttpUrl.Builder().scheme("http").host(this.metricsRoute.getSpec().getHost()).build();
    }

    public boolean undeployCluster() {
        return (Boolean)Crds.kafkaConnectOperation((KubernetesClient)this.ocp).delete((Object[])new KafkaConnect[]{this.kafkaConnect});
    }
}

