package org.apache.falcon.metadata;

import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.CatalogTable;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/falcon/metadata/MetadataMappingServiceTest.class */
public class MetadataMappingServiceTest {
    public static final String FALCON_USER = "falcon-user";
    private static final String LOGS_DIR = "jail://global:00/falcon/staging/feed/logs";
    private static final String NOMINAL_TIME = "2014-01-01-01-00";
    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
    public static final String BCP_CLUSTER_ENTITY_NAME = "bcp-cluster";
    public static final String PROCESS_ENTITY_NAME = "sample-process";
    public static final String COLO_NAME = "west-coast";
    public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow";
    public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow";
    private static final String EVICTION_WORKFLOW_NAME = "eviction-policy-workflow";
    public static final String WORKFLOW_VERSION = "1.0.9";
    public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
    public static final String INPUT_INSTANCE_PATHS = "jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02#jail://global:00/falcon/clicks-feed/2014-01-01";
    public static final String INPUT_INSTANCE_PATHS_NO_DATE = "jail://global:00/falcon/impression-feed,jail://global:00/falcon/impression-feed#jail://global:00/falcon/clicks-feed";
    public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
    public static final String OUTPUT_INSTANCE_PATHS = "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
    private static final String REPLICATED_FEED = "raw-click";
    private static final String EVICTED_FEED = "imp-click-join1";
    private static final String EVICTED_INSTANCE_PATHS = "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
    public static final String OUTPUT_INSTANCE_PATHS_NO_DATE = "jail://global:00/falcon/imp-click-join1,jail://global:00/falcon/imp-click-join2";
    public static final String COUNTERS = "TIMETAKEN:36956,COPY:30,BYTESCOPIED:1000";
    public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
    private ConfigurationStore configStore;
    private MetadataMappingService service;
    private Cluster clusterEntity;
    private Cluster anotherCluster;
    private List<Feed> inputFeeds = new ArrayList();
    private List<Feed> outputFeeds = new ArrayList();
    private Process processEntity;

    @BeforeClass
    public void setUp() throws Exception {
        CurrentUser.authenticate(FALCON_USER);
        this.configStore = ConfigurationStore.get();
        Services.get().register(new WorkflowJobEndNotificationService());
        StartupProperties.get().setProperty("falcon.graph.storage.directory", "target/graphdb-" + System.currentTimeMillis());
        StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
        this.service = new MetadataMappingService();
        this.service.init();
        System.out.println("Got vertex property keys: " + this.service.getVertexIndexedKeys());
        System.out.println("Got edge property keys: " + this.service.getEdgeIndexedKeys());
    }

    @AfterClass
    public void tearDown() throws Exception {
        GraphUtils.dump(this.service.getGraph(), System.out);
        cleanUp();
        StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
    }

    @AfterMethod
    public void printGraph() throws Exception {
        GraphUtils.dump(this.service.getGraph());
    }

    private GraphQuery getQuery() {
        return this.service.getGraph().query();
    }

    @Test
    public void testGetName() throws Exception {
        Assert.assertEquals(this.service.getName(), MetadataMappingService.SERVICE_NAME);
    }

