package org.apache.kafka.connect.runtime;

import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Frequencies;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerTask.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.0-rc-202105080004.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerTask.class */
public abstract class WorkerTask implements Runnable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerTask.class);
    private static final String THREAD_NAME_PREFIX = "task-thread-";
    protected final ConnectorTaskId id;
    private final TaskStatus.Listener statusListener;
    protected final ClassLoader loader;
    private final TaskMetricsGroup taskMetricsGroup;
    private volatile TargetState targetState;
    protected final RetryWithToleranceOperator retryWithToleranceOperator;
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private volatile boolean stopping = false;
    private volatile boolean cancelled = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerTask$TaskMetricsGroup.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.0-rc-202105080004.jar:META-INF/bundled-dependencies/connect-runtime-2.3.0.jar:org/apache/kafka/connect/runtime/WorkerTask$TaskMetricsGroup.class */
    public static class TaskMetricsGroup implements TaskStatus.Listener {
        private final TaskStatus.Listener delegateListener;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Time time;
        private final StateTracker taskStateTimer = new StateTracker();
        private final Sensor commitTime;
        private final Sensor batchSize;
        private final Sensor commitAttempts;

        public TaskMetricsGroup(ConnectorTaskId connectorTaskId, ConnectMetrics connectMetrics, TaskStatus.Listener listener) {
            this.delegateListener = listener;
            this.time = connectMetrics.time();
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.taskGroupName(), registry.connectorTagName(), connectorTaskId.connector(), registry.taskTagName(), Integer.toString(connectorTaskId.task()));
            this.metricGroup.close();
            this.metricGroup.addValueMetric(registry.taskStatus, new ConnectMetrics.LiteralSupplier<String>() { // from class: org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public String metricValue(long j) {
                    return TaskMetricsGroup.this.taskStateTimer.currentState().toString().toLowerCase(Locale.getDefault());
                }
            });
            addRatioMetric(AbstractStatus.State.RUNNING, registry.taskRunningRatio);
            addRatioMetric(AbstractStatus.State.PAUSED, registry.taskPauseRatio);
            this.commitTime = this.metricGroup.sensor("commit-time");
            this.commitTime.add(this.metricGroup.metricName(registry.taskCommitTimeMax), new Max());
            this.commitTime.add(this.metricGroup.metricName(registry.taskCommitTimeAvg), new Avg());
            this.batchSize = this.metricGroup.sensor("batch-size");
            this.batchSize.add(this.metricGroup.metricName(registry.taskBatchSizeMax), new Max());
            this.batchSize.add(this.metricGroup.metricName(registry.taskBatchSizeAvg), new Avg());
            Frequencies forBooleanValues = Frequencies.forBooleanValues(this.metricGroup.metricName(registry.taskCommitFailurePercentage), this.metricGroup.metricName(registry.taskCommitSuccessPercentage));
            this.commitAttempts = this.metricGroup.sensor("offset-commit-completion");
            this.commitAttempts.add(forBooleanValues);
        }

        private void addRatioMetric(final AbstractStatus.State state, MetricNameTemplate metricNameTemplate) {
            MetricName metricName = this.metricGroup.metricName(metricNameTemplate);
            if (this.metricGroup.metrics().metric(metricName) == null) {
                this.metricGroup.metrics().addMetric(metricName, new Measurable() { // from class: org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup.2
                    @Override // org.apache.kafka.common.metrics.Measurable
                    public double measure(MetricConfig metricConfig, long j) {
                        return TaskMetricsGroup.this.taskStateTimer.durationRatio(state, j);
                    }
                });
            }
        }

        void close() {
            this.metricGroup.close();
        }

        void recordCommit(long j, boolean z, Throwable th) {
            if (!z) {
                this.commitAttempts.record(0.0d);
            } else {
                this.commitTime.record(j);
                this.commitAttempts.record(1.0d);
            }
        }

        void recordBatch(int i) {
            this.batchSize.record(i);
        }

        @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
        public void onStartup(ConnectorTaskId connectorTaskId) {
            this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
            this.delegateListener.onStartup(connectorTaskId);
        }

        @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
        public void onFailure(ConnectorTaskId connectorTaskId, Throwable th) {
            this.taskStateTimer.changeState(AbstractStatus.State.FAILED, this.time.milliseconds());
            this.delegateListener.onFailure(connectorTaskId, th);
        }

        @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
        public void onPause(ConnectorTaskId connectorTaskId) {
            this.taskStateTimer.changeState(AbstractStatus.State.PAUSED, this.time.milliseconds());
            this.delegateListener.onPause(connectorTaskId);
        }

        @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
        public void onResume(ConnectorTaskId connectorTaskId) {
            this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
            this.delegateListener.onResume(connectorTaskId);
        }

        @Override // org.apache.kafka.connect.runtime.TaskStatus.Listener
        public void onShutdown(ConnectorTaskId connectorTaskId) {
            this.taskStateTimer.changeState(AbstractStatus.State.UNASSIGNED, this.time.milliseconds());
            this.delegateListener.onShutdown(connectorTaskId);
        }

        public void recordState(TargetState targetState) {
            switch (targetState) {
                case STARTED:
                    this.taskStateTimer.changeState(AbstractStatus.State.RUNNING, this.time.milliseconds());
                    return;
                case PAUSED:
                    this.taskStateTimer.changeState(AbstractStatus.State.PAUSED, this.time.milliseconds());
                    return;
                default:
                    return;
            }
        }

        public AbstractStatus.State state() {
            return this.taskStateTimer.currentState();
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public WorkerTask(ConnectorTaskId connectorTaskId, TaskStatus.Listener listener, TargetState targetState, ClassLoader classLoader, ConnectMetrics connectMetrics, RetryWithToleranceOperator retryWithToleranceOperator) {
        this.id = connectorTaskId;
        this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, listener);
        this.statusListener = this.taskMetricsGroup;
        this.loader = classLoader;
        this.targetState = targetState;
        this.taskMetricsGroup.recordState(this.targetState);
        this.retryWithToleranceOperator = retryWithToleranceOperator;
    }

    public ConnectorTaskId id() {
        return this.id;
    }

    public ClassLoader loader() {
        return this.loader;
    }

    public abstract void initialize(TaskConfig taskConfig);

    private void triggerStop() {
        synchronized (this) {
            this.stopping = true;
            notifyAll();
        }
    }

    public void stop() {
        triggerStop();
    }

    public void cancel() {
        this.cancelled = true;
    }

    public boolean awaitStop(long j) {
        try {
            return this.shutdownLatch.await(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    protected abstract void execute();

    protected abstract void close();

    protected abstract void releaseResources();

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isStopping() {
        return this.stopping;
    }

    private void doClose() {
        try {
            close();
        } catch (Throwable th) {
            log.error("{} Task threw an uncaught and unrecoverable exception during shutdown", this, th);
            throw th;
        }
    }

    /*  JADX ERROR: NullPointerException in pass: AttachTryCatchVisitor
        java.lang.NullPointerException
        */
    private void doRun() throws java.lang.InterruptedException {
        /*
            r5 = this;
            r0 = r5     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r1 = r0     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r6 = r1     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            monitor-enter(r0)     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r5     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            boolean r0 = r0.stopping     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            if (r0 == 0) goto L12     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r6     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r5     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0.doClose()
            return
            r0 = r5
            org.apache.kafka.connect.runtime.TargetState r0 = r0.targetState
            org.apache.kafka.connect.runtime.TargetState r1 = org.apache.kafka.connect.runtime.TargetState.PAUSED
            if (r0 != r1) goto L2e
            r0 = r5
            r0.onPause()
            r0 = r5
            boolean r0 = r0.awaitUnpause()
            if (r0 != 0) goto L2e
            r0 = r6
            monitor-exit(r0)
            r0 = r5
            r0.doClose()
            return
            r0 = r5     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            org.apache.kafka.connect.runtime.TaskStatus$Listener r0 = r0.statusListener     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r1 = r5     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            org.apache.kafka.connect.util.ConnectorTaskId r1 = r1.id     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0.onStartup(r1)     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r6     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            goto L45     // Catch: java.lang.Throwable -> L40 java.lang.Throwable -> L50 java.lang.Throwable -> L6a
        L40:
            r7 = move-exception     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r6     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r7     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            throw r0     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r5     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0.execute()     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0 = r5     // Catch: java.lang.Throwable -> L50 java.lang.Throwable -> L6a
            r0.doClose()
            goto L71
        L50:
            r6 = move-exception     // Catch: java.lang.Throwable -> L6a
            org.slf4j.Logger r0 = org.apache.kafka.connect.runtime.WorkerTask.log     // Catch: java.lang.Throwable -> L6a
            java.lang.String r1 = "{} Task threw an uncaught and unrecoverable exception"     // Catch: java.lang.Throwable -> L6a
            r2 = r5     // Catch: java.lang.Throwable -> L6a
            r3 = r6     // Catch: java.lang.Throwable -> L6a
            r0.error(r1, r2, r3)     // Catch: java.lang.Throwable -> L6a
            org.slf4j.Logger r0 = org.apache.kafka.connect.runtime.WorkerTask.log     // Catch: java.lang.Throwable -> L6a
            java.lang.String r1 = "{} Task is being killed and will not recover until manually restarted"     // Catch: java.lang.Throwable -> L6a
            r2 = r5     // Catch: java.lang.Throwable -> L6a
            r0.error(r1, r2)     // Catch: java.lang.Throwable -> L6a
            r0 = r6     // Catch: java.lang.Throwable -> L6a
            throw r0     // Catch: java.lang.Throwable -> L6a
        L6a:
            r8 = move-exception     // Catch: java.lang.Throwable -> L6a
            r0 = r5     // Catch: java.lang.Throwable -> L6a
            r0.doClose()
            r0 = r8
            throw r0
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.runtime.WorkerTask.doRun():void");
    }

    private void onShutdown() {
        synchronized (this) {
            triggerStop();
            if (!this.cancelled) {
                this.statusListener.onShutdown(this.id);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onFailure(Throwable th) {
        synchronized (this) {
            triggerStop();
            if (!this.cancelled) {
                this.statusListener.onFailure(this.id, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void onPause() {
        this.statusListener.onPause(this.id);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void onResume() {
        this.statusListener.onResume(this.id);
    }

    @Override // java.lang.Runnable
    public void run() {
        LoggingContext.clear();
        LoggingContext forTask = LoggingContext.forTask(id());
        Throwable th = null;
        try {
            ClassLoader compareAndSwapLoaders = Plugins.compareAndSwapLoaders(this.loader);
            String name = Thread.currentThread().getName();
            try {
                try {
                    Thread.currentThread().setName(THREAD_NAME_PREFIX + this.id);
                    doRun();
                    onShutdown();
                    try {
                        Thread.currentThread().setName(name);
                        Plugins.compareAndSwapLoaders(compareAndSwapLoaders);
                        this.shutdownLatch.countDown();
                        try {
                            releaseResources();
                            this.taskMetricsGroup.close();
                        } finally {
                        }
                    } catch (Throwable th2) {
                        try {
                            releaseResources();
                            this.taskMetricsGroup.close();
                            throw th2;
                        } finally {
                        }
                    }
                } catch (Throwable th3) {
                    onFailure(th3);
                    if (th3 instanceof Error) {
                        throw ((Error) th3);
                    }
                    try {
                        Thread.currentThread().setName(name);
                        Plugins.compareAndSwapLoaders(compareAndSwapLoaders);
                        this.shutdownLatch.countDown();
                        try {
                            releaseResources();
                            this.taskMetricsGroup.close();
                        } finally {
                            this.taskMetricsGroup.close();
                        }
                    } catch (Throwable th4) {
                        try {
                            releaseResources();
                            this.taskMetricsGroup.close();
                            throw th4;
                        } finally {
                            this.taskMetricsGroup.close();
                        }
                    }
                }
                if (forTask != null) {
                    if (0 == 0) {
                        forTask.close();
                        return;
                    }
                    try {
                        forTask.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                }
            } catch (Throwable th6) {
                try {
                    Thread.currentThread().setName(name);
                    Plugins.compareAndSwapLoaders(compareAndSwapLoaders);
                    this.shutdownLatch.countDown();
                    try {
                        releaseResources();
                        this.taskMetricsGroup.close();
                        throw th6;
                    } finally {
                        this.taskMetricsGroup.close();
                    }
                } catch (Throwable th7) {
                    try {
                        releaseResources();
                        this.taskMetricsGroup.close();
                        throw th7;
                    } finally {
                        this.taskMetricsGroup.close();
                    }
                }
            }
        } catch (Throwable th8) {
            if (forTask != null) {
                if (0 != 0) {
                    try {
                        forTask.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    forTask.close();
                }
            }
            throw th8;
        }
    }

    public boolean shouldPause() {
        return this.targetState == TargetState.PAUSED;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean awaitUnpause() throws InterruptedException {
        synchronized (this) {
            while (this.targetState == TargetState.PAUSED) {
                if (this.stopping) {
                    return false;
                }
                wait();
            }
            return true;
        }
    }

    public void transitionTo(TargetState targetState) {
        synchronized (this) {
            if (this.stopping) {
                return;
            }
            this.targetState = targetState;
            notifyAll();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recordCommitSuccess(long j) {
        this.taskMetricsGroup.recordCommit(j, true, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recordCommitFailure(long j, Throwable th) {
        this.taskMetricsGroup.recordCommit(j, false, th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recordBatch(int i) {
        this.taskMetricsGroup.recordBatch(i);
    }

    TaskMetricsGroup taskMetricsGroup() {
        return this.taskMetricsGroup;
    }
}
