package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage;

import com.ververica.cdc.connectors.shaded.org.apache.commons.lang3.BooleanUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.NewTopic;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.TopicConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.RetriableException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.ByteArraySerializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringSerializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Time;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.Utils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaAndValue;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.AbstractStatus;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.ConnectorStatus;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.TaskStatus;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.TopicStatus;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.WorkerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.Callback;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.ConnectUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.ConnectorTaskId;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.KafkaBasedLog;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.SharedTopicAdmin;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.Table;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.TopicAdmin;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/connect/storage/KafkaStatusBackingStore.class */
public class KafkaStatusBackingStore implements StatusBackingStore {
    public static final String TASK_STATUS_PREFIX = "status-task-";
    public static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
    public static final String TOPIC_STATUS_PREFIX = "status-topic-";
    public static final String TOPIC_STATUS_SEPARATOR = ":connector-";
    public static final String TOPIC_STATE_KEY = "topic";
    public static final String TOPIC_NAME_KEY = "name";
    public static final String TOPIC_CONNECTOR_KEY = "connector";
    public static final String TOPIC_TASK_KEY = "task";
    private final Time time;
    private final Converter converter;
    protected final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
    protected final Map<String, CacheEntry<ConnectorStatus>> connectors;
    protected final ConcurrentMap<String, ConcurrentMap<String, TopicStatus>> topics;
    private final Supplier<TopicAdmin> topicAdminSupplier;
    private String statusTopic;
    private KafkaBasedLog<String, byte[]> kafkaLog;
    private int generation;
    private SharedTopicAdmin ownTopicAdmin;
    private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
    public static final String STATE_KEY_NAME = "state";
    public static final String TRACE_KEY_NAME = "trace";
    public static final String WORKER_ID_KEY_NAME = "worker_id";
    public static final String GENERATION_KEY_NAME = "generation";
    private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct().field(STATE_KEY_NAME, Schema.STRING_SCHEMA).field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build()).field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA).field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA).build();
    public static final String TOPIC_DISCOVER_TIMESTAMP_KEY = "discoverTimestamp";
    private static final Schema TOPIC_STATUS_VALUE_SCHEMA_V0 = SchemaBuilder.struct().field("name", Schema.STRING_SCHEMA).field("connector", Schema.STRING_SCHEMA).field("task", Schema.INT32_SCHEMA).field(TOPIC_DISCOVER_TIMESTAMP_KEY, Schema.INT64_SCHEMA).build();
    private static final Schema TOPIC_STATUS_SCHEMA_V0 = SchemaBuilder.map(Schema.STRING_SCHEMA, TOPIC_STATUS_VALUE_SCHEMA_V0).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ververica/cdc/connectors/shaded/org/apache/kafka/connect/storage/KafkaStatusBackingStore$CacheEntry.class */
    public static class CacheEntry<T extends AbstractStatus<?>> {
        private T value;
        private int sequence;
        private boolean deleted;

        private CacheEntry() {
            this.value = null;
            this.sequence = 0;
            this.deleted = false;
        }

        public int increment() {
            int i = this.sequence + 1;
            this.sequence = i;
            return i;
        }

        public void put(T t) {
            this.value = t;
        }

        public T get() {
            return this.value;
        }

        public void delete() {
            this.deleted = true;
        }

        public boolean isDeleted() {
            return this.deleted;
        }

        public boolean canWriteSafely(T t) {
            return this.value == null || this.value.workerId().equals(t.workerId()) || this.value.generation() <= t.generation();
        }

        public boolean canWriteSafely(T t, int i) {
            return canWriteSafely(t) && this.sequence == i;
        }
    }

    @Deprecated
    public KafkaStatusBackingStore(Time time, Converter converter) {
        this(time, converter, null);
    }

    public KafkaStatusBackingStore(Time time, Converter converter, Supplier<TopicAdmin> supplier) {
        this.time = time;
        this.converter = converter;
        this.tasks = new Table<>();
        this.connectors = new HashMap();
        this.topics = new ConcurrentHashMap();
        this.topicAdminSupplier = supplier;
    }

    KafkaStatusBackingStore(Time time, Converter converter, String str, KafkaBasedLog<String, byte[]> kafkaBasedLog) {
        this(time, converter);
        this.kafkaLog = kafkaBasedLog;
        this.statusTopic = str;
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void configure(WorkerConfig workerConfig) {
        Supplier<TopicAdmin> supplier;
        this.statusTopic = workerConfig.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
        if (this.statusTopic == null || this.statusTopic.trim().length() == 0) {
            throw new ConfigException("Must specify topic for connector status.");
        }
        String lookupKafkaClusterId = ConnectUtils.lookupKafkaClusterId(workerConfig);
        Map<String, Object> originals = workerConfig.originals();
        HashMap hashMap = new HashMap(originals);
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        hashMap.put("retries", 0);
        hashMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, BooleanUtils.FALSE);
        ConnectUtils.addMetricsContextProperties(hashMap, workerConfig, lookupKafkaClusterId);
        HashMap hashMap2 = new HashMap(originals);
        hashMap2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap2.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        ConnectUtils.addMetricsContextProperties(hashMap2, workerConfig, lookupKafkaClusterId);
        HashMap hashMap3 = new HashMap(originals);
        ConnectUtils.addMetricsContextProperties(hashMap3, workerConfig, lookupKafkaClusterId);
        if (this.topicAdminSupplier != null) {
            supplier = this.topicAdminSupplier;
        } else {
            this.ownTopicAdmin = new SharedTopicAdmin(hashMap3);
            supplier = this.ownTopicAdmin;
        }
        this.kafkaLog = createKafkaBasedLog(this.statusTopic, hashMap, hashMap2, (th, consumerRecord) -> {
            read(consumerRecord);
        }, TopicAdmin.defineTopic(this.statusTopic).config(workerConfig instanceof DistributedConfig ? ((DistributedConfig) workerConfig).statusStorageTopicSettings() : Collections.emptyMap()).compacted().partitions(workerConfig.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG).intValue()).replicationFactor(workerConfig.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG).shortValue()).build(), supplier);
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<String, byte[]>> callback, NewTopic newTopic, Supplier<TopicAdmin> supplier) {
        return new KafkaBasedLog<>(str, map, map2, supplier, callback, this.time, topicAdmin -> {
            log.debug("Creating admin client to manage Connect internal status topic");
            if (topicAdmin.createTopics(newTopic).contains(str)) {
                return;
            }
            log.debug("Using admin client to check cleanup policy of '{}' topic is '{}'", str, TopicConfig.CLEANUP_POLICY_COMPACT);
            topicAdmin.verifyTopicCleanupPolicyOnlyCompact(str, DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connector and task statuses");
        });
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void start() {
        this.kafkaLog.start();
        this.kafkaLog.readToEnd();
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void stop() {
        try {
            this.kafkaLog.stop();
        } finally {
            if (this.ownTopicAdmin != null) {
                this.ownTopicAdmin.close();
            }
        }
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void put(ConnectorStatus connectorStatus) {
        sendConnectorStatus(connectorStatus, false);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void putSafe(ConnectorStatus connectorStatus) {
        sendConnectorStatus(connectorStatus, true);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void put(TaskStatus taskStatus) {
        sendTaskStatus(taskStatus, false);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void putSafe(TaskStatus taskStatus) {
        sendTaskStatus(taskStatus, true);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void put(TopicStatus topicStatus) {
        sendTopicStatus(topicStatus.connector(), topicStatus.topic(), topicStatus);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void flush() {
        this.kafkaLog.flush();
    }

    private void sendConnectorStatus(ConnectorStatus connectorStatus, boolean z) {
        String id = connectorStatus.id();
        send(CONNECTOR_STATUS_PREFIX + id, connectorStatus, getOrAdd(id), z);
    }

    private void sendTaskStatus(TaskStatus taskStatus, boolean z) {
        ConnectorTaskId id = taskStatus.id();
        send(TASK_STATUS_PREFIX + id.connector() + "-" + id.task(), taskStatus, getOrAdd(id), z);
    }

    private void sendTopicStatus(String str, String str2, TopicStatus topicStatus) {
        final String str3 = TOPIC_STATUS_PREFIX + str2 + TOPIC_STATUS_SEPARATOR + str;
        final byte[] serializeTopicStatus = serializeTopicStatus(topicStatus);
        this.kafkaLog.send(str3, serializeTopicStatus, new com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.Callback() { // from class: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.KafkaStatusBackingStore.1
            @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.Callback
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    return;
                }
                if (exc instanceof RetriableException) {
                    KafkaStatusBackingStore.this.kafkaLog.send(str3, serializeTopicStatus, this);
                } else {
                    KafkaStatusBackingStore.log.error("Failed to write status update", exc);
                }
            }
        });
    }

    private <V extends AbstractStatus<?>> void send(final String str, final V v, final CacheEntry<V> cacheEntry, final boolean z) {
        synchronized (this) {
            this.generation = v.generation();
            if (!z || cacheEntry.canWriteSafely(v)) {
                final int increment = cacheEntry.increment();
                final byte[] serialize = v.state() == AbstractStatus.State.DESTROYED ? null : serialize(v);
                this.kafkaLog.send(str, serialize, new com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.Callback() { // from class: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.KafkaStatusBackingStore.2
                    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.Callback
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc == null) {
                            return;
                        }
                        if (!(exc instanceof RetriableException)) {
                            KafkaStatusBackingStore.log.error("Failed to write status update", exc);
                            return;
                        }
                        synchronized (KafkaStatusBackingStore.this) {
                            if (cacheEntry.isDeleted() || v.generation() != KafkaStatusBackingStore.this.generation || (z && !cacheEntry.canWriteSafely(v, increment))) {
                                return;
                            }
                            KafkaStatusBackingStore.this.kafkaLog.send(str, serialize, this);
                        }
                    }
                });
            }
        }
    }

    private synchronized CacheEntry<ConnectorStatus> getOrAdd(String str) {
        CacheEntry<ConnectorStatus> cacheEntry = this.connectors.get(str);
        if (cacheEntry == null) {
            cacheEntry = new CacheEntry<>();
            this.connectors.put(str, cacheEntry);
        }
        return cacheEntry;
    }

    private synchronized void remove(String str) {
        CacheEntry<ConnectorStatus> remove = this.connectors.remove(str);
        if (remove != null) {
            remove.delete();
        }
        Map<Integer, CacheEntry<TaskStatus>> remove2 = this.tasks.remove(str);
        if (remove2 != null) {
            Iterator<CacheEntry<TaskStatus>> it = remove2.values().iterator();
            while (it.hasNext()) {
                it.next().delete();
            }
        }
    }

    private synchronized CacheEntry<TaskStatus> getOrAdd(ConnectorTaskId connectorTaskId) {
        CacheEntry<TaskStatus> cacheEntry = this.tasks.get(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
        if (cacheEntry == null) {
            cacheEntry = new CacheEntry<>();
            this.tasks.put(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()), cacheEntry);
        }
        return cacheEntry;
    }

    private synchronized void remove(ConnectorTaskId connectorTaskId) {
        CacheEntry<TaskStatus> remove = this.tasks.remove(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
        if (remove != null) {
            remove.delete();
        }
    }

    private void removeTopic(String str, String str2) {
        ConcurrentMap<String, TopicStatus> concurrentMap = this.topics.get(str2);
        if (concurrentMap == null) {
            return;
        }
        concurrentMap.remove(str);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized TaskStatus get(ConnectorTaskId connectorTaskId) {
        CacheEntry<TaskStatus> cacheEntry = this.tasks.get(connectorTaskId.connector(), Integer.valueOf(connectorTaskId.task()));
        if (cacheEntry == null) {
            return null;
        }
        return cacheEntry.get();
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized ConnectorStatus get(String str) {
        CacheEntry<ConnectorStatus> cacheEntry = this.connectors.get(str);
        if (cacheEntry == null) {
            return null;
        }
        return cacheEntry.get();
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized Collection<TaskStatus> getAll(String str) {
        ArrayList arrayList = new ArrayList();
        Iterator<CacheEntry<TaskStatus>> it = this.tasks.row(str).values().iterator();
        while (it.hasNext()) {
            TaskStatus taskStatus = it.next().get();
            if (taskStatus != null) {
                arrayList.add(taskStatus);
            }
        }
        return arrayList;
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public TopicStatus getTopic(String str, String str2) {
        ConcurrentMap<String, TopicStatus> concurrentMap = this.topics.get(Objects.requireNonNull(str));
        if (concurrentMap != null) {
            return concurrentMap.get(Objects.requireNonNull(str2));
        }
        return null;
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public Collection<TopicStatus> getAllTopics(String str) {
        ConcurrentMap<String, TopicStatus> concurrentMap = this.topics.get(Objects.requireNonNull(str));
        return concurrentMap != null ? Collections.unmodifiableCollection((Collection) Objects.requireNonNull(concurrentMap.values())) : Collections.emptySet();
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public void deleteTopic(String str, String str2) {
        sendTopicStatus((String) Objects.requireNonNull(str), (String) Objects.requireNonNull(str2), null);
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.StatusBackingStore
    public synchronized Set<String> connectors() {
        return new HashSet(this.connectors.keySet());
    }

    private ConnectorStatus parseConnectorStatus(String str, byte[] bArr) {
        try {
            SchemaAndValue connectData = this.converter.toConnectData(this.statusTopic, bArr);
            if (connectData.value() instanceof Map) {
                Map map = (Map) connectData.value();
                return new ConnectorStatus(str, AbstractStatus.State.valueOf((String) map.get(STATE_KEY_NAME)), (String) map.get(TRACE_KEY_NAME), (String) map.get(WORKER_ID_KEY_NAME), ((Long) map.get(GENERATION_KEY_NAME)).intValue());
            }
            log.error("Invalid connector status type {}", connectData.value().getClass());
            return null;
        } catch (Exception e) {
            log.error("Failed to deserialize connector status", e);
            return null;
        }
    }

    private TaskStatus parseTaskStatus(ConnectorTaskId connectorTaskId, byte[] bArr) {
        try {
            SchemaAndValue connectData = this.converter.toConnectData(this.statusTopic, bArr);
            if (!(connectData.value() instanceof Map)) {
                log.error("Invalid task status type {}", connectData.value().getClass());
                return null;
            }
            Map map = (Map) connectData.value();
            return new TaskStatus(connectorTaskId, AbstractStatus.State.valueOf((String) map.get(STATE_KEY_NAME)), (String) map.get(WORKER_ID_KEY_NAME), ((Long) map.get(GENERATION_KEY_NAME)).intValue(), (String) map.get(TRACE_KEY_NAME));
        } catch (Exception e) {
            log.error("Failed to deserialize task status", e);
            return null;
        }
    }

    protected TopicStatus parseTopicStatus(byte[] bArr) {
        try {
            SchemaAndValue connectData = this.converter.toConnectData(this.statusTopic, bArr);
            if (!(connectData.value() instanceof Map)) {
                log.error("Invalid topic status value {}", connectData.value());
                return null;
            }
            Object obj = ((Map) connectData.value()).get("topic");
            if (obj instanceof Map) {
                Map map = (Map) obj;
                return new TopicStatus((String) map.get("name"), (String) map.get("connector"), ((Long) map.get("task")).intValue(), ((Long) map.get(TOPIC_DISCOVER_TIMESTAMP_KEY)).longValue());
            }
            log.error("Invalid topic status value {} for field {}", obj, "topic");
            return null;
        } catch (Exception e) {
            log.error("Failed to deserialize topic status", e);
            return null;
        }
    }

    private byte[] serialize(AbstractStatus<?> abstractStatus) {
        Struct struct = new Struct(STATUS_SCHEMA_V0);
        struct.put(STATE_KEY_NAME, abstractStatus.state().name());
        if (abstractStatus.trace() != null) {
            struct.put(TRACE_KEY_NAME, abstractStatus.trace());
        }
        struct.put(WORKER_ID_KEY_NAME, abstractStatus.workerId());
        struct.put(GENERATION_KEY_NAME, Integer.valueOf(abstractStatus.generation()));
        return this.converter.fromConnectData(this.statusTopic, STATUS_SCHEMA_V0, struct);
    }

    protected byte[] serializeTopicStatus(TopicStatus topicStatus) {
        if (topicStatus == null) {
            return null;
        }
        Struct struct = new Struct(TOPIC_STATUS_VALUE_SCHEMA_V0);
        struct.put("name", topicStatus.topic());
        struct.put("connector", topicStatus.connector());
        struct.put("task", Integer.valueOf(topicStatus.task()));
        struct.put(TOPIC_DISCOVER_TIMESTAMP_KEY, Long.valueOf(topicStatus.discoverTimestamp()));
        return this.converter.fromConnectData(this.statusTopic, TOPIC_STATUS_SCHEMA_V0, Collections.singletonMap("topic", struct));
    }

    private String parseConnectorStatusKey(String str) {
        return str.substring(CONNECTOR_STATUS_PREFIX.length());
    }

    private ConnectorTaskId parseConnectorTaskId(String str) {
        String[] split = str.split("-");
        if (split.length < 4) {
            return null;
        }
        try {
            return new ConnectorTaskId(Utils.join(Arrays.copyOfRange(split, 2, split.length - 1), "-"), Integer.parseInt(split[split.length - 1]));
        } catch (NumberFormatException e) {
            log.warn("Invalid task status key {}", str);
            return null;
        }
    }

    private void readConnectorStatus(String str, byte[] bArr) {
        String parseConnectorStatusKey = parseConnectorStatusKey(str);
        if (parseConnectorStatusKey.isEmpty()) {
            log.warn("Discarding record with invalid connector status key {}", str);
            return;
        }
        if (bArr == null) {
            log.trace("Removing status for connector {}", parseConnectorStatusKey);
            remove(parseConnectorStatusKey);
            return;
        }
        ConnectorStatus parseConnectorStatus = parseConnectorStatus(parseConnectorStatusKey, bArr);
        if (parseConnectorStatus == null) {
            return;
        }
        synchronized (this) {
            log.trace("Received connector {} status update {}", parseConnectorStatusKey, parseConnectorStatus);
            getOrAdd(parseConnectorStatusKey).put(parseConnectorStatus);
        }
    }

    private void readTaskStatus(String str, byte[] bArr) {
        ConnectorTaskId parseConnectorTaskId = parseConnectorTaskId(str);
        if (parseConnectorTaskId == null) {
            log.warn("Discarding record with invalid task status key {}", str);
            return;
        }
        if (bArr == null) {
            log.trace("Removing task status for {}", parseConnectorTaskId);
            remove(parseConnectorTaskId);
            return;
        }
        TaskStatus parseTaskStatus = parseTaskStatus(parseConnectorTaskId, bArr);
        if (parseTaskStatus == null) {
            log.warn("Failed to parse task status with key {}", str);
            return;
        }
        synchronized (this) {
            log.trace("Received task {} status update {}", parseConnectorTaskId, parseTaskStatus);
            getOrAdd(parseConnectorTaskId).put(parseTaskStatus);
        }
    }

    private void readTopicStatus(String str, byte[] bArr) {
        int indexOf = str.indexOf(58);
        int length = TOPIC_STATUS_PREFIX.length();
        if (length > indexOf) {
            log.warn("Discarding record with invalid topic status key {}", str);
            return;
        }
        String substring = str.substring(length, indexOf);
        if (substring.isEmpty()) {
            log.warn("Discarding record with invalid topic status key containing empty topic {}", str);
            return;
        }
        int length2 = indexOf + TOPIC_STATUS_SEPARATOR.length();
        if (length2 > str.length()) {
            log.warn("Discarding record with invalid topic status key {}", str);
            return;
        }
        String substring2 = str.substring(length2);
        if (substring2.isEmpty()) {
            log.warn("Discarding record with invalid topic status key containing empty connector {}", str);
            return;
        }
        if (bArr == null) {
            log.trace("Removing status for topic {} and connector {}", substring, substring2);
            removeTopic(substring, substring2);
            return;
        }
        TopicStatus parseTopicStatus = parseTopicStatus(bArr);
        if (parseTopicStatus == null) {
            log.warn("Failed to parse topic status with key {}", str);
        } else {
            log.trace("Received topic status update {}", parseTopicStatus);
            this.topics.computeIfAbsent(substring2, str2 -> {
                return new ConcurrentHashMap();
            }).put(substring, parseTopicStatus);
        }
    }

    void read(ConsumerRecord<String, byte[]> consumerRecord) {
        String key = consumerRecord.key();
        if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
            readConnectorStatus(key, consumerRecord.value());
            return;
        }
        if (key.startsWith(TASK_STATUS_PREFIX)) {
            readTaskStatus(key, consumerRecord.value());
        } else if (key.startsWith(TOPIC_STATUS_PREFIX)) {
            readTopicStatus(key, consumerRecord.value());
        } else {
            log.warn("Discarding record with invalid key {}", key);
        }
    }
}