    @Test
    public void testOnAddClusterEntity() throws Exception {
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        this.clusterEntity = addClusterEntity("primary-cluster", COLO_NAME, "classification=production");
        verifyEntityWasAddedToGraph("primary-cluster", RelationshipType.CLUSTER_ENTITY);
        verifyClusterEntityEdges();
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 4);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 3);
    }

    @Test(dependsOnMethods = {"testOnAddClusterEntity"})
    public void testOnAddFeedEntity() throws Exception {
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        Feed addFeedEntity = addFeedEntity("impression-feed", this.clusterEntity, "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
        this.inputFeeds.add(addFeedEntity);
        verifyEntityWasAddedToGraph(addFeedEntity.getName(), RelationshipType.FEED_ENTITY);
        verifyFeedEntityEdges(addFeedEntity.getName(), "Secure", "analytics");
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 3);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 4);
        long verticesCount2 = getVerticesCount(this.service.getGraph());
        long edgesCount2 = getEdgesCount(this.service.getGraph());
        Feed addFeedEntity2 = addFeedEntity("clicks-feed", this.clusterEntity, "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
        this.inputFeeds.add(addFeedEntity2);
        verifyEntityWasAddedToGraph(addFeedEntity2.getName(), RelationshipType.FEED_ENTITY);
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount2 + 2);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount2 + 5);
        long verticesCount3 = getVerticesCount(this.service.getGraph());
        long edgesCount3 = getEdgesCount(this.service.getGraph());
        Feed addFeedEntity3 = addFeedEntity(EVICTED_FEED, this.clusterEntity, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
        this.outputFeeds.add(addFeedEntity3);
        verifyEntityWasAddedToGraph(addFeedEntity3.getName(), RelationshipType.FEED_ENTITY);
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount3 + 3);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount3 + 5);
        long verticesCount4 = getVerticesCount(this.service.getGraph());
        long edgesCount4 = getEdgesCount(this.service.getGraph());
        Feed addFeedEntity4 = addFeedEntity("imp-click-join2", this.clusterEntity, "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
        this.outputFeeds.add(addFeedEntity4);
        verifyEntityWasAddedToGraph(addFeedEntity4.getName(), RelationshipType.FEED_ENTITY);
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount4 + 1);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount4 + 6);
    }

    @Test(dependsOnMethods = {"testOnAddFeedEntity"})
    public void testOnAddProcessEntity() throws Exception {
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        this.processEntity = addProcessEntity("sample-process", this.clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, WORKFLOW_VERSION, this.inputFeeds, this.outputFeeds);
        verifyEntityWasAddedToGraph(this.processEntity.getName(), RelationshipType.PROCESS_ENTITY);
        verifyProcessEntityEdges();
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 4);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 9);
    }

    @Test(dependsOnMethods = {"testOnAddProcessEntity"})
    public void testOnAdd() throws Exception {
        verifyEntityGraph(RelationshipType.FEED_ENTITY, "Secure");
    }

    @Test
    public void testMapLineage() throws Exception {
        setup();
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        this.service.onSuccess(WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, null, null, null), WorkflowExecutionContext.Type.POST_PROCESSING));
        debug(this.service.getGraph());
        GraphUtils.dump(this.service.getGraph());
        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName());
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 6);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 40);
    }

    @Test
    public void testLineageForNoDateInFeedPath() throws Exception {
        setupForNoDateInFeedPath();
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        this.service.onSuccess(WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, null, OUTPUT_INSTANCE_PATHS_NO_DATE, INPUT_INSTANCE_PATHS_NO_DATE, null), WorkflowExecutionContext.Type.POST_PROCESSING));
        debug(this.service.getGraph());
        GraphUtils.dump(this.service.getGraph());
        Assert.assertTrue(getFeedsOwnedByAUser(RelationshipType.FEED_INSTANCE.getName()).containsAll(Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z", "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z")));
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 5);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 34);
    }

    @Test
    public void testLineageForReplication() throws Exception {
        setupForLineageReplication();
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        WorkflowExecutionContext create = WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED, "jail://global:00/falcon/raw-click/bcp/20140101", "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
        this.service.onSuccess(create);
        debug(this.service.getGraph());
        GraphUtils.dump(this.service.getGraph());
        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED, "jail://global:00/falcon/raw-click/bcp/20140101", create, RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 0);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 1);
    }

    @Test
    public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
        cleanUp();
        this.service.init();
        addClusterAndFeedForReplication(this.inputFeeds);
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        WorkflowExecutionContext create = WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED, "jail://global:00/falcon/raw-click/bcp/20140101", "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
        this.service.onSuccess(create);
        debug(this.service.getGraph());
        GraphUtils.dump(this.service.getGraph());
        verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics");
        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED, "jail://global:00/falcon/raw-click/bcp/20140101", create, RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 1);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 6);
    }

    @Test
    public void testLineageForRetention() throws Exception {
        setupForLineageEviction();
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        WorkflowExecutionContext create = WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.DELETE, EVICTION_WORKFLOW_NAME, EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
        this.service.onSuccess(create);
        debug(this.service.getGraph());
        GraphUtils.dump(this.service.getGraph());
        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z", "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z"), Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z"), Arrays.asList("clicks-feed/2014-01-01T00:00Z", "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z"));
        for (String str : EVICTED_INSTANCE_PATHS.split(",")) {
            verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, str, create, RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
        }
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 0);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 2);
    }

    @Test
    public void testLineageForRetentionWithNoFeedsEvicted() throws Exception {
        cleanUp();
        this.service.init();
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        this.service.onSuccess(WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.DELETE, EVICTION_WORKFLOW_NAME, EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING));
        debug(this.service.getGraph());
        GraphUtils.dump(this.service.getGraph());
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount);
    }

    @Test(dependsOnMethods = {"testOnAdd"})
    public void testOnChange() throws Exception {
        this.service.destroy();
        this.service.init();
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        this.anotherCluster = addClusterEntity("another-cluster", "east-coast", "classification=another");
        verifyEntityWasAddedToGraph("another-cluster", RelationshipType.CLUSTER_ENTITY);
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 3);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 3);
    }

    @Test(dependsOnMethods = {"testOnChange"})
    public void testOnFeedEntityChange() throws Exception {
        Feed buildFeed = EntityBuilderTestUtil.buildFeed(this.inputFeeds.get(0).getName(), this.clusterEntity, "classified-as=Secured,source=data-warehouse", "reporting");
        addStorage(buildFeed, Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
        try {
            this.configStore.initiateUpdate(buildFeed);
            long verticesCount = getVerticesCount(this.service.getGraph());
            long edgesCount = getEdgesCount(this.service.getGraph());
            org.apache.falcon.entity.v0.feed.Cluster cluster = new org.apache.falcon.entity.v0.feed.Cluster();
            cluster.setName(this.anotherCluster.getName());
            buildFeed.getClusters().getClusters().add(cluster);
            this.configStore.update(EntityType.FEED, buildFeed);
            this.configStore.cleanupUpdateInit();
            verifyUpdatedEdges(buildFeed);
            Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 2);
            Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount + 2);
        } catch (Throwable th) {
            this.configStore.cleanupUpdateInit();
            throw th;
        }
    }

    @Test
    public void testLineageForTransactionFailure() throws Exception {
        cleanUp();
        this.service.init();
        this.clusterEntity = addClusterEntity("primary-cluster", COLO_NAME, "classification=production");
        verifyEntityWasAddedToGraph("primary-cluster", RelationshipType.CLUSTER_ENTITY);
        verifyClusterEntityEdges();
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), 4L);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), 3L);
        Feed buildFeed = EntityBuilderTestUtil.buildFeed("feed-name", new Cluster[]{this.clusterEntity}, (String) null, (String) null);
        this.inputFeeds.add(buildFeed);
        this.outputFeeds.add(buildFeed);
        try {
            this.processEntity = addProcessEntity("sample-process", this.clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, WORKFLOW_VERSION, this.inputFeeds, this.outputFeeds);
            Assert.fail();
        } catch (FalconException e) {
            Assert.assertEquals(getVerticesCount(this.service.getGraph()), 4L);
            Assert.assertEquals(getEdgesCount(this.service.getGraph()), 3L);
        }
    }

    private void verifyUpdatedEdges(Feed feed) {
        Vertex entityVertex = getEntityVertex(feed.getName(), RelationshipType.FEED_ENTITY);
        Assert.assertEquals(((Edge) entityVertex.getEdges(Direction.OUT, new String[]{RelationshipLabel.GROUPS.getName()}).iterator().next()).getVertex(Direction.IN).getProperty("name"), "reporting");
        Assert.assertEquals(((Edge) entityVertex.getEdges(Direction.OUT, new String[]{"classified-as"}).iterator().next()).getVertex(Direction.IN).getProperty("name"), "Secured");
        Assert.assertEquals(((Edge) entityVertex.getEdges(Direction.OUT, new String[]{"source"}).iterator().next()).getVertex(Direction.IN).getProperty("name"), "data-warehouse");
        ArrayList arrayList = new ArrayList();
        Iterator it = entityVertex.getEdges(Direction.OUT, new String[]{RelationshipLabel.FEED_CLUSTER_EDGE.getName()}).iterator();
        while (it.hasNext()) {
            arrayList.add(((Edge) it.next()).getVertex(Direction.IN).getProperty("name"));
        }
        Assert.assertTrue(arrayList.containsAll(Arrays.asList("primary-cluster", "another-cluster")), "Actual does not contain expected: " + arrayList);
    }

    @Test(dependsOnMethods = {"testOnFeedEntityChange"})
    public void testOnProcessEntityChange() throws Exception {
        long verticesCount = getVerticesCount(this.service.getGraph());
        long edgesCount = getEdgesCount(this.service.getGraph());
        Process buildProcess = EntityBuilderTestUtil.buildProcess(this.processEntity.getName(), this.anotherCluster, (String) null, (String) null);
        EntityBuilderTestUtil.addProcessWorkflow(buildProcess, GENERATE_WORKFLOW_NAME, "2.0.0");
        EntityBuilderTestUtil.addInput(buildProcess, this.inputFeeds.get(0));
        try {
            this.configStore.initiateUpdate(buildProcess);
            this.configStore.update(EntityType.PROCESS, buildProcess);
            this.configStore.cleanupUpdateInit();
            verifyUpdatedEdges(buildProcess);
            Assert.assertEquals(getVerticesCount(this.service.getGraph()), verticesCount + 0);
            Assert.assertEquals(getEdgesCount(this.service.getGraph()), edgesCount - 6);
        } catch (Throwable th) {
            this.configStore.cleanupUpdateInit();
            throw th;
        }
    }

    @Test(dependsOnMethods = {"testOnProcessEntityChange"})
    public void testAreSame() throws Exception {
        Inputs inputs = new Inputs();
        Inputs inputs2 = new Inputs();
        Outputs outputs = new Outputs();
        Outputs outputs2 = new Outputs();
        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs, inputs2));
        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs, outputs2));
        Input input = new Input();
        input.setName("input1");
        new Input().setName("input2");
        Output output = new Output();
        output.setName("output1");
        new Output().setName("output2");
        inputs.getInputs().add(input);
        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(inputs, inputs2));
        outputs.getOutputs().add(output);
        Assert.assertFalse(EntityRelationshipGraphBuilder.areSame(outputs, outputs2));
        inputs2.getInputs().add(input);
        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(inputs, inputs2));
        outputs2.getOutputs().add(output);
        Assert.assertTrue(EntityRelationshipGraphBuilder.areSame(outputs, outputs2));
    }

    @Test
    public void testLineageForJobCounter() throws Exception {
        setupForJobCounters();
        WorkflowExecutionContext create = WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "IGNORE", "IGNORE", "IGNORE", "NONE"), WorkflowExecutionContext.Type.POST_PROCESSING);
        this.service.onSuccess(create);
        debug(this.service.getGraph());
        GraphUtils.dump(this.service.getGraph());
        Vertex vertex = (Vertex) this.service.getGraph().getVertices("name", "sample-process/2014-01-01T01:00Z").iterator().next();
        Assert.assertEquals(vertex.getProperty("TIMETAKEN"), 36956L);
        Assert.assertEquals(vertex.getProperty("COPY"), 30L);
        Assert.assertEquals(vertex.getProperty("BYTESCOPIED"), 1000L);
        Assert.assertEquals(getVerticesCount(this.service.getGraph()), 9L);
        Assert.assertEquals(getEdgesCount(this.service.getGraph()), 14L);
        verifyLineageGraphForJobCounters(create);
    }

    private void verifyUpdatedEdges(Process process) {
        Vertex entityVertex = getEntityVertex(process.getName(), RelationshipType.PROCESS_ENTITY);
        Assert.assertEquals(((Edge) entityVertex.getEdges(Direction.OUT, new String[]{RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()}).iterator().next()).getVertex(Direction.IN).getProperty("name"), this.anotherCluster.getName());
        Assert.assertEquals(((Edge) entityVertex.getEdges(Direction.IN, new String[]{RelationshipLabel.FEED_PROCESS_EDGE.getName()}).iterator().next()).getVertex(Direction.OUT).getProperty("name"), ((Input) process.getInputs().getInputs().get(0)).getFeed());
        Iterator it = entityVertex.getEdges(Direction.OUT, new String[]{RelationshipLabel.PROCESS_FEED_EDGE.getName()}).iterator();
        while (it.hasNext()) {
            Assert.fail("there should not be any edges to output feeds" + ((Edge) it.next()));
        }
    }

    public static void debug(Graph graph) {
        System.out.println("*****Vertices of " + graph);
        Iterator it = graph.getVertices().iterator();
        while (it.hasNext()) {
            System.out.println(GraphUtils.vertexString((Vertex) it.next()));
        }
        System.out.println("*****Edges of " + graph);
        Iterator it2 = graph.getEdges().iterator();
        while (it2.hasNext()) {
            System.out.println(GraphUtils.edgeString((Edge) it2.next()));
        }
    }

    private Cluster addClusterEntity(String str, String str2, String str3) throws Exception {
        Cluster buildCluster = EntityBuilderTestUtil.buildCluster(str, str2, str3);
        this.configStore.publish(EntityType.CLUSTER, buildCluster);
        return buildCluster;
    }

    private Feed addFeedEntity(String str, Cluster cluster, String str2, String str3, Storage.TYPE type, String str4) throws Exception {
        return addFeedEntity(str, new Cluster[]{cluster}, str2, str3, type, str4);
    }

    private Feed addFeedEntity(String str, Cluster[] clusterArr, String str2, String str3, Storage.TYPE type, String str4) throws Exception {
        Feed buildFeed = EntityBuilderTestUtil.buildFeed(str, clusterArr, str2, str3);
        addStorage(buildFeed, type, str4);
        for (org.apache.falcon.entity.v0.feed.Cluster cluster : buildFeed.getClusters().getClusters()) {
            if (cluster.getName().equals(BCP_CLUSTER_ENTITY_NAME)) {
                cluster.setType(ClusterType.TARGET);
            }
        }
        this.configStore.publish(EntityType.FEED, buildFeed);
        return buildFeed;
    }

    public Process addProcessEntity(String str, Cluster cluster, String str2, String str3, String str4, String str5, List<Feed> list, List<Feed> list2) throws Exception {
        Process buildProcess = EntityBuilderTestUtil.buildProcess(str, cluster, str2, str3);
        EntityBuilderTestUtil.addProcessWorkflow(buildProcess, str4, str5);
        Iterator<Feed> it = list.iterator();
        while (it.hasNext()) {
            EntityBuilderTestUtil.addInput(buildProcess, it.next());
        }
        Iterator<Feed> it2 = list2.iterator();
        while (it2.hasNext()) {
            EntityBuilderTestUtil.addOutput(buildProcess, it2.next());
        }
        this.configStore.publish(EntityType.PROCESS, buildProcess);
        return buildProcess;
    }

    private static void addStorage(Feed feed, Storage.TYPE type, String str) {
        if (type != Storage.TYPE.FILESYSTEM) {
            CatalogTable catalogTable = new CatalogTable();
            catalogTable.setUri(str);
            feed.setTable(catalogTable);
        } else {
            feed.setLocations(new Locations());
            Location location = new Location();
            location.setType(LocationType.DATA);
            location.setPath(str);
            feed.getLocations().getLocations().add(location);
        }
    }

    private static void addStorage(org.apache.falcon.entity.v0.feed.Cluster cluster, Feed feed, Storage.TYPE type, String str) {
        if (type != Storage.TYPE.FILESYSTEM) {
            CatalogTable catalogTable = new CatalogTable();
            catalogTable.setUri(str);
            cluster.setTable(catalogTable);
        } else {
            feed.setLocations(new Locations());
            Location location = new Location();
            location.setType(LocationType.DATA);
            location.setPath(str);
            cluster.setLocations(new Locations());
            cluster.getLocations().getLocations().add(location);
        }
    }

    private void verifyEntityWasAddedToGraph(String str, RelationshipType relationshipType) {
        Vertex entityVertex = getEntityVertex(str, relationshipType);
        Assert.assertNotNull(entityVertex);
        verifyEntityProperties(entityVertex, str, relationshipType);
    }

    private void verifyEntityProperties(Vertex vertex, String str, RelationshipType relationshipType) {
        Assert.assertEquals(str, vertex.getProperty(RelationshipProperty.NAME.getName()));
        Assert.assertEquals(relationshipType.getName(), vertex.getProperty(RelationshipProperty.TYPE.getName()));
        Assert.assertNotNull(vertex.getProperty(RelationshipProperty.TIMESTAMP.getName()));
    }

    private void verifyClusterEntityEdges() {
        Vertex entityVertex = getEntityVertex("primary-cluster", RelationshipType.CLUSTER_ENTITY);
        verifyVertexForEdge(entityVertex, Direction.OUT, RelationshipLabel.USER.getName(), FALCON_USER, RelationshipType.USER.getName());
        verifyVertexForEdge(entityVertex, Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName(), COLO_NAME, RelationshipType.COLO.getName());
        verifyVertexForEdge(entityVertex, Direction.OUT, "classification", "production", RelationshipType.TAGS.getName());
    }

    private void verifyFeedEntityEdges(String str, String str2, String str3) {
        Vertex entityVertex = getEntityVertex(str, RelationshipType.FEED_ENTITY);
        verifyVertexForEdge(entityVertex, Direction.OUT, RelationshipLabel.FEED_CLUSTER_EDGE.getName(), "primary-cluster", RelationshipType.CLUSTER_ENTITY.getName());
        verifyVertexForEdge(entityVertex, Direction.OUT, RelationshipLabel.USER.getName(), FALCON_USER, RelationshipType.USER.getName());
        verifyVertexForEdge(entityVertex, Direction.OUT, "classified-as", str2, RelationshipType.TAGS.getName());
        verifyVertexForEdge(entityVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(), str3, RelationshipType.GROUPS.getName());
    }

    private void verifyProcessEntityEdges() {
        Vertex entityVertex = getEntityVertex("sample-process", RelationshipType.PROCESS_ENTITY);
        verifyVertexForEdge(entityVertex, Direction.OUT, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName(), "primary-cluster", RelationshipType.CLUSTER_ENTITY.getName());
        verifyVertexForEdge(entityVertex, Direction.OUT, RelationshipLabel.USER.getName(), FALCON_USER, RelationshipType.USER.getName());
        verifyVertexForEdge(entityVertex, Direction.OUT, "classified-as", "Critical", RelationshipType.TAGS.getName());
        ArrayList arrayList = new ArrayList();
        Iterator it = entityVertex.getEdges(Direction.IN, new String[]{RelationshipLabel.FEED_PROCESS_EDGE.getName()}).iterator();
        while (it.hasNext()) {
            Vertex vertex = ((Edge) it.next()).getVertex(Direction.OUT);
            Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(), vertex.getProperty(RelationshipProperty.TYPE.getName()));
            arrayList.add(vertex.getProperty(RelationshipProperty.NAME.getName()));
        }
        Assert.assertTrue(arrayList.containsAll(Arrays.asList("impression-feed", "clicks-feed")), "Actual does not contain expected: " + arrayList);
        arrayList.clear();
        Iterator it2 = entityVertex.getEdges(Direction.OUT, new String[]{RelationshipLabel.PROCESS_FEED_EDGE.getName()}).iterator();
        while (it2.hasNext()) {
            Vertex vertex2 = ((Edge) it2.next()).getVertex(Direction.IN);
            Assert.assertEquals(RelationshipType.FEED_ENTITY.getName(), vertex2.getProperty(RelationshipProperty.TYPE.getName()));
            arrayList.add(vertex2.getProperty(RelationshipProperty.NAME.getName()));
        }
        Assert.assertTrue(arrayList.containsAll(Arrays.asList(EVICTED_FEED, "imp-click-join2")), "Actual does not contain expected: " + arrayList);
    }

    private Vertex getEntityVertex(String str, RelationshipType relationshipType) {
        Iterator it = getQuery().has(RelationshipProperty.NAME.getName(), str).has(RelationshipProperty.TYPE.getName(), relationshipType.getName()).vertices().iterator();
        Assert.assertTrue(it.hasNext());
        Vertex vertex = (Vertex) it.next();
        Assert.assertNotNull(vertex);
        return vertex;
    }

    private void verifyVertexForEdge(Vertex vertex, Direction direction, String str, String str2, String str3) {
        boolean z = false;
        Iterator it = vertex.getEdges(direction, new String[]{str}).iterator();
        while (it.hasNext()) {
            z = true;
            Vertex vertex2 = ((Edge) it.next()).getVertex(Direction.IN);
            Assert.assertEquals(vertex2.getProperty(RelationshipProperty.NAME.getName()), str2);
            Assert.assertEquals(vertex2.getProperty(RelationshipProperty.TYPE.getName()), str3);
        }
        Assert.assertFalse(!z, "Edge not found");
    }

    private void verifyEntityGraph(RelationshipType relationshipType, String str) {
        Assert.assertEquals(getFeedsOwnedByAUser(relationshipType.getName()), Arrays.asList("impression-feed", "clicks-feed", EVICTED_FEED, "imp-click-join2"));
        verifyFeedsClassifiedAsSecure(relationshipType.getName(), Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
        verifyFeedsOwnedByUserAndClassification(relationshipType.getName(), str, Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
    }

    private List<String> getFeedsOwnedByAUser(String str) {
        GraphQuery has = getQuery().has(RelationshipProperty.NAME.getName(), FALCON_USER).has(RelationshipProperty.TYPE.getName(), RelationshipType.USER.getName());
        ArrayList arrayList = new ArrayList();
        Iterator it = has.vertices().iterator();
        while (it.hasNext()) {
            for (Vertex vertex : ((Vertex) it.next()).getVertices(Direction.IN, new String[]{RelationshipLabel.USER.getName()})) {
                if (vertex.getProperty(RelationshipProperty.TYPE.getName()).equals(str)) {
                    System.out.println("falcon-user owns -> " + GraphUtils.vertexString(vertex));
                    arrayList.add(vertex.getProperty(RelationshipProperty.NAME.getName()));
                }
            }
        }
        return arrayList;
    }

    private void verifyFeedsClassifiedAsSecure(String str, List<String> list) {
        GraphQuery has = getQuery().has(RelationshipProperty.NAME.getName(), "Secure").has(RelationshipProperty.TYPE.getName(), RelationshipType.TAGS.getName());
        ArrayList arrayList = new ArrayList();
        Iterator it = has.vertices().iterator();
        while (it.hasNext()) {
            for (Vertex vertex : ((Vertex) it.next()).getVertices(Direction.BOTH, new String[]{"classified-as"})) {
                if (vertex.getProperty(RelationshipProperty.TYPE.getName()).equals(str)) {
                    System.out.println(" Secure classification -> " + GraphUtils.vertexString(vertex));
                    arrayList.add(vertex.getProperty(RelationshipProperty.NAME.getName()));
                }
            }
        }
        Assert.assertTrue(arrayList.containsAll(list), "Actual does not contain expected: " + arrayList);
    }

    private void verifyFeedsOwnedByUserAndClassification(String str, String str2, List<String> list) {
        ArrayList arrayList = new ArrayList();
        for (Vertex vertex : getEntityVertex(FALCON_USER, RelationshipType.USER).getVertices(Direction.IN, new String[]{RelationshipLabel.USER.getName()})) {
            if (vertex.getProperty(RelationshipProperty.TYPE.getName()).equals(str)) {
                Iterator it = vertex.getVertices(Direction.OUT, new String[]{"classified-as"}).iterator();
                while (it.hasNext()) {
                    if (((Vertex) it.next()).getProperty(RelationshipProperty.NAME.getName()).equals(str2)) {
                        arrayList.add(vertex.getProperty(RelationshipProperty.NAME.getName()));
                        System.out.println(str2 + " feed owned by falcon-user -> " + GraphUtils.vertexString(vertex));
                    }
                }
            }
        }
        Assert.assertTrue(arrayList.containsAll(list), "Actual does not contain expected: " + arrayList);
    }

    public long getVerticesCount(Graph graph) {
        long j = 0;
        for (Vertex vertex : graph.getVertices()) {
            j++;
        }
        return j;
    }

    public long getEdgesCount(Graph graph) {
        long j = 0;
        for (Edge edge : graph.getEdges()) {
            j++;
        }
        return j;
    }

    private void verifyLineageGraph(String str) {
        verifyLineageGraph(str, Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z", "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"), Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"), Arrays.asList("clicks-feed/2014-01-01T00:00Z", "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"));
    }

    private void verifyLineageGraph(String str, List<String> list, List<String> list2, List<String> list3) {
        Assert.assertTrue(getFeedsOwnedByAUser(str).containsAll(list));
        Graph graph = this.service.getGraph();
        Iterator it = graph.getVertices("name", "impression-feed/2014-01-01T00:00Z").iterator();
        Assert.assertTrue(it.hasNext());
        Vertex vertex = (Vertex) it.next();
        Assert.assertEquals(vertex.getProperty(RelationshipProperty.TYPE.getName()), RelationshipType.FEED_INSTANCE.getName());
        Assert.assertEquals(graph.getVertex(vertex.getId()), vertex);
        verifyFeedsClassifiedAsSecure(str, list2);
        verifyFeedsOwnedByUserAndClassification(str, "Financial", list3);
    }

    private void verifyLineageGraphForReplicationOrEviction(String str, String str2, WorkflowExecutionContext workflowExecutionContext, RelationshipLabel relationshipLabel) throws Exception {
        Edge edge = (Edge) getEntityVertex(InstanceRelationshipGraphBuilder.getFeedInstanceName(str, workflowExecutionContext.getClusterName(), str2, workflowExecutionContext.getNominalTimeAsISO8601()), RelationshipType.FEED_INSTANCE).getEdges(Direction.OUT, new String[]{relationshipLabel.getName()}).iterator().next();
        Assert.assertNotNull(edge);
        Assert.assertEquals(edge.getProperty(RelationshipProperty.TIMESTAMP.getName()), workflowExecutionContext.getTimeStampAsISO8601());
        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty(RelationshipProperty.NAME.getName()), workflowExecutionContext.getClusterName());
    }

    private void verifyLineageGraphForJobCounters(WorkflowExecutionContext workflowExecutionContext) throws Exception {
        Assert.assertEquals(getEntityVertex("sample-process", RelationshipType.PROCESS_ENTITY).getProperty("name"), "sample-process");
        Assert.assertTrue(workflowExecutionContext.getCounters().length() > 0);
    }

    private static String[] getTestMessageArgs(WorkflowExecutionContext.EntityOperations entityOperations, String str, String str2, String str3, String str4, String str5) {
        String str6 = WorkflowExecutionContext.EntityOperations.REPLICATE == entityOperations ? "bcp-cluster,primary-cluster" : "primary-cluster";
        String[] strArr = new String[50];
        strArr[0] = "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName();
        strArr[1] = str6;
        strArr[2] = "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName();
        strArr[3] = "process";
        strArr[4] = "-" + WorkflowExecutionArgs.ENTITY_NAME.getName();
        strArr[5] = "sample-process";
        strArr[6] = "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName();
        strArr[7] = NOMINAL_TIME;
        strArr[8] = "-" + WorkflowExecutionArgs.OPERATION.getName();
        strArr[9] = entityOperations.toString();
        strArr[10] = "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName();
        strArr[11] = str5 != null ? str5 : INPUT_FEED_NAMES;
        strArr[12] = "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName();
        strArr[13] = str4 != null ? str4 : INPUT_INSTANCE_PATHS;
        strArr[14] = "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName();
        strArr[15] = str2 != null ? str2 : OUTPUT_FEED_NAMES;
        strArr[16] = "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName();
        strArr[17] = str3 != null ? str3 : OUTPUT_INSTANCE_PATHS;
        strArr[18] = "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName();
        strArr[19] = "workflow-01-00";
        strArr[20] = "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName();
        strArr[21] = FALCON_USER;
        strArr[22] = "-" + WorkflowExecutionArgs.RUN_ID.getName();
        strArr[23] = "1";
        strArr[24] = "-" + WorkflowExecutionArgs.STATUS.getName();
        strArr[25] = "SUCCEEDED";
        strArr[26] = "-" + WorkflowExecutionArgs.TIMESTAMP.getName();
        strArr[27] = NOMINAL_TIME;
        strArr[28] = "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName();
        strArr[29] = "http://localhost:11000/oozie";
        strArr[30] = "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName();
        strArr[31] = "userflow@wf-id";
        strArr[32] = "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName();
        strArr[33] = str;
        strArr[34] = "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName();
        strArr[35] = WORKFLOW_VERSION;
        strArr[36] = "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName();
        strArr[37] = EngineType.PIG.name();
        strArr[38] = "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName();
        strArr[39] = BROKER;
        strArr[40] = "-" + WorkflowExecutionArgs.BRKR_URL.getName();
        strArr[41] = "tcp://localhost:61616?daemon=true";
        strArr[42] = "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName();
        strArr[43] = BROKER;
        strArr[44] = "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName();
        strArr[45] = "tcp://localhost:61616?daemon=true";
        strArr[46] = "-" + WorkflowExecutionArgs.BRKR_TTL.getName();
        strArr[47] = "1000";
        strArr[48] = "-" + WorkflowExecutionArgs.LOG_DIR.getName();
        strArr[49] = LOGS_DIR;
        return strArr;
    }

    private void setupForJobCounters() throws Exception {
        cleanUp();
        this.service.init();
        this.clusterEntity = addClusterEntity("primary-cluster", COLO_NAME, "classification=production");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        createJobCountersFileForTest();
        this.processEntity = addProcessEntity("sample-process", this.clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, WORKFLOW_VERSION, arrayList, arrayList2);
    }

    private void createJobCountersFileForTest() throws Exception {
        OutputStream outputStream = null;
        try {
            outputStream = HadoopClientFactory.get().createProxiedFileSystem(new Path(LOGS_DIR).toUri()).create(new Path(LOGS_DIR, "counter.txt"));
            outputStream.write(COUNTERS.getBytes());
            outputStream.flush();
            outputStream.close();
        } catch (Throwable th) {
            outputStream.close();
            throw th;
        }
    }

    private void setup() throws Exception {
        cleanUp();
        this.service.init();
        this.clusterEntity = addClusterEntity("primary-cluster", COLO_NAME, "classification=production");
        addFeedsAndProcess(this.clusterEntity);
    }

    private void addFeedsAndProcess(Cluster cluster) throws Exception {
        Feed addFeedEntity = addFeedEntity("impression-feed", cluster, "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList.add(addFeedEntity);
        arrayList.add(addFeedEntity("clicks-feed", cluster, "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}"));
        arrayList2.add(addFeedEntity(EVICTED_FEED, cluster, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"));
        arrayList2.add(addFeedEntity("imp-click-join2", cluster, "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}"));
        this.processEntity = addProcessEntity("sample-process", this.clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, WORKFLOW_VERSION, arrayList, arrayList2);
    }

    private void setupForLineageReplication() throws Exception {
        cleanUp();
        this.service.init();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        addClusterAndFeedForReplication(arrayList);
        arrayList2.add(addFeedEntity(EVICTED_FEED, this.clusterEntity, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}"));
        this.processEntity = addProcessEntity("sample-process", this.clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, WORKFLOW_VERSION, arrayList, arrayList2);
        this.service.onSuccess(WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, EVICTED_FEED, "jail://global:00/falcon/imp-click-join1/20140101", "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING));
    }

    private void addClusterAndFeedForReplication(List<Feed> list) throws Exception {
        this.clusterEntity = addClusterEntity("primary-cluster", COLO_NAME, "classification=production");
        Feed addFeedEntity = addFeedEntity(REPLICATED_FEED, new Cluster[]{this.clusterEntity, addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp")}, "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
        for (org.apache.falcon.entity.v0.feed.Cluster cluster : addFeedEntity.getClusters().getClusters()) {
            if (cluster.getName().equals("primary-cluster")) {
                addStorage(cluster, addFeedEntity, Storage.TYPE.FILESYSTEM, "/falcon/raw-click/primary/${YEAR}/${MONTH}/${DAY}");
            } else {
                addStorage(cluster, addFeedEntity, Storage.TYPE.FILESYSTEM, "/falcon/raw-click/bcp/${YEAR}/${MONTH}/${DAY}");
            }
        }
        try {
            this.configStore.initiateUpdate(addFeedEntity);
            this.configStore.update(EntityType.FEED, addFeedEntity);
            this.configStore.cleanupUpdateInit();
            list.add(addFeedEntity);
        } catch (Throwable th) {
            this.configStore.cleanupUpdateInit();
            throw th;
        }
    }

    private void setupForLineageEviction() throws Exception {
        setup();
        this.service.onSuccess(WorkflowExecutionContext.create(getTestMessageArgs(WorkflowExecutionContext.EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null), WorkflowExecutionContext.Type.POST_PROCESSING));
    }

    private void setupForNoDateInFeedPath() throws Exception {
        cleanUp();
        this.service.init();
        this.clusterEntity = addClusterEntity("primary-cluster", COLO_NAME, "classification=production");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList.add(addFeedEntity("impression-feed", this.clusterEntity, "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/impression-feed"));
        arrayList.add(addFeedEntity("clicks-feed", this.clusterEntity, "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed"));
        arrayList2.add(addFeedEntity(EVICTED_FEED, this.clusterEntity, "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1"));
        arrayList2.add(addFeedEntity("imp-click-join2", this.clusterEntity, "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2"));
        this.processEntity = addProcessEntity("sample-process", this.clusterEntity, "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME, WORKFLOW_VERSION, arrayList, arrayList2);
    }

    private void cleanUp() throws Exception {
        cleanupGraphStore(this.service.getGraph());
        cleanupConfigurationStore(this.configStore);
        this.service.destroy();
    }

    private void cleanupGraphStore(Graph graph) {
        Iterator it = graph.getEdges().iterator();
        while (it.hasNext()) {
            graph.removeEdge((Edge) it.next());
        }
        Iterator it2 = graph.getVertices().iterator();
        while (it2.hasNext()) {
            graph.removeVertex((Vertex) it2.next());
        }
        graph.shutdown();
    }

    private static void cleanupConfigurationStore(ConfigurationStore configurationStore) throws Exception {
        for (EntityType entityType : EntityType.values()) {
            Iterator it = configurationStore.getEntities(entityType).iterator();
            while (it.hasNext()) {
                configurationStore.remove(entityType, (String) it.next());
            }
        }
    }
}
