/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner;

import com.google.common.annotations.VisibleForTesting;
import io.debezium.config.Configuration;
import io.debezium.connector.spanner.Module;
import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.SpannerConnectorTask;
import io.debezium.connector.spanner.config.validation.ConfigurationValidator;
import io.debezium.connector.spanner.kafka.KafkaAdminClientFactory;
import io.debezium.connector.spanner.kafka.internal.KafkaRebalanceTopicAdminService;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.scaler.TaskScalerMonitor;
import io.debezium.connector.spanner.task.scaler.TaskScalerMonitorFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpannerConnector
extends SourceConnector {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerConnector.class);
    private Map<String, String> props = Map.of();
    private volatile TaskScalerMonitor taskScalerMonitor;

    public String version() {
        return Module.version();
    }

    public Class<? extends Task> taskClass() {
        return SpannerConnectorTask.class;
    }

    public void start(Map<String, String> props) {
        this.props = props;
        LOGGER.info("Starting connector: props {}", props);
        SpannerConnectorConfig config = new SpannerConnectorConfig(Configuration.from(props));
        this.createRebalanceTopic(config);
        TaskScalerMonitorFactory scalerFactory = new TaskScalerMonitorFactory(config, this.context, this::onError);
        this.taskScalerMonitor = scalerFactory.createMonitor();
        try {
            this.taskScalerMonitor.start();
        }
        catch (Exception ex) {
            this.taskScalerMonitor.shutdown();
            throw new RuntimeException(ex);
        }
        if (config.isLoggingJsonEnabled()) {
            LoggerUtils.enableJsonLog();
        }
    }

    public List<Map<String, String>> taskConfigs(int maxTasks) {
        int tasksCount = this.taskScalerMonitor.getRequiredTasksCount();
        LOGGER.info("taskConfigs: tasksCount: {}", (Object)tasksCount);
        return IntStream.range(0, tasksCount).mapToObj(this::getProps).collect(Collectors.toList());
    }

    public Config validate(Map<String, String> connectorConfigs) {
        return ConfigurationValidator.validate(connectorConfigs);
    }

    public void stop() {
        LOGGER.info("stopping connector");
        this.props = null;
        this.taskScalerMonitor.shutdown();
    }

    public ConfigDef config() {
        return SpannerConnectorConfig.configDef();
    }

    private void onError(RuntimeException exception) {
        this.taskScalerMonitor.shutdown();
        throw exception;
    }

    @VisibleForTesting
    Map<String, String> getProps(int taskId) {
        HashMap<String, String> taskProps = new HashMap<String, String>(this.props);
        taskProps.put("task.id", String.valueOf(taskId));
        return taskProps;
    }

    @VisibleForTesting
    void createRebalanceTopic(SpannerConnectorConfig config) {
        KafkaAdminClientFactory adminClientFactory = new KafkaAdminClientFactory(config);
        KafkaRebalanceTopicAdminService rebalanceTopicAdminService = new KafkaRebalanceTopicAdminService(adminClientFactory.getAdminClient(), config);
        rebalanceTopicAdminService.createAdjustRebalanceTopic();
        adminClientFactory.close();
    }
}

