/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.TransientException;
import org.neo4j.function.Predicates;
import org.neo4j.graphdb.DependencyResolver;
import org.neo4j.graphdb.Result;
import org.neo4j.harness.junit.EnterpriseNeo4jRule;
import org.neo4j.harness.junit.Neo4jRule;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.IOUtils;
import org.neo4j.kernel.impl.api.KernelTransactions;
import org.neo4j.kernel.impl.enterprise.configuration.OnlineBackupSettings;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.test.ThreadTestUtils;
import org.neo4j.test.rule.VerboseTimeout;

public class SessionResetIT {
    private static final String SHORT_QUERY_1 = "CREATE (n:Node {name: 'foo', occupation: 'bar'})";
    private static final String SHORT_QUERY_2 = "MATCH (n:Node {name: 'foo'}) RETURN count(n)";
    private static final String LONG_QUERY = "UNWIND range(0, 10000000) AS i CREATE (n:Node {idx: i}) DELETE n";
    private static final String LONG_PERIODIC_COMMIT_QUERY = "USING PERIODIC COMMIT 1 LOAD CSV FROM '" + SessionResetIT.createTmpCsvFile() + "' AS l UNWIND range(0, 10) AS i CREATE (n:Node {name: l[0], occupation: l[1], idx: i}) DELETE n";
    private static final int STRESS_IT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() * 2;
    private static final long STRESS_IT_DURATION_MS = TimeUnit.SECONDS.toMillis(5L);
    private static final String[] STRESS_IT_QUERIES = new String[]{"CREATE (n:Node {name: 'foo', occupation: 'bar'})", "MATCH (n:Node {name: 'foo'}) RETURN count(n)", "UNWIND range(0, 10000000) AS i CREATE (n:Node {idx: i}) DELETE n"};
    private final VerboseTimeout timeout = VerboseTimeout.builder().withTimeout(3L, TimeUnit.MINUTES).build();
    private final Neo4jRule db = new EnterpriseNeo4jRule().withConfig(OnlineBackupSettings.online_backup_enabled, "false");
    @Rule
    public final RuleChain ruleChain = RuleChain.outerRule((TestRule)this.timeout).around((TestRule)this.db);
    private Driver driver;

    @Before
    public void setUp() {
        this.driver = GraphDatabase.driver((URI)this.db.boltURI(), (Config)Config.build().withEncryptionLevel(Config.EncryptionLevel.NONE).toConfig());
    }

    @After
    public void tearDown() {
        IOUtils.closeAllSilently((AutoCloseable[])new Driver[]{this.driver});
    }

    @Test
    public void shouldTerminateAutoCommitQuery() throws Exception {
        this.testQueryTermination(LONG_QUERY, true);
    }

    @Test
    public void shouldTerminateQueryInExplicitTransaction() throws Exception {
        this.testQueryTermination(LONG_QUERY, false);
    }

    @Test
    public void shouldNotTerminatePeriodicCommitQueries() throws Exception {
        Future<Void> queryResult = this.runQueryInDifferentThreadAndResetSession(LONG_PERIODIC_COMMIT_QUERY, true);
        try {
            Assert.assertNull((Object)queryResult.get(1L, TimeUnit.MINUTES));
        }
        catch (TimeoutException e) {
            System.err.println("Unable to get query result, dumping all stacktraces:");
            ThreadTestUtils.dumpAllStackTraces();
            throw e;
        }
        this.assertDatabaseIsIdle();
        Assert.assertEquals((long)0L, (long)this.countNodes());
    }

    @Test
    public void shouldTerminateAutoCommitQueriesRandomly() throws Exception {
        this.testRandomQueryTermination(true);
    }

    @Test
    public void shouldTerminateQueriesInExplicitTransactionsRandomly() throws Exception {
        this.testRandomQueryTermination(false);
    }

    private void testRandomQueryTermination(boolean autoCommit) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(STRESS_IT_THREAD_COUNT, (ThreadFactory)NamedThreadFactory.daemon((String)"test-worker"));
        Set<Session> runningSessions = Collections.newSetFromMap(new ConcurrentHashMap());
        AtomicBoolean stop = new AtomicBoolean();
        ArrayList futures = new ArrayList();
        for (int i = 0; i < STRESS_IT_THREAD_COUNT; ++i) {
            futures.add(executor.submit(() -> {
                ThreadLocalRandom random = ThreadLocalRandom.current();
                while (!stop.get()) {
                    this.runRandomQuery(autoCommit, random, runningSessions, stop);
                }
            }));
        }
        long deadline = System.currentTimeMillis() + STRESS_IT_DURATION_MS;
        while (!stop.get()) {
            if (System.currentTimeMillis() > deadline) {
                stop.set(true);
            }
            SessionResetIT.resetAny(runningSessions);
            TimeUnit.MILLISECONDS.sleep(30L);
        }
        this.driver.close();
        SessionResetIT.awaitAll(futures);
        this.assertDatabaseIsIdle();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runRandomQuery(boolean autoCommit, Random random, Set<Session> runningSessions, AtomicBoolean stop) {
        block5: {
            try {
                Session session = this.driver.session();
                runningSessions.add(session);
                try {
                    String query = STRESS_IT_QUERIES[random.nextInt(STRESS_IT_QUERIES.length - 1)];
                    SessionResetIT.runQuery(session, query, autoCommit);
                }
                finally {
                    runningSessions.remove(session);
                    session.close();
                }
            }
            catch (Throwable error) {
                if (stop.get() || SessionResetIT.isAcceptable(error)) break block5;
                stop.set(true);
                throw error;
            }
        }
    }

