package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.0.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker.class */
public class ExternalCommandWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ExternalCommandWorker.class);
    private static final int DEFAULT_SHUTDOWN_GRACE_PERIOD_MS = 5000;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final LinkedBlockingQueue<TerminatorAction> terminatorActionQueue = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<Optional<JsonNode>> stdinQueue = new LinkedBlockingQueue<>();
    private final String id;
    private final ExternalCommandSpec spec;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private ExecutorService executor;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$ExitMonitor.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.0.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$ExitMonitor.class */
    class ExitMonitor implements Runnable {
        private final Process process;
        private final Future<?> stdoutFuture;
        private final Future<?> stderrFuture;
        private final Future<?> terminatorFuture;

        ExitMonitor(Process process, Future<?> future, Future<?> future2, Future<?> future3) {
            this.process = process;
            this.stdoutFuture = future;
            this.stderrFuture = future2;
            this.terminatorFuture = future3;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                int waitFor = this.process.waitFor();
                ExternalCommandWorker.log.info("{}: process exited with return code {}", ExternalCommandWorker.this.id, Integer.valueOf(waitFor));
                this.stdoutFuture.get();
                this.stderrFuture.get();
                if (waitFor == 0) {
                    ExternalCommandWorker.this.doneFuture.complete("");
                } else {
                    ExternalCommandWorker.this.doneFuture.complete("exited with return code " + waitFor);
                }
                ExternalCommandWorker.this.stdinQueue.add(Optional.empty());
                ExternalCommandWorker.this.terminatorActionQueue.add(TerminatorAction.CLOSE);
                this.terminatorFuture.get();
                ExternalCommandWorker.this.executor.shutdown();
            } catch (Throwable th) {
                ExternalCommandWorker.log.error("{}: ExitMonitor error", ExternalCommandWorker.this.id, th);
                ExternalCommandWorker.this.doneFuture.complete("ExitMonitor error: " + th.getMessage());
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$StderrMonitor.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.0.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$StderrMonitor.class */
    class StderrMonitor implements Runnable {
        private final Process process;

        StderrMonitor(Process process) {
            this.process = process;
        }

        @Override // java.lang.Runnable
        public void run() {
            ExternalCommandWorker.log.trace("{}: starting stderr monitor.", ExternalCommandWorker.this.id);
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8));
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            } else {
                                ExternalCommandWorker.log.error("{}: (stderr):{}", ExternalCommandWorker.this.id, readLine);
                            }
                        } finally {
                        }
                    } catch (IOException e) {
                        ExternalCommandWorker.log.info("{}: can't read any more from stderr: {}", ExternalCommandWorker.this.id, e.getMessage());
                        if (bufferedReader != null) {
                            if (0 == 0) {
                                bufferedReader.close();
                                return;
                            }
                            try {
                                bufferedReader.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                }
                throw new IOException("EOF");
            } catch (Throwable th3) {
                ExternalCommandWorker.log.info("{}: error reading from stderr.", ExternalCommandWorker.this.id, th3);
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$StdinWriter.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.0.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$StdinWriter.class */
    class StdinWriter implements Runnable {
        private final Process process;

        StdinWriter(Process process) {
            this.process = process;
        }

        @Override // java.lang.Runnable
        public void run() {
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(this.process.getOutputStream(), StandardCharsets.UTF_8);
            while (true) {
                try {
                    try {
                        try {
                            ExternalCommandWorker.log.info("{}: stdin writer ready.", ExternalCommandWorker.this.id);
                            Optional optional = (Optional) ExternalCommandWorker.this.stdinQueue.take();
                            if (!optional.isPresent()) {
                                ExternalCommandWorker.log.trace("{}: StdinWriter terminating.", ExternalCommandWorker.this.id);
                                try {
                                    outputStreamWriter.close();
                                    return;
                                } catch (IOException e) {
                                    ExternalCommandWorker.log.debug("{}: error closing stdinWriter: {}", ExternalCommandWorker.this.id, e.getMessage());
                                    return;
                                }
                            }
                            String jsonString = JsonUtil.toJsonString(optional.get());
                            ExternalCommandWorker.log.info("{}: writing to stdin: {}", ExternalCommandWorker.this.id, jsonString);
                            outputStreamWriter.write(jsonString + "\n");
                            outputStreamWriter.flush();
                        } catch (Throwable th) {
                            try {
                                outputStreamWriter.close();
                            } catch (IOException e2) {
                                ExternalCommandWorker.log.debug("{}: error closing stdinWriter: {}", ExternalCommandWorker.this.id, e2.getMessage());
                            }
                            throw th;
                        }
                    } catch (Throwable th2) {
                        ExternalCommandWorker.log.info("{}: error writing to stdin.", ExternalCommandWorker.this.id, th2);
                        try {
                            outputStreamWriter.close();
                            return;
                        } catch (IOException e3) {
                            ExternalCommandWorker.log.debug("{}: error closing stdinWriter: {}", ExternalCommandWorker.this.id, e3.getMessage());
                            return;
                        }
                    }
                } catch (IOException e4) {
                    ExternalCommandWorker.log.info("{}: can't write any more to stdin: {}", ExternalCommandWorker.this.id, e4.getMessage());
                    try {
                        outputStreamWriter.close();
                        return;
                    } catch (IOException e5) {
                        ExternalCommandWorker.log.debug("{}: error closing stdinWriter: {}", ExternalCommandWorker.this.id, e5.getMessage());
                        return;
                    }
                }
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$StdoutMonitor.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.0.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$StdoutMonitor.class */
    class StdoutMonitor implements Runnable {
        private final Process process;

        StdoutMonitor(Process process) {
            this.process = process;
        }

        @Override // java.lang.Runnable
        public void run() {
            ExternalCommandWorker.log.trace("{}: starting stdout monitor.", ExternalCommandWorker.this.id);
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8));
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            ExternalCommandWorker.log.trace("{}: read line from stdin: {}", ExternalCommandWorker.this.id, readLine);
                            JsonNode readObject = ExternalCommandWorker.readObject(readLine);
                            if (readObject.has("status")) {
                                ExternalCommandWorker.log.info("{}: New status: {}", ExternalCommandWorker.this.id, readObject.get("status").toString());
                                ExternalCommandWorker.this.status.update(readObject.get("status"));
                            }
                            if (readObject.has("log")) {
                                ExternalCommandWorker.log.info("{}: (stdout): {}", ExternalCommandWorker.this.id, readObject.get("log").asText());
                            }
                            if (readObject.has("error")) {
                                String asText = readObject.get("error").asText();
                                ExternalCommandWorker.log.error("{}: error: {}", ExternalCommandWorker.this.id, asText);
                                ExternalCommandWorker.this.doneFuture.complete(asText);
                            }
                        } catch (IOException e) {
                            ExternalCommandWorker.log.info("{}: can't read any more from stdout: {}", ExternalCommandWorker.this.id, e.getMessage());
                            if (bufferedReader != null) {
                                if (0 == 0) {
                                    bufferedReader.close();
                                    return;
                                }
                                try {
                                    bufferedReader.close();
                                    return;
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                    return;
                                }
                            }
                            return;
                        }
                    } finally {
                    }
                }
                throw new IOException("EOF");
            } catch (Throwable th3) {
                ExternalCommandWorker.log.info("{}: error reading from stdout.", ExternalCommandWorker.this.id, th3);
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$Terminator.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.0.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$Terminator.class */
    class Terminator implements Runnable {
        private final Process process;

        Terminator(Process process) {
            this.process = process;
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:3:0x0014. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    switch ((TerminatorAction) ExternalCommandWorker.this.terminatorActionQueue.take()) {
                        case DESTROY:
                            ExternalCommandWorker.log.info("{}: destroying process", ExternalCommandWorker.this.id);
                            this.process.getInputStream().close();
                            this.process.getErrorStream().close();
                            this.process.destroy();
                        case DESTROY_FORCIBLY:
                            ExternalCommandWorker.log.info("{}: forcibly destroying process", ExternalCommandWorker.this.id);
                            this.process.getInputStream().close();
                            this.process.getErrorStream().close();
                            this.process.destroyForcibly();
                        case CLOSE:
                            ExternalCommandWorker.log.trace("{}: closing Terminator thread.", ExternalCommandWorker.this.id);
                            return;
                    }
                } catch (Throwable th) {
                    ExternalCommandWorker.log.error("{}: Terminator error", ExternalCommandWorker.this.id, th);
                    ExternalCommandWorker.this.doneFuture.complete("Terminator error: " + th.getMessage());
                    return;
                }
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$TerminatorAction.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.3.0.jar:META-INF/bundled-dependencies/kafka-tools-2.3.0.jar:org/apache/kafka/trogdor/workload/ExternalCommandWorker$TerminatorAction.class */
    enum TerminatorAction {
        DESTROY,
        DESTROY_FORCIBLY,
        CLOSE
    }

    public ExternalCommandWorker(String str, ExternalCommandSpec externalCommandSpec) {
        this.id = str;
        this.spec = externalCommandSpec;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("ConsumeBenchWorker is already running.");
        }
        log.info("{}: Activating ExternalCommandWorker with {}", this.id, this.spec);
        this.status = workerStatusTracker;
        this.doneFuture = kafkaFutureImpl;
        this.executor = Executors.newCachedThreadPool(ThreadUtils.createThreadFactory("ExternalCommandWorkerThread%d", false));
        try {
            Process startProcess = startProcess();
            Future<?> submit = this.executor.submit(new StdoutMonitor(startProcess));
            Future<?> submit2 = this.executor.submit(new StderrMonitor(startProcess));
            this.executor.submit(new StdinWriter(startProcess));
            this.executor.submit(new ExitMonitor(startProcess, submit, submit2, this.executor.submit(new Terminator(startProcess))));
            ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
            objectNode.set("id", new TextNode(this.id));
            objectNode.set("workload", this.spec.workload());
            this.stdinQueue.add(Optional.of(objectNode));
        } catch (Throwable th) {
            log.error("{}: Unable to start process", this.id, th);
            this.executor.shutdown();
            kafkaFutureImpl.complete("Unable to start process: " + th.getMessage());
        }
    }

    private Process startProcess() throws Exception {
        if (this.spec.command().isEmpty()) {
            throw new RuntimeException("No command specified");
        }
        return new ProcessBuilder(this.spec.command()).start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static JsonNode readObject(String str) {
        try {
            return JsonUtil.JSON_SERDE.readTree(str);
        } catch (IOException e) {
            return NullNode.instance;
        }
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("ExternalCommandWorker is not running.");
        }
        log.info("{}: Deactivating ExternalCommandWorker.", this.id);
        this.terminatorActionQueue.add(TerminatorAction.DESTROY);
        if (!this.executor.awaitTermination(this.spec.shutdownGracePeriodMs().isPresent() ? this.spec.shutdownGracePeriodMs().get().intValue() : 5000, TimeUnit.MILLISECONDS)) {
            this.terminatorActionQueue.add(TerminatorAction.DESTROY_FORCIBLY);
            this.executor.awaitTermination(1000L, TimeUnit.DAYS);
        }
        this.status = null;
        this.doneFuture = null;
        this.executor = null;
    }
}
