/*
 * Decompiled with CFR 0.152.
 */
package io.littlehorse.sdk.worker.internal;

import io.grpc.stub.StreamObserver;
import io.littlehorse.sdk.common.config.LHConfig;
import io.littlehorse.sdk.common.proto.LHHostInfo;
import io.littlehorse.sdk.common.proto.LittleHorseGrpc;
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerRequest;
import io.littlehorse.sdk.common.proto.RegisterTaskWorkerResponse;
import io.littlehorse.sdk.common.proto.TaskDef;
import io.littlehorse.sdk.worker.internal.LHLivenessController;
import io.littlehorse.sdk.worker.internal.PollThread;
import io.littlehorse.sdk.worker.internal.util.VariableMapping;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class RebalanceThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(RebalanceThread.class);
    private final LittleHorseGrpc.LittleHorseStub bootstrapStub;
    private final String taskWorkerId;
    private final String connectListenerName;
    private final TaskDef taskDef;
    private final HeartBeatCallback heartBeatCallback = new HeartBeatCallback();
    private final Method taskMethod;
    private final List<VariableMapping> mappings;
    private final Object executable;
    private final LHConfig config;
    private final Map<LHHostInfo, List<PollThread>> runningConnections = new ConcurrentHashMap<LHHostInfo, List<PollThread>>();
    private final LHLivenessController livenessController;
    private final long heartbeatIntervalMs;

    public RebalanceThread(LittleHorseGrpc.LittleHorseStub bootstrapStub, String taskWorkerId, String connectListenerName, TaskDef taskDef, Method taskMethod, List<VariableMapping> mappings, Object executable, LHConfig config, LHLivenessController livenessController, long heartbeatIntervalMs) {
        this.bootstrapStub = bootstrapStub;
        this.taskWorkerId = taskWorkerId;
        this.connectListenerName = connectListenerName;
        this.taskDef = taskDef;
        this.taskMethod = taskMethod;
        this.mappings = mappings;
        this.executable = executable;
        this.config = config;
        this.livenessController = livenessController;
        this.heartbeatIntervalMs = heartbeatIntervalMs;
    }

    @Override
    public void run() {
        while (this.livenessController.keepWorkerRunning()) {
            this.doHeartBeat();
            this.waitForInterval();
        }
    }

    public void doHeartBeat() {
        this.bootstrapStub.registerTaskWorker(RegisterTaskWorkerRequest.newBuilder().setTaskDefId(this.taskDef.getId()).setTaskWorkerId(this.taskWorkerId).setListenerName(this.connectListenerName).build(), this.heartBeatCallback);
    }

    private PollThread createConnection(LittleHorseGrpc.LittleHorseStub stub, String threadName) {
        return new PollThread(threadName, stub, this.bootstrapStub, this.taskDef.getId(), this.taskWorkerId, this.config.getTaskWorkerVersion(), this.mappings, this.executable, this.taskMethod);
    }

    private void waitForInterval() {
        try {
            Thread.sleep(this.heartbeatIntervalMs);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private final class HeartBeatCallback
    implements StreamObserver<RegisterTaskWorkerResponse> {
        private HeartBeatCallback() {
        }

        public void onNext(RegisterTaskWorkerResponse response) {
            RebalanceThread.this.livenessController.notifySuccessCall(response);
            List<LHHostInfo> availableHosts = response.getYourHostsList();
            for (LHHostInfo runningConnection : RebalanceThread.this.runningConnections.keySet()) {
                if (!availableHosts.contains(runningConnection)) {
                    for (PollThread removed : RebalanceThread.this.runningConnections.remove(runningConnection)) {
                        removed.close();
                    }
                }
                LittleHorseGrpc.LittleHorseStub stub = RebalanceThread.this.config.getAsyncStub(runningConnection.getHost(), runningConnection.getPort());
                List<PollThread> originalPollThreads = RebalanceThread.this.runningConnections.get(runningConnection);
                boolean hasAPollThreadFailed = false;
                for (PollThread p : originalPollThreads) {
                    if (p.isRunning()) continue;
                    hasAPollThreadFailed = true;
                    break;
                }
                if (!hasAPollThreadFailed) continue;
                ArrayList<PollThread> newPollThreads = new ArrayList<PollThread>();
                for (int i = 0; i < originalPollThreads.size(); ++i) {
                    PollThread currentPollThread = originalPollThreads.get(i);
                    if (currentPollThread.isRunning()) {
                        newPollThreads.add(currentPollThread);
                        continue;
                    }
                    String threadName = String.format("lh-poll-%s", i);
                    PollThread connection = RebalanceThread.this.createConnection(stub, threadName);
                    connection.start();
                    newPollThreads.add(connection);
                }
                RebalanceThread.this.runningConnections.put(runningConnection, newPollThreads);
            }
            for (LHHostInfo lhHostInfo : availableHosts) {
                if (RebalanceThread.this.runningConnections.containsKey(lhHostInfo)) continue;
                ArrayList<PollThread> connections = new ArrayList<PollThread>();
                LittleHorseGrpc.LittleHorseStub stub = RebalanceThread.this.config.getAsyncStub(lhHostInfo.getHost(), lhHostInfo.getPort());
                for (int i = 0; i < RebalanceThread.this.config.getWorkerThreads(); ++i) {
                    String threadName = String.format("lh-poll-%s", i);
                    PollThread connection = RebalanceThread.this.createConnection(stub, threadName);
                    connection.start();
                    connections.add(connection);
                }
                RebalanceThread.this.runningConnections.put(lhHostInfo, connections);
            }
        }

        public void onError(Throwable t) {
            RebalanceThread.this.livenessController.notifyWorkerFailure();
        }

        public void onCompleted() {
        }
    }
}

