package org.broadinstitute.hellbender.utils.python;

import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.utils.Utils;
import org.broadinstitute.hellbender.utils.codecs.gencode.GencodeGtfFeature;
import org.broadinstitute.hellbender.utils.python.PythonExecutorBase;
import org.broadinstitute.hellbender.utils.runtime.AsynchronousStreamWriter;
import org.broadinstitute.hellbender.utils.runtime.InputStreamSettings;
import org.broadinstitute.hellbender.utils.runtime.OutputStreamSettings;
import org.broadinstitute.hellbender.utils.runtime.ProcessControllerAckResult;
import org.broadinstitute.hellbender.utils.runtime.ProcessOutput;
import org.broadinstitute.hellbender.utils.runtime.ProcessSettings;
import org.broadinstitute.hellbender.utils.runtime.StreamingProcessController;

/* loaded from: input_file:org/broadinstitute/hellbender/utils/python/StreamingPythonScriptExecutor.class */
public class StreamingPythonScriptExecutor<T> extends PythonExecutorBase {
    private final List<String> curatedCommandLineArgs;
    private StreamingProcessController spController;
    private ProcessSettings processSettings;
    private File dataTransferFIFOFile;
    private FileOutputStream dataTransferFIFOWriter;
    private AsynchronousStreamWriter<T> asyncWriter;
    private File profileResults;
    private boolean isAckRequestOutstanding;
    private static final Logger logger = LogManager.getLogger(StreamingPythonScriptExecutor.class);
    private static final String NL = System.lineSeparator();
    private static final String PYTHON_IMPORT_GATK = "from gatktool import tool" + NL;
    private static final String PYTHON_INITIALIZE_GATK = "tool.initializeGATK('%s')" + NL;
    private static final String PYTHON_START_PROFILING = "tool.startProfiling()" + NL;
    private static final String PYTHON_TERMINATE_GATK = "tool.terminateGATK()" + NL;
    private static final String PYTHON_INITIALIZE_DATA_FIFO = "tool.initializeDataFIFO('%s')" + NL;
    private static final String PYTHON_CLOSE_DATA_FIFO = "tool.closeDataFIFO()" + NL;
    private static final String PYTHON_SEND_ACK_REQUEST = "tool.sendAck()" + NL;
    private static final String PYTHON_END_PROFILING = "tool.endProfiling('%s')" + NL;

    public StreamingPythonScriptExecutor(boolean z) {
        this(PythonExecutorBase.PythonExecutableName.PYTHON, z);
    }

    public StreamingPythonScriptExecutor(PythonExecutorBase.PythonExecutableName pythonExecutableName, boolean z) {
        super(pythonExecutableName, z);
        this.curatedCommandLineArgs = new ArrayList();
        this.isAckRequestOutstanding = false;
    }

    public boolean start(List<String> list) {
        return start(list, false, null);
    }

