package org.apache.jackrabbit.oak.spi.commit;

import com.google.common.collect.Lists;
import java.io.Closeable;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
import org.apache.jackrabbit.oak.plugins.observation.Filter;
import org.apache.jackrabbit.oak.plugins.observation.FilteringAwareObserver;
import org.apache.jackrabbit.oak.plugins.observation.FilteringObserver;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest.class */
public class BackgroundObserverTest {
    private static final CommitInfo COMMIT_INFO = new CommitInfo("no-session", (String) null);
    public static final int CHANGE_COUNT = 1024;
    private CountDownLatch doneCounter;
    private final List<Runnable> assertions = Lists.newArrayList();
    private final List<Closeable> closeables = Lists.newArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest$MyFilter.class */
    public class MyFilter implements Filter {
        private boolean excludeNext;

        MyFilter() {
        }

        void excludeNext(boolean z) {
            this.excludeNext = z;
        }

        public boolean excludes(NodeState nodeState, CommitInfo commitInfo) {
            boolean z = this.excludeNext;
            this.excludeNext = false;
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest$NodeStateGenerator.class */
    public class NodeStateGenerator {
        Random r = new Random(1232131);
        NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder();

        NodeStateGenerator() {
        }

        NodeState next() {
            this.builder.setProperty("p", Integer.valueOf(this.r.nextInt()));
            NodeState nodeState = this.builder.getNodeState();
            this.builder = nodeState.builder();
            return nodeState;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest$Pair.class */
    public class Pair {
        private final NodeState before;
        private final NodeState after;

        Pair(NodeState nodeState, NodeState nodeState2) {
            this.before = nodeState;
            this.after = nodeState2;
        }

        public String toString() {
            return "Pair(before=" + this.before + ", after=" + this.after + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/spi/commit/BackgroundObserverTest$Recorder.class */
    public class Recorder implements FilteringAwareObserver {
        List<Pair> includedChanges = Collections.synchronizedList(new LinkedList());
        private boolean pause;
        private boolean pausing;

        public Recorder() {
        }

        public void contentChanged(NodeState nodeState, NodeState nodeState2, CommitInfo commitInfo) {
            this.includedChanges.add(new Pair(nodeState, nodeState2));
            maybePause();
        }

        public void maybePause() {
            synchronized (this) {
                while (this.pause) {
                    try {
                        this.pausing = true;
                        notifyAll();
                        try {
                            wait();
                        } catch (InterruptedException e) {
                        }
                    } catch (Throwable th) {
                        this.pausing = false;
                        notifyAll();
                        throw th;
                    }
                }
                this.pausing = false;
                notifyAll();
            }
        }

        public synchronized void pause() {
            this.pause = true;
        }

        public synchronized void unpause() {
            this.pause = false;
            notifyAll();
        }

        public boolean waitForPausing(int i, TimeUnit timeUnit) throws InterruptedException {
            boolean z;
            long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(i);
            synchronized (this) {
                while (!this.pausing && currentTimeMillis > System.currentTimeMillis()) {
                    wait(100L);
                }
                z = this.pausing;
            }
            return z;
        }

        public boolean waitForUnpausing(int i, TimeUnit timeUnit) throws InterruptedException {
            boolean z;
            long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(i);
            synchronized (this) {
                while (this.pausing && currentTimeMillis > System.currentTimeMillis()) {
                    wait(100L);
                }
                z = !this.pausing;
            }
            return z;
        }
    }

    @Test
    public void concurrentObservers() throws InterruptedException {
        CompositeObserver createCompositeObserver = createCompositeObserver(Executors.newFixedThreadPool(16), 128);
        for (int i = 0; i < 1024; i++) {
            contentChanged(createCompositeObserver, i);
        }
        done((Observer) createCompositeObserver);
        Assert.assertTrue(this.doneCounter.await(5L, TimeUnit.SECONDS));
        Iterator<Runnable> it = this.assertions.iterator();
        while (it.hasNext()) {
            it.next().run();
        }
    }

    private static void contentChanged(Observer observer, long j) {
        observer.contentChanged(EmptyNodeState.EMPTY_NODE.builder().setProperty("p", Long.valueOf(j)).getNodeState(), COMMIT_INFO);
    }

    private static void done(Observer observer) {
        observer.contentChanged(EmptyNodeState.EMPTY_NODE.builder().setProperty("done", true).getNodeState(), COMMIT_INFO);
    }

    private CompositeObserver createCompositeObserver(ExecutorService executorService, int i) {
        CompositeObserver compositeObserver = new CompositeObserver();
        for (int i2 = 0; i2 < i; i2++) {
            compositeObserver.addObserver(createBackgroundObserver(executorService));
        }
        this.doneCounter = new CountDownLatch(i);
        return compositeObserver;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void done(List<Runnable> list) {
        this.assertions.addAll(list);
        this.doneCounter.countDown();
    }

    private Observer createBackgroundObserver(ExecutorService executorService) {
        return new BackgroundObserver(new Observer() { // from class: org.apache.jackrabbit.oak.spi.commit.BackgroundObserverTest.1
            final List<Runnable> assertions = Collections.synchronizedList(Lists.newArrayList());
            volatile NodeState previous;

            public void contentChanged(@NotNull final NodeState nodeState, @NotNull CommitInfo commitInfo) {
                if (nodeState.hasProperty("done")) {
                    BackgroundObserverTest.this.done(this.assertions);
                } else if (this.previous != null) {
                    final NodeState nodeState2 = this.previous;
                    this.assertions.add(new Runnable() { // from class: org.apache.jackrabbit.oak.spi.commit.BackgroundObserverTest.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            Assert.assertEquals(getP(nodeState2).longValue() + 1, getP(nodeState).longValue());
                        }
                    });
                }
                this.previous = nodeState;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Long getP(NodeState nodeState) {
                return (Long) nodeState.getProperty("p").getValue(Type.LONG);
            }
        }, executorService, 1025);
    }

    private void assertMatches(String str, List<Pair> list, List<Pair> list2) {
        Assert.assertEquals("size mismatch. msg=" + str, list.size(), list2.size());
        for (int i = 0; i < list.size(); i++) {
            Assert.assertSame("mismatch of before at pos=" + i + ", msg=" + str, list.get(i).before, list2.get(i).before);
            Assert.assertSame("mismatch of after at pos=" + i + ", msg=" + str, list.get(i).after, list2.get(i).after);
        }
    }

    @After
    public void shutDown() throws Exception {
        Iterator<Closeable> it = this.closeables.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                throw new AssertionFailedError(e.getMessage());
            }
        }
    }

    @Test
    public void testExcludedAllCommits() throws Exception {
        MyFilter myFilter = new MyFilter();
        Recorder recorder = new Recorder();
        Closeable filteringObserver = new FilteringObserver(Executors.newSingleThreadExecutor(), 5, myFilter, recorder);
        this.closeables.add(filteringObserver);
        LinkedList linkedList = new LinkedList();
        NodeStateGenerator nodeStateGenerator = new NodeStateGenerator();
        filteringObserver.contentChanged(nodeStateGenerator.next(), CommitInfo.EMPTY);
        for (int i = 0; i < 100000; i++) {
            myFilter.excludeNext(true);
            filteringObserver.contentChanged(nodeStateGenerator.next(), CommitInfo.EMPTY);
        }
        Assert.assertTrue("testExcludedAllCommits", filteringObserver.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
        assertMatches("testExcludedAllCommits", linkedList, recorder.includedChanges);
    }

    @Test
    public void testNoExcludedCommits() throws Exception {
        MyFilter myFilter = new MyFilter();
        Recorder recorder = new Recorder();
        Closeable filteringObserver = new FilteringObserver(Executors.newSingleThreadExecutor(), 10002, myFilter, recorder);
        this.closeables.add(filteringObserver);
        LinkedList linkedList = new LinkedList();
        NodeStateGenerator nodeStateGenerator = new NodeStateGenerator();
        NodeState next = nodeStateGenerator.next();
        filteringObserver.contentChanged(next, CommitInfo.EMPTY);
        NodeState nodeState = next;
        for (int i = 0; i < 10000; i++) {
            myFilter.excludeNext(false);
            NodeState next2 = nodeStateGenerator.next();
            linkedList.add(new Pair(nodeState, next2));
            nodeState = next2;
            filteringObserver.contentChanged(next2, CommitInfo.EMPTY);
        }
        Assert.assertTrue("testNoExcludedCommits", filteringObserver.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
        assertMatches("testNoExcludedCommits", linkedList, recorder.includedChanges);
    }

    @Test
    public void testExcludeCommitsWithFullQueue() throws Exception {
        MyFilter myFilter = new MyFilter();
        Recorder recorder = new Recorder();
        Closeable filteringObserver = new FilteringObserver(Executors.newSingleThreadExecutor(), 2, myFilter, recorder);
        this.closeables.add(filteringObserver);
        LinkedList linkedList = new LinkedList();
        NodeStateGenerator nodeStateGenerator = new NodeStateGenerator();
        recorder.pause();
        NodeState next = nodeStateGenerator.next();
        filteringObserver.contentChanged(next, CommitInfo.EMPTY);
        NodeState next2 = nodeStateGenerator.next();
        linkedList.add(new Pair(next, next2));
        filteringObserver.contentChanged(next2, CommitInfo.EMPTY);
        Assert.assertTrue("observer did not get called (yet?)", recorder.waitForPausing(5, TimeUnit.SECONDS));
        NodeState next3 = nodeStateGenerator.next();
        linkedList.add(new Pair(next2, next3));
        filteringObserver.contentChanged(next3, CommitInfo.EMPTY);
        filteringObserver.contentChanged(nodeStateGenerator.next(), CommitInfo.EMPTY);
        filteringObserver.contentChanged(nodeStateGenerator.next(), CommitInfo.EMPTY);
        myFilter.excludeNext(true);
        filteringObserver.contentChanged(nodeStateGenerator.next(), CommitInfo.EMPTY);
        myFilter.excludeNext(false);
        NodeState next4 = nodeStateGenerator.next();
        filteringObserver.contentChanged(next4, CommitInfo.EMPTY);
        linkedList.add(new Pair(next3, next4));
        recorder.unpause();
        recorder.waitForUnpausing(5, TimeUnit.SECONDS);
        Thread.sleep(1000L);
        myFilter.excludeNext(true);
        NodeState next5 = nodeStateGenerator.next();
        filteringObserver.contentChanged(next5, CommitInfo.EMPTY);
        myFilter.excludeNext(false);
        NodeState next6 = nodeStateGenerator.next();
        linkedList.add(new Pair(next5, next6));
        filteringObserver.contentChanged(next6, CommitInfo.EMPTY);
        Assert.assertTrue("testExcludeCommitsWithFullQueue", filteringObserver.getBackgroundObserver().waitUntilStopped(10, TimeUnit.SECONDS));
        assertMatches("testExcludeCommitsWithFullQueue", linkedList, recorder.includedChanges);
    }

    @Test
    public void testExcludeSomeCommits() throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            doTestExcludeSomeCommits(i, newSingleThreadExecutor);
        }
        for (int i2 = 100; i2 < 10000; i2 += 50) {
            doTestExcludeSomeCommits(i2, newSingleThreadExecutor);
        }
        newSingleThreadExecutor.shutdownNow();
    }

    private void doTestExcludeSomeCommits(int i, Executor executor) throws Exception {
        MyFilter myFilter = new MyFilter();
        Recorder recorder = new Recorder();
        Closeable filteringObserver = new FilteringObserver(executor, i + 2, myFilter, recorder);
        this.closeables.add(filteringObserver);
        LinkedList linkedList = new LinkedList();
        Random random = new Random(2343242L);
        NodeStateGenerator nodeStateGenerator = new NodeStateGenerator();
        NodeState next = nodeStateGenerator.next();
        filteringObserver.contentChanged(next, CommitInfo.EMPTY);
        NodeState nodeState = next;
        for (int i2 = 0; i2 < i; i2++) {
            boolean z = random.nextInt(100) < 90;
            myFilter.excludeNext(z);
            NodeState next2 = nodeStateGenerator.next();
            if (!z) {
                linkedList.add(new Pair(nodeState, next2));
            }
            nodeState = next2;
            filteringObserver.contentChanged(next2, CommitInfo.EMPTY);
        }
        Assert.assertTrue("cnt=" + i, filteringObserver.getBackgroundObserver().waitUntilStopped(5, TimeUnit.SECONDS));
        assertMatches("cnt=" + i, linkedList, recorder.includedChanges);
    }
}
