/*
 * Decompiled with CFR 0.152.
 */
package dev.vality.testcontainers.annotations.kafka;

import dev.vality.testcontainers.annotations.exception.KafkaStartingException;
import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainer;
import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainerFactory;
import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainerSingleton;
import dev.vality.testcontainers.annotations.util.GenericContainerUtil;
import dev.vality.testcontainers.annotations.util.SpringApplicationPropertiesLoader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.platform.commons.support.AnnotationSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ContextConfigurationAttributes;
import org.springframework.test.context.ContextCustomizer;
import org.springframework.test.context.ContextCustomizerFactory;
import org.testcontainers.containers.KafkaContainer;

public class KafkaTestcontainerExtension
implements BeforeAllCallback,
AfterAllCallback {
    private static final Logger log = LoggerFactory.getLogger(KafkaTestcontainerExtension.class);
    private static final ThreadLocal<KafkaContainer> THREAD_CONTAINER = new ThreadLocal();

    public void beforeAll(ExtensionContext context) {
        if (KafkaTestcontainerExtension.findPrototypeAnnotation(context).isPresent()) {
            KafkaContainer container = KafkaTestcontainerFactory.container();
            GenericContainerUtil.startContainer(container);
            List<String> topics = this.loadTopics(KafkaTestcontainerExtension.findPrototypeAnnotation(context).get().topicsKeys());
            this.createTopics(container, topics);
            THREAD_CONTAINER.set(container);
        } else if (KafkaTestcontainerExtension.findSingletonAnnotation(context).isPresent()) {
            KafkaContainer container = KafkaTestcontainerFactory.singletonContainer();
            List<String> topics = this.loadTopics(KafkaTestcontainerExtension.findSingletonAnnotation(context).get().topicsKeys());
            if (!container.isRunning()) {
                GenericContainerUtil.startContainer(container);
            } else {
                this.deleteTopics(container, topics);
            }
            this.createTopics(container, topics);
            THREAD_CONTAINER.set(container);
        }
    }

    public void afterAll(ExtensionContext context) {
        if (KafkaTestcontainerExtension.findPrototypeAnnotation(context).isPresent()) {
            KafkaContainer container = THREAD_CONTAINER.get();
            if (container != null && container.isRunning()) {
                container.stop();
            }
            THREAD_CONTAINER.remove();
        } else if (KafkaTestcontainerExtension.findSingletonAnnotation(context).isPresent()) {
            THREAD_CONTAINER.remove();
        }
    }

    private static Optional<KafkaTestcontainer> findPrototypeAnnotation(ExtensionContext context) {
        return AnnotationSupport.findAnnotation((Optional)context.getElement(), KafkaTestcontainer.class);
    }

    private static Optional<KafkaTestcontainer> findPrototypeAnnotation(Class<?> testClass) {
        return AnnotationSupport.findAnnotation(testClass, KafkaTestcontainer.class);
    }

    private static Optional<KafkaTestcontainerSingleton> findSingletonAnnotation(ExtensionContext context) {
        return AnnotationSupport.findAnnotation((Optional)context.getElement(), KafkaTestcontainerSingleton.class);
    }

    private static Optional<KafkaTestcontainerSingleton> findSingletonAnnotation(Class<?> testClass) {
        return AnnotationSupport.findAnnotation(testClass, KafkaTestcontainerSingleton.class);
    }

    private List<String> loadTopics(String[] topicsKeys) {
        return SpringApplicationPropertiesLoader.loadFromSpringApplicationPropertiesFile(Arrays.asList(topicsKeys)).values().stream().map(String::valueOf).collect(Collectors.toList());
    }

    private void createTopics(KafkaContainer container, List<String> topics) {
        try (AdminClient admin = this.createAdminClient(container);){
            List newTopics = topics.stream().map(topic -> new NewTopic(topic, 1, 1)).peek(newTopic -> log.info(newTopic.toString())).collect(Collectors.toList());
            CreateTopicsResult topicsResult = admin.createTopics(newTopics);
            topicsResult.all().get(30L, TimeUnit.SECONDS);
            Set adminClientTopics = (Set)admin.listTopics().names().get(30L, TimeUnit.SECONDS);
            log.info("Topics list from 'AdminClient' after [TOPICS CREATED]: " + adminClientTopics);
            Assertions.assertThat((int)adminClientTopics.size()).isEqualTo(topics.size());
            Assertions.assertThat((String)this.execInContainerKafkaTopicsListCommand(container)).contains(topics);
        }
        catch (ExecutionException | TimeoutException ex) {
            throw new KafkaStartingException("Error when topic creating, ", ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new KafkaStartingException("Error when topic creating, ", ex);
        }
    }

    private void deleteTopics(KafkaContainer container, List<String> topics) {
        try (AdminClient admin = this.createAdminClient(container);){
            DeleteTopicsResult topicsResult = admin.deleteTopics(topics);
            topicsResult.all().get(30L, TimeUnit.SECONDS);
            Set adminClientTopics = (Set)admin.listTopics().names().get(30L, TimeUnit.SECONDS);
            log.info("Topics list from 'AdminClient' after [TOPICS DELETED]: " + adminClientTopics + " (should be empty)");
            Assertions.assertThat((Collection)adminClientTopics).isEmpty();
            this.execInContainerKafkaTopicsListCommand(container);
        }
        catch (ExecutionException | TimeoutException ex) {
            throw new KafkaStartingException("Error when topic deleting, ", ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new KafkaStartingException("Error when topic deleting, ", ex);
        }
    }

    private AdminClient createAdminClient(KafkaContainer container) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", container.getBootstrapServers());
        return AdminClient.create((Properties)properties);
    }

    private String execInContainerKafkaTopicsListCommand(KafkaContainer container) {
        String kafkaTopicsListCommand = "/usr/bin/kafka-topics --bootstrap-server localhost:9092 --list";
        try {
            String stdout = container.execInContainer(new String[]{"/bin/sh", "-c", kafkaTopicsListCommand}).getStdout();
            log.info("Topics list from '/usr/bin/kafka-topics': [" + stdout.replace("\n", ",") + "]");
            return stdout;
        }
        catch (IOException ex) {
            throw new KafkaStartingException("Error when " + kafkaTopicsListCommand + ", ", ex);
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            throw new KafkaStartingException("Error when " + kafkaTopicsListCommand + ", ", ex);
        }
    }

    public static class KafkaTestcontainerContextCustomizerFactory
    implements ContextCustomizerFactory {
        public ContextCustomizer createContextCustomizer(Class<?> testClass, List<ContextConfigurationAttributes> configAttributes) {
            return (context, mergedConfig) -> {
                if (KafkaTestcontainerExtension.findPrototypeAnnotation(testClass).isPresent()) {
                    this.init(context, KafkaTestcontainerExtension.findPrototypeAnnotation(testClass).get().properties());
                } else if (KafkaTestcontainerExtension.findSingletonAnnotation(testClass).isPresent()) {
                    this.init(context, KafkaTestcontainerExtension.findSingletonAnnotation(testClass).get().properties());
                }
            };
        }

        private void init(ConfigurableApplicationContext context, String[] properties) {
            KafkaContainer container = THREAD_CONTAINER.get();
            TestPropertyValues.of((String[])new String[]{"kafka.bootstrap-servers=" + container.getBootstrapServers(), "spring.kafka.bootstrap-servers=" + container.getBootstrapServers(), "kafka.ssl.enabled=false"}).and(properties).applyTo(context);
        }
    }
}

