package org.pragmaticminds.crunch.execution;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import com.google.common.base.Preconditions;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import org.pragmaticminds.crunch.api.pipe.EvaluationPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/pragmaticminds/crunch/execution/CrunchExecutor.class */
public class CrunchExecutor {
    private static final Logger logger = LoggerFactory.getLogger(CrunchExecutor.class);
    private static final ActorSystem SYSTEM = ActorSystem.create("CrunchExecutor");
    private static final GraphFactory GRAPH_FACTORY = new GraphFactory();
    private MRecordSource source;
    private EvaluationPipeline evaluationPipeline;
    private EventSink sink;

    public CrunchExecutor(MRecordSource mRecordSource, EvaluationPipeline evaluationPipeline, EventSink eventSink) {
        this.source = mRecordSource;
        this.evaluationPipeline = evaluationPipeline;
        this.sink = eventSink;
    }

    public CrunchExecutor(MRecordSource mRecordSource, EvaluationPipeline evaluationPipeline) {
        this(mRecordSource, evaluationPipeline, null);
    }

    public void run() {
        runWithSink(this.sink);
    }

    public void run(EventSink eventSink) {
        runWithSink(eventSink);
    }

    private void runWithSink(EventSink eventSink) {
        Preconditions.checkNotNull(eventSink, "Please provide a Sink!");
        try {
            ((CompletionStage) GRAPH_FACTORY.create(this.source, this.evaluationPipeline, eventSink, 50L).run(ActorMaterializer.create(SYSTEM))).toCompletableFuture().get();
        } catch (InterruptedException e) {
            logger.warn("Unable to wait for execution of pipeline.", e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException e2) {
            logger.warn("Unable to wait for execution of pipeline.", e2);
        }
    }
}
