package org.apache.kafka.connect.storage;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.sourceforge.argparse4j.ArgumentParsers;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaConfigBackingStore.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.2.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaConfigBackingStore.class */
public class KafkaConfigBackingStore implements ConfigBackingStore {
    private static final Logger log;
    public static final String TARGET_STATE_PREFIX = "target-state-";
    public static final String CONNECTOR_PREFIX = "connector-";
    public static final String TASK_PREFIX = "task-";
    public static final String COMMIT_TASKS_PREFIX = "commit-";
    public static final Schema CONNECTOR_CONFIGURATION_V0;
    public static final Schema TASK_CONFIGURATION_V0;
    public static final Schema CONNECTOR_TASKS_COMMIT_V0;
    public static final Schema TARGET_STATE_V0;
    private static final long READ_TO_END_TIMEOUT_MS = 30000;
    private final Converter converter;
    private ConfigBackingStore.UpdateListener updateListener;
    private final String topic;
    private final KafkaBasedLog<String, byte[]> configLog;
    private final WorkerConfigTransformer configTransformer;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<String, Integer> connectorTaskCounts = new HashMap();
    private final Map<String, Map<String, String>> connectorConfigs = new HashMap();
    private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap();
    private final Set<String> inconsistent = new HashSet();
    private final Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap();
    private final Map<String, TargetState> connectorTargetStates = new HashMap();
    private final Object lock = new Object();
    private volatile boolean started = false;
    private volatile long offset = -1;

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaConfigBackingStore$ConsumeCallback.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.4.2.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaConfigBackingStore$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
        private ConsumeCallback() {
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.kafka.connect.storage.KafkaConfigBackingStore.access$402(org.apache.kafka.connect.storage.KafkaConfigBackingStore, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.kafka.connect.storage.KafkaConfigBackingStore
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(java.lang.Throwable r7, org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, byte[]> r8) {
            /*
                Method dump skipped, instructions count: 1285
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.storage.KafkaConfigBackingStore.ConsumeCallback.onCompletion(java.lang.Throwable, org.apache.kafka.clients.consumer.ConsumerRecord):void");
        }
    }

    public static String TARGET_STATE_KEY(String str) {
        return TARGET_STATE_PREFIX + str;
    }

    public static String CONNECTOR_KEY(String str) {
        return CONNECTOR_PREFIX + str;
    }

    public static String TASK_KEY(ConnectorTaskId connectorTaskId) {
        return TASK_PREFIX + connectorTaskId.connector() + ArgumentParsers.DEFAULT_PREFIX_CHARS + connectorTaskId.task();
    }

    public static String COMMIT_TASKS_KEY(String str) {
        return COMMIT_TASKS_PREFIX + str;
    }

    public KafkaConfigBackingStore(Converter converter, WorkerConfig workerConfig, WorkerConfigTransformer workerConfigTransformer) {
        this.converter = converter;
        this.topic = workerConfig.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
        if (this.topic == null || this.topic.trim().length() == 0) {
            throw new ConfigException("Must specify topic for connector configuration.");
        }
        this.configLog = setupAndCreateKafkaBasedLog(this.topic, workerConfig);
        this.configTransformer = workerConfigTransformer;
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void setUpdateListener(ConfigBackingStore.UpdateListener updateListener) {
        this.updateListener = updateListener;
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void start() {
        log.info("Starting KafkaConfigBackingStore");
        this.configLog.start();
        this.started = true;
        log.info("Started KafkaConfigBackingStore");
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void stop() {
        log.info("Closing KafkaConfigBackingStore");
        this.configLog.stop();
        log.info("Closed KafkaConfigBackingStore");
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public ClusterConfigState snapshot() {
        ClusterConfigState clusterConfigState;
        synchronized (this.lock) {
            clusterConfigState = new ClusterConfigState(this.offset, new HashMap(this.connectorTaskCounts), new HashMap(this.connectorConfigs), new HashMap(this.connectorTargetStates), new HashMap(this.taskConfigs), new HashSet(this.inconsistent), this.configTransformer);
        }
        return clusterConfigState;
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public boolean contains(String str) {
        boolean containsKey;
        synchronized (this.lock) {
            containsKey = this.connectorConfigs.containsKey(str);
        }
        return containsKey;
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void putConnectorConfig(String str, Map<String, String> map) {
        log.debug("Writing connector configuration for connector '{}'", str);
        Struct struct = new Struct(CONNECTOR_CONFIGURATION_V0);
        struct.put(TypedMessageBuilder.CONF_PROPERTIES, map);
        updateConnectorConfig(str, this.converter.fromConnectData(this.topic, CONNECTOR_CONFIGURATION_V0, struct));
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void removeConnectorConfig(String str) {
        log.debug("Removing connector configuration for connector '{}'", str);
        try {
            this.configLog.send(CONNECTOR_KEY(str), null);
            this.configLog.send(TARGET_STATE_KEY(str), null);
            this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to remove connector configuration from Kafka: ", e);
            throw new ConnectException("Error removing connector configuration from Kafka", e);
        }
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void removeTaskConfigs(String str) {
        throw new UnsupportedOperationException("Removal of tasks is not currently supported");
    }

    private void updateConnectorConfig(String str, byte[] bArr) {
        try {
            this.configLog.send(CONNECTOR_KEY(str), bArr);
            this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write connector configuration to Kafka: ", e);
            throw new ConnectException("Error writing connector configuration to Kafka", e);
        }
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void putTaskConfigs(String str, List<Map<String, String>> list) {
        try {
            this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            int size = list.size();
            int i = 0;
            for (Map<String, String> map : list) {
                Struct struct = new Struct(TASK_CONFIGURATION_V0);
                struct.put(TypedMessageBuilder.CONF_PROPERTIES, map);
                byte[] fromConnectData = this.converter.fromConnectData(this.topic, TASK_CONFIGURATION_V0, struct);
                log.debug("Writing configuration for connector '{}' task {}", str, Integer.valueOf(i));
                this.configLog.send(TASK_KEY(new ConnectorTaskId(str, i)), fromConnectData);
                i++;
            }
            if (size > 0) {
                try {
                    this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    log.error("Failed to write root configuration to Kafka: ", e);
                    throw new ConnectException("Error writing root configuration to Kafka", e);
                }
            }
            Struct struct2 = new Struct(CONNECTOR_TASKS_COMMIT_V0);
            struct2.put(ConnectProtocol.TASKS_KEY_NAME, Integer.valueOf(size));
            byte[] fromConnectData2 = this.converter.fromConnectData(this.topic, CONNECTOR_TASKS_COMMIT_V0, struct2);
            log.debug("Writing commit for connector '{}' with {} tasks.", str, Integer.valueOf(size));
            this.configLog.send(COMMIT_TASKS_KEY(str), fromConnectData2);
            this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e2) {
            log.error("Failed to write root configuration to Kafka: ", e2);
            throw new ConnectException("Error writing root configuration to Kafka", e2);
        }
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void refresh(long j, TimeUnit timeUnit) throws TimeoutException {
        try {
            this.configLog.readToEnd().get(j, timeUnit);
        } catch (InterruptedException | ExecutionException e) {
            throw new ConnectException("Error trying to read to end of config log", e);
        }
    }

    @Override // org.apache.kafka.connect.storage.ConfigBackingStore
    public void putTargetState(String str, TargetState targetState) {
        Struct struct = new Struct(TARGET_STATE_V0);
        struct.put(KafkaStatusBackingStore.STATE_KEY_NAME, targetState.name());
        byte[] fromConnectData = this.converter.fromConnectData(this.topic, TARGET_STATE_V0, struct);
        log.debug("Writing target state {} for connector {}", targetState, str);
        this.configLog.send(TARGET_STATE_KEY(str), fromConnectData);
    }

    KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String str, WorkerConfig workerConfig) {
        Map<String, Object> originals = workerConfig.originals();
        HashMap hashMap = new HashMap(originals);
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        hashMap.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
        HashMap hashMap2 = new HashMap(originals);
        hashMap2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap2.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        HashMap hashMap3 = new HashMap(originals);
        return createKafkaBasedLog(str, hashMap, hashMap2, new ConsumeCallback(), TopicAdmin.defineTopic(str).compacted().partitions(1).replicationFactor(workerConfig.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG).shortValue()).build(), hashMap3);
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<String, byte[]>> callback, final NewTopic newTopic, final Map<String, Object> map3) {
        return new KafkaBasedLog<>(str, map, map2, callback, Time.SYSTEM, new Runnable() { // from class: org.apache.kafka.connect.storage.KafkaConfigBackingStore.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaConfigBackingStore.log.debug("Creating admin client to manage Connect internal config topic");
                TopicAdmin topicAdmin = new TopicAdmin(map3);
                Throwable th = null;
                try {
                    topicAdmin.createTopics(newTopic);
                    if (topicAdmin != null) {
                        if (0 == 0) {
                            topicAdmin.close();
                            return;
                        }
                        try {
                            topicAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (topicAdmin != null) {
                        if (0 != 0) {
                            try {
                                topicAdmin.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            topicAdmin.close();
                        }
                    }
                    throw th3;
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConnectorTaskId parseTaskId(String str) {
        String[] split = str.split(ArgumentParsers.DEFAULT_PREFIX_CHARS);
        if (split.length < 3) {
            return null;
        }
        try {
            return new ConnectorTaskId(Utils.join(Arrays.copyOfRange(split, 1, split.length - 1), ArgumentParsers.DEFAULT_PREFIX_CHARS), Integer.parseInt(split[split.length - 1]));
        } catch (NumberFormatException e) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Set<Integer> taskIds(String str, Map<ConnectorTaskId, Map<String, String>> map) {
        TreeSet treeSet = new TreeSet();
        if (map == null) {
            return treeSet;
        }
        for (ConnectorTaskId connectorTaskId : map.keySet()) {
            if (!$assertionsDisabled && !connectorTaskId.connector().equals(str)) {
                throw new AssertionError();
            }
            treeSet.add(Integer.valueOf(connectorTaskId.task()));
        }
        return treeSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean completeTaskIdSet(Set<Integer> set, int i) {
        if (set.size() < i) {
            return false;
        }
        for (int i2 = 0; i2 < i; i2++) {
            if (!set.contains(Integer.valueOf(i2))) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int intValue(Object obj) {
        if (obj instanceof Integer) {
            return ((Integer) obj).intValue();
        }
        if (obj instanceof Long) {
            return (int) ((Long) obj).longValue();
        }
        throw new ConnectException("Expected integer value to be either Integer or Long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.connect.storage.KafkaConfigBackingStore.access$402(org.apache.kafka.connect.storage.KafkaConfigBackingStore, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$402(org.apache.kafka.connect.storage.KafkaConfigBackingStore r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.offset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.storage.KafkaConfigBackingStore.access$402(org.apache.kafka.connect.storage.KafkaConfigBackingStore, long):long");
    }

    static /* synthetic */ Object access$500(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.lock;
    }

    static /* synthetic */ Map access$600(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.connectorTargetStates;
    }

    static /* synthetic */ Map access$700(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.connectorConfigs;
    }

    static /* synthetic */ boolean access$800(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.started;
    }

    static /* synthetic */ ConfigBackingStore.UpdateListener access$900(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.updateListener;
    }

    static /* synthetic */ ConnectorTaskId access$1000(KafkaConfigBackingStore kafkaConfigBackingStore, String str) {
        return kafkaConfigBackingStore.parseTaskId(str);
    }

    static /* synthetic */ Map access$1100(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.deferredTaskUpdates;
    }

    static /* synthetic */ int access$1200(Object obj) {
        return intValue(obj);
    }

    static /* synthetic */ Set access$1300(KafkaConfigBackingStore kafkaConfigBackingStore, String str, Map map) {
        return kafkaConfigBackingStore.taskIds(str, map);
    }

    static /* synthetic */ boolean access$1400(KafkaConfigBackingStore kafkaConfigBackingStore, Set set, int i) {
        return kafkaConfigBackingStore.completeTaskIdSet(set, i);
    }

    static /* synthetic */ Set access$1500(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.inconsistent;
    }

    static /* synthetic */ Map access$1600(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.taskConfigs;
    }

    static /* synthetic */ Map access$1700(KafkaConfigBackingStore kafkaConfigBackingStore) {
        return kafkaConfigBackingStore.connectorTaskCounts;
    }

    static {
        $assertionsDisabled = !KafkaConfigBackingStore.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger((Class<?>) KafkaConfigBackingStore.class);
        CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct().field(TypedMessageBuilder.CONF_PROPERTIES, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA).build()).build();
        TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0;
        CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct().field(ConnectProtocol.TASKS_KEY_NAME, Schema.INT32_SCHEMA).build();
        TARGET_STATE_V0 = SchemaBuilder.struct().field(KafkaStatusBackingStore.STATE_KEY_NAME, Schema.STRING_SCHEMA).build();
    }
}
