package org.apache.jackrabbit.oak.jcr;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFutureTask;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jcr.Node;
import javax.jcr.Repository;
import javax.jcr.Session;
import org.apache.jackrabbit.oak.commons.FixturesHelper;
import org.apache.jackrabbit.oak.commons.PerfLogger;
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.plugins.atomic.AtomicCounterEditor;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT.class */
public class AtomicCounterClusterIT extends DocumentClusterIT {
    private static final Set<FixturesHelper.Fixture> FIXTURES = FixturesHelper.getFixtures();
    private static final Logger LOG = LoggerFactory.getLogger(AtomicCounterClusterIT.class);
    private static final PerfLogger LOG_PERF = new PerfLogger(LOG);
    private List<CustomScheduledExecutor> executors = Lists.newArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT$CustomScheduledExecutor.class */
    public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
        private volatile AtomicInteger total;

        /* loaded from: input_file:org/apache/jackrabbit/oak/jcr/AtomicCounterClusterIT$CustomScheduledExecutor$CustomTask.class */
        private class CustomTask<V> implements RunnableScheduledFuture<V> {
            private final RunnableScheduledFuture<V> task;

            public CustomTask(Callable<V> callable, RunnableScheduledFuture<V> runnableScheduledFuture) {
                this.task = runnableScheduledFuture;
            }

            @Override // java.util.concurrent.RunnableFuture, java.lang.Runnable
            public void run() {
                this.task.run();
                CustomScheduledExecutor.this.total.decrementAndGet();
            }

            @Override // java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return this.task.cancel(z);
            }

            @Override // java.util.concurrent.Future
            public boolean isCancelled() {
                return this.task.isCancelled();
            }

            @Override // java.util.concurrent.Future
            public boolean isDone() {
                return this.task.isDone();
            }

            @Override // java.util.concurrent.Future
            public V get() throws InterruptedException, ExecutionException {
                return (V) this.task.get();
            }