    private void testQueryTermination(String query, boolean autoCommit) throws Exception {
        Future<Void> queryResult = this.runQueryInDifferentThreadAndResetSession(query, autoCommit);
        try {
            queryResult.get(10L, TimeUnit.SECONDS);
            Assert.fail((String)"Exception expected");
        }
        catch (Exception e) {
            Assert.assertThat((Object)e, (Matcher)Matchers.instanceOf(ExecutionException.class));
            Assert.assertTrue((boolean)SessionResetIT.isTransactionTerminatedException(e.getCause()));
        }
        this.assertDatabaseIsIdle();
    }

    private Future<Void> runQueryInDifferentThreadAndResetSession(String query, boolean autoCommit) throws Exception {
        AtomicReference sessionRef = new AtomicReference();
        CompletableFuture<Void> queryResult = CompletableFuture.runAsync(() -> {
            try (Session session = this.driver.session();){
                sessionRef.set(session);
                SessionResetIT.runQuery(session, query, autoCommit);
            }
        });
        Predicates.await(() -> this.activeQueriesCount() == 1L, (long)10L, (TimeUnit)TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(1L);
        Session session = (Session)sessionRef.get();
        Assert.assertNotNull((Object)session);
        session.reset();
        return queryResult;
    }

    private static void runQuery(Session session, String query, boolean autoCommit) {
        if (autoCommit) {
            session.run(query).consume();
        } else {
            try (Transaction tx = session.beginTransaction();){
                tx.run(query);
                tx.success();
            }
        }
    }

    private void assertDatabaseIsIdle() throws InterruptedException {
        org.neo4j.test.assertion.Assert.assertEventually((String)"Wrong number of active queries", this::activeQueriesCount, (Matcher)Matchers.is((Object)0L), (long)10L, (TimeUnit)TimeUnit.SECONDS);
        org.neo4j.test.assertion.Assert.assertEventually((String)"Wrong number of active transactions", this::activeTransactionsCount, (Matcher)Matchers.is((Object)0L), (long)10L, (TimeUnit)TimeUnit.SECONDS);
    }

    private long activeQueriesCount() {
        try (Result result = this.db().execute("CALL dbms.listQueries() YIELD queryId RETURN count(queryId) AS result");){
            long l = (Long)((Map)Iterators.single((Iterator)result)).get("result") - 1L;
            return l;
        }
    }

    private long activeTransactionsCount() {
        DependencyResolver resolver = this.db().getDependencyResolver();
        KernelTransactions kernelTransactions = (KernelTransactions)resolver.resolveDependency(KernelTransactions.class);
        return kernelTransactions.activeTransactions().size();
    }

    private long countNodes() {
        try (Result result = this.db().execute("MATCH (n) RETURN count(n) AS result");){
            long l = (Long)((Map)Iterators.single((Iterator)result)).get("result");
            return l;
        }
    }

    private GraphDatabaseAPI db() {
        return (GraphDatabaseAPI)this.db.getGraphDatabaseService();
    }

    private static void resetAny(Set<Session> sessions) {
        sessions.stream().findAny().ifPresent(session -> {
            if (sessions.remove(session)) {
                SessionResetIT.resetSafely(session);
            }
        });
    }

    private static void resetSafely(Session session) {
        block3: {
            try {
                if (session.isOpen()) {
                    session.reset();
                }
            }
            catch (ClientException e) {
                if (!session.isOpen()) break block3;
                throw e;
            }
        }
    }

    private static boolean isAcceptable(Throwable error) {
        Throwable cause = Exceptions.rootCause((Throwable)error);
        return SessionResetIT.isTransactionTerminatedException(cause) || cause instanceof ServiceUnavailableException || cause instanceof ClientException || cause instanceof ClosedChannelException;
    }

    private static boolean isTransactionTerminatedException(Throwable error) {
        return error instanceof TransientException && error.getMessage().startsWith("The transaction has been terminated");
    }

    private static URI createTmpCsvFile() {
        try {
            Path csvFile = Files.createTempFile("test", ".csv", new FileAttribute[0]);
            List lines = IntStream.range(0, 50000).mapToObj(i -> "Foo-" + i + ", Bar-" + i).collect(Collectors.toList());
            return Files.write(csvFile, lines, new OpenOption[0]).toAbsolutePath().toUri();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static void awaitAll(List<Future<?>> futures) throws Exception {
        for (Future<?> future : futures) {
            Assert.assertNull(future.get(1L, TimeUnit.MINUTES));
        }
    }
}

