/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.server.agent;

import com.mantisrx.common.utils.Services;
import io.mantisrx.runtime.loader.ClassLoaderHandle;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.TaskFactory;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.server.agent.BlobStore;
import io.mantisrx.server.agent.BlobStoreAwareClassLoaderHandle;
import io.mantisrx.server.agent.TaskExecutor;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.MantisAkkaRpcSystemLoader;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.beans.ConstructorProperties;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorStarter
extends AbstractIdleService {
    private static final Logger log = LoggerFactory.getLogger(TaskExecutorStarter.class);
    private final TaskExecutor taskExecutor;
    private final HighAvailabilityServices highAvailabilityServices;
    private final RpcSystem rpcSystem;

    protected void startUp() {
        this.highAvailabilityServices.startAsync().awaitRunning();
        this.taskExecutor.start();
    }

    protected void shutDown() throws Exception {
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.taskExecutor.closeAsync().exceptionally(throwable -> null)).thenCompose(dontCare -> Services.stopAsync((Service)this.highAvailabilityServices, (Executor)MoreExecutors.directExecutor()))).thenRunAsync(() -> ((RpcSystem)this.rpcSystem).close())).get();
    }

    public static TaskExecutorStarterBuilder builder(WorkerConfiguration workerConfiguration) {
        return new TaskExecutorStarterBuilder(workerConfiguration);
    }

    @ConstructorProperties(value={"taskExecutor", "highAvailabilityServices", "rpcSystem"})
    private TaskExecutorStarter(TaskExecutor taskExecutor, HighAvailabilityServices highAvailabilityServices, RpcSystem rpcSystem) {
        this.taskExecutor = taskExecutor;
        this.highAvailabilityServices = highAvailabilityServices;
        this.rpcSystem = rpcSystem;
    }

    public static class TaskExecutorStarterBuilder {
        private final WorkerConfiguration workerConfiguration;
        private Configuration configuration;
        @Nullable
        private RpcSystem rpcSystem;
        @Nullable
        private RpcService rpcService;
        @Nullable
        private ClassLoaderHandle classLoaderHandle;
        private final HighAvailabilityServices highAvailabilityServices;
        @Nullable
        private SinkSubscriptionStateHandler.Factory sinkSubscriptionHandlerFactory;
        @Nullable
        private TaskFactory taskFactory;
        private final List<Tuple2<TaskExecutor.Listener, Executor>> listeners = new ArrayList<Tuple2<TaskExecutor.Listener, Executor>>();

        private TaskExecutorStarterBuilder(WorkerConfiguration workerConfiguration) {
            this.workerConfiguration = workerConfiguration;
            this.configuration = new Configuration();
            this.highAvailabilityServices = HighAvailabilityServicesUtil.createHAServices((CoreConfiguration)workerConfiguration);
        }

        public TaskExecutorStarterBuilder configuration(Configuration configuration) {
            this.configuration = configuration;
            return this;
        }

        public TaskExecutorStarterBuilder rpcSystem(RpcSystem rpcSystem) {
            Preconditions.checkNotNull((Object)rpcSystem);
            this.rpcSystem = rpcSystem;
            return this;
        }

        private RpcSystem getRpcSystem() {
            if (this.rpcSystem == null) {
                return MantisAkkaRpcSystemLoader.getInstance();
            }
            return this.rpcSystem;
        }

        public TaskExecutorStarterBuilder rpcService(RpcService rpcService) {
            Preconditions.checkNotNull((Object)rpcService);
            this.rpcService = rpcService;
            return this;
        }

        private RpcService getRpcService() throws Exception {
            if (this.rpcService == null) {
                return RpcUtils.createRemoteRpcService((RpcSystem)this.getRpcSystem(), (Configuration)this.configuration, (String)this.workerConfiguration.getExternalAddress(), (String)this.workerConfiguration.getExternalPortRange(), (String)this.workerConfiguration.getBindAddress(), Optional.ofNullable(this.workerConfiguration.getBindPort()));
            }
            return this.rpcService;
        }

        public TaskExecutorStarterBuilder taskFactory(TaskFactory taskFactory) {
            this.taskFactory = taskFactory;
            return this;
        }

        public TaskExecutorStarterBuilder classLoaderHandle(ClassLoaderHandle classLoaderHandle) {
            this.classLoaderHandle = classLoaderHandle;
            return this;
        }

        private ClassLoaderHandle getClassLoaderHandle() throws Exception {
            if (this.classLoaderHandle == null) {
                return new BlobStoreAwareClassLoaderHandle(BlobStore.forHadoopFileSystem(this.workerConfiguration.getBlobStoreArtifactDir(), this.workerConfiguration.getLocalStorageDir()));
            }
            return this.classLoaderHandle;
        }

        public TaskExecutorStarterBuilder sinkSubscriptionHandlerFactory(SinkSubscriptionStateHandler.Factory sinkSubscriptionHandlerFactory) {
            this.sinkSubscriptionHandlerFactory = sinkSubscriptionHandlerFactory;
            return this;
        }

        public TaskExecutorStarterBuilder addListener(TaskExecutor.Listener listener, Executor executor) {
            this.listeners.add((Tuple2<TaskExecutor.Listener, Executor>)Tuple.of((Object)listener, (Object)executor));
            return this;
        }

        private SinkSubscriptionStateHandler.Factory getSinkSubscriptionHandlerFactory() {
            if (this.sinkSubscriptionHandlerFactory == null) {
                return SinkSubscriptionStateHandler.Factory.forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber((MantisMasterGateway)this.highAvailabilityServices.getMasterClientApi(), (Clock)Clock.systemDefaultZone());
            }
            return this.sinkSubscriptionHandlerFactory;
        }

        public TaskExecutorStarter build() throws Exception {
            TaskExecutor taskExecutor = new TaskExecutor(this.getRpcService(), this.workerConfiguration, this.highAvailabilityServices, this.getClassLoaderHandle(), this.getSinkSubscriptionHandlerFactory(), this.taskFactory);
            for (Tuple2<TaskExecutor.Listener, Executor> listener : this.listeners) {
                taskExecutor.addListener((TaskExecutor.Listener)listener._1(), (Executor)listener._2());
            }
            return new TaskExecutorStarter(taskExecutor, this.highAvailabilityServices, this.getRpcSystem());
        }
    }
}