            @Override // java.util.concurrent.Future
            public V get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                return (V) this.task.get(j, timeUnit);
            }

            @Override // java.util.concurrent.Delayed
            public long getDelay(TimeUnit timeUnit) {
                return this.task.getDelay(timeUnit);
            }

            @Override // java.lang.Comparable
            public int compareTo(Delayed delayed) {
                return this.task.compareTo(delayed);
            }

            @Override // java.util.concurrent.RunnableScheduledFuture
            public boolean isPeriodic() {
                return this.task.isPeriodic();
            }
        }

        public CustomScheduledExecutor(int i) {
            super(i);
            this.total = new AtomicInteger();
            this.total.set(0);
        }

        @Override // java.util.concurrent.ScheduledThreadPoolExecutor
        protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> runnableScheduledFuture) {
            if (!(callable instanceof AtomicCounterEditor.ConsolidatorTask)) {
                return super.decorateTask(callable, runnableScheduledFuture);
            }
            this.total.incrementAndGet();
            return new CustomTask(callable, runnableScheduledFuture);
        }

        public synchronized int getTotal() {
            return this.total.get();
        }
    }

    @BeforeClass
    public static void assumtions() {
        Assume.assumeTrue(FIXTURES.contains(FixturesHelper.Fixture.DOCUMENT_NS));
        Assume.assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable());
    }

    @Override // org.apache.jackrabbit.oak.jcr.DocumentClusterIT
    public void before() throws Exception {
        super.before();
        this.executors = Lists.newArrayList();
    }

    @Override // org.apache.jackrabbit.oak.jcr.DocumentClusterIT
    public void after() throws Exception {
        super.after();
        Iterator<CustomScheduledExecutor> it = this.executors.iterator();
        while (it.hasNext()) {
            new ExecutorCloser(it.next(), 10, TimeUnit.SECONDS).close();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void increments() throws Exception {
        setUpCluster(getClass(), this.mks, this.repos, Integer.MIN_VALUE);
        Assert.assertEquals("repositories and executors should match", this.repos.size(), this.executors.size());
        final Random random = new Random(14L);
        final AtomicLong atomicLong = new AtomicLong(0L);
        final Map synchronizedMap = Collections.synchronizedMap(new HashMap());
        Session login = this.repos.get(0).login(ADMIN);
        try {
            Node addNode = login.getRootNode().addNode("counter");
            addNode.addMixin("mix:atomicCounter");
            login.save();
            final String path = addNode.getPath();
            login.logout();
            alignCluster(this.mks);
            Assert.assertFalse("Path to the counter node should be set", Strings.isNullOrEmpty(path));
            Iterator<Repository> it = this.repos.iterator();
            while (it.hasNext()) {
                try {
                    login = it.next().login(ADMIN);
                    Node node = login.getNode(path);
                    Assert.assertEquals("Nothing should have touched the `expected`", 0L, atomicLong.get());
                    Assert.assertEquals("Wrong initial counter", atomicLong.get(), node.getProperty("oak:counter").getLong());
                    login.logout();
                } catch (Throwable th) {
                    throw th;
                }
            }
            int intValue = Integer.getInteger("oak.test.it.atomiccounter.threads", 100).intValue();
            LOG.debug("pushing {} increments per each of the {} cluster nodes for a total of {} concurrent updates", new Object[]{Integer.valueOf(intValue), Integer.valueOf(this.repos.size()), Integer.valueOf(intValue * this.repos.size())});
            long start = LOG_PERF.start("Firing the threads");
            ArrayList newArrayList = Lists.newArrayList();
            for (final Repository repository : this.repos) {
                for (int i = 0; i < intValue; i++) {
                    ListenableFutureTask create = ListenableFutureTask.create(new Callable<Void>() { // from class: org.apache.jackrabbit.oak.jcr.AtomicCounterClusterIT.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            Session login2 = repository.login(DocumentClusterIT.ADMIN);
                            try {
                                try {
                                    Node node2 = login2.getNode(path);
                                    int nextInt = random.nextInt(10) + 1;
                                    node2.setProperty("oak:increment", nextInt);
                                    atomicLong.addAndGet(nextInt);
                                    login2.save();
                                    login2.logout();
                                    return null;
                                } catch (Throwable th2) {
                                    login2.logout();
                                    throw th2;
                                }
                            } catch (Exception e) {
                                synchronizedMap.put(Thread.currentThread().getName(), e);
                                return null;
                            }
                        }
                    });
                    new Thread((Runnable) create).start();
                    newArrayList.add(create);
                }
            }
            LOG_PERF.end(start, -1L, "Firing threads completed", "");
            Futures.allAsList(newArrayList).get();
            LOG_PERF.end(start, -1L, "Futures completed", "");
            waitForTaskCompletion();
            LOG_PERF.end(start, -1L, "All tasks completed", "");
            Thread.sleep(5000L);
            raiseExceptions(synchronizedMap, LOG);
            for (int i2 = 0; i2 < this.repos.size(); i2++) {
                try {
                    login = this.repos.get(i2).login(ADMIN);
                    Node node2 = login.getNode(path);
                    LOG.debug("Cluster node: {}, actual counter: {}, expected counter: {}", new Object[]{Integer.valueOf(i2 + 1), Long.valueOf(atomicLong.get()), Long.valueOf(node2.getProperty("oak:counter").getLong())});
                    Assert.assertEquals("Wrong counter on node " + (i2 + 1), atomicLong.get(), node2.getProperty("oak:counter").getLong());
                    login.logout();
                } finally {
                    login.logout();
                }
            }
        } finally {
            login.logout();
        }
    }

    private void waitForTaskCompletion() throws InterruptedException {
        int i;
        do {
            i = 0;
            Iterator<CustomScheduledExecutor> it = this.executors.iterator();
            while (it.hasNext()) {
                i += it.next().getTotal();
            }
            if (i > 0) {
                LOG.debug("there are approximately {} tasks left to complete. Sleeping 1 sec", Integer.valueOf(i));
                Thread.sleep(1000L);
            }
        } while (i > 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.jackrabbit.oak.jcr.DocumentClusterIT
    public Jcr getJcr(NodeStore nodeStore) {
        CustomScheduledExecutor customScheduledExecutor = new CustomScheduledExecutor(10);
        this.executors.add(customScheduledExecutor);
        return super.getJcr(nodeStore).with(customScheduledExecutor).withAtomicCounter();
    }
}
