/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.master.worker;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.net.BlockingSendException;
import edu.iu.dsc.tws.api.net.StatusCode;
import edu.iu.dsc.tws.api.net.request.ConnectHandler;
import edu.iu.dsc.tws.api.net.request.MessageHandler;
import edu.iu.dsc.tws.api.net.request.RequestID;
import edu.iu.dsc.tws.api.resource.IAllJoinedListener;
import edu.iu.dsc.tws.api.resource.IReceiverFromDriver;
import edu.iu.dsc.tws.api.resource.IScalerListener;
import edu.iu.dsc.tws.checkpointing.client.CheckpointingClientImpl;
import edu.iu.dsc.tws.common.net.tcp.Progress;
import edu.iu.dsc.tws.common.net.tcp.request.RRClient;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.master.worker.JMSenderToDriver;
import edu.iu.dsc.tws.master.worker.JMWorkerController;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class JMWorkerAgent {
    private static final Logger LOG = Logger.getLogger(JMWorkerAgent.class.getName());
    private static Progress looper;
    private boolean stopLooper = false;
    private Config config;
    private JobMasterAPI.WorkerInfo thisWorker;
    private String masterAddress;
    private int masterPort;
    private RRClient rrClient;
    private JMWorkerController workerController;
    private JMSenderToDriver senderToDriver;
    private boolean registrationSucceeded;
    private int numberOfWorkers;
    private boolean connectionRefused = false;
    private IReceiverFromDriver receiverFromDriver;
    private LinkedList<JobMasterAPI.DriverMessage> messageBuffer = new LinkedList();
    private IScalerListener scalerListener;
    private LinkedList<JobMasterAPI.WorkersScaled> scaledEventBuffer = new LinkedList();
    private IAllJoinedListener allJoinedListener;
    private LinkedList<JobMasterAPI.WorkersJoined> allJoinedEventBuffer = new LinkedList();
    private static JMWorkerAgent workerAgent;
    private static final long CONNECTION_TRY_TIME_LIMIT = 100000L;
    private CheckpointingClientImpl checkpointClient;
    private JobMasterAPI.WorkerState initialState;

    private JMWorkerAgent(Config config, JobMasterAPI.WorkerInfo thisWorker, String masterHost, int masterPort, int numberOfWorkers, JobMasterAPI.WorkerState initialState) {
        this.config = config;
        this.thisWorker = thisWorker;
        this.masterAddress = masterHost;
        this.masterPort = masterPort;
        this.numberOfWorkers = numberOfWorkers;
        this.initialState = initialState;
    }

    public static JMWorkerAgent createJMWorkerAgent(Config config, JobMasterAPI.WorkerInfo thisWorker, String masterHost, int masterPort, int numberOfWorkers, JobMasterAPI.WorkerState initialState) {
        if (workerAgent != null) {
            return workerAgent;
        }
        workerAgent = new JMWorkerAgent(config, thisWorker, masterHost, masterPort, numberOfWorkers, initialState);
        return workerAgent;
    }

    public static JMWorkerAgent getJMWorkerAgent() {
        return workerAgent;
    }

    private void init() {
        looper = new Progress();
        ClientConnectHandler connectHandler = new ClientConnectHandler();
        this.rrClient = new RRClient(this.masterAddress, this.masterPort, null, looper, this.thisWorker.getWorkerID(), (ConnectHandler)connectHandler);
        this.senderToDriver = new JMSenderToDriver(this);
        JobMasterAPI.RegisterWorker.Builder registerWorkerBuilder = JobMasterAPI.RegisterWorker.newBuilder();
        JobMasterAPI.RegisterWorkerResponse.Builder registerWorkerResponseBuilder = JobMasterAPI.RegisterWorkerResponse.newBuilder();
        JobMasterAPI.WorkerStateChange.Builder stateChangeBuilder = JobMasterAPI.WorkerStateChange.newBuilder();
        JobMasterAPI.WorkerStateChangeResponse.Builder stateChangeResponseBuilder = JobMasterAPI.WorkerStateChangeResponse.newBuilder();
        JobMasterAPI.WorkerMessage.Builder workerMessageBuilder = JobMasterAPI.WorkerMessage.newBuilder();
        JobMasterAPI.WorkerMessageResponse.Builder workerResponseBuilder = JobMasterAPI.WorkerMessageResponse.newBuilder();
        JobMasterAPI.WorkersScaled.Builder scaledMessageBuilder = JobMasterAPI.WorkersScaled.newBuilder();
        JobMasterAPI.DriverMessage.Builder driverMessageBuilder = JobMasterAPI.DriverMessage.newBuilder();
        JobMasterAPI.WorkersJoined.Builder joinedBuilder = JobMasterAPI.WorkersJoined.newBuilder();
        ResponseMessageHandler responseMessageHandler = new ResponseMessageHandler();
        this.rrClient.registerResponseHandler((Message.Builder)registerWorkerBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)registerWorkerResponseBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)stateChangeBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)stateChangeResponseBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)workerMessageBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)workerResponseBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)scaledMessageBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)driverMessageBuilder, (MessageHandler)responseMessageHandler);
        this.rrClient.registerResponseHandler((Message.Builder)joinedBuilder, (MessageHandler)responseMessageHandler);
        this.tryUntilConnected(100000L);
        if (!this.rrClient.isConnected()) {
            throw new RuntimeException("JMWorkerAgent can not connect to Job Master. Exiting .....");
        }
    }

    private void initJMWorkerController() {
        this.checkpointClient = new CheckpointingClientImpl(this.rrClient);
        this.workerController = new JMWorkerController(this.config, this.thisWorker, this.rrClient, this.numberOfWorkers, (CheckpointingClient)this.checkpointClient);
        JobMasterAPI.ListWorkersRequest.Builder listRequestBuilder = JobMasterAPI.ListWorkersRequest.newBuilder();
        JobMasterAPI.ListWorkersResponse.Builder listResponseBuilder = JobMasterAPI.ListWorkersResponse.newBuilder();
        this.rrClient.registerResponseHandler((Message.Builder)listRequestBuilder, (MessageHandler)this.workerController);
        this.rrClient.registerResponseHandler((Message.Builder)listResponseBuilder, (MessageHandler)this.workerController);
        JobMasterAPI.BarrierRequest.Builder barrierRequestBuilder = JobMasterAPI.BarrierRequest.newBuilder();
        JobMasterAPI.BarrierResponse.Builder barrierResponseBuilder = JobMasterAPI.BarrierResponse.newBuilder();
        this.rrClient.registerResponseHandler((Message.Builder)barrierRequestBuilder, (MessageHandler)this.workerController);
        this.rrClient.registerResponseHandler((Message.Builder)barrierResponseBuilder, (MessageHandler)this.workerController);
        this.checkpointClient.init();
    }

    private void startLooping() {
        while (!this.stopLooper) {
            looper.loopBlocking();
        }
        this.rrClient.disconnect();
    }

    public Thread startThreaded() {
        this.init();
        Thread jmThread = new Thread(this::startLooping);
        jmThread.setName("JM Agent");
        jmThread.setDaemon(true);
        jmThread.start();
        boolean registered = this.registerWorker(this.initialState);
        if (!registered) {
            this.close();
            throw new RuntimeException("Could not register Worker with JobMaster. Exiting .....");
        }
        return jmThread;
    }

    public void startBlocking() {
        this.init();
        this.startLooping();
        boolean registered = this.registerWorker(this.initialState);
        if (!registered) {
            throw new RuntimeException("Could not register Worker with JobMaster. Exiting .....");
        }
    }

    public boolean tryUntilConnected(long timeLimit) {
        long logInterval;
        long startTime = System.currentTimeMillis();
        long duration = 0L;
        long sleepInterval = 50L;
        long nextLogTime = logInterval = 1000L;
        this.connectionRefused = true;
        while (duration < timeLimit) {
            if (this.connectionRefused) {
                this.rrClient.tryConnecting();
                this.connectionRefused = false;
            }
            looper.loop();
            if (this.rrClient.isConnected()) {
                return true;
            }
            try {
                Thread.sleep(sleepInterval);
            }
            catch (InterruptedException e) {
                LOG.warning("Sleep interrupted.");
            }
            if (this.rrClient.isConnected()) {
                return true;
            }
            duration = System.currentTimeMillis() - startTime;
            if (duration <= nextLogTime) continue;
            LOG.fine("Still trying to connect to the Job Master: " + this.masterAddress + ":" + this.masterPort);
            nextLogTime += logInterval;
        }
        return false;
    }

    public JobMasterAPI.WorkerInfo getWorkerInfo() {
        return this.thisWorker;
    }

    public JMWorkerController getJMWorkerController() {
        return this.workerController;
    }

    public JMSenderToDriver getSenderToDriver() {
        return this.senderToDriver;
    }

    public CheckpointingClientImpl getCheckpointClient() {
        return this.checkpointClient;
    }

    public static boolean addScalerListener(IScalerListener scalerListener) {
        if (JMWorkerAgent.workerAgent.scalerListener != null) {
            return false;
        }
        JMWorkerAgent.workerAgent.scalerListener = scalerListener;
        workerAgent.deliverBufferedScaledEvents();
        return true;
    }

    public static boolean addAllJoinedListener(IAllJoinedListener iAllJoinedListener) {
        if (JMWorkerAgent.workerAgent.allJoinedListener != null) {
            return false;
        }
        JMWorkerAgent.workerAgent.allJoinedListener = iAllJoinedListener;
        workerAgent.deliverBufferedAllJoinedEvents();
        return true;
    }

    public static boolean addReceiverFromDriver(IReceiverFromDriver receiverFromDriver) {
        if (JMWorkerAgent.workerAgent.receiverFromDriver != null) {
            return false;
        }
        JMWorkerAgent.workerAgent.receiverFromDriver = receiverFromDriver;
        workerAgent.deliverBufferedMessages();
        return true;
    }

    private boolean registerWorker(JobMasterAPI.WorkerState initState) {
        JobMasterAPI.RegisterWorker registerWorker = JobMasterAPI.RegisterWorker.newBuilder().setWorkerID(this.thisWorker.getWorkerID()).setWorkerInfo(this.thisWorker).setInitialState(initState).build();
        LOG.fine("Sending RegisterWorker message: \n" + registerWorker);
        try {
            this.rrClient.sendRequestWaitResponse((Message)registerWorker, JobMasterContext.responseWaitDuration(this.config));
            if (this.registrationSucceeded) {
                this.initJMWorkerController();
            }
            return this.registrationSucceeded;
        }
        catch (BlockingSendException bse) {
            LOG.log(Level.SEVERE, bse.getMessage(), bse);
            return false;
        }
    }

    public boolean sendWorkerCompletedMessage() {
        JobMasterAPI.WorkerStateChange workerStateChange = JobMasterAPI.WorkerStateChange.newBuilder().setWorkerID(this.thisWorker.getWorkerID()).setState(JobMasterAPI.WorkerState.COMPLETED).build();
        LOG.fine("Sending Worker COMPLETED message: \n" + workerStateChange);
        try {
            this.rrClient.sendRequestWaitResponse((Message)workerStateChange, JobMasterContext.responseWaitDuration(this.config));
        }
        catch (BlockingSendException e) {
            LOG.log(Level.SEVERE, String.format("%d Worker completed message failed", this.thisWorker.getWorkerID()), e);
            return false;
        }
        return true;
    }

    public boolean sendWorkerToDriverMessage(Message message) {
        JobMasterAPI.WorkerMessage workerMessage = JobMasterAPI.WorkerMessage.newBuilder().setData(Any.pack((Message)message).toByteString()).setWorkerID(this.thisWorker.getWorkerID()).build();
        RequestID requestID = this.rrClient.sendRequest((Message)workerMessage);
        if (requestID == null) {
            LOG.severe("Could not send WorkerToDriver message.");
            return false;
        }
        LOG.fine("Sent WorkerToDriver message: \n" + workerMessage);
        return true;
    }

    public void close() {
        this.stopLooper = true;
        looper.wakeup();
    }

    private void deliverBufferedScaledEvents() {
        while (!this.scaledEventBuffer.isEmpty()) {
            this.deliverToScalerListener(this.scaledEventBuffer.poll());
        }
    }

    private void deliverBufferedAllJoinedEvents() {
        while (!this.allJoinedEventBuffer.isEmpty()) {
            this.deliverToAllJoinedListener(this.allJoinedEventBuffer.poll());
        }
    }

    private void deliverBufferedMessages() {
        while (!this.messageBuffer.isEmpty()) {
            this.deliverMessageToReceiver(this.messageBuffer.poll());
        }
    }

    private void deliverToScalerListener(JobMasterAPI.WorkersScaled event) {
        if (event.getChange() > 0) {
            this.scalerListener.workersScaledUp(event.getChange());
        } else if (event.getChange() < 0) {
            this.scalerListener.workersScaledDown(0 - event.getChange());
        }
    }

    private void deliverToAllJoinedListener(JobMasterAPI.WorkersJoined event) {
        this.allJoinedListener.allWorkersJoined(event.getWorkerList());
    }

    private void deliverMessageToReceiver(JobMasterAPI.DriverMessage message) {
        try {
            Any any = Any.parseFrom((ByteString)message.getData());
            this.receiverFromDriver.driverMessageReceived(any);
        }
        catch (InvalidProtocolBufferException e) {
            LOG.log(Level.SEVERE, "Can not parse received protocol buffer message to Any", e);
        }
    }

    public class ClientConnectHandler
    implements ConnectHandler {
        public void onError(SocketChannel channel) {
        }

        public void onConnect(SocketChannel channel, StatusCode status) {
            if (status == StatusCode.SUCCESS) {
                LOG.fine(JMWorkerAgent.this.thisWorker.getWorkerID() + " JMWorkerAgent connected to JobMaster: " + channel);
            }
            if (status == StatusCode.CONNECTION_REFUSED) {
                JMWorkerAgent.this.connectionRefused = true;
            }
        }

        public void onClose(SocketChannel channel) {
        }
    }

    class ResponseMessageHandler
    implements MessageHandler {
        ResponseMessageHandler() {
        }

        public void onMessage(RequestID id, int workerId, Message message) {
            if (message instanceof JobMasterAPI.RegisterWorkerResponse) {
                LOG.fine("Received a RegisterWorkerResponse message from the master. \n" + message);
                JobMasterAPI.RegisterWorkerResponse responseMessage = (JobMasterAPI.RegisterWorkerResponse)message;
                JMWorkerAgent.this.registrationSucceeded = responseMessage.getResult();
            } else if (message instanceof JobMasterAPI.WorkerStateChangeResponse) {
                LOG.fine("Received a WorkerStateChange response from the master. \n" + message);
            } else if (message instanceof JobMasterAPI.WorkerMessageResponse) {
                LOG.fine("Received a WorkerMessageResponse from the master. \n" + message);
            } else if (message instanceof JobMasterAPI.DriverMessage) {
                JobMasterAPI.DriverMessage driverMessage = (JobMasterAPI.DriverMessage)message;
                if (JMWorkerAgent.this.receiverFromDriver == null) {
                    JMWorkerAgent.this.messageBuffer.add(driverMessage);
                } else {
                    JMWorkerAgent.this.deliverMessageToReceiver(driverMessage);
                }
            } else if (message instanceof JobMasterAPI.WorkersScaled) {
                LOG.fine("Received " + message.getClass().getSimpleName() + " message from the master. \n" + message);
                JobMasterAPI.WorkersScaled scaledMessage = (JobMasterAPI.WorkersScaled)message;
                if (JMWorkerAgent.this.scalerListener == null) {
                    JMWorkerAgent.this.scaledEventBuffer.add(scaledMessage);
                } else {
                    JMWorkerAgent.this.deliverToScalerListener(scaledMessage);
                }
                JMWorkerAgent.this.workerController.scaled(scaledMessage.getChange(), scaledMessage.getNumberOfWorkers());
            } else if (message instanceof JobMasterAPI.WorkersJoined) {
                JobMasterAPI.WorkersJoined joinedMessage = (JobMasterAPI.WorkersJoined)message;
                if (JMWorkerAgent.this.allJoinedListener == null) {
                    JMWorkerAgent.this.allJoinedEventBuffer.add(joinedMessage);
                } else {
                    JMWorkerAgent.this.deliverToAllJoinedListener(joinedMessage);
                }
            } else {
                LOG.warning("Received message unrecognized. \n" + message);
            }
        }
    }
}

