package org.apache.pulsar.functions.instance.state;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.sundr.codegen.model.Node;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.DeleteOption;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.api.StateStoreContext;
import org.apache.pulsar.functions.utils.FunctionCommon;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.7.2.2-rc-202105210450.jar:org/apache/pulsar/functions/instance/state/BKStateStoreImpl.class */
public class BKStateStoreImpl implements DefaultStateStore {
    private final String tenant;
    private final String namespace;
    private final String name;
    private final String fqsn;
    private final Table<ByteBuf, ByteBuf> table;

    public BKStateStoreImpl(String str, String str2, String str3, Table<ByteBuf, ByteBuf> table) {
        this.tenant = str;
        this.namespace = str2;
        this.name = str3;
        this.table = table;
        this.fqsn = FunctionCommon.getFullyQualifiedName(str, str2, str3);
    }

    @Override // org.apache.pulsar.functions.api.StateStore
    public String tenant() {
        return this.tenant;
    }

    @Override // org.apache.pulsar.functions.api.StateStore
    public String namespace() {
        return this.namespace;
    }

    @Override // org.apache.pulsar.functions.api.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.pulsar.functions.api.StateStore
    public String fqsn() {
        return this.fqsn;
    }

    @Override // org.apache.pulsar.functions.api.StateStore
    public void init(StateStoreContext stateStoreContext) {
    }

    @Override // org.apache.pulsar.functions.api.StateStore, java.lang.AutoCloseable
    public void close() {
        this.table.close();
    }

    @Override // org.apache.pulsar.functions.api.state.CounterStateStore
    public CompletableFuture<Void> incrCounterAsync(String str, long j) {
        return this.table.increment(Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8)), j);
    }

    @Override // org.apache.pulsar.functions.api.state.CounterStateStore
    public void incrCounter(String str, long j) {
        try {
            FutureUtils.result(incrCounterAsync(str, j));
        } catch (Exception e) {
            throw new RuntimeException("Failed to increment key '" + str + "' by amount '" + j + Node.Q, e);
        }
    }

    @Override // org.apache.pulsar.functions.api.state.CounterStateStore
    public CompletableFuture<Long> getCounterAsync(String str) {
        return this.table.getNumber(Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8)));
    }

    @Override // org.apache.pulsar.functions.api.state.CounterStateStore
    public long getCounter(String str) {
        try {
            return ((Long) FutureUtils.result(getCounterAsync(str))).longValue();
        } catch (Exception e) {
            throw new RuntimeException("Failed to retrieve counter from key '" + str + Node.Q);
        }
    }

    @Override // org.apache.pulsar.functions.api.state.ByteBufferStateStore
    public CompletableFuture<Void> putAsync(String str, ByteBuffer byteBuffer) {
        if (byteBuffer == null) {
            return this.table.put(Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8)), null);
        }
        byteBuffer.position(0);
        return this.table.put(Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8)), Unpooled.wrappedBuffer(byteBuffer));
    }

    @Override // org.apache.pulsar.functions.api.state.ByteBufferStateStore
    public void put(String str, ByteBuffer byteBuffer) {
        try {
            FutureUtils.result(putAsync(str, byteBuffer));
        } catch (Exception e) {
            throw new RuntimeException("Failed to update the state value for key '" + str + Node.Q);
        }
    }

    @Override // org.apache.pulsar.functions.api.state.ByteBufferStateStore
    public CompletableFuture<Void> deleteAsync(String str) {
        return this.table.delete((Table<ByteBuf, ByteBuf>) Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8)), (DeleteOption<Table<ByteBuf, ByteBuf>>) Options.delete()).thenApply(deleteResult -> {
            return null;
        });
    }

    @Override // org.apache.pulsar.functions.api.state.ByteBufferStateStore
    public void delete(String str) {
        try {
            FutureUtils.result(deleteAsync(str));
        } catch (Exception e) {
            throw new RuntimeException("Failed to delete the state value for key '" + str + Node.Q);
        }
    }

    @Override // org.apache.pulsar.functions.api.state.ByteBufferStateStore
    public CompletableFuture<ByteBuffer> getAsync(String str) {
        return this.table.get(Unpooled.wrappedBuffer(str.getBytes(StandardCharsets.UTF_8))).thenApply(byteBuf -> {
            if (byteBuf == null) {
                return null;
            }
            try {
                ByteBuffer allocate = ByteBuffer.allocate(byteBuf.readableBytes());
                byteBuf.readBytes(allocate);
                allocate.position(0);
                return allocate;
            } finally {
                ReferenceCountUtil.safeRelease(byteBuf);
            }
        });
    }

    @Override // org.apache.pulsar.functions.api.state.ByteBufferStateStore
    public ByteBuffer get(String str) {
        try {
            return (ByteBuffer) FutureUtils.result(getAsync(str));
        } catch (Exception e) {
            throw new RuntimeException("Failed to retrieve the state value for key '" + str + Node.Q, e);
        }
    }
}
