/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.worker;

import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.net.BlockingSendException;
import edu.iu.dsc.tws.api.net.StatusCode;
import edu.iu.dsc.tws.api.net.request.ConnectHandler;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.api.net.request.RequestID;
import edu.iu.dsc.tws.api.resource.IAllJoinedListener;
import edu.iu.dsc.tws.api.resource.IReceiverFromDriver;
import edu.iu.dsc.tws.api.resource.IScalerListener;
import edu.iu.dsc.tws.api.resource.IWorkerFailureListener;
import edu.iu.dsc.tws.checkpointing.client.CheckpointingClientImpl;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingContext;
import edu.iu.dsc.tws.common.net.tcp.Progress;
import edu.iu.dsc.tws.common.net.tcp.request.RRClient;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.worker.JMDriverAgent;
import edu.iu.dsc.tws.master.worker.JMWorkerController;
import edu.iu.dsc.tws.master.worker.JMWorkerStatusUpdater;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class JMWorkerAgent {
    private static final Logger LOG = Logger.getLogger(JMWorkerAgent.class.getName());
    private static Progress looper;
    private boolean stopLooper = false;
    private Config config;
    private JobMasterAPI.WorkerInfo thisWorker;
    private String jmAddress;
    private int jmPort;
    private RRClient rrClient;
    private JMWorkerController workerController;
    private JMDriverAgent driverAgent;
    private JMWorkerStatusUpdater statusUpdater;
    private boolean registrationSucceeded;
    private boolean disconnected = false;
    private boolean reconnect = false;
    private boolean reconnected = false;
    private int numberOfWorkers;
    private IScalerListener scalerListener;
    private LinkedList<JobMasterAPI.JobScaled> scaledEventBuffer = new LinkedList();
    private IAllJoinedListener allJoinedListener;
    private LinkedList<JobMasterAPI.AllJoined> allJoinedEventBuffer = new LinkedList();
    private static JMWorkerAgent workerAgent;
    private static final long CONNECTION_TRY_TIME_LIMIT = 100000L;
    private CheckpointingClientImpl checkpointClient;
    private int restartCount;
    private JobMasterAPI.WorkerState initialState;

    private JMWorkerAgent(Config config, JobMasterAPI.WorkerInfo thisWorker, String jmAddress, int jmPort, int numberOfWorkers, int restartCount) {
        this.config = config;
        this.thisWorker = thisWorker;
        this.jmAddress = jmAddress;
        this.jmPort = jmPort;
        this.numberOfWorkers = numberOfWorkers;
        this.restartCount = restartCount;
        this.initialState = restartCount > 0 ? JobMasterAPI.WorkerState.RESTARTED : JobMasterAPI.WorkerState.STARTED;
    }

    public static JMWorkerAgent createJMWorkerAgent(Config config, JobMasterAPI.WorkerInfo thisWorker, String jmAddress, int jmPort, int numberOfWorkers, int restartCount) {
        if (workerAgent != null) {
            return workerAgent;
        }
        workerAgent = new JMWorkerAgent(config, thisWorker, jmAddress, jmPort, numberOfWorkers, restartCount);
        return workerAgent;
    }

    public static JMWorkerAgent getJMWorkerAgent() {
        return workerAgent;
    }

    private void init() {
        looper = new Progress();
        ClientConnectHandler connectHandler = new ClientConnectHandler();
        this.rrClient = new RRClient(this.jmAddress, this.jmPort, null, looper, this.thisWorker.getWorkerID(), (ConnectHandler)connectHandler);
        this.driverAgent = new JMDriverAgent(this.rrClient, this.thisWorker.getWorkerID());
        this.statusUpdater = new JMWorkerStatusUpdater(this.rrClient, this.thisWorker.getWorkerID(), this.config);
        ResponseMessageHandler handler = new ResponseMessageHandler();
        this.rrClient.registerResponseHandler((Message.Builder)JobMasterAPI.RegisterWorker.newBuilder(), (MessageHandler)handler);
        this.rrClient.registerResponseHandler((Message.Builder)JobMasterAPI.RegisterWorkerResponse.newBuilder(), (MessageHandler)handler);
        this.rrClient.registerResponseHandler((Message.Builder)JobMasterAPI.JobScaled.newBuilder(), (MessageHandler)handler);
        this.rrClient.registerResponseHandler((Message.Builder)JobMasterAPI.AllJoined.newBuilder(), (MessageHandler)handler);
        this.checkpointClient = new CheckpointingClientImpl(this.rrClient, CheckpointingContext.getRequestTimeout((Config)this.config));
        this.workerController = new JMWorkerController(this.config, this.thisWorker, this.numberOfWorkers, this.restartCount, this.rrClient, (CheckpointingClient)this.checkpointClient);
        this.tryUntilConnected(100000L);
        if (!this.rrClient.isConnected()) {
            throw new RuntimeException("JMWorkerAgent can not connect to Job Master. Exiting .....");
        }
        this.checkpointClient.init();
    }

    private void startLooping() {
        while (!this.stopLooper) {
            looper.loopBlocking();
            if (!this.reconnect) continue;
            LOG.fine("Worker is disconnecting from JobMaster from previous session.");
            this.rrClient.disconnect();
            this.rrClient.setHostAndPort(this.jmAddress, this.jmPort);
            this.reconnected = this.tryUntilConnected(100000L);
            if (this.reconnected) {
                LOG.info("Worker " + this.thisWorker.getWorkerID() + " Re-connected to JobMaster.");
            } else {
                LOG.info("Worker " + this.thisWorker.getWorkerID() + " could not re-connect to JobMaster.");
            }
            this.reconnect = false;
        }
        this.rrClient.disconnect();
    }

    public Thread startThreaded() {
        this.init();
        Thread jmThread = new Thread(this::startLooping);
        jmThread.setName("JM-Agent-WorkerID " + this.thisWorker.getWorkerID());
        jmThread.setDaemon(true);
        jmThread.start();
        boolean registered = this.registerWorker();
        if (!registered && this.disconnected) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            registered = this.reconnect(this.jmAddress);
        }
        if (!registered && this.disconnected) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            registered = this.reconnect(this.jmAddress);
        }
        if (!registered) {
            this.close();
            throw new RuntimeException("Could not register Worker with JobMaster. Exiting .....");
        }
        return jmThread;
    }

    public void startBlocking() {
        this.init();
        this.startLooping();
        boolean registered = this.registerWorker();
        if (!registered) {
            this.close();
            throw new RuntimeException("Could not register Worker with JobMaster. Exiting .....");
        }
    }

    public boolean tryUntilConnected(long timeLimit) {
        long logInterval;
        long startTime = System.currentTimeMillis();
        long duration = 0L;
        long connectAttemptInterval = 100L;
        long maxConnectAttemptInterval = 1000L;
        long nextLogTime = logInterval = 1000L;
        while (duration < timeLimit && !this.rrClient.isConnected()) {
            this.rrClient.tryConnecting();
            try {
                looper.loop();
            }
            catch (CancelledKeyException cke) {
                LOG.severe(cke.getMessage() + " Will try to reconnect ...");
                this.rrClient.disconnect();
            }
            if (this.rrClient.isConnected()) {
                return true;
            }
            try {
                Thread.sleep(connectAttemptInterval);
            }
            catch (InterruptedException e) {
                LOG.warning("Sleep interrupted.");
            }
            if (connectAttemptInterval < maxConnectAttemptInterval) {
                connectAttemptInterval = Math.min(connectAttemptInterval * 2L, maxConnectAttemptInterval);
            }
            if ((duration = System.currentTimeMillis() - startTime) <= nextLogTime) continue;
            LOG.info("Still trying to connect to the Job Master: " + this.jmAddress + ":" + this.jmPort);
            nextLogTime += logInterval;
        }
        return false;
    }

    public boolean reconnect(String jobMasterAddress) {
        this.jmAddress = jobMasterAddress;
        this.reconnect = true;
        this.reconnected = false;
        looper.wakeup();
        long startTime = System.currentTimeMillis();
        long delay = 0L;
        while (!this.reconnected && delay < 100000L) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                LOG.warning("Sleep interrupted. Will try again.");
            }
            delay = System.currentTimeMillis() - startTime;
        }
        if (!this.reconnected) {
            throw new RuntimeException("Could not reconnect Worker with JobMaster. Exiting .....");
        }
        LOG.info("Worker re-registering with JobMaster to initialize things.");
        return this.registerWorker();
    }

    public JobMasterAPI.WorkerInfo getWorkerInfo() {
        return this.thisWorker;
    }

    public JMWorkerController getJMWorkerController() {
        return this.workerController;
    }

    public JMDriverAgent getDriverAgent() {
        return this.driverAgent;
    }

    public JMWorkerStatusUpdater getStatusUpdater() {
        return this.statusUpdater;
    }

    public CheckpointingClientImpl getCheckpointClient() {
        return this.checkpointClient;
    }

    public static boolean addScalerListener(IScalerListener scalerListener) {
        if (JMWorkerAgent.workerAgent.scalerListener != null) {
            return false;
        }
        JMWorkerAgent.workerAgent.scalerListener = scalerListener;
        workerAgent.deliverBufferedScaledEvents();
        return true;
    }

    public static boolean addAllJoinedListener(IAllJoinedListener iAllJoinedListener) {
        if (JMWorkerAgent.workerAgent.allJoinedListener != null) {
            return false;
        }
        JMWorkerAgent.workerAgent.allJoinedListener = iAllJoinedListener;
        workerAgent.deliverBufferedAllJoinedEvents();
        return true;
    }

    public static boolean addReceiverFromDriver(IReceiverFromDriver receiverFromDriver) {
        return workerAgent.getDriverAgent().addReceiverFromDriver(receiverFromDriver);
    }

    public static boolean addWorkerFailureListener(IWorkerFailureListener workerFailureListener) {
        return workerAgent.getStatusUpdater().addWorkerFailureListener(workerFailureListener);
    }

    private boolean registerWorker() {
        JobMasterAPI.RegisterWorker registerWorker = JobMasterAPI.RegisterWorker.newBuilder().setWorkerID(this.thisWorker.getWorkerID()).setWorkerInfo(this.thisWorker).setRestartCount(this.restartCount).build();
        LOG.fine("Sending RegisterWorker message: \n" + registerWorker);
        try {
            this.rrClient.sendRequestWaitResponse((Message)registerWorker, JobMasterContext.responseWaitDuration(this.config));
            if (this.registrationSucceeded) {
                LOG.info("Registered worker[" + this.thisWorker.getWorkerID() + "] with JobMaster.");
            }
            return this.registrationSucceeded;
        }
        catch (BlockingSendException bse) {
            LOG.log(Level.SEVERE, bse.getMessage(), bse);
            return false;
        }
    }

    public boolean sendWorkerCompletedMessage(JobMasterAPI.WorkerState finalState) {
        return this.statusUpdater.updateWorkerStatus(finalState);
    }

    public void close() {
        this.stopLooper = true;
        looper.wakeup();
    }

    private void deliverBufferedScaledEvents() {
        while (!this.scaledEventBuffer.isEmpty()) {
            this.deliverToScalerListener(this.scaledEventBuffer.poll());
        }
    }

    private void deliverBufferedAllJoinedEvents() {
        while (!this.allJoinedEventBuffer.isEmpty()) {
            this.deliverToAllJoinedListener(this.allJoinedEventBuffer.poll());
        }
    }

    private void deliverToScalerListener(JobMasterAPI.JobScaled event) {
        if (event.getChange() > 0) {
            this.scalerListener.workersScaledUp(event.getChange());
        } else if (event.getChange() < 0) {
            this.scalerListener.workersScaledDown(0 - event.getChange());
        }
    }

    private void deliverToAllJoinedListener(JobMasterAPI.AllJoined event) {
        this.allJoinedListener.allWorkersJoined(event.getWorkerInfoList());
    }

    public class ClientConnectHandler
    implements ConnectHandler {
        public void onError(SocketChannel channel, StatusCode status) {
            JMWorkerAgent.this.disconnected = true;
        }

        public void onConnect(SocketChannel channel) {
            JMWorkerAgent.this.disconnected = false;
            LOG.info("Worker " + JMWorkerAgent.this.thisWorker.getWorkerID() + " connected to JobMaster: " + channel);
        }

        public void onClose(SocketChannel channel) {
        }
    }

    class ResponseMessageHandler
    implements MessageHandler {
        ResponseMessageHandler() {
        }

        public void onMessage(RequestID id, int workerId, Message message) {
            if (message instanceof JobMasterAPI.RegisterWorkerResponse) {
                LOG.fine("Received a RegisterWorkerResponse message from the master. \n" + message);
                JobMasterAPI.RegisterWorkerResponse responseMessage = (JobMasterAPI.RegisterWorkerResponse)message;
                JMWorkerAgent.this.registrationSucceeded = responseMessage.getResult();
            } else if (message instanceof JobMasterAPI.JobScaled) {
                LOG.fine("Received " + message.getClass().getSimpleName() + " message from the master. \n" + message);
                JobMasterAPI.JobScaled scaledMessage = (JobMasterAPI.JobScaled)message;
                if (JMWorkerAgent.this.scalerListener == null) {
                    JMWorkerAgent.this.scaledEventBuffer.add(scaledMessage);
                } else {
                    JMWorkerAgent.this.deliverToScalerListener(scaledMessage);
                }
                JMWorkerAgent.this.workerController.scaled(scaledMessage.getChange(), scaledMessage.getNumberOfWorkers());
            } else if (message instanceof JobMasterAPI.AllJoined) {
                JobMasterAPI.AllJoined joinedMessage = (JobMasterAPI.AllJoined)message;
                if (JMWorkerAgent.this.allJoinedListener == null) {
                    JMWorkerAgent.this.allJoinedEventBuffer.add(joinedMessage);
                } else {
                    JMWorkerAgent.this.deliverToAllJoinedListener(joinedMessage);
                }
            } else {
                LOG.warning("Received message unrecognized. \n" + message);
            }
        }
    }
}

