/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarOffsetBackingStore
implements OffsetBackingStore {
    private static final Logger log = LoggerFactory.getLogger(PulsarOffsetBackingStore.class);
    private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<ByteBuffer, ByteBuffer>();
    private PulsarClient client;
    private String topic;
    private Map<String, Object> readerConfigMap = new HashMap<String, Object>();
    private Producer<byte[]> producer;
    private Reader<byte[]> reader;
    private volatile CompletableFuture<Void> outstandingReadToEnd = null;

    public PulsarOffsetBackingStore(PulsarClient client) {
        Preconditions.checkArgument((client != null ? 1 : 0) != 0, (Object)"Pulsar Client must be provided");
        this.client = client;
    }

    public void configure(WorkerConfig workerConfig) {
        this.topic = workerConfig.getString("offset.storage.topic");
        Preconditions.checkArgument((!StringUtils.isBlank((String)this.topic) ? 1 : 0) != 0, (Object)"Offset storage topic must be specified");
        try {
            this.readerConfigMap = IOConfigUtils.loadConfigFromJsonString((String)workerConfig.getString("offset.storage.reader.config"));
        }
        catch (JsonProcessingException exception) {
            log.warn("The provided reader configs are invalid, will not passing any extra config to the reader builder.", (Throwable)exception);
        }
        log.info("Configure offset backing store on pulsar topic {}", (Object)this.topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void readToEnd(CompletableFuture<Void> future) {
        PulsarOffsetBackingStore pulsarOffsetBackingStore = this;
        synchronized (pulsarOffsetBackingStore) {
            if (this.outstandingReadToEnd != null) {
                this.outstandingReadToEnd.whenComplete((result, cause) -> {
                    if (null != cause) {
                        future.completeExceptionally((Throwable)cause);
                    } else {
                        future.complete((Void)result);
                    }
                });
                return;
            }
            this.outstandingReadToEnd = future;
            future.whenComplete((result, cause) -> {
                PulsarOffsetBackingStore pulsarOffsetBackingStore = this;
                synchronized (pulsarOffsetBackingStore) {
                    this.outstandingReadToEnd = null;
                }
            });
        }
        this.producer.flushAsync().whenComplete((ignored, cause) -> {
            if (null != cause) {
                future.completeExceptionally((Throwable)cause);
            } else {
                this.checkAndReadNext(future);
            }
        });
    }

    private void checkAndReadNext(CompletableFuture<Void> endFuture) {
        this.reader.hasMessageAvailableAsync().whenComplete((hasMessageAvailable, cause) -> {
            if (null != cause) {
                endFuture.completeExceptionally((Throwable)cause);
            } else if (hasMessageAvailable.booleanValue()) {
                this.readNext(endFuture);
            } else {
                endFuture.complete(null);
            }
        });
    }

    private void readNext(CompletableFuture<Void> endFuture) {
        this.reader.readNextAsync().whenComplete((message, cause) -> {
            if (null != cause) {
                endFuture.completeExceptionally((Throwable)cause);
            } else {
                this.processMessage((Message<byte[]>)message);
                this.checkAndReadNext(endFuture);
            }
        });
    }

    void processMessage(Message<byte[]> message) {
        if (message.getKey() != null) {
            this.data.put(ByteBuffer.wrap(message.getKey().getBytes(StandardCharsets.UTF_8)), ByteBuffer.wrap((byte[])message.getValue()));
        } else {
            log.debug("Got message without key from the offset storage topic, skip it. message value: {}", message.getValue());
        }
    }

    public void start() {
        try {
            this.producer = this.client.newProducer(Schema.BYTES).topic(this.topic).create();
            log.info("Successfully created producer to produce updates to topic {}", (Object)this.topic);
            this.reader = this.client.newReader(Schema.BYTES).topic(this.topic).startMessageId(MessageId.earliest).loadConf(this.readerConfigMap).create();
            log.info("Successfully created reader to replay updates from topic {}", (Object)this.topic);
            CompletableFuture<Void> endFuture = new CompletableFuture<Void>();
            this.readToEnd(endFuture);
            endFuture.get();
        }
        catch (PulsarClientException e) {
            log.error("Failed to setup pulsar producer/reader to cluster", (Throwable)e);
            throw new RuntimeException("Failed to setup pulsar producer/reader to cluster ", e);
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("Failed to start PulsarOffsetBackingStore", (Throwable)e);
            throw new RuntimeException("Failed to start PulsarOffsetBackingStore", e);
        }
    }

    public void stop() {
        log.info("Stopping PulsarOffsetBackingStore");
        if (null != this.producer) {
            try {
                this.producer.flush();
            }
            catch (PulsarClientException pce) {
                log.warn("Failed to flush the producer", (Throwable)pce);
            }
            try {
                this.producer.close();
            }
            catch (PulsarClientException e) {
                log.warn("Failed to close producer", (Throwable)e);
            }
            this.producer = null;
        }
        if (null != this.reader) {
            try {
                this.reader.close();
            }
            catch (IOException e) {
                log.warn("Failed to close reader", (Throwable)e);
            }
            this.reader = null;
        }
        this.data.clear();
    }

    public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) {
        CompletableFuture<Void> endFuture = new CompletableFuture<Void>();
        this.readToEnd(endFuture);
        return endFuture.thenApply(ignored -> {
            HashMap<ByteBuffer, ByteBuffer> values = new HashMap<ByteBuffer, ByteBuffer>();
            for (ByteBuffer key : keys) {
                ByteBuffer value = this.data.get(key);
                if (null == value) continue;
                values.put(key, value);
            }
            return values;
        });
    }

    public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback) {
        values.forEach((key, value) -> {
            ByteBuf bb = Unpooled.wrappedBuffer((ByteBuffer)key);
            byte[] keyBytes = ByteBufUtil.getBytes((ByteBuf)bb);
            byte[] valBytes = null;
            if (value != null) {
                bb = Unpooled.wrappedBuffer((ByteBuffer)value);
                valBytes = ByteBufUtil.getBytes((ByteBuf)bb);
            } else {
                valBytes = MessageId.earliest.toByteArray();
            }
            this.producer.newMessage().key(new String(keyBytes, StandardCharsets.UTF_8)).value((Object)valBytes).sendAsync();
        });
        return this.producer.flushAsync().whenComplete((ignored, cause) -> {
            if (null != callback) {
                callback.onCompletion(cause, ignored);
            }
            if (null == cause) {
                this.readToEnd(new CompletableFuture<Void>());
            }
        });
    }
}

