package org.apache.kafka.connect.storage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import net.sourceforge.argparse4j.ArgumentParsers;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.Table;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.3.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaStatusBackingStore.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaStatusBackingStore.class */
public class KafkaStatusBackingStore implements StatusBackingStore {
    private static final String TASK_STATUS_PREFIX = "status-task-";
    private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
    private final Time time;
    private final Converter converter;
    private final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
    private final Map<String, CacheEntry<ConnectorStatus>> connectors;
    private String topic;
    private KafkaBasedLog<String, byte[]> kafkaLog;
    private int generation;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaStatusBackingStore.class);
    public static final String STATE_KEY_NAME = "state";
    public static final String TRACE_KEY_NAME = "trace";
    public static final String WORKER_ID_KEY_NAME = "worker_id";
    public static final String GENERATION_KEY_NAME = "generation";
    private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct().field(STATE_KEY_NAME, Schema.STRING_SCHEMA).field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build()).field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA).field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.3.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaStatusBackingStore$CacheEntry.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/storage/KafkaStatusBackingStore$CacheEntry.class */
    public static class CacheEntry<T extends AbstractStatus> {
        private T value;
        private int sequence;
        private boolean deleted;

        private CacheEntry() {
            this.value = null;
            this.sequence = 0;
            this.deleted = false;
        }

        public int increment() {
            int i = this.sequence + 1;
            this.sequence = i;
            return i;
        }

        public void put(T t) {
            this.value = t;
        }

        public T get() {
            return this.value;
        }

        public void delete() {
            this.deleted = true;
        }

        public boolean isDeleted() {
            return this.deleted;
        }

        public boolean canWriteSafely(T t) {
            return this.value == null || this.value.workerId().equals(t.workerId()) || this.value.generation() <= t.generation();
        }

        public boolean canWriteSafely(T t, int i) {
            return canWriteSafely(t) && this.sequence == i;
        }
    }

    public KafkaStatusBackingStore(Time time, Converter converter) {
        this.time = time;
        this.converter = converter;
        this.tasks = new Table<>();
        this.connectors = new HashMap();
    }

    KafkaStatusBackingStore(Time time, Converter converter, String str, KafkaBasedLog<String, byte[]> kafkaBasedLog) {
        this(time, converter);
        this.kafkaLog = kafkaBasedLog;
        this.topic = str;
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void configure(WorkerConfig workerConfig) {
        this.topic = workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
        if (this.topic == null || this.topic.trim().length() == 0) {
            throw new ConfigException("Must specify topic for connector status.");
        }
        Map<String, Object> originals = workerConfig.originals();
        HashMap hashMap = new HashMap(originals);
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        hashMap.put("retries", 0);
        HashMap hashMap2 = new HashMap(originals);
        hashMap2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap2.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        HashMap hashMap3 = new HashMap(originals);
        NewTopic build = TopicAdmin.defineTopic(this.topic).compacted().partitions(workerConfig.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG).intValue()).replicationFactor(workerConfig.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG).shortValue()).build();
        this.kafkaLog = createKafkaBasedLog(this.topic, hashMap, hashMap2, new Callback<ConsumerRecord<String, byte[]>>() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStore.1
            @Override // org.apache.kafka.connect.util.Callback
            public void onCompletion(Throwable th, ConsumerRecord<String, byte[]> consumerRecord) {
                KafkaStatusBackingStore.this.read(consumerRecord);
            }
        }, build, hashMap3);
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<String, byte[]>> callback, final NewTopic newTopic, final Map<String, Object> map3) {
        return new KafkaBasedLog<>(str, map, map2, callback, this.time, new Runnable() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStore.2
            @Override // java.lang.Runnable
            public void run() {
                KafkaStatusBackingStore.log.debug("Creating admin client to manage Connect internal status topic");
                TopicAdmin topicAdmin = new TopicAdmin(map3);
                Throwable th = null;
                try {
                    topicAdmin.createTopics(newTopic);
                    if (topicAdmin != null) {
                        if (0 == 0) {
                            topicAdmin.close();
                            return;
                        }
                        try {
                            topicAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (topicAdmin != null) {
                        if (0 != 0) {
                            try {
                                topicAdmin.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            topicAdmin.close();
                        }
                    }
                    throw th3;
                }
            }
        });
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void start() {
        this.kafkaLog.start();
        this.kafkaLog.readToEnd();
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void stop() {
        this.kafkaLog.stop();
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void put(ConnectorStatus connectorStatus) {
        sendConnectorStatus(connectorStatus, false);
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void putSafe(ConnectorStatus connectorStatus) {
        sendConnectorStatus(connectorStatus, true);
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void put(TaskStatus taskStatus) {
        sendTaskStatus(taskStatus, false);
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void putSafe(TaskStatus taskStatus) {
        sendTaskStatus(taskStatus, true);
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public void flush() {
        this.kafkaLog.flush();
    }

    private void sendConnectorStatus(ConnectorStatus connectorStatus, boolean z) {
        String id = connectorStatus.id();
        send(CONNECTOR_STATUS_PREFIX + id, connectorStatus, getOrAdd(id), z);
    }

    private void sendTaskStatus(TaskStatus taskStatus, boolean z) {
        ConnectorTaskId id = taskStatus.id();
        send(TASK_STATUS_PREFIX + id.connector() + ArgumentParsers.DEFAULT_PREFIX_CHARS + id.task(), taskStatus, getOrAdd(id), z);
    }

    private <V extends AbstractStatus> void send(final String str, final V v, final CacheEntry<V> cacheEntry, final boolean z) {
        synchronized (this) {
            this.generation = v.generation();
            if (!z || cacheEntry.canWriteSafely(v)) {
                final int increment = cacheEntry.increment();
                final byte[] serialize = v.state() == AbstractStatus.State.DESTROYED ? null : serialize(v);
                this.kafkaLog.send(str, serialize, new org.apache.kafka.clients.producer.Callback() { // from class: org.apache.kafka.connect.storage.KafkaStatusBackingStore.3
                    @Override // org.apache.kafka.clients.producer.Callback
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc == null) {
                            return;
                        }
                        if (!(exc instanceof RetriableException)) {
                            KafkaStatusBackingStore.log.error("Failed to write status update", (Throwable) exc);
                            return;
                        }
                        synchronized (KafkaStatusBackingStore.this) {
                            if (cacheEntry.isDeleted() || v.generation() != KafkaStatusBackingStore.this.generation || (z && !cacheEntry.canWriteSafely(v, increment))) {
                                return;
                            }
                            KafkaStatusBackingStore.this.kafkaLog.send(str, serialize, this);
                        }
                    }
                });
            }
        }
    }

    private synchronized CacheEntry<ConnectorStatus> getOrAdd(String str) {
        CacheEntry<ConnectorStatus> cacheEntry = this.connectors.get(str);
        if (cacheEntry == null) {
            cacheEntry = new CacheEntry<>();
            this.connectors.put(str, cacheEntry);
        }
        return cacheEntry;
    }

    private synchronized void remove(String str) {
        CacheEntry<ConnectorStatus> remove = this.connectors.remove(str);
        if (remove != null) {
            remove.delete();
        }
        Map<Integer, CacheEntry<TaskStatus>> remove2 = this.tasks.remove(str);
        if (remove2 != null) {
            Iterator<CacheEntry<TaskStatus>> it = remove2.values().iterator();
            while (it.hasNext()) {
                it.next().delete();
            }
        }
    }

    private synchronized CacheEntry<TaskStatus> getOrAdd(ConnectorTaskId connectorTaskId) {
        CacheEntry<TaskStatus> cacheEntry = this.tasks.get(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
        if (cacheEntry == null) {
            cacheEntry = new CacheEntry<>();
            this.tasks.put(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()), cacheEntry);
        }
        return cacheEntry;
    }

    private synchronized void remove(ConnectorTaskId connectorTaskId) {
        CacheEntry<TaskStatus> remove = this.tasks.remove(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
        if (remove != null) {
            remove.delete();
        }
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized TaskStatus get(ConnectorTaskId connectorTaskId) {
        CacheEntry<TaskStatus> cacheEntry = this.tasks.get(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
        if (cacheEntry == null) {
            return null;
        }
        return cacheEntry.get();
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized ConnectorStatus get(String str) {
        CacheEntry<ConnectorStatus> cacheEntry = this.connectors.get(str);
        if (cacheEntry == null) {
            return null;
        }
        return cacheEntry.get();
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized Collection<TaskStatus> getAll(String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<CacheEntry<TaskStatus>> it = this.tasks.row(str).values().iterator();
        while (it.hasNext()) {
            TaskStatus taskStatus = it.next().get();
            if (taskStatus != null) {
                arrayList.add(taskStatus);
            }
        }
        return arrayList;
    }

    @Override // org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized Set<String> connectors() {
        return new HashSet(this.connectors.keySet());
    }

    private ConnectorStatus parseConnectorStatus(String str, byte[] bArr) {
        try {
            SchemaAndValue connectData = this.converter.toConnectData(this.topic, bArr);
            if (connectData.value() instanceof Map) {
                Map map = (Map) connectData.value();
                return new ConnectorStatus(str, AbstractStatus.State.valueOf((String) map.get(STATE_KEY_NAME)), (String) map.get(TRACE_KEY_NAME), (String) map.get(WORKER_ID_KEY_NAME), ((Long) map.get(GENERATION_KEY_NAME)).intValue());
            }
            log.error("Invalid connector status type {}", connectData.value().getClass());
            return null;
        } catch (Exception e) {
            log.error("Failed to deserialize connector status", (Throwable) e);
            return null;
        }
    }

    private TaskStatus parseTaskStatus(ConnectorTaskId connectorTaskId, byte[] bArr) {
        try {
            SchemaAndValue connectData = this.converter.toConnectData(this.topic, bArr);
            if (!(connectData.value() instanceof Map)) {
                log.error("Invalid task status type {}", connectData.value().getClass());
                return null;
            }
            Map map = (Map) connectData.value();
            return new TaskStatus(connectorTaskId, AbstractStatus.State.valueOf((String) map.get(STATE_KEY_NAME)), (String) map.get(WORKER_ID_KEY_NAME), ((Long) map.get(GENERATION_KEY_NAME)).intValue(), (String) map.get(TRACE_KEY_NAME));
        } catch (Exception e) {
            log.error("Failed to deserialize task status", (Throwable) e);
            return null;
        }
    }

    private byte[] serialize(AbstractStatus abstractStatus) {
        Struct struct = new Struct(STATUS_SCHEMA_V0);
        struct.put(STATE_KEY_NAME, abstractStatus.state().name());
        if (abstractStatus.trace() != null) {
            struct.put(TRACE_KEY_NAME, abstractStatus.trace());
        }
        struct.put(WORKER_ID_KEY_NAME, abstractStatus.workerId());
        struct.put(GENERATION_KEY_NAME, Integer.valueOf(abstractStatus.generation()));
        return this.converter.fromConnectData(this.topic, STATUS_SCHEMA_V0, struct);
    }

    private String parseConnectorStatusKey(String str) {
        return str.substring(CONNECTOR_STATUS_PREFIX.length());
    }

    private ConnectorTaskId parseConnectorTaskId(String str) {
        String[] split = str.split(ArgumentParsers.DEFAULT_PREFIX_CHARS);
        if (split.length < 4) {
            return null;
        }
        try {
            return new ConnectorTaskId(Utils.join(Arrays.copyOfRange(split, 2, split.length - 1), ArgumentParsers.DEFAULT_PREFIX_CHARS), Integer.parseInt(split[split.length - 1]));
        } catch (NumberFormatException e) {
            log.warn("Invalid task status key {}", str);
            return null;
        }
    }

    private void readConnectorStatus(String str, byte[] bArr) {
        String parseConnectorStatusKey = parseConnectorStatusKey(str);
        if (parseConnectorStatusKey == null || parseConnectorStatusKey.isEmpty()) {
            log.warn("Discarding record with invalid connector status key {}", str);
            return;
        }
        if (bArr == null) {
            log.trace("Removing status for connector {}", parseConnectorStatusKey);
            remove(parseConnectorStatusKey);
            return;
        }
        ConnectorStatus parseConnectorStatus = parseConnectorStatus(parseConnectorStatusKey, bArr);
        if (parseConnectorStatus == null) {
            return;
        }
        synchronized (this) {
            log.trace("Received connector {} status update {}", parseConnectorStatusKey, parseConnectorStatus);
            getOrAdd(parseConnectorStatusKey).put(parseConnectorStatus);
        }
    }

    private void readTaskStatus(String str, byte[] bArr) {
        ConnectorTaskId parseConnectorTaskId = parseConnectorTaskId(str);
        if (parseConnectorTaskId == null) {
            log.warn("Discarding record with invalid task status key {}", str);
            return;
        }
        if (bArr == null) {
            log.trace("Removing task status for {}", parseConnectorTaskId);
            remove(parseConnectorTaskId);
            return;
        }
        TaskStatus parseTaskStatus = parseTaskStatus(parseConnectorTaskId, bArr);
        if (parseTaskStatus == null) {
            log.warn("Failed to parse task status with key {}", str);
            return;
        }
        synchronized (this) {
            log.trace("Received task {} status update {}", parseConnectorTaskId, parseTaskStatus);
            getOrAdd(parseConnectorTaskId).put(parseTaskStatus);
        }
    }

    void read(ConsumerRecord<String, byte[]> consumerRecord) {
        String key = consumerRecord.key();
        if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
            readConnectorStatus(key, consumerRecord.value());
        } else if (key.startsWith(TASK_STATUS_PREFIX)) {
            readTaskStatus(key, consumerRecord.value());
        } else {
            log.warn("Discarding record with invalid key {}", key);
        }
    }
}