    public boolean start(List<String> list, boolean z, File file) {
        PythonScriptExecutor.checkPythonEnvironmentForPackage("gatktool");
        this.profileResults = file;
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.externalScriptExecutableName);
        arrayList.add("-u");
        arrayList.add("-i");
        if (list != null) {
            arrayList.addAll(list);
        }
        this.curatedCommandLineArgs.addAll(arrayList);
        InputStreamSettings inputStreamSettings = new InputStreamSettings();
        OutputStreamSettings outputStreamSettings = new OutputStreamSettings();
        outputStreamSettings.setBufferSize(-1);
        OutputStreamSettings outputStreamSettings2 = new OutputStreamSettings();
        outputStreamSettings2.setBufferSize(-1);
        this.processSettings = new ProcessSettings((String[]) arrayList.toArray(new String[arrayList.size()]), false, null, null, inputStreamSettings, outputStreamSettings, outputStreamSettings2);
        this.spController = new StreamingProcessController(this.processSettings, z);
        File start = this.spController.start();
        if (start == null) {
            return false;
        }
        initializeTool(start);
        return true;
    }

    public ProcessOutput sendSynchronousCommand(String str) {
        if (!str.endsWith(NL)) {
            throw new IllegalArgumentException("Python commands must be newline-terminated in order to be executed. Indented Python code blocks must be terminated with additional newlines");
        }
        this.spController.writeProcessInput(str);
        sendAckRequest();
        return waitForAck();
    }

    public void sendAsynchronousCommand(String str) {
        if (!str.endsWith(NL)) {
            throw new IllegalArgumentException("Python commands must be newline-terminated");
        }
        this.spController.writeProcessInput(str);
        sendAckRequest();
    }

    public ProcessOutput waitForAck() {
        if (!this.isAckRequestOutstanding) {
            throw new GATKException("No ack request is outstanding. An ack request must be issued first");
        }
        ProcessControllerAckResult waitForAck = this.spController.waitForAck();
        this.isAckRequestOutstanding = false;
        ProcessOutput accumulatedOutput = getAccumulatedOutput();
        if (waitForAck.isPositiveAck()) {
            return accumulatedOutput;
        }
        throw new PythonScriptExecutorException(String.format("A nack was received from the Python process (most likely caused by a raised exception caused by): %s", waitForAck.getDisplayMessage()));
    }

    @Override // org.broadinstitute.hellbender.utils.python.PythonExecutorBase, org.broadinstitute.hellbender.utils.runtime.ScriptExecutor
    public String getApproximateCommandLine() {
        return (String) this.curatedCommandLineArgs.stream().collect(Collectors.joining(GencodeGtfFeature.EXTRA_FIELD_KEY_VALUE_SPLITTER));
    }

    public void initStreamWriter(Function<T, ByteArrayOutputStream> function) {
        Utils.nonNull(function, "An item serializer must be provided for the async writer service");
        this.dataTransferFIFOFile = this.spController.createDataFIFO();
        sendAsynchronousCommand(String.format(PYTHON_INITIALIZE_DATA_FIFO, this.dataTransferFIFOFile.getAbsolutePath()));
        try {
            this.dataTransferFIFOWriter = new FileOutputStream(this.dataTransferFIFOFile);
            this.asyncWriter = this.spController.getAsynchronousStreamWriter(this.dataTransferFIFOWriter, function);
            waitForAck();
        } catch (IOException e) {
            throw new GATKException("Failure opening FIFO for writing", e);
        }
    }

    public void startBatchWrite(String str, List<T> list) {
        Utils.nonNull(str);
        Utils.nonNull(list);
        Utils.nonEmpty(list);
        sendAsynchronousCommand(str);
        this.asyncWriter.startBatchWrite(list);
    }

    public Future<Integer> waitForPreviousBatchCompletion() {
        Future<Integer> waitForPreviousBatchCompletion = this.asyncWriter.waitForPreviousBatchCompletion();
        if (waitForPreviousBatchCompletion != null) {
            waitForAck();
        }
        return waitForPreviousBatchCompletion;
    }

    @VisibleForTesting
    protected Process getProcess() {
        return this.spController.getProcess();
    }

    public void terminate() {
        if (this.profileResults != null) {
            this.spController.writeProcessInput(String.format(PYTHON_END_PROFILING, this.profileResults.getAbsolutePath()));
            sendAckRequest();
            waitForAck();
        }
        if (this.dataTransferFIFOWriter != null) {
            if (this.asyncWriter != null && !this.asyncWriter.terminate()) {
                throw new GATKException("failed to close asyncWriter");
            }
            this.spController.writeProcessInput(PYTHON_CLOSE_DATA_FIFO);
            sendAckRequest();
            waitForAck();
            try {
                this.dataTransferFIFOWriter.close();
                this.dataTransferFIFOWriter = null;
                this.dataTransferFIFOFile = null;
            } catch (IOException e) {
                throw new GATKException("IOException closing fifo", e);
            }
        }
        this.spController.writeProcessInput(PYTHON_TERMINATE_GATK);
        this.spController.terminate();
    }

    public ProcessOutput getAccumulatedOutput() {
        return this.spController.getProcessOutput();
    }

    private void initializeTool(File file) {
        this.spController.writeProcessInput(PYTHON_IMPORT_GATK);
        this.spController.writeProcessInput(String.format(PYTHON_INITIALIZE_GATK, file.getAbsolutePath()));
        sendAckRequest();
        this.spController.openAckFIFOForRead();
        waitForAck();
        if (this.profileResults != null) {
            this.spController.writeProcessInput(PYTHON_START_PROFILING);
            sendAckRequest();
            waitForAck();
        }
    }

    private void sendAckRequest() {
        if (this.isAckRequestOutstanding) {
            throw new GATKException("An ack request is already outstanding. The previous ack request must be retrieved before a new ack request can be issued");
        }
        this.spController.writeProcessInput(PYTHON_SEND_ACK_REQUEST);
        this.isAckRequestOutstanding = true;
    }
}
