package org.apache.pulsar.metadata.coordination.impl;

import com.fasterxml.jackson.databind.type.TypeFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.11.0-rc-202204302206.jar:org/apache/pulsar/metadata/coordination/impl/LockManagerImpl.class */
class LockManagerImpl<T> implements LockManager<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LockManagerImpl.class);
    private final Map<String, ResourceLockImpl<T>> locks;
    private final MetadataStoreExtended store;
    private final MetadataCache<T> cache;
    private final MetadataSerde<T> serde;
    private final ExecutorService executor;
    private State state;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.11.0-rc-202204302206.jar:org/apache/pulsar/metadata/coordination/impl/LockManagerImpl$State.class */
    public enum State {
        Ready,
        Closed
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LockManagerImpl(MetadataStoreExtended metadataStoreExtended, Class<T> cls, ExecutorService executorService) {
        this(metadataStoreExtended, new JSONMetadataSerdeSimpleType(TypeFactory.defaultInstance().constructSimpleType(cls, null)), executorService);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LockManagerImpl(MetadataStoreExtended metadataStoreExtended, MetadataSerde<T> metadataSerde, ExecutorService executorService) {
        this.locks = new ConcurrentHashMap();
        this.state = State.Ready;
        this.store = metadataStoreExtended;
        this.cache = metadataStoreExtended.getMetadataCache(metadataSerde);
        this.serde = metadataSerde;
        this.executor = executorService;
        metadataStoreExtended.registerSessionListener(this::handleSessionEvent);
        metadataStoreExtended.registerListener(this::handleDataNotification);
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LockManager
    public CompletableFuture<Optional<T>> readLock(String str) {
        return this.cache.get(str);
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LockManager
    public CompletableFuture<ResourceLock<T>> acquireLock(String str, T t) {
        ResourceLockImpl resourceLockImpl = new ResourceLockImpl(this.store, this.serde, str);
        CompletableFuture<ResourceLock<T>> completableFuture = new CompletableFuture<>();
        resourceLockImpl.acquire(t).thenRun(() -> {
            synchronized (this) {
                if (this.state == State.Ready) {
                    this.locks.put(str, resourceLockImpl);
                    resourceLockImpl.getLockExpiredFuture().thenRun(() -> {
                        log.info("Released resource lock on {}", str);
                        synchronized (this) {
                            this.locks.remove(str, resourceLockImpl);
                        }
                    });
                } else {
                    resourceLockImpl.release();
                }
            }
            completableFuture.complete(resourceLockImpl);
        }).exceptionally(th -> {
            if (th.getCause() instanceof MetadataStoreException.BadVersionException) {
                completableFuture.completeExceptionally(new MetadataStoreException.LockBusyException("Resource at " + str + " is already locked"));
                return null;
            }
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    private void handleSessionEvent(SessionEvent sessionEvent) {
        this.executor.execute(SafeRunnable.safeRun(() -> {
            ArrayList arrayList = new ArrayList();
            if (sessionEvent == SessionEvent.SessionReestablished) {
                log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
                for (ResourceLockImpl<T> resourceLockImpl : this.locks.values()) {
                    arrayList.add(resourceLockImpl.revalidate(resourceLockImpl.getValue()));
                }
            } else if (sessionEvent == SessionEvent.Reconnected) {
                log.info("Metadata store connection has been re-established. Revalidating locks that were pending.");
                Iterator<ResourceLockImpl<T>> it = this.locks.values().iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().revalidateIfNeededAfterReconnection());
                }
            }
            try {
                FutureUtil.waitForAll(arrayList).get();
            } catch (InterruptedException | ExecutionException e) {
                log.warn("Failure when processing session event", e);
            }
        }));
    }

    private void handleDataNotification(Notification notification) {
        ResourceLockImpl<T> resourceLockImpl;
        if (notification.getType() != NotificationType.Deleted || (resourceLockImpl = this.locks.get(notification.getPath())) == null) {
            return;
        }
        resourceLockImpl.lockWasInvalidated();
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LockManager
    public CompletableFuture<List<String>> listLocks(String str) {
        return this.cache.getChildren(str);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            asyncClose().join();
        } catch (CompletionException e) {
            throw MetadataStoreException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LockManager
    public CompletableFuture<Void> asyncClose() {
        synchronized (this) {
            if (this.state != State.Ready) {
                return CompletableFuture.completedFuture(null);
            }
            HashMap hashMap = new HashMap(this.locks);
            this.state = State.Closed;
            return FutureUtils.collect((List) hashMap.values().stream().map((v0) -> {
                return v0.release();
            }).collect(Collectors.toList())).thenApply(list -> {
                return null;
            });
        }
    }
}
