package org.apache.pulsar.io.kafka.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.9.3.17.jar:org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.class */
public class PulsarKafkaSinkTaskContext implements SinkTaskContext {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PulsarKafkaSinkTaskContext.class);
    private final Map<String, String> config;
    private final SinkContext ctx;
    private final OffsetBackingStore offsetStore;
    private final String topicNamespace;
    private final Consumer<Collection<TopicPartition>> onPartitionChange;
    private final AtomicBoolean runRepartition = new AtomicBoolean(false);
    private final ConcurrentHashMap<TopicPartition, Long> currentOffsets = new ConcurrentHashMap<>();

    public PulsarKafkaSinkTaskContext(Map<String, String> map, SinkContext sinkContext, Consumer<Collection<TopicPartition>> consumer) {
        this.config = map;
        this.ctx = sinkContext;
        this.offsetStore = new PulsarOffsetBackingStore(sinkContext.getPulsarClient());
        PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(map);
        this.offsetStore.configure(pulsarKafkaWorkerConfig);
        this.offsetStore.start();
        this.onPartitionChange = consumer;
        this.topicNamespace = pulsarKafkaWorkerConfig.getString(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG);
    }

    public void close() {
        this.offsetStore.stop();
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public Map<String, String> configs() {
        return this.config;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public Long currentOffset(String str, int i) {
        return currentOffset(new TopicPartition(str, i));
    }

    private Long currentOffset(TopicPartition topicPartition) {
        return this.currentOffsets.computeIfAbsent(topicPartition, topicPartition2 -> {
            LinkedList newLinkedList = Lists.newLinkedList();
            ByteBuffer byteBuffer = topicPartitionAsKey(topicPartition);
            newLinkedList.add(byteBuffer);
            try {
                Map<ByteBuffer, ByteBuffer> map = this.offsetStore.get(newLinkedList).get();
                if (map != null && map.size() != 0) {
                    Optional<U> map2 = map.entrySet().stream().filter(entry -> {
                        return ((ByteBuffer) entry.getKey()).equals(byteBuffer);
                    }).findFirst().map(entry2 -> {
                        return (ByteBuffer) entry2.getValue();
                    });
                    if (map2.isPresent()) {
                        long j = ((ByteBuffer) map2.get()).getLong();
                        if (log.isDebugEnabled()) {
                            log.debug("read initial offset for {} == {}", topicPartition, Long.valueOf(j));
                        }
                        return Long.valueOf(j);
                    }
                }
                return -1L;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("error getting initial state of {}", topicPartition, e);
                throw new RuntimeException("error getting initial state of " + topicPartition, e);
            } catch (ExecutionException e2) {
                log.error("error getting initial state of {}", topicPartition, e2);
                throw new RuntimeException("error getting initial state of " + topicPartition, e2);
            }
        });
    }

    public Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(this.currentOffsets.size());
        this.currentOffsets.forEach((topicPartition, l) -> {
            if (l.longValue() > 0) {
                newHashMapWithExpectedSize.put(topicPartition, new OffsetAndMetadata(l.longValue(), Optional.empty(), null));
            }
        });
        return newHashMapWithExpectedSize;
    }

    private ByteBuffer topicPartitionAsKey(TopicPartition topicPartition) {
        return ByteBuffer.wrap((this.topicNamespace + "/" + topicPartition.toString()).getBytes(StandardCharsets.UTF_8));
    }

    private void fillOffsetMap(Map<ByteBuffer, ByteBuffer> map, TopicPartition topicPartition, long j) {
        ByteBuffer byteBuffer = topicPartitionAsKey(topicPartition);
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(j);
        allocate.flip();
        map.put(byteBuffer, allocate);
    }

    private void seekAndUpdateOffset(TopicPartition topicPartition, long j) {
        try {
            this.ctx.seek(topicPartition.topic(), topicPartition.partition(), MessageIdUtils.getMessageId(j));
            if (!this.currentOffsets.containsKey(topicPartition)) {
                this.runRepartition.set(true);
            }
            this.currentOffsets.put(topicPartition, Long.valueOf(j));
        } catch (PulsarClientException e) {
            log.error("Failed to seek topic {} partition {} offset {}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Long.valueOf(j), e);
            throw new RuntimeException("Failed to seek topic " + topicPartition.topic() + " partition " + topicPartition.partition() + " offset " + j, e);
        }
    }

    public void updateLastOffset(TopicPartition topicPartition, long j) {
        if (!this.currentOffsets.containsKey(topicPartition)) {
            this.runRepartition.set(true);
        }
        this.currentOffsets.put(topicPartition, Long.valueOf(j));
        if (this.runRepartition.compareAndSet(true, false)) {
            this.onPartitionChange.accept(this.currentOffsets.keySet());
        }
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public void offset(Map<TopicPartition, Long> map) {
        map.forEach((topicPartition, l) -> {
            seekAndUpdateOffset(topicPartition, l.longValue());
        });
        if (this.runRepartition.compareAndSet(true, false)) {
            this.onPartitionChange.accept(this.currentOffsets.keySet());
        }
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public void offset(TopicPartition topicPartition, long j) {
        seekAndUpdateOffset(topicPartition, j);
        if (this.runRepartition.compareAndSet(true, false)) {
            this.onPartitionChange.accept(this.currentOffsets.keySet());
        }
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public void timeout(long j) {
        log.warn("timeout() is called but is not supported currently.");
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public Set<TopicPartition> assignment() {
        return this.currentOffsets.keySet();
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public void pause(TopicPartition... topicPartitionArr) {
        for (TopicPartition topicPartition : topicPartitionArr) {
            try {
                this.ctx.pause(topicPartition.topic(), topicPartition.partition());
            } catch (PulsarClientException e) {
                log.error("Failed to pause topic {} partition {}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), e);
                throw new RuntimeException("Failed to pause topic " + topicPartition.topic() + " partition " + topicPartition.partition(), e);
            }
        }
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public void resume(TopicPartition... topicPartitionArr) {
        for (TopicPartition topicPartition : topicPartitionArr) {
            try {
                this.ctx.resume(topicPartition.topic(), topicPartition.partition());
            } catch (PulsarClientException e) {
                log.error("Failed to resume topic {} partition {}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), e);
                throw new RuntimeException("Failed to resume topic " + topicPartition.topic() + " partition " + topicPartition.partition(), e);
            }
        }
    }

    @Override // org.apache.kafka.connect.sink.SinkTaskContext
    public void requestCommit() {
        log.warn("requestCommit() is called but is not supported currently.");
    }

    public void flushOffsets(Map<TopicPartition, OffsetAndMetadata> map) throws Exception {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
        map.forEach((topicPartition, offsetAndMetadata) -> {
            fillOffsetMap(newHashMapWithExpectedSize, topicPartition, offsetAndMetadata.offset());
        });
        CompletableFuture completableFuture = new CompletableFuture();
        this.offsetStore.set(newHashMapWithExpectedSize, (th, r8) -> {
            if (th == null) {
                completableFuture.complete(null);
            } else {
                log.error("error flushing offsets for {}", map, th);
                completableFuture.completeExceptionally(th);
            }
        });
        completableFuture.get();
    }
}
