/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.transaction.coordinator.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.TransactionMetadataStoreStats;
import org.apache.pulsar.transaction.coordinator.impl.TxnMetaImpl;
import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLTransactionMetadataStore
extends TransactionMetadataStoreState
implements TransactionMetadataStore {
    private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
    private final TransactionCoordinatorID tcID;
    private final MLTransactionLogImpl transactionLog;
    @VisibleForTesting
    final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap();
    private final TransactionTimeoutTracker timeoutTracker;
    private final TransactionMetadataStoreStats transactionMetadataStoreStats;
    private final LongAdder createdTransactionCount;
    private final LongAdder committedTransactionCount;
    private final LongAdder abortedTransactionCount;
    private final LongAdder transactionTimeoutCount;
    private final LongAdder appendLogCount;
    private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
    private final ExecutorService internalPinnedExecutor;
    public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
    private final long maxActiveTransactionsPerCoordinator;

    public MLTransactionMetadataStore(TransactionCoordinatorID tcID, MLTransactionLogImpl mlTransactionLog, TransactionTimeoutTracker timeoutTracker, MLTransactionSequenceIdGenerator sequenceIdGenerator, long maxActiveTransactionsPerCoordinator) {
        super(TransactionMetadataStoreState.State.None);
        this.sequenceIdGenerator = sequenceIdGenerator;
        this.tcID = tcID;
        this.transactionLog = mlTransactionLog;
        this.timeoutTracker = timeoutTracker;
        this.transactionMetadataStoreStats = new TransactionMetadataStoreStats();
        this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator;
        this.createdTransactionCount = new LongAdder();
        this.committedTransactionCount = new LongAdder();
        this.abortedTransactionCount = new LongAdder();
        this.transactionTimeoutCount = new LongAdder();
        this.appendLogCount = new LongAdder();
        DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_" + tcID.toString() + "thread_factory");
        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)threadFactory);
    }

    public CompletableFuture<TransactionMetadataStore> init(final TransactionRecoverTracker recoverTracker) {
        final CompletableFuture<TransactionMetadataStore> completableFuture = new CompletableFuture<TransactionMetadataStore>();
        if (!this.changeToInitializingState()) {
            log.error("Managed ledger transaction metadata store change state error when init it");
            completableFuture.completeExceptionally((Throwable)new TransactionCoordinatorClientException.CoordinatorNotFoundException("transaction metadata store with tcId " + this.tcID.toString() + " change state to Initializing error when init it"));
        } else {
            this.recoverTime.setRecoverStartTime(System.currentTimeMillis());
            this.internalPinnedExecutor.execute(() -> this.transactionLog.replayAsync(new TransactionLogReplayCallback(){

                @Override
                public void replayComplete() {
                    recoverTracker.appendOpenTransactionToTimeoutTracker();
                    if (!MLTransactionMetadataStore.this.changeToReadyState()) {
                        log.error("Managed ledger transaction metadata store change state error when replay complete");
                        completableFuture.completeExceptionally((Throwable)new TransactionCoordinatorClientException.CoordinatorNotFoundException("transaction metadata store with tcId " + MLTransactionMetadataStore.this.tcID.toString() + " change state to Ready error when init it"));
                    } else {
                        completableFuture.complete(MLTransactionMetadataStore.this);
                        MLTransactionMetadataStore.this.recoverTime.setRecoverEndTime(System.currentTimeMillis());
                    }
                }

                @Override
                public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
                    try {
                        TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(), transactionMetadataEntry.getTxnidLeastBits());
                        long transactionId = transactionMetadataEntry.getTxnidLeastBits();
                        switch (transactionMetadataEntry.getMetadataOp()) {
                            case NEW: {
                                long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
                                if (MLTransactionMetadataStore.this.txnMetaMap.containsKey(transactionId)) {
                                    ((List)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getRight()).add(position);
                                    break;
                                }
                                ArrayList<Position> positions = new ArrayList<Position>();
                                positions.add(position);
                                long openTimestamp = transactionMetadataEntry.getStartTime();
                                long timeoutAt = transactionMetadataEntry.getTimeoutMs();
                                String owner = transactionMetadataEntry.hasOwner() ? transactionMetadataEntry.getOwner() : null;
                                TxnMetaImpl left = new TxnMetaImpl(txnID, openTimestamp, timeoutAt, owner);
                                MLTransactionMetadataStore.this.txnMetaMap.put(transactionId, (Pair<TxnMeta, List<Position>>)MutablePair.of((Object)left, positions));
                                recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp);
                                break;
                            }
                            case ADD_PARTITION: {
                                if (!MLTransactionMetadataStore.this.txnMetaMap.containsKey(transactionId)) {
                                    MLTransactionMetadataStore.this.transactionLog.deletePosition(Collections.singletonList(position));
                                    break;
                                }
                                ((TxnMeta)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getLeft()).addProducedPartitions(transactionMetadataEntry.getPartitionsList());
                                ((List)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getRight()).add(position);
                                break;
                            }
                            case ADD_SUBSCRIPTION: {
                                if (!MLTransactionMetadataStore.this.txnMetaMap.containsKey(transactionId)) {
                                    MLTransactionMetadataStore.this.transactionLog.deletePosition(Collections.singletonList(position));
                                    break;
                                }
                                ((TxnMeta)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getLeft()).addAckedPartitions(MLTransactionMetadataStore.subscriptionToTxnSubscription(transactionMetadataEntry.getSubscriptionsList()));
                                ((List)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getRight()).add(position);
                                break;
                            }
                            case UPDATE: {
                                if (!MLTransactionMetadataStore.this.txnMetaMap.containsKey(transactionId)) {
                                    MLTransactionMetadataStore.this.transactionLog.deletePosition(Collections.singletonList(position));
                                    break;
                                }
                                TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
                                ((TxnMeta)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getLeft()).updateTxnStatus(transactionMetadataEntry.getNewStatus(), transactionMetadataEntry.getExpectedStatus());
                                ((List)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getRight()).add(position);
                                recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
                                if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
                                    MLTransactionMetadataStore.this.transactionLog.deletePosition((List)MLTransactionMetadataStore.this.txnMetaMap.get(transactionId).getRight()).thenAccept(v -> MLTransactionMetadataStore.this.txnMetaMap.remove(transactionId).getLeft());
                                }
                                break;
                            }
                            default: {
                                throw new CoordinatorException.InvalidTxnStatusException("Transaction `" + txnID + "` load replay metadata operation from transaction log with unknown operation");
                            }
                        }
                    }
                    catch (CoordinatorException.InvalidTxnStatusException e) {
                        MLTransactionMetadataStore.this.transactionLog.deletePosition(Collections.singletonList(position));
                        log.error(e.getMessage(), (Throwable)e);
                    }
                }
            }));
        }
        return completableFuture;
    }

    @Override
    public CompletableFuture<TxnStatus> getTxnStatus(TxnID txnID) {
        return CompletableFuture.completedFuture(((TxnMeta)this.txnMetaMap.get(txnID.getLeastSigBits()).getLeft()).status());
    }

    @Override
    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
        Pair<TxnMeta, List<Position>> txnMetaListPair = this.txnMetaMap.get(txnID.getLeastSigBits());
        CompletableFuture<TxnMeta> completableFuture = new CompletableFuture<TxnMeta>();
        if (txnMetaListPair == null) {
            completableFuture.completeExceptionally(new CoordinatorException.TransactionNotFoundException(txnID));
        } else {
            completableFuture.complete((TxnMeta)txnMetaListPair.getLeft());
        }
        return completableFuture;
    }

    @Override
    public CompletableFuture<TxnID> newTransaction(long timeOut, String owner) {
        if (this.maxActiveTransactionsPerCoordinator == 0L || this.maxActiveTransactionsPerCoordinator > (long)this.txnMetaMap.size()) {
            CompletableFuture<TxnID> completableFuture = new CompletableFuture<TxnID>();
            this.internalPinnedExecutor.execute(() -> {
                if (!this.checkIfReady()) {
                    completableFuture.completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "new Transaction"));
                    return;
                }
                long mostSigBits = this.tcID.getId();
                long leastSigBits = this.sequenceIdGenerator.generateSequenceId();
                TxnID txnID = new TxnID(mostSigBits, leastSigBits);
                long currentTimeMillis = System.currentTimeMillis();
                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry().setTxnidMostBits(mostSigBits).setTxnidLeastBits(leastSigBits).setStartTime(currentTimeMillis).setTimeoutMs(timeOut).setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW).setLastModificationTime(currentTimeMillis).setMaxLocalTxnId(this.sequenceIdGenerator.getCurrentSequenceId());
                if (owner != null) {
                    if (StringUtils.isBlank((CharSequence)owner)) {
                        completableFuture.completeExceptionally(new IllegalArgumentException("Owner can't be blank"));
                        return;
                    }
                    transactionMetadataEntry.setOwner(owner);
                }
                this.transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> {
                    if (throwable != null) {
                        completableFuture.completeExceptionally((Throwable)throwable);
                    } else {
                        this.appendLogCount.increment();
                        TxnMetaImpl txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut, owner);
                        ArrayList<Position> positions = new ArrayList<Position>();
                        positions.add((Position)position);
                        MutablePair pair = MutablePair.of((Object)txn, positions);
                        this.txnMetaMap.put(leastSigBits, (Pair<TxnMeta, List<Position>>)pair);
                        this.timeoutTracker.addTransaction(leastSigBits, timeOut);
                        this.createdTransactionCount.increment();
                        completableFuture.complete(txnID);
                    }
                });
            });
            return completableFuture;
        }
        return FutureUtil.failedFuture((Throwable)new CoordinatorException.ReachMaxActiveTxnException("New txn op reach max active txn! tcId : " + this.getTransactionCoordinatorID().getId()));
    }

    @Override
    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (!this.checkIfReady()) {
                promise.completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "add produced partition"));
                return;
            }
            ((CompletableFuture)this.getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry().setTxnidMostBits(txnID.getMostSigBits()).setTxnidLeastBits(txnID.getLeastSigBits()).setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_PARTITION).addAllPartitions(partitions).setLastModificationTime(System.currentTimeMillis()).setMaxLocalTxnId(this.sequenceIdGenerator.getCurrentSequenceId());
                return this.transactionLog.append(transactionMetadataEntry).thenAccept(position -> {
                    this.appendLogCount.increment();
                    try {
                        TxnMeta txnMeta = (TxnMeta)txnMetaListPair.getLeft();
                        synchronized (txnMeta) {
                            ((TxnMeta)txnMetaListPair.getLeft()).addProducedPartitions(partitions);
                            ((List)this.txnMetaMap.get(txnID.getLeastSigBits()).getRight()).add(position);
                        }
                        promise.complete(null);
                    }
                    catch (CoordinatorException.InvalidTxnStatusException e) {
                        this.transactionLog.deletePosition(Collections.singletonList(position));
                        log.error("TxnID {} add produced partition error with TxnStatus: {}", new Object[]{((TxnMeta)txnMetaListPair.getLeft()).id().toString(), ((TxnMeta)txnMetaListPair.getLeft()).status().name(), e});
                        promise.completeExceptionally(e);
                    }
                });
            })).exceptionally(ex -> {
                promise.completeExceptionally((Throwable)ex);
                return null;
            });
        });
        return promise;
    }

    @Override
    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID, List<TransactionSubscription> txnSubscriptions) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (!this.checkIfReady()) {
                promise.completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "add acked partition"));
                return;
            }
            ((CompletableFuture)this.getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry().setTxnidMostBits(txnID.getMostSigBits()).setTxnidLeastBits(txnID.getLeastSigBits()).setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_SUBSCRIPTION).addAllSubscriptions(MLTransactionMetadataStore.txnSubscriptionToSubscription(txnSubscriptions)).setLastModificationTime(System.currentTimeMillis()).setMaxLocalTxnId(this.sequenceIdGenerator.getCurrentSequenceId());
                return this.transactionLog.append(transactionMetadataEntry).thenAccept(position -> {
                    this.appendLogCount.increment();
                    try {
                        TxnMeta txnMeta = (TxnMeta)txnMetaListPair.getLeft();
                        synchronized (txnMeta) {
                            ((TxnMeta)txnMetaListPair.getLeft()).addAckedPartitions(txnSubscriptions);
                            ((List)this.txnMetaMap.get(txnID.getLeastSigBits()).getRight()).add(position);
                        }
                        promise.complete(null);
                    }
                    catch (CoordinatorException.InvalidTxnStatusException e) {
                        this.transactionLog.deletePosition(Collections.singletonList(position));
                        log.error("TxnID : " + ((TxnMeta)txnMetaListPair.getLeft()).id().toString() + " add acked subscription error with TxnStatus : " + ((TxnMeta)txnMetaListPair.getLeft()).status().name(), (Throwable)e);
                        promise.completeExceptionally(e);
                    }
                });
            })).exceptionally(ex -> {
                promise.completeExceptionally((Throwable)ex);
                return null;
            });
        });
        return promise;
    }

    @Override
    public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus, TxnStatus expectedStatus, boolean isTimeout) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (!this.checkIfReady()) {
                promise.completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(this.tcID, TransactionMetadataStoreState.State.Ready, this.getState(), "update transaction status"));
                return;
            }
            ((CompletableFuture)this.getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
                if (((TxnMeta)txnMetaListPair.getLeft()).status() == newStatus) {
                    promise.complete(null);
                    return promise;
                }
                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry().setTxnidMostBits(txnID.getMostSigBits()).setTxnidLeastBits(txnID.getLeastSigBits()).setExpectedStatus(expectedStatus).setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.UPDATE).setLastModificationTime(System.currentTimeMillis()).setNewStatus(newStatus).setMaxLocalTxnId(this.sequenceIdGenerator.getCurrentSequenceId());
                return this.transactionLog.append(transactionMetadataEntry).thenAccept(position -> {
                    this.appendLogCount.increment();
                    try {
                        TxnMeta txnMeta = (TxnMeta)txnMetaListPair.getLeft();
                        synchronized (txnMeta) {
                            ((TxnMeta)txnMetaListPair.getLeft()).updateTxnStatus(newStatus, expectedStatus);
                            ((List)txnMetaListPair.getRight()).add(position);
                        }
                        if (newStatus == TxnStatus.ABORTING && isTimeout) {
                            this.transactionTimeoutCount.increment();
                        }
                        if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
                            this.transactionMetadataStoreStats.addTransactionExecutionLatencySample(System.currentTimeMillis() - ((TxnMeta)txnMetaListPair.getLeft()).getOpenTimestamp());
                            if (newStatus == TxnStatus.COMMITTED) {
                                this.committedTransactionCount.increment();
                            } else {
                                this.abortedTransactionCount.increment();
                            }
                            this.txnMetaMap.remove(txnID.getLeastSigBits());
                            this.transactionLog.deletePosition((List)txnMetaListPair.getRight()).exceptionally(ex -> {
                                log.warn("Failed to delete transaction log position at end transaction [{}]", (Object)txnID);
                                return null;
                            });
                        }
                        promise.complete(null);
                    }
                    catch (CoordinatorException.InvalidTxnStatusException e) {
                        this.transactionLog.deletePosition(Collections.singletonList(position));
                        log.error("TxnID : " + ((TxnMeta)txnMetaListPair.getLeft()).id().toString() + " add update txn status error with TxnStatus : " + ((TxnMeta)txnMetaListPair.getLeft()).status().name(), (Throwable)e);
                        promise.completeExceptionally(e);
                    }
                });
            })).exceptionally(ex -> {
                promise.completeExceptionally((Throwable)ex);
                return null;
            });
        });
        return promise;
    }

    @Override
    public long getLowWaterMark() {
        try {
            return this.txnMetaMap.firstKey() - 1L;
        }
        catch (NoSuchElementException e) {
            return 0L;
        }
    }

    @Override
    public TransactionCoordinatorID getTransactionCoordinatorID() {
        return this.tcID;
    }

    @Override
    public TransactionCoordinatorStats getCoordinatorStats() {
        TransactionCoordinatorStats transactionCoordinatorstats = new TransactionCoordinatorStats();
        transactionCoordinatorstats.setLowWaterMark(this.getLowWaterMark());
        transactionCoordinatorstats.setState(this.getState().name());
        transactionCoordinatorstats.setLeastSigBits(this.sequenceIdGenerator.getCurrentSequenceId());
        transactionCoordinatorstats.ongoingTxnSize = this.txnMetaMap.size();
        transactionCoordinatorstats.recoverStartTime = this.recoverTime.getRecoverStartTime();
        transactionCoordinatorstats.recoverEndTime = this.recoverTime.getRecoverEndTime();
        return transactionCoordinatorstats;
    }

    private CompletableFuture<Pair<TxnMeta, List<Position>>> getTxnPositionPair(TxnID txnID) {
        CompletableFuture<Pair<TxnMeta, List<Position>>> completableFuture = new CompletableFuture<Pair<TxnMeta, List<Position>>>();
        Pair<TxnMeta, List<Position>> txnMetaListPair = this.txnMetaMap.get(txnID.getLeastSigBits());
        if (txnMetaListPair == null) {
            completableFuture.completeExceptionally(new CoordinatorException.TransactionNotFoundException(txnID));
        } else {
            completableFuture.complete(txnMetaListPair);
        }
        return completableFuture;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.changeToClosingState()) {
            this.internalPinnedExecutor.shutdown();
            return this.transactionLog.closeAsync().thenCompose(v -> {
                this.txnMetaMap.clear();
                this.timeoutTracker.close();
                if (!this.changeToCloseState()) {
                    return FutureUtil.failedFuture((Throwable)new IllegalStateException("Managed ledger transaction metadata store state to close error!"));
                }
                MoreExecutors.shutdownAndAwaitTermination((ExecutorService)this.internalPinnedExecutor, (Duration)Duration.ofSeconds(5L));
                return CompletableFuture.completedFuture(null);
            });
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public TransactionMetadataStoreStats getMetadataStoreStats() {
        this.transactionMetadataStoreStats.setCoordinatorId(this.tcID.getId());
        this.transactionMetadataStoreStats.setActives(this.txnMetaMap.size());
        this.transactionMetadataStoreStats.setCreatedCount(this.createdTransactionCount.longValue());
        this.transactionMetadataStoreStats.setCommittedCount(this.committedTransactionCount.longValue());
        this.transactionMetadataStoreStats.setAbortedCount(this.abortedTransactionCount.longValue());
        this.transactionMetadataStoreStats.setTimeoutCount(this.transactionTimeoutCount.longValue());
        this.transactionMetadataStoreStats.setAppendLogCount(this.appendLogCount.longValue());
        return this.transactionMetadataStoreStats;
    }

    @Override
    public List<TxnMeta> getSlowTransactions(long timeout) {
        ArrayList<TxnMeta> txnMetas = new ArrayList<TxnMeta>();
        this.txnMetaMap.forEach((k, v) -> {
            if (((TxnMeta)v.getLeft()).getTimeoutAt() > timeout) {
                txnMetas.add((TxnMeta)v.getLeft());
            }
        });
        return txnMetas;
    }

    public static List<Subscription> txnSubscriptionToSubscription(List<TransactionSubscription> tnxSubscriptions) {
        ArrayList<Subscription> subscriptions = new ArrayList<Subscription>(tnxSubscriptions.size());
        for (TransactionSubscription transactionSubscription : tnxSubscriptions) {
            Subscription subscription = new Subscription().setSubscription(transactionSubscription.getSubscription()).setTopic(transactionSubscription.getTopic());
            subscriptions.add(subscription);
        }
        return subscriptions;
    }

    public static List<TransactionSubscription> subscriptionToTxnSubscription(List<Subscription> subscriptions) {
        ArrayList<TransactionSubscription> transactionSubscriptions = new ArrayList<TransactionSubscription>(subscriptions.size());
        for (Subscription subscription : subscriptions) {
            TransactionSubscription.TransactionSubscriptionBuilder transactionSubscriptionBuilder = TransactionSubscription.builder();
            transactionSubscriptionBuilder.subscription(subscription.getSubscription());
            transactionSubscriptionBuilder.topic(subscription.getTopic());
            transactionSubscriptions.add(transactionSubscriptionBuilder.build());
        }
        return transactionSubscriptions;
    }

    public ManagedLedger getManagedLedger() {
        return this.transactionLog.getManagedLedger();
    }
}

