package org.apache.asterix.test.dataflow;

import java.io.File;
import java.lang.reflect.Field;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.asterix.app.bootstrap.TestNodeController;
import org.apache.asterix.app.data.gen.RecordTupleGenerator;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.external.util.DataflowUtils;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.test.common.TestHelper;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
import org.apache.hyracks.storage.am.lsm.common.impls.AsynchronousScheduler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.class */
public class LSMFlushRecoveryTest {
    private static TestNodeController nc;
    private static Dataset dataset;
    private static TestNodeController.PrimaryIndexInfo[] primaryIndexInfos;
    private static TestNodeController.SecondaryIndexInfo[] secondaryIndexInfo;
    private static TestLsmBtree[] primaryIndexes;
    private static TestLsmBtree[] secondaryIndexes;
    private static Index secondaryIndexEntity;
    private static NCAppRuntimeContext ncAppCtx;
    private static IDatasetLifecycleManager dsLifecycleMgr;
    private static IHyracksTaskContext[] testCtxs;
    private static IIndexDataflowHelper[] primaryIndexDataflowHelpers;
    private static IIndexDataflowHelper[] secondaryIndexDataflowHelpers;
    private static ITransactionContext txnCtx;
    private static LSMInsertDeleteOperatorNodePushable[] insertOps;
    private static RecordTupleGenerator tupleGenerator;
    private static final int NUM_PARTITIONS = 2;
    private static final String SECONDARY_INDEX_NAME = "TestIdx";
    public static final Logger LOGGER = LogManager.getLogger();
    private static final DatasetConfig.IndexType SECONDARY_INDEX_TYPE = DatasetConfig.IndexType.BTREE;
    private static final int PARTITION_1 = 1;
    private static final List<List<String>> SECONDARY_INDEX_FIELD_NAMES = Arrays.asList(Arrays.asList(StorageTestUtils.RECORD_TYPE.getFieldNames()[PARTITION_1]));
    private static final int PARTITION_0 = 0;
    private static final List<Integer> SECONDARY_INDEX_FIELD_INDICATORS = Arrays.asList(Integer.valueOf(PARTITION_0));
    private static final List<IAType> SECONDARY_INDEX_FIELD_TYPES = Arrays.asList(BuiltinType.AINT64);

    @BeforeClass
    public static void setUp() throws Exception {
        System.out.println("SetUp: ");
        TestHelper.deleteExistingInstanceFiles();
        nc = new TestNodeController(System.getProperty("user.dir") + File.separator + "src" + File.separator + "test" + File.separator + "resources" + File.separator + "cc.conf", false);
    }

    @Before
    public void initializeTest() throws Exception {
        initializeNc(true);
        initializeTestCtx();
        createIndex();
        readIndex();
        createInsertOps();
        tupleGenerator = StorageTestUtils.getTupleGenerator();
    }

