package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import com.google.common.reflect.ClassPath;
import io.airlift.concurrent.MoreFutures;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.file.TestingFileHiveMetastore;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingNames;
import io.trino.testing.TestingSession;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:io/trino/plugin/deltalake/TestDeltaLakeLocalConcurrentWritesTest.class */
public class TestDeltaLakeLocalConcurrentWritesTest extends AbstractTestQueryFramework {
    protected static final String SCHEMA = "test_delta_concurrent_writes_" + TestingNames.randomNameSuffix();
    private Path dataDirectory;
    private HiveMetastore metastore;

    protected QueryRunner createQueryRunner() throws Exception {
        DistributedQueryRunner build = DistributedQueryRunner.builder(TestingSession.testSessionBuilder().setCatalog(DeltaLakeQueryRunner.DELTA_CATALOG).setSchema(SCHEMA).build()).build();
        this.dataDirectory = build.getCoordinator().getBaseDataDir().resolve("delta_lake_data");
        this.metastore = TestingFileHiveMetastore.createTestingFileHiveMetastore(this.dataDirectory.toFile());
        build.installPlugin(new TestingDeltaLakePlugin(this.dataDirectory, Optional.of(new TestingDeltaLakeMetastoreModule(this.metastore))));
        build.createCatalog(DeltaLakeQueryRunner.DELTA_CATALOG, "delta_lake", ImmutableMap.builder().put("delta.unique-table-location", "true").put("delta.register-table-procedure.enabled", "true").buildOrThrow());
        build.execute("CREATE SCHEMA " + SCHEMA);
        return build;
    }

    @AfterAll
    public void tearDown() throws IOException {
        if (this.metastore != null) {
            this.metastore.dropDatabase(SCHEMA, false);
            MoreFiles.deleteRecursively(this.dataDirectory, new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE});
        }
    }

    @Test
    public void testConcurrentInsertsReconciliationForBlindInserts() throws Exception {
        testConcurrentInsertsReconciliationForBlindInserts(false);
        testConcurrentInsertsReconciliationForBlindInserts(true);
    }

    private void testConcurrentInsertsReconciliationForBlindInserts(boolean z) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        String str = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a INT, part INT) " + (z ? " WITH (partitioned_by = ARRAY['part'])" : ""));
        try {
            newFixedThreadPool.invokeAll(ImmutableList.builder().add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (1, 10)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (11, 20)");
                return null;
            }).add(() -> {
                cyclicBarrier.await(10L, TimeUnit.SECONDS);
                getQueryRunner().execute("INSERT INTO " + str + " VALUES (21, 30)");
                return null;
            }).build()).forEach(MoreFutures::getDone);
            ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (1, 10), (11, 20), (21, 30)");
            assertQuery("SELECT version, operation, isolation_level, read_version FROM \"" + str + "$history\"", "VALUES\n    (0, 'CREATE TABLE', 'WriteSerializable', 0),\n    (1, 'WRITE', 'WriteSerializable', 0),\n    (2, 'WRITE', 'WriteSerializable', 1),\n    (3, 'WRITE', 'WriteSerializable', 2)\n");
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            org.junit.jupiter.api.Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            org.junit.jupiter.api.Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
            throw th;
        }
    }

    @Test
    public void testConcurrentSerializableBlindInsertsReconciliationFailure() throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        String str = "test_concurrent_serializable_blind_inserts_table_reconciliation" + TestingNames.randomNameSuffix();
        registerTableFromResources(str, "deltalake/serializable_partitioned_table", getQueryRunner());
        ((QueryAssertions.QueryAssert) Assertions.assertThat(query("SELECT * FROM " + str))).matches("VALUES (0, 10), (33, 40)");
        try {
            long count = ((List) IntStream.range(0, 5).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("INSERT INTO " + str + " VALUES (1, 10)");
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(future -> {
                return (Boolean) MoreFutures.tryGetFutureValue(future, 20, TimeUnit.SECONDS).orElseThrow(() -> {
                    return new RuntimeException("Wait timed out");
                });
            }).filter(bool -> {
                return bool.booleanValue();
            }).count();
            Assertions.assertThat(count).isGreaterThanOrEqualTo(1L);
            Assertions.assertThat(count).isLessThan(5);
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            org.junit.jupiter.api.Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            org.junit.jupiter.api.Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
            throw th;
        }
    }

    @Test
    public void testConcurrentInsertsReconciliationFailure() throws Exception {
        testConcurrentInsertsReconciliationFailure(false);
        testConcurrentInsertsReconciliationFailure(true);
    }

    private void testConcurrentInsertsReconciliationFailure(boolean z) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        String str = "test_concurrent_inserts_table_" + TestingNames.randomNameSuffix();
        assertUpdate("CREATE TABLE " + str + " (a, part)" + (z ? " WITH (partitioned_by = ARRAY['part'])" : "") + " AS VALUES (1, 10)", 1L);
        try {
            long count = ((List) IntStream.range(0, 5).mapToObj(i -> {
                return newFixedThreadPool.submit(() -> {
                    cyclicBarrier.await(10L, TimeUnit.SECONDS);
                    try {
                        getQueryRunner().execute("INSERT INTO " + str + " SELECT * FROM " + str + " WHERE part = 10");
                        return true;
                    } catch (Exception e) {
                        try {
                            Assertions.assertThat(io.trino.testing.QueryAssertions.getTrinoExceptionCause(e)).hasMessage("Failed to write Delta Lake transaction log entry");
                            return false;
                        } catch (Throwable th) {
                            if (th != e) {
                                th.addSuppressed(e);
                            }
                            throw th;
                        }
                    }
                });
            }).collect(ImmutableList.toImmutableList())).stream().map(future -> {
                return (Boolean) MoreFutures.tryGetFutureValue(future, 20, TimeUnit.SECONDS).orElseThrow(() -> {
                    return new RuntimeException("Wait timed out");
                });
            }).filter(bool -> {
                return bool.booleanValue();
            }).count();
            Assertions.assertThat(count).isGreaterThanOrEqualTo(1L);
            Assertions.assertThat(count).isLessThan(5);
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            org.junit.jupiter.api.Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
        } catch (Throwable th) {
            assertUpdate("DROP TABLE " + str);
            newFixedThreadPool.shutdownNow();
            org.junit.jupiter.api.Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
            throw th;
        }
    }

    protected void registerTableFromResources(String str, String str2, QueryRunner queryRunner) throws IOException {
        TrinoFileSystem create = ((TrinoFileSystemFactory) TestingDeltaLakeUtils.getConnectorService(queryRunner, TrinoFileSystemFactory.class)).create(ConnectorIdentity.ofUser("test"));
        String str3 = "local:///" + SCHEMA + "/" + str;
        create.createDirectory(Location.of(str3));
        try {
            for (ClassPath.ResourceInfo resourceInfo : (List) ClassPath.from(getClass().getClassLoader()).getResources().stream().filter(resourceInfo2 -> {
                return resourceInfo2.getResourceName().startsWith(str2 + "/");
            }).collect(ImmutableList.toImmutableList())) {
                create.newOutputFile(Location.of(resourceInfo.getResourceName().replaceFirst("^" + Pattern.quote(str2), Matcher.quoteReplacement(str3)))).createOrOverwrite(resourceInfo.asByteSource().read());
            }
            queryRunner.execute(String.format("CALL system.register_table('%s', '%s', '%s')", SCHEMA, str, str3));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
