package org.mixql.engine.core;

import com.github.nscala_time.time.Imports$;
import com.github.nscala_time.time.RichReadableInstant$;
import com.github.nscala_time.time.RichReadableInterval$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.joda.time.DateTime;
import org.mixql.engine.core.logger.ModuleLogger;
import org.mixql.remote.RemoteMessageConverter;
import org.mixql.remote.messages.Message;
import org.mixql.remote.messages.broker.IBrokerSender;
import org.mixql.remote.messages.broker.PlatformPongHeartBeat;
import org.mixql.remote.messages.client.Execute;
import org.mixql.remote.messages.client.ExecuteFunction;
import org.mixql.remote.messages.client.GetDefinedFunctions;
import org.mixql.remote.messages.client.IModuleReceiver;
import org.mixql.remote.messages.client.IWorkerReceiver;
import org.mixql.remote.messages.client.ShutDown;
import org.mixql.remote.messages.module.ExecuteResult;
import org.mixql.remote.messages.module.ExecuteResultFailed;
import org.mixql.remote.messages.module.ExecutedFunctionResult;
import org.mixql.remote.messages.module.ExecutedFunctionResultFailed;
import org.mixql.remote.messages.module.GetDefinedFunctionsError;
import org.mixql.remote.messages.module.IModuleSendToClient;
import org.mixql.remote.messages.module.toBroker.EngineFailed;
import org.mixql.remote.messages.module.toBroker.EngineIsReady;
import org.mixql.remote.messages.module.toBroker.EnginePingHeartBeat;
import org.mixql.remote.messages.module.toBroker.IBrokerReceiverFromModule;
import org.mixql.remote.messages.module.worker.IWorkerSendToClient;
import org.mixql.remote.messages.module.worker.SendMsgToPlatform;
import org.mixql.remote.messages.module.worker.WorkerFinished;
import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import scala.Function0;
import scala.Function2;
import scala.Function3;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.StringOps$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Random$;
import scala.util.Success;
import scala.util.Try$;
import scala.util.matching.Regex;

