/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.yarn;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.LevelDBProvider;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YarnShuffleService
extends AuxiliaryService {
    private final Logger logger = LoggerFactory.getLogger(YarnShuffleService.class);
    private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
    private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
    private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
    private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
    private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
    private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";
    @VisibleForTesting
    static int boundPort = -1;
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
    private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
    @VisibleForTesting
    static YarnShuffleService instance;
    private ShuffleSecretManager secretManager;
    private TransportServer shuffleServer = null;
    private Configuration _conf = null;
    @VisibleForTesting
    ExternalShuffleBlockHandler blockHandler;
    @VisibleForTesting
    File registeredExecutorFile;
    @VisibleForTesting
    File secretsFile;
    private DB db;

    public YarnShuffleService() {
        super("spark_shuffle");
        this.logger.info("Initializing YARN shuffle service for Spark");
        instance = this;
    }

    private boolean isAuthenticationEnabled() {
        return this.secretManager != null;
    }

    protected void serviceInit(Configuration conf) {
        this._conf = conf;
        try {
            this.registeredExecutorFile = this.findRecoveryDb(RECOVERY_FILE_NAME);
            TransportConf transportConf = new TransportConf("shuffle", (ConfigProvider)new HadoopConfigProvider(conf));
            this.blockHandler = new ExternalShuffleBlockHandler(transportConf, this.registeredExecutorFile);
            boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, false);
            ArrayList bootstraps = Lists.newArrayList();
            if (authEnabled) {
                this.createSecretManager();
                bootstraps.add(new SaslServerBootstrap(transportConf, (SecretKeyHolder)this.secretManager));
            }
            int port = conf.getInt(SPARK_SHUFFLE_SERVICE_PORT_KEY, 7337);
            TransportContext transportContext = new TransportContext(transportConf, (RpcHandler)this.blockHandler);
            this.shuffleServer = transportContext.createServer(port, (List)bootstraps);
            boundPort = port = this.shuffleServer.getPort();
            String authEnabledString = authEnabled ? "enabled" : "not enabled";
            this.logger.info("Started YARN shuffle service for Spark on port {}. Authentication is {}.  Registered executor file is {}", new Object[]{port, authEnabledString, this.registeredExecutorFile});
        }
        catch (Exception e) {
            this.logger.error("Failed to initialize external shuffle service", (Throwable)e);
        }
    }

    private void createSecretManager() throws IOException {
        this.secretManager = new ShuffleSecretManager();
        this.secretsFile = this.findRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
        LocalFileSystem fs = FileSystem.getLocal((Configuration)this._conf);
        fs.mkdirs(new Path(this.secretsFile.getPath()), new FsPermission(448));
        this.db = LevelDBProvider.initLevelDB((File)this.secretsFile, (LevelDBProvider.StoreVersion)CURRENT_VERSION, (ObjectMapper)mapper);
        this.logger.info("Recovery location is: " + this.secretsFile.getPath());
        if (this.db != null) {
            Map.Entry e;
            String key;
            this.logger.info("Going to reload spark shuffle data");
            DBIterator itr = this.db.iterator();
            itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
            while (itr.hasNext() && (key = new String((byte[])(e = (Map.Entry)itr.next()).getKey(), StandardCharsets.UTF_8)).startsWith(APP_CREDS_KEY_PREFIX)) {
                String id = YarnShuffleService.parseDbAppKey(key);
                ByteBuffer secret = (ByteBuffer)mapper.readValue((byte[])e.getValue(), ByteBuffer.class);
                this.logger.info("Reloading tokens for app: " + id);
                this.secretManager.registerApp(id, secret);
            }
        }
    }

    private static String parseDbAppKey(String s) throws IOException {
        if (!s.startsWith(APP_CREDS_KEY_PREFIX)) {
            throw new IllegalArgumentException("expected a string starting with AppCreds");
        }
        String json = s.substring(APP_CREDS_KEY_PREFIX.length() + 1);
        AppId parsed = (AppId)mapper.readValue(json, AppId.class);
        return parsed.appId;
    }

    private static byte[] dbAppKey(AppId appExecId) throws IOException {
        String appExecJson = mapper.writeValueAsString((Object)appExecId);
        String key = "AppCreds;" + appExecJson;
        return key.getBytes(StandardCharsets.UTF_8);
    }

    public void initializeApplication(ApplicationInitializationContext context) {
        String appId = context.getApplicationId().toString();
        try {
            ByteBuffer shuffleSecret = context.getApplicationDataForService();
            this.logger.info("Initializing application {}", (Object)appId);
            if (this.isAuthenticationEnabled()) {
                AppId fullId = new AppId(appId);
                if (this.db != null) {
                    byte[] key = YarnShuffleService.dbAppKey(fullId);
                    byte[] value = mapper.writeValueAsString((Object)shuffleSecret).getBytes(StandardCharsets.UTF_8);
                    this.db.put(key, value);
                }
                this.secretManager.registerApp(appId, shuffleSecret);
            }
        }
        catch (Exception e) {
            this.logger.error("Exception when initializing application {}", (Object)appId, (Object)e);
        }
    }

    public void stopApplication(ApplicationTerminationContext context) {
        String appId = context.getApplicationId().toString();
        try {
            this.logger.info("Stopping application {}", (Object)appId);
            if (this.isAuthenticationEnabled()) {
                AppId fullId = new AppId(appId);
                if (this.db != null) {
                    try {
                        this.db.delete(YarnShuffleService.dbAppKey(fullId));
                    }
                    catch (IOException e) {
                        this.logger.error("Error deleting {} from executor state db", (Object)appId, (Object)e);
                    }
                }
                this.secretManager.unregisterApp(appId);
            }
            this.blockHandler.applicationRemoved(appId, false);
        }
        catch (Exception e) {
            this.logger.error("Exception when stopping application {}", (Object)appId, (Object)e);
        }
    }

    public void initializeContainer(ContainerInitializationContext context) {
        ContainerId containerId = context.getContainerId();
        this.logger.info("Initializing container {}", (Object)containerId);
    }

    public void stopContainer(ContainerTerminationContext context) {
        ContainerId containerId = context.getContainerId();
        this.logger.info("Stopping container {}", (Object)containerId);
    }

    private File findRecoveryDb(String fileName) {
        String[] localDirs;
        for (String dir : localDirs = this._conf.getTrimmedStrings("yarn.nodemanager.local-dirs")) {
            File f = new File(new Path(dir).toUri().getPath(), fileName);
            if (!f.exists()) continue;
            return f;
        }
        return new File(new Path(localDirs[0]).toUri().getPath(), fileName);
    }

    protected void serviceStop() {
        try {
            if (this.shuffleServer != null) {
                this.shuffleServer.close();
            }
            if (this.blockHandler != null) {
                this.blockHandler.close();
            }
            if (this.db != null) {
                this.db.close();
            }
        }
        catch (Exception e) {
            this.logger.error("Exception when stopping service", (Throwable)e);
        }
    }

    public ByteBuffer getMetaData() {
        return ByteBuffer.allocate(0);
    }

    public static class AppId {
        public final String appId;

        @JsonCreator
        public AppId(@JsonProperty(value="appId") String appId) {
            this.appId = appId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AppId appExecId = (AppId)o;
            return Objects.equal((Object)this.appId, (Object)appExecId.appId);
        }

        public int hashCode() {
            return Objects.hashCode((Object[])new Object[]{this.appId});
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("appId", (Object)this.appId).toString();
        }
    }
}