    @After
    public void testRecovery() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Pair.of(StorageProperties.Option.STORAGE_MEMORYCOMPONENT_GLOBALBUDGET, 20971520L));
        arrayList.add(Pair.of(StorageProperties.Option.STORAGE_MEMORYCOMPONENT_PAGESIZE, 1024));
        arrayList.add(Pair.of(StorageProperties.Option.STORAGE_MAX_ACTIVE_WRITABLE_DATASETS, 1024));
        nc.setOpts(arrayList);
        initializeNc(false);
        initializeTestCtx();
        readIndex();
        DatasetInfo datasetInfo = dsLifecycleMgr.getDatasetInfo(dataset.getDatasetId());
        datasetInfo.waitForIO();
        checkComponentIds();
        createInsertOps();
        insertRecords(PARTITION_0, StorageTestUtils.RECORDS_PER_COMPONENT, StorageTestUtils.RECORDS_PER_COMPONENT, true);
        datasetInfo.waitForIO();
        checkComponentIds();
        dropIndex();
        nc.deInit(true);
        nc.clearOpts();
    }

    private void initializeNc(boolean z) throws Exception {
        nc.init(z);
        ncAppCtx = nc.getAppRuntimeContext();
        Field declaredField = ncAppCtx.getClass().getDeclaredField("lsmIOScheduler");
        declaredField.setAccessible(true);
        declaredField.set(ncAppCtx, new AsynchronousScheduler(ncAppCtx.getServiceContext().getThreadFactory(), new IIoOperationFailedCallback() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.1
            public void schedulerFailed(ILSMIOOperationScheduler iLSMIOOperationScheduler, Throwable th) {
                LSMFlushRecoveryTest.LOGGER.error("Scheduler Failed", th);
            }

            public void operationFailed(ILSMIOOperation iLSMIOOperation, Throwable th) {
                LSMFlushRecoveryTest.LOGGER.warn("IO Operation failed", th);
            }
        }));
        dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
    }

    private void createIndex() throws Exception {
        dataset = StorageTestUtils.DATASET;
        secondaryIndexEntity = new Index(dataset.getDataverseName(), dataset.getDatasetName(), SECONDARY_INDEX_NAME, SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, SECONDARY_INDEX_FIELD_INDICATORS, SECONDARY_INDEX_FIELD_TYPES, false, false, false, PARTITION_0);
        primaryIndexInfos = new TestNodeController.PrimaryIndexInfo[NUM_PARTITIONS];
        secondaryIndexInfo = new TestNodeController.SecondaryIndexInfo[NUM_PARTITIONS];
        for (int i = PARTITION_0; i < NUM_PARTITIONS; i += PARTITION_1) {
            primaryIndexInfos[i] = StorageTestUtils.createPrimaryIndex(nc, i);
            secondaryIndexInfo[i] = nc.createSecondaryIndex(primaryIndexInfos[i], secondaryIndexEntity, StorageTestUtils.STORAGE_MANAGER, i);
        }
    }

    private void initializeTestCtx() throws Exception {
        JobId newJobId = nc.newJobId();
        testCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
        for (int i = PARTITION_0; i < NUM_PARTITIONS; i += PARTITION_1) {
            testCtxs[i] = nc.createTestContext(newJobId, i, false);
        }
        txnCtx = nc.getTransactionManager().beginTransaction(nc.getTxnJobId(newJobId), new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
    }

    private void readIndex() throws HyracksDataException {
        primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
        primaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
        for (int i = PARTITION_0; i < NUM_PARTITIONS; i += PARTITION_1) {
            primaryIndexDataflowHelpers[i] = new IndexDataflowHelperFactory(nc.getStorageManager(), primaryIndexInfos[i].getFileSplitProvider()).create(testCtxs[i].getJobletContext().getServiceContext(), i);
            primaryIndexDataflowHelpers[i].open();
            primaryIndexes[i] = (TestLsmBtree) primaryIndexDataflowHelpers[i].getIndexInstance();
            primaryIndexDataflowHelpers[i].close();
        }
        secondaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
        secondaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
        for (int i2 = PARTITION_0; i2 < NUM_PARTITIONS; i2 += PARTITION_1) {
            secondaryIndexDataflowHelpers[i2] = new IndexDataflowHelperFactory(nc.getStorageManager(), secondaryIndexInfo[i2].getFileSplitProvider()).create(testCtxs[i2].getJobletContext().getServiceContext(), i2);
            secondaryIndexDataflowHelpers[i2].open();
            secondaryIndexes[i2] = (TestLsmBtree) secondaryIndexDataflowHelpers[i2].getIndexInstance();
            secondaryIndexDataflowHelpers[i2].close();
        }
    }

    private void createInsertOps() throws HyracksDataException, RemoteException, ACIDException, AlgebricksException {
        insertOps = new LSMInsertDeleteOperatorNodePushable[NUM_PARTITIONS];
        for (int i = PARTITION_0; i < NUM_PARTITIONS; i += PARTITION_1) {
            insertOps[i] = StorageTestUtils.getInsertPipeline(nc, testCtxs[i], secondaryIndexEntity);
        }
    }

    private void dropIndex() throws HyracksDataException {
        for (int i = PARTITION_0; i < NUM_PARTITIONS; i += PARTITION_1) {
            primaryIndexDataflowHelpers[i].destroy();
            secondaryIndexDataflowHelpers[i].destroy();
        }
    }

    @Test
    public void testBothFlushSucceed() throws Exception {
        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
        nc.deInit(false);
    }

    @Test
    public void testSecondaryFlushFails() throws Exception {
        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
        primaryIndexes[PARTITION_0].clearFlushCallbacks();
        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
        secondaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.2
            public void before(Semaphore semaphore) throws HyracksDataException {
                throw new HyracksDataException("Kill the flush thread");
            }

            public void after(Semaphore semaphore) throws HyracksDataException {
            }
        });
        final Semaphore semaphore = new Semaphore(PARTITION_0);
        primaryIndexes[PARTITION_0].addFlushCallback(AllowTestOpCallback.INSTANCE);
        primaryIndexes[PARTITION_0].addIoCompletedCallback(new ITestOpCallback<Void>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.3
            public void before(Void r2) throws HyracksDataException {
            }

            public void after(Void r3) throws HyracksDataException {
                semaphore.release();
            }
        });
        StorageTestUtils.flush(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
        semaphore.acquire();
        Assert.assertEquals(primaryIndexes[PARTITION_0].getDiskComponents().size(), secondaryIndexes[PARTITION_0].getDiskComponents().size() + PARTITION_1);
        nc.deInit(false);
    }

    @Test
    public void testPrimaryFlushFails() throws Exception {
        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
        primaryIndexes[PARTITION_0].clearFlushCallbacks();
        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
        primaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.4
            public void before(Semaphore semaphore) throws HyracksDataException {
                throw new HyracksDataException("Kill the flush thread");
            }

            public void after(Semaphore semaphore) throws HyracksDataException {
            }
        });
        final Semaphore semaphore = new Semaphore(PARTITION_0);
        secondaryIndexes[PARTITION_0].addFlushCallback(AllowTestOpCallback.INSTANCE);
        secondaryIndexes[PARTITION_0].addIoCompletedCallback(new ITestOpCallback<Void>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.5
            public void before(Void r2) throws HyracksDataException {
            }

            public void after(Void r3) throws HyracksDataException {
                semaphore.release();
            }
        });
        StorageTestUtils.flush(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
        semaphore.acquire();
        Assert.assertEquals(secondaryIndexes[PARTITION_0].getDiskComponents().size(), primaryIndexes[PARTITION_0].getDiskComponents().size() + PARTITION_1);
        nc.deInit(false);
    }

    @Test
    public void testMultiPartition() throws Exception {
        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, false);
        insertRecords(PARTITION_1, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
        StorageTestUtils.waitForOperations(primaryIndexes[PARTITION_0]);
        StorageTestUtils.waitForOperations(primaryIndexes[PARTITION_1]);
        Assert.assertTrue(primaryIndexes[PARTITION_0].getCurrentMemoryComponent().isModified());
        Assert.assertTrue(primaryIndexes[PARTITION_1].getCurrentMemoryComponent().isModified());
        primaryIndexes[PARTITION_0].clearFlushCallbacks();
        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
        primaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.6
            public void before(Semaphore semaphore) throws HyracksDataException {
                throw new HyracksDataException("Kill the flush thread");
            }

            public void after(Semaphore semaphore) throws HyracksDataException {
            }
        });
        final Semaphore semaphore = new Semaphore(PARTITION_0);
        secondaryIndexes[PARTITION_0].addFlushCallback(AllowTestOpCallback.INSTANCE);
        secondaryIndexes[PARTITION_0].addIoCompletedCallback(new ITestOpCallback<Void>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.7
            public void before(Void r2) throws HyracksDataException {
            }

            public void after(Void r3) throws HyracksDataException {
                semaphore.release();
            }
        });
        StorageTestUtils.flushPartition(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
        semaphore.acquire(PARTITION_1);
        Assert.assertEquals(secondaryIndexes[PARTITION_0].getDiskComponents().size(), primaryIndexes[PARTITION_0].getDiskComponents().size() + PARTITION_1);
        Assert.assertEquals(secondaryIndexes[PARTITION_1].getDiskComponents().size(), primaryIndexes[PARTITION_1].getDiskComponents().size());
        nc.deInit(false);
    }

    @Test
    public void testBothFlushFail() throws Exception {
        insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true);
        primaryIndexes[PARTITION_0].clearFlushCallbacks();
        secondaryIndexes[PARTITION_0].clearFlushCallbacks();
        final Semaphore semaphore = new Semaphore(PARTITION_0);
        final Semaphore semaphore2 = new Semaphore(PARTITION_0);
        primaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.8
            public void before(Semaphore semaphore3) throws HyracksDataException {
                semaphore.release();
                throw new HyracksDataException("Kill the flush thread");
            }

            public void after(Semaphore semaphore3) throws HyracksDataException {
            }
        });
        secondaryIndexes[PARTITION_0].addFlushCallback(new ITestOpCallback<Semaphore>() { // from class: org.apache.asterix.test.dataflow.LSMFlushRecoveryTest.9
            public void before(Semaphore semaphore3) throws HyracksDataException {
                semaphore2.release();
                throw new HyracksDataException("Kill the fluhs thread");
            }

            public void after(Semaphore semaphore3) throws HyracksDataException {
            }
        });
        StorageTestUtils.flush(dsLifecycleMgr, primaryIndexes[PARTITION_0], true);
        semaphore.acquire();
        semaphore2.acquire();
        Assert.assertEquals(secondaryIndexes[PARTITION_0].getDiskComponents().size(), primaryIndexes[PARTITION_0].getDiskComponents().size());
        nc.deInit(false);
    }

    private void insertRecords(int i, int i2, int i3, boolean z) throws Exception {
        StorageTestUtils.allowAllOps(primaryIndexes[i]);
        StorageTestUtils.allowAllOps(secondaryIndexes[i]);
        insertOps[i].open();
        FrameTupleAppender frameTupleAppender = new FrameTupleAppender(new VSizeFrame(testCtxs[i]));
        for (int i4 = PARTITION_0; i4 < i2; i4 += PARTITION_1) {
            if (i4 % i3 == 0 && i4 != 0 && i4 + PARTITION_1 != i2) {
                if (frameTupleAppender.getTupleCount() > 0) {
                    frameTupleAppender.write(insertOps[i], true);
                }
                StorageTestUtils.flushPartition(dsLifecycleMgr, primaryIndexes[i], false);
            }
            DataflowUtils.addTupleToFrame(frameTupleAppender, tupleGenerator.next(), insertOps[i]);
        }
        if (frameTupleAppender.getTupleCount() > 0) {
            frameTupleAppender.write(insertOps[i], true);
        }
        insertOps[i].close();
        if (z) {
            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
        }
    }

    private void checkComponentIds() throws HyracksDataException {
        for (int i = PARTITION_0; i < NUM_PARTITIONS; i += PARTITION_1) {
            checkComponentIds(i);
        }
    }

    private void checkComponentIds(int i) throws HyracksDataException {
        if (primaryIndexes[i].isMemoryComponentsAllocated()) {
            Assert.assertEquals(primaryIndexes[i].getCurrentMemoryComponent().getId(), secondaryIndexes[i].getCurrentMemoryComponent().getId());
            Assert.assertEquals(primaryIndexes[i].getCurrentMemoryComponentIndex(), secondaryIndexes[i].getCurrentMemoryComponentIndex());
        }
        List diskComponents = primaryIndexes[i].getDiskComponents();
        List diskComponents2 = secondaryIndexes[i].getDiskComponents();
        Assert.assertEquals(diskComponents.size(), diskComponents2.size());
        for (int i2 = PARTITION_0; i2 < diskComponents.size(); i2 += PARTITION_1) {
            Assert.assertEquals(((ILSMDiskComponent) diskComponents.get(i2)).getId(), ((ILSMDiskComponent) diskComponents2.get(i2)).getId());
        }
    }
}