/* compiled from: Module.scala */
/* loaded from: input_file:org/mixql/engine/core/Module.class */
public class Module {
    private final IModuleExecutor executor;
    private final String identity;
    private final String host;
    private final int port;
    private final ModuleLogger logger;
    private final Config config = ConfigFactory.load();
    private ZMQ.Context ctx = null;
    private ZMQ.Socket server = null;
    private ZMQ.Poller poller = null;
    private ZMQ.Poller workerPoller = null;
    private final long pollerTimeout = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(this::$init$$$anonfun$1).getOrElse(Module::$init$$$anonfun$2));
    private final long workerPollerTimeout = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(this::$init$$$anonfun$3).getOrElse(Module::$init$$$anonfun$4));
    private final long heartBeatInterval = BoxesRunTime.unboxToLong(Try$.MODULE$.apply(this::$init$$$anonfun$5).getOrElse(Module::$init$$$anonfun$6));
    private DateTime processStart = null;
    private final int livenessInit = BoxesRunTime.unboxToInt(Try$.MODULE$.apply(this::$init$$$anonfun$7).getOrElse(Module::$init$$$anonfun$8));
    private int liveness = this.livenessInit;
    private final Map workersMap = (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[0]));
    private final Random$ r = Random$.MODULE$;

    public Module(IModuleExecutor iModuleExecutor, String str, String str2, int i, ModuleLogger moduleLogger) {
        this.executor = iModuleExecutor;
        this.identity = str;
        this.host = str2;
        this.port = i;
        this.logger = moduleLogger;
    }

    public Config config() {
        return this.config;
    }

    public ZMQ.Context ctx() {
        return this.ctx;
    }

    public void ctx_$eq(ZMQ.Context context) {
        this.ctx = context;
    }

    public ZMQ.Socket server() {
        return this.server;
    }

    public void server_$eq(ZMQ.Socket socket) {
        this.server = socket;
    }

    public ZMQ.Poller poller() {
        return this.poller;
    }

    public void poller_$eq(ZMQ.Poller poller) {
        this.poller = poller;
    }

    public ZMQ.Poller workerPoller() {
        return this.workerPoller;
    }

    public void workerPoller_$eq(ZMQ.Poller poller) {
        this.workerPoller = poller;
    }

    public long pollerTimeout() {
        return this.pollerTimeout;
    }

    public long workerPollerTimeout() {
        return this.workerPollerTimeout;
    }

    public void startServer() {
        this.logger.logInfo("Starting main client");
        this.logger.logInfo(new StringBuilder(31).append("host of server is ").append(this.host).append(" and port is ").append(BoxesRunTime.boxToInteger(this.port).toString()).toString());
        try {
            try {
                ctx_$eq(ZMQ.context(1));
                server_$eq(ctx().socket(SocketType.DEALER));
                server().setIdentity(this.identity.getBytes());
                this.logger.logInfo(new StringBuilder(11).append("connected: ").append(server().connect(new StringBuilder(7).append("tcp://").append(this.host).append(":").append(BoxesRunTime.boxToInteger(this.port).toString()).toString())).toString());
                this.logger.logInfo("Connection established.");
                this.logger.logDebug("Setting processStart for timer");
                this.processStart = Imports$.MODULE$.DateTime().now();
                this.logger.logInfo("Setting poller");
                poller_$eq(ctx().poller(1));
                this.logger.logInfo("Setting workers poller");
                workerPoller_$eq(ctx().poller(14));
                this.logger.logInfo("Register server's socket pollin in poller");
                int register = poller().register(server(), 1);
                this.logger.logInfo("Sending READY message to server's broker");
                sendMsgToPlatformBroker(new EngineIsReady(Predef$.MODULE$.long2Long(this.heartBeatInterval), Predef$.MODULE$.long2Long(pollerTimeout())), this.logger);
                while (1 != 0) {
                    poller().poll(pollerTimeout());
                    int i = -1;
                    if (workerPoller().getSize() != 0) {
                        i = workerPoller().poll(workerPollerTimeout());
                    }
                    if (poller().pollin(register)) {
                        this.logger.logDebug("Setting processStart for timer, as message was received");
                        Message readMsgFromServerBroker = readMsgFromServerBroker(this.logger);
                        if (readMsgFromServerBroker instanceof IBrokerSender) {
                            this.logger.logDebug("got broker's service message");
                            reactOnReceivedBrokerMsg((IBrokerSender) readMsgFromServerBroker);
                        } else {
                            if (!(readMsgFromServerBroker instanceof IModuleReceiver)) {
                                throw new MatchError(readMsgFromServerBroker);
                            }
                            reactOnReceivedMsgForEngine((IModuleReceiver) readMsgFromServerBroker);
                        }
                        this.processStart = Imports$.MODULE$.DateTime().now();
                        this.liveness = this.livenessInit;
                    } else {
                        long millis$extension = RichReadableInterval$.MODULE$.millis$extension(Imports$.MODULE$.richReadableInterval(RichReadableInstant$.MODULE$.to$extension(Imports$.MODULE$.richReadableInstant(this.processStart), Imports$.MODULE$.DateTime().now())));
                        this.logger.logDebug(new StringBuilder(9).append("elapsed: ").append(millis$extension).toString());
                        if (millis$extension >= this.heartBeatInterval) {
                            this.processStart = Imports$.MODULE$.DateTime().now();
                            this.logger.logDebug(new StringBuilder(46).append("heartbeat work. Sending heart beat. Liveness: ").append(this.liveness).toString());
                            sendMsgToPlatformBroker(new EnginePingHeartBeat(), this.logger);
                            this.liveness--;
                            this.logger.logDebug(new StringBuilder(52).append("heartbeat work. After sending heart beat. Liveness: ").append(this.liveness).toString());
                        }
                        if (this.liveness < 0) {
                            this.logger.logError("heartbeat failure, can't reach server's broker. Shutting down");
                            throw new BrakeException();
                        }
                    }
                    if (i > 0) {
                        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), workerPoller().getSize()).foreach(i2 -> {
                            if (workerPoller().pollin(i2)) {
                                ZMQ.Socket socket = workerPoller().getSocket(i2);
                                Message unpackAnyMsgFromArray = RemoteMessageConverter.unpackAnyMsgFromArray(socket.recv(0));
                                if (unpackAnyMsgFromArray instanceof WorkerFinished) {
                                    WorkerFinished workerFinished = (WorkerFinished) unpackAnyMsgFromArray;
                                    this.logger.logInfo(new StringBuilder(74).append("Received message WorkerFinished from worker ").append(workerFinished.workerIdentity()).append(" Remove socket from workersMap").toString());
                                    workersMap().remove(workerFinished.Id);
                                    this.logger.logInfo(new StringBuilder(45).append("Unregister worker's ").append(workerFinished.workerIdentity()).append(" socket from workerPoller").toString());
                                    workerPoller().unregister(socket);
                                    this.logger.logInfo(new StringBuilder(24).append("Closing worker's ").append(workerFinished.workerIdentity()).append(" socket").toString());
                                    socket.close();
                                    return;
                                }
                                if (unpackAnyMsgFromArray instanceof SendMsgToPlatform) {
                                    SendMsgToPlatform sendMsgToPlatform = (SendMsgToPlatform) unpackAnyMsgFromArray;
                                    this.logger.logInfo(new StringBuilder(71).append("Received message SendMsgToPlatform from worker ").append(sendMsgToPlatform.workerIdentity()).append(" and send it to platform").toString());
                                    sendMsgToClient(sendMsgToPlatform.msg, this.logger);
                                } else {
                                    if (!(unpackAnyMsgFromArray instanceof IWorkerSendToClient)) {
                                        throw new MatchError(unpackAnyMsgFromArray);
                                    }
                                    IWorkerSendToClient iWorkerSendToClient = (IWorkerSendToClient) unpackAnyMsgFromArray;
                                    this.logger.logInfo(new StringBuilder(59).append("Received message of type IWorkerSendToPlatform from worker ").append(iWorkerSendToClient.workerIdentity()).append(new StringBuilder(34).append(" and proxy it (type: ").append(iWorkerSendToClient.type()).append(") to platform").toString()).toString());
                                    sendMsgToClient(iWorkerSendToClient, this.logger);
                                }
                            }
                        });
                    }
                }
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } catch (BrakeException unused) {
                this.logger.logDebug("BrakeException");
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } catch (Exception e) {
                this.logger.logError(new StringBuilder(7).append("Error: ").append(e.getMessage()).toString());
                BoxesRunTime.boxToBoolean(sendMsgToPlatformBroker(new EngineFailed(new StringBuilder(32).append("Module ").append(this.identity).append(" to broker: fatal error: ").append(e.getMessage()).toString()), this.logger));
            }
            close();
            this.logger.logInfo("Stopped.");
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    private void reactOnReceivedBrokerMsg(Message message) {
        if (!(message instanceof PlatformPongHeartBeat)) {
            throw new MatchError(message);
        }
        this.logger.logDebug("got pong heart beat message from broker server");
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    private void reactOnReceivedMsgForEngine(IModuleReceiver iModuleReceiver) {
        if (iModuleReceiver instanceof Execute) {
            reactOnExecuteMessageAsync((Execute) iModuleReceiver);
            return;
        }
        if (iModuleReceiver instanceof ShutDown) {
            this.logger.logInfo("Started shutdown");
            try {
                this.executor.reactOnShutDown(this.identity, iModuleReceiver.clientIdentity(), this.logger);
            } catch (Throwable th) {
                this.logger.logWarn(new StringBuilder(43).append("Warning: error while reacting on shutdown: ").append(th.getMessage()).toString());
            }
            throw new BrakeException();
        }
        if (iModuleReceiver instanceof ExecuteFunction) {
            reactOnExecuteFunctionMessageAsync((ExecuteFunction) iModuleReceiver);
            return;
        }
        if (!(iModuleReceiver instanceof GetDefinedFunctions)) {
            if (!(iModuleReceiver instanceof IWorkerReceiver)) {
                throw new MatchError(iModuleReceiver);
            }
            sendMessageToWorker((IWorkerReceiver) iModuleReceiver);
        } else {
            try {
                sendMsgToClient(this.executor.reactOnGetDefinedFunctions(this.identity, iModuleReceiver.clientIdentity(), this.logger), this.logger);
            } catch (Throwable th2) {
                sendMsgToClient(new GetDefinedFunctionsError(new StringBuilder(59).append("Module ").append(this.identity).append(" to ").append(iModuleReceiver.clientIdentity()).append(": error while reacting on getting").append(" functions list").append(th2.getMessage()).toString(), iModuleReceiver.clientIdentity()), this.logger);
            }
        }
    }

    public boolean sendMessageToWorker(IWorkerReceiver iWorkerReceiver) {
        String workerIdentity = iWorkerReceiver.workerIdentity();
        this.logger.logInfo(new StringBuilder(71).append("received message ").append(iWorkerReceiver.type()).append(" from platfrom to workers-future-").append(workerIdentity).append(" ").append("Sending it to worker").toString());
        return ((ZMQ.Socket) workersMap().apply(workerIdentity)).send(iWorkerReceiver.toByteArray());
    }

    public void reactOnExecuteMessageAsync(Execute execute) {
        reactOnRemoteMessageAsync(execute.clientIdentity(), (str, platformContext) -> {
            this.logger.logInfo(new StringBuilder(39).append("[workers-future-").append(str).append("]: triggering onExecute").toString());
            return this.executor.reactOnExecuteAsync(execute, this.identity, execute.clientIdentity(), this.logger, platformContext);
        }, (message, socket, str2) -> {
            socket.send(new SendMsgToPlatform(new ExecuteResult(execute.statement, message, execute.clientIdentity()), str2).toByteArray());
        }, (th, socket2, str3) -> {
            socket2.send(new SendMsgToPlatform(new ExecuteResultFailed(new StringBuilder(46).append("Module ").append(this.identity).append(" to ").append(execute.clientIdentity()).append(": error while reacting on execute: ").append(th.getMessage()).toString(), execute.clientIdentity()), str3).toByteArray());
        });
    }

    public void reactOnExecuteFunctionMessageAsync(ExecuteFunction executeFunction) {
        reactOnRemoteMessageAsync(executeFunction.clientIdentity(), (str, platformContext) -> {
            this.logger.logInfo(new StringBuilder(47).append("[workers-future-").append(str).append("]: triggering onExecuteFunction").toString());
            return this.executor.reactOnExecuteFunctionAsync(executeFunction, this.identity, executeFunction.clientIdentity(), this.logger, platformContext);
        }, (message, socket, str2) -> {
            socket.send(new SendMsgToPlatform(new ExecutedFunctionResult(executeFunction.name, message, executeFunction.clientIdentity()), str2).toByteArray());
        }, (th, socket2, str3) -> {
            socket2.send(new SendMsgToPlatform(new ExecutedFunctionResultFailed(new StringBuilder(53).append("Module ").append(this.identity).append(" to ").append(executeFunction.clientIdentity()).append(": error while reacting on execute function").append(new StringBuilder(2).append(executeFunction.name).append(": ").toString()).append(th.getMessage()).toString(), executeFunction.clientIdentity()), str3).toByteArray());
        });
    }

    public void reactOnRemoteMessageAsync(String str, Function2<String, PlatformContext, Message> function2, Function3<Message, ZMQ.Socket, String, BoxedUnit> function3, Function3<Throwable, ZMQ.Socket, String, BoxedUnit> function32) {
        String generateUnusedWorkersName = generateUnusedWorkersName();
        this.logger.logInfo(new StringBuilder(16).append("Creating worker ").append(generateUnusedWorkersName).toString());
        this.logger.logInfo(new StringBuilder(65).append("Register module's pair socket pollin in workersPoller for worker ").append(generateUnusedWorkersName).toString());
        ZMQ.Socket socket = ctx().socket(SocketType.PAIR);
        workerPoller().register(socket, 1);
        socket.bind(new StringBuilder(9).append("inproc://").append(generateUnusedWorkersName).toString());
        workersMap().put(generateUnusedWorkersName, socket);
        ObjectRef create = ObjectRef.create((Object) null);
        Future$.MODULE$.apply(() -> {
            return r1.reactOnRemoteMessageAsync$$anonfun$1(r2, r3, r4, r5);
        }, ExecutionContext$Implicits$.MODULE$.global()).onComplete(r10 -> {
            if (r10 instanceof Success) {
                function3.apply((Message) ((Success) r10).value(), (ZMQ.Socket) create.elem, generateUnusedWorkersName);
                this.logger.logInfo(new StringBuilder(54).append("[workers-future-").append(generateUnusedWorkersName).append("]: Sending WorkerFinished to inproc://").append(generateUnusedWorkersName).toString());
                ((ZMQ.Socket) create.elem).send(new WorkerFinished(generateUnusedWorkersName).toByteArray());
                this.logger.logInfo(new StringBuilder(55).append("[workers-future-").append(generateUnusedWorkersName).append("]: Close future's pair socket inproc://").append(generateUnusedWorkersName).toString());
                ((ZMQ.Socket) create.elem).close();
                return;
            }
            if (!(r10 instanceof Failure)) {
                throw new MatchError(r10);
            }
            function32.apply(((Failure) r10).exception(), (ZMQ.Socket) create.elem, generateUnusedWorkersName);
            this.logger.logInfo(new StringBuilder(54).append("[workers-future-").append(generateUnusedWorkersName).append("]: Sending WorkerFinished to inproc://").append(generateUnusedWorkersName).toString());
            ((ZMQ.Socket) create.elem).send(new WorkerFinished(generateUnusedWorkersName).toByteArray());
            this.logger.logInfo(new StringBuilder(55).append("[workers-future-").append(generateUnusedWorkersName).append("]: Close future's pair socket inproc://").append(generateUnusedWorkersName).toString());
            ((ZMQ.Socket) create.elem).close();
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public boolean sendMsgToPlatformBroker(IBrokerReceiverFromModule iBrokerReceiverFromModule, ModuleLogger moduleLogger) {
        moduleLogger.logDebug("sendMsgToPlatformBroker: Send msg to server ");
        return server().send(iBrokerReceiverFromModule.toByteArray());
    }

    public boolean sendMsgToClient(IModuleSendToClient iModuleSendToClient, ModuleLogger moduleLogger) {
        moduleLogger.logDebug("sendMsgToClient: Send msg to server ");
        return server().send(iModuleSendToClient.toByteArray());
    }

    public Message readMsgFromServerBroker(ModuleLogger moduleLogger) {
        moduleLogger.logDebug(new StringBuilder(53).append("readMsgFromServerBroker: received Identity of engine ").append(new String(server().recv(0))).toString());
        byte[] recv = server().recv(0);
        moduleLogger.logDebug(new StringBuilder(35).append("have received message from server: ").append(new String(recv)).toString());
        return RemoteMessageConverter.unpackAnyMsgFromArray(recv);
    }

    public Map<String, ZMQ.Socket> workersMap() {
        return this.workersMap;
    }

    public Random$ r() {
        return this.r;
    }

    public String generateUnusedWorkersName() {
        Regex r$extension = StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("[0-9]+"));
        Iterable iterable = (Iterable) workersMap().keys().map(str -> {
            return StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString((String) r$extension.findFirstIn(str).get()));
        });
        boolean z = false;
        IntRef create = IntRef.create(-1);
        while (!z) {
            create.elem = RichInt$.MODULE$.abs$extension(Predef$.MODULE$.intWrapper(r().nextInt()));
            Option find = iterable.find(i -> {
                return i == create.elem;
            });
            if (!(find instanceof Some)) {
                if (!None$.MODULE$.equals(find)) {
                    throw new MatchError(find);
                }
                z = true;
            }
        }
        return new StringBuilder(6).append("worker").append(create.elem).toString();
    }

    public void close() {
        Try$.MODULE$.apply(() -> {
            close$$anonfun$1();
            return BoxedUnit.UNIT;
        });
        if (workersMap().nonEmpty()) {
            workersMap().foreach(tuple2 -> {
                return Try$.MODULE$.apply(() -> {
                    return r1.close$$anonfun$2$$anonfun$1(r2);
                });
            });
        }
        Try$.MODULE$.apply(() -> {
            close$$anonfun$3();
            return BoxedUnit.UNIT;
        });
        Try$.MODULE$.apply(() -> {
            close$$anonfun$4();
            return BoxedUnit.UNIT;
        });
        try {
            if (ctx() != null) {
                this.logger.logInfo("finally close context");
                runWithTimeout(5000L, () -> {
                    close$$anonfun$5();
                    return BoxedUnit.UNIT;
                });
            }
        } catch (Throwable unused) {
            this.logger.logError("tiemout of closing context exceeded:(");
        }
    }

    public <T> Option<T> runWithTimeout(long j, Function0<T> function0) {
        return Some$.MODULE$.apply(Await$.MODULE$.result(Future$.MODULE$.apply(function0, ExecutionContext$Implicits$.MODULE$.global()), new package.DurationLong(package$.MODULE$.DurationLong(j)).milliseconds()));
    }

    private final long $init$$$anonfun$1() {
        return config().getLong("org.mixql.engine.module.pollerTimeout");
    }

    private static final long $init$$$anonfun$2() {
        return 100L;
    }

    private final long $init$$$anonfun$3() {
        return config().getLong("org.mixql.engine.module.workerPollerTimeout");
    }

    private static final long $init$$$anonfun$4() {
        return 95L;
    }

    private final long $init$$$anonfun$5() {
        return config().getLong("org.mixql.engine.module.heartBeatInterval");
    }

    private static final long $init$$$anonfun$6() {
        return 16500L;
    }

    private final int $init$$$anonfun$7() {
        return config().getInt("org.mixql.engine.module.liveness");
    }

    private static final int $init$$$anonfun$8() {
        return 3;
    }

    private final Message reactOnRemoteMessageAsync$$anonfun$1(String str, Function2 function2, String str2, ObjectRef objectRef) {
        this.logger.logInfo(new StringBuilder(78).append("[workers-future-").append(str2).append("]: Creating future's pair socket for communicating with module").toString());
        objectRef.elem = ctx().socket(SocketType.PAIR);
        this.logger.logInfo(new StringBuilder(57).append("[workers-future-").append(str2).append("]: Bind future's pair socket in inproc://").append(str2).toString());
        ((ZMQ.Socket) objectRef.elem).connect(new StringBuilder(9).append("inproc://").append(str2).toString());
        return (Message) function2.apply(str2, new PlatformContext((ZMQ.Socket) objectRef.elem, str2, str, this.logger));
    }

    private final void close$$anonfun$1$$anonfun$1() {
        server().close();
    }

    private final void close$$anonfun$1() {
        if (server() != null) {
            this.logger.logInfo("finally close server");
            runWithTimeout(5000L, () -> {
                close$$anonfun$1$$anonfun$1();
                return BoxedUnit.UNIT;
            });
        }
    }

    private static final void close$$anonfun$2$$anonfun$1$$anonfun$1(Tuple2 tuple2) {
        ((ZMQ.Socket) tuple2._2()).close();
    }

    private final Option close$$anonfun$2$$anonfun$1(Tuple2 tuple2) {
        return runWithTimeout(5000L, () -> {
            close$$anonfun$2$$anonfun$1$$anonfun$1(tuple2);
            return BoxedUnit.UNIT;
        });
    }

    private final void close$$anonfun$3$$anonfun$1() {
        poller().close();
    }

    private final void close$$anonfun$3() {
        if (poller() != null) {
            this.logger.logInfo("finally close poller");
            runWithTimeout(5000L, () -> {
                close$$anonfun$3$$anonfun$1();
                return BoxedUnit.UNIT;
            });
        }
    }

    private final void close$$anonfun$4$$anonfun$1() {
        workerPoller().close();
    }

    private final void close$$anonfun$4() {
        if (workerPoller() != null) {
            this.logger.logInfo("finally close workerPoller");
            runWithTimeout(5000L, () -> {
                close$$anonfun$4$$anonfun$1();
                return BoxedUnit.UNIT;
            });
        }
    }

    private final void close$$anonfun$5() {
        ctx().close();
    }
}
