package org.apache.hadoop.yarn.nodelabels;

import com.google.common.collect.Sets;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.text.lookup.StringLookupFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;

/* loaded from: input_file:org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.class */
public class FileSystemNodeLabelsStore extends NodeLabelsStore {
    protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
    protected static final String DEFAULT_DIR_NAME = "node-labels";
    protected static final String MIRROR_FILENAME = "nodelabel.mirror";
    protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
    Path fsWorkingPath;
    FileSystem fs;
    FSDataOutputStream editlogOs;
    Path editLogPath;

    /* loaded from: input_file:org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore$SerializedLogType.class */
    protected enum SerializedLogType {
        ADD_LABELS,
        NODE_TO_LABELS,
        REMOVE_LABELS
    }

    public FileSystemNodeLabelsStore(CommonNodeLabelsManager commonNodeLabelsManager) {
        super(commonNodeLabelsManager);
    }

    private String getDefaultFSNodeLabelsRootDir() throws IOException {
        return "file:///tmp/hadoop-yarn-" + UserGroupInformation.getCurrentUser().getShortUserName() + "/" + DEFAULT_DIR_NAME;
    }

    @Override // org.apache.hadoop.yarn.nodelabels.NodeLabelsStore
    public void init(Configuration configuration) throws Exception {
        this.fsWorkingPath = new Path(configuration.get(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR, getDefaultFSNodeLabelsRootDir()));
        setFileSystem(configuration);
        if (this.fs.exists(this.fsWorkingPath)) {
            return;
        }
        this.fs.mkdirs(this.fsWorkingPath);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.fs.close();
            this.editlogOs.close();
        } catch (IOException e) {
            LOG.warn("Exception happened whiling shutting down,", e);
        }
    }

    void setFileSystem(Configuration configuration) throws IOException {
        Configuration configuration2 = new Configuration(configuration);
        configuration2.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
        configuration2.set(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY, configuration2.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC, "2000, 500"));
        this.fs = this.fsWorkingPath.getFileSystem(configuration2);
        if (this.fs.getScheme().equals(StringLookupFactory.KEY_FILE)) {
            this.fs = this.fs.getRaw();
        }
    }

    private void ensureAppendEditlogFile() throws IOException {
        this.editlogOs = this.fs.append(this.editLogPath);
    }

    private void ensureCloseEditlogFile() throws IOException {
        this.editlogOs.close();
    }

    @Override // org.apache.hadoop.yarn.nodelabels.NodeLabelsStore
    public void updateNodeToLabelsMappings(Map<NodeId, Set<String>> map) throws IOException {
        ensureAppendEditlogFile();
        this.editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
        ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest.newInstance(map)).getProto().writeDelimitedTo(this.editlogOs);
        ensureCloseEditlogFile();
    }

    @Override // org.apache.hadoop.yarn.nodelabels.NodeLabelsStore
    public void storeNewClusterNodeLabels(Set<String> set) throws IOException {
        ensureAppendEditlogFile();
        this.editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
        ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest.newInstance(set)).getProto().writeDelimitedTo(this.editlogOs);
        ensureCloseEditlogFile();
    }

    @Override // org.apache.hadoop.yarn.nodelabels.NodeLabelsStore
    public void removeClusterNodeLabels(Collection<String> collection) throws IOException {
        ensureAppendEditlogFile();
        this.editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
        ((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets.newHashSet(collection.iterator()))).getProto().writeDelimitedTo(this.editlogOs);
        ensureCloseEditlogFile();
    }

    @Override // org.apache.hadoop.yarn.nodelabels.NodeLabelsStore
    public void recover() throws IOException {
        Path path = new Path(this.fsWorkingPath, MIRROR_FILENAME);
        Path path2 = new Path(this.fsWorkingPath, "nodelabel.mirror.old");
        FSDataInputStream fSDataInputStream = null;
        if (this.fs.exists(path)) {
            fSDataInputStream = this.fs.open(path);
        } else if (this.fs.exists(path2)) {
            fSDataInputStream = this.fs.open(path2);
        }
        if (null != fSDataInputStream) {
            Set<String> nodeLabels = new AddToClusterNodeLabelsRequestPBImpl(YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(fSDataInputStream)).getNodeLabels();
            Map<NodeId, Set<String>> nodeToLabels = new ReplaceLabelsOnNodeRequestPBImpl(YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(fSDataInputStream)).getNodeToLabels();
            this.mgr.addToCluserNodeLabels(nodeLabels);
            this.mgr.replaceLabelsOnNode(nodeToLabels);
            fSDataInputStream.close();
        }
        this.editLogPath = new Path(this.fsWorkingPath, EDITLOG_FILENAME);
        if (this.fs.exists(this.editLogPath)) {
            FSDataInputStream open = this.fs.open(this.editLogPath);
            while (true) {
                try {
                    switch (SerializedLogType.values()[open.readInt()]) {
                        case ADD_LABELS:
                            this.mgr.addToCluserNodeLabels(Sets.newHashSet(YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(open).getNodeLabelsList().iterator()));
                            break;
                        case REMOVE_LABELS:
                            this.mgr.removeFromClusterNodeLabels(YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(open).getNodeLabelsList());
                            break;
                        case NODE_TO_LABELS:
                            this.mgr.replaceLabelsOnNode(new ReplaceLabelsOnNodeRequestPBImpl(YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(open)).getNodeToLabels());
                            break;
                    }
                } catch (EOFException e) {
                }
            }
        }
        Path path3 = new Path(this.fsWorkingPath, "nodelabel.mirror.writing");
        OutputStream create = this.fs.create(path3, true);
        ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl.newInstance(this.mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(create);
        ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest.newInstance(this.mgr.getNodeLabels())).getProto().writeDelimitedTo(create);
        create.close();
        if (this.fs.exists(path)) {
            this.fs.delete(path2, false);
            this.fs.rename(path, path2);
        }
        this.fs.rename(path3, path);
        this.fs.delete(path3, false);
        this.fs.delete(path2, false);
        this.editlogOs = this.fs.create(this.editLogPath, true);
        this.editlogOs.close();
        LOG.info("Finished write mirror at:" + path.toString());
        LOG.info("Finished create editlog file at:" + this.editLogPath.toString());
    }
}
