package org.apache.pulsar.io.debezium;

import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import java.util.Map;
import net.sourceforge.argparse4j.ArgumentParsers;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.connect.KafkaConnectSource;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-debezium-core-2.7.2.4-rc-202106030805.jar:org/apache/pulsar/io/debezium/DebeziumSource.class */
public abstract class DebeziumSource extends KafkaConnectSource {
    private static final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter";
    private static final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
    private static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
    private static final String DEFAULT_HISTORY_TOPIC = "debezium-history-topic";

    public static void throwExceptionIfConfigNotMatch(Map<String, Object> map, String str, String str2) throws IllegalArgumentException {
        Object obj = map.get(str);
        if (obj == null) {
            map.put(str, str2);
        } else if (!obj.equals(str2)) {
            throw new IllegalArgumentException("Expected " + str2 + " but has " + obj);
        }
    }

    public static void setConfigIfNull(Map<String, Object> map, String str, String str2) {
        if (map.get(str) == null) {
            map.put(str, str2);
        }
    }

    public static String topicNamespace(SourceContext sourceContext) {
        String tenant = sourceContext.getTenant();
        String namespace = sourceContext.getNamespace();
        return (StringUtils.isEmpty(tenant) ? "public" : tenant) + "/" + (StringUtils.isEmpty(namespace) ? "default" : namespace);
    }

    public abstract void setDbConnectorTask(Map<String, Object> map) throws Exception;

    @Override // org.apache.pulsar.io.kafka.connect.KafkaConnectSource, org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        setDbConnectorTask(map);
        setConfigIfNull(map, "key.converter", DEFAULT_CONVERTER);
        setConfigIfNull(map, "value.converter", DEFAULT_CONVERTER);
        setConfigIfNull(map, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
        String str = (String) map.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
        if (str == null) {
            throw new IllegalArgumentException("Pulsar service URL not provided.");
        }
        setConfigIfNull(map, PulsarDatabaseHistory.SERVICE_URL.name(), str);
        String str2 = topicNamespace(sourceContext);
        setConfigIfNull(map, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, str2);
        String sourceName = sourceContext.getSourceName();
        setConfigIfNull(map, PulsarDatabaseHistory.TOPIC.name(), str2 + "/" + sourceName + ArgumentParsers.DEFAULT_PREFIX_CHARS + DEFAULT_HISTORY_TOPIC);
        setConfigIfNull(map, "offset.storage.topic", str2 + "/" + sourceName + ArgumentParsers.DEFAULT_PREFIX_CHARS + DEFAULT_OFFSET_TOPIC);
        super.open(map, sourceContext);
    }
}
