package org.apache.falcon.entity.store;

import java.io.IOException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBException;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.v0.AccessControlList;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/falcon/entity/store/ConfigurationStore.class */
public final class ConfigurationStore implements FalconService {
    private static final String UTF_8 = "UTF-8";
    private final boolean shouldPersist;
    private Set<ConfigurationChangeListener> listeners = new LinkedHashSet();
    private ThreadLocal<Entity> updatesInProgress = new ThreadLocal<>();
    private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary = new HashMap();
    private FileSystem fs;
    private Path storePath;
    private static final EntityType[] ENTITY_LOAD_ORDER = {EntityType.CLUSTER, EntityType.FEED, EntityType.PROCESS, EntityType.DATASOURCE};
    public static final EntityType[] ENTITY_DELETE_ORDER = {EntityType.PROCESS, EntityType.FEED, EntityType.CLUSTER};
    private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class);
    private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT");
    private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
    private static final Entity NULL = new Entity() { // from class: org.apache.falcon.entity.store.ConfigurationStore.1
        public String getName() {
            return "NULL";
        }

        public String getTags() {
            return null;
        }

        public AccessControlList getACL() {
            return null;
        }
    };
    private static final ConfigurationStore STORE = new ConfigurationStore();

    public static ConfigurationStore get() {
        return STORE;
    }

    private ConfigurationStore() {
        for (EntityType entityType : EntityType.values()) {
            this.dictionary.put(entityType, new ConcurrentHashMap<>());
        }
        this.shouldPersist = Boolean.parseBoolean(StartupProperties.get().getProperty("config.store.persist", "true"));
        if (this.shouldPersist) {
            this.storePath = new Path(StartupProperties.get().getProperty("config.store.uri"));
            this.fs = initializeFileSystem();
        }
    }

    private FileSystem initializeFileSystem() {
        try {
            FileSystem createFalconFileSystem = HadoopClientFactory.get().createFalconFileSystem(this.storePath.toUri());
            if (!createFalconFileSystem.exists(this.storePath)) {
                LOG.info("Creating configuration store directory: {}", this.storePath);
                HadoopClientFactory.mkdirs(createFalconFileSystem, this.storePath, STORE_PERMISSION);
            }
            return createFalconFileSystem;
        } catch (Exception e) {
            throw new RuntimeException("Unable to bring up config store for path: " + this.storePath, e);
        }
    }

    @Override // org.apache.falcon.service.FalconService
    public void init() throws FalconException {
        for (String str : StartupProperties.get().getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph").split(",")) {
            String trim = str.trim();
            if (!trim.isEmpty()) {
                registerListener((ConfigurationChangeListener) ReflectionUtils.getInstanceByClassName(trim));
            }
        }
        if (this.shouldPersist) {
            for (EntityType entityType : ENTITY_LOAD_ORDER) {
                loadEntity(entityType);
            }
        }
    }

    private void loadEntity(final EntityType entityType) throws FalconException {
        try {
            final ConcurrentHashMap<String, Entity> concurrentHashMap = this.dictionary.get(entityType);
            FileStatus[] globStatus = this.fs.globStatus(new Path(this.storePath, entityType.name() + CatalogStorage.OUTPUT_PATH_SEPARATOR + "*"));
            if (globStatus != null) {
                ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(100);
                for (final FileStatus fileStatus : globStatus) {
                    newFixedThreadPool.execute(new Runnable() { // from class: org.apache.falcon.entity.store.ConfigurationStore.2
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                String name = fileStatus.getPath().getName();
                                String decode = URLDecoder.decode(name.substring(0, name.length() - 4), ConfigurationStore.UTF_8);
                                concurrentHashMap.put(decode, ConfigurationStore.this.restore(entityType, decode));
                            } catch (IOException | FalconException e) {
                                ConfigurationStore.LOG.error("Unable to restore entity of", fileStatus);
                            }
                        }
                    });
                }
                newFixedThreadPool.shutdown();
                if (newFixedThreadPool.awaitTermination(10L, TimeUnit.MINUTES)) {
                    LOG.info("Restored Configurations for entity type: {} ", entityType.name());
                } else {
                    LOG.warn("Time out happened while waiting for all threads to finish while restoring entities for type: {}", entityType.name());
                }
                if (concurrentHashMap.size() != globStatus.length) {
                    throw new FalconException("Unable to restore configurations for entity type " + entityType.name());
                }
                Iterator<Entity> it = concurrentHashMap.values().iterator();
                while (it.hasNext()) {
                    onReload(it.next());
                }
            }
        } catch (IOException e) {
            throw new FalconException("Unable to restore configurations", e);
        } catch (InterruptedException e2) {
            throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + entityType.name());
        }
    }

    public void registerListener(ConfigurationChangeListener configurationChangeListener) {
        this.listeners.add(configurationChangeListener);
    }

    public void unregisterListener(ConfigurationChangeListener configurationChangeListener) {
        this.listeners.remove(configurationChangeListener);
    }

    public synchronized void publish(EntityType entityType, Entity entity) throws FalconException {
        try {
            if (get(entityType, entity.getName()) != null) {
                throw new EntityAlreadyExistsException(entity.toShortString() + " already registered with configuration store. Can't be submitted again. Try removing before submitting.");
            }
            persist(entityType, entity);
            onAdd(entity);
            this.dictionary.get(entityType).put(entity.getName(), entity);
            AUDIT.info(entityType + CatalogStorage.OUTPUT_PATH_SEPARATOR + entity.getName() + " is published into config store");
        } catch (IOException e) {
            throw new StoreAccessException(e);
        }
    }

    private synchronized void updateInternal(EntityType entityType, Entity entity) throws FalconException {
        try {
            if (get(entityType, entity.getName()) == null) {
                throw new FalconException(entity.toShortString() + " doesn't exist");
            }
            persist(entityType, entity);
            ConcurrentHashMap<String, Entity> concurrentHashMap = this.dictionary.get(entityType);
            onChange(concurrentHashMap.get(entity.getName()), entity);
            concurrentHashMap.put(entity.getName(), entity);
            AUDIT.info(entityType + CatalogStorage.OUTPUT_PATH_SEPARATOR + entity.getName() + " is replaced into config store");
        } catch (IOException e) {
            throw new StoreAccessException(e);
        }
    }

    public synchronized void update(EntityType entityType, Entity entity) throws FalconException {
        if (this.updatesInProgress.get() != entity) {
            throw new FalconException(entity.toShortString() + " is not initialized for update");
        }
        updateInternal(entityType, entity);
    }

    private void onAdd(Entity entity) throws FalconException {
        Iterator<ConfigurationChangeListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onAdd(entity);
        }
    }

    private void onChange(Entity entity, Entity entity2) throws FalconException {
        Iterator<ConfigurationChangeListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onChange(entity, entity2);
        }
    }

    private void onReload(Entity entity) throws FalconException {
        Iterator<ConfigurationChangeListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onReload(entity);
        }
    }

    public synchronized void initiateUpdate(Entity entity) throws FalconException {
        if (get(entity.getEntityType(), entity.getName()) == null || this.updatesInProgress.get() != null) {
            throw new FalconException("An update for " + entity.toShortString() + " is already in progress or doesn't exist");
        }
        this.updatesInProgress.set(entity);
    }

    public <T extends Entity> T get(EntityType entityType, String str) throws FalconException {
        ConcurrentHashMap concurrentHashMap = this.dictionary.get(entityType);
        if (!concurrentHashMap.containsKey(str)) {
            return null;
        }
        if (this.updatesInProgress.get() != null && this.updatesInProgress.get().getEntityType() == entityType && this.updatesInProgress.get().getName().equals(str)) {
            return (T) this.updatesInProgress.get();
        }
        T t = (T) concurrentHashMap.get(str);
        if (t != NULL || !this.shouldPersist) {
            return t;
        }
        try {
            T t2 = (T) restore(entityType, str);
            concurrentHashMap.put(str, t2);
            return t2;
        } catch (IOException e) {
            throw new StoreAccessException(e);
        }
    }

    public Collection<String> getEntities(EntityType entityType) {
        return Collections.unmodifiableCollection(this.dictionary.get(entityType).keySet());
    }

    public synchronized boolean remove(EntityType entityType, String str) throws FalconException {
        ConcurrentHashMap<String, Entity> concurrentHashMap = this.dictionary.get(entityType);
        if (!concurrentHashMap.containsKey(str)) {
            return false;
        }
        try {
            archive(entityType, str);
            onRemove(concurrentHashMap.get(str));
            concurrentHashMap.remove(str);
            AUDIT.info(entityType + " " + str + " is removed from config store");
            return true;
        } catch (IOException e) {
            throw new StoreAccessException(e);
        }
    }

    private void onRemove(Entity entity) throws FalconException {
        Iterator<ConfigurationChangeListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onRemove(entity);
        }
    }

    private void persist(EntityType entityType, Entity entity) throws IOException, FalconException {
        if (this.shouldPersist) {
            FSDataOutputStream create = this.fs.create(new Path(this.storePath, entityType + CatalogStorage.OUTPUT_PATH_SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml"));
            try {
                try {
                    entityType.getMarshaller().marshal(entity, create);
                    LOG.info("Persisted configuration {}/{}", entityType, entity.getName());
                    create.close();
                } catch (JAXBException e) {
                    LOG.error("Unable to serialize the entity object {}/{}", new Object[]{entityType, entity.getName(), e});
                    throw new StoreAccessException("Unable to serialize the entity object " + entityType + CatalogStorage.OUTPUT_PATH_SEPARATOR + entity.getName(), e);
                }
            } catch (Throwable th) {
                create.close();
                throw th;
            }
        }
    }

    private void archive(EntityType entityType, String str) throws IOException {
        if (this.shouldPersist) {
            Path path = new Path(this.storePath, "archive/" + entityType);
            HadoopClientFactory.mkdirs(this.fs, path, STORE_PERMISSION);
            this.fs.rename(new Path(this.storePath, entityType + CatalogStorage.OUTPUT_PATH_SEPARATOR + URLEncoder.encode(str, UTF_8) + ".xml"), new Path(path, URLEncoder.encode(str, UTF_8) + "." + System.currentTimeMillis()));
            LOG.info("Archived configuration {}/{}", entityType, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized <T extends Entity> T restore(EntityType entityType, String str) throws IOException, FalconException {
        FSDataInputStream open = this.fs.open(new Path(this.storePath, entityType + CatalogStorage.OUTPUT_PATH_SEPARATOR + URLEncoder.encode(str, UTF_8) + ".xml"));
        try {
            try {
                T t = (T) entityType.getUnmarshaller().unmarshal(open);
                open.close();
                LOG.info("Restored configuration {}/{}", entityType, str);
                return t;
            } catch (JAXBException e) {
                throw new StoreAccessException("Unable to un-marshall xml definition for " + entityType + CatalogStorage.OUTPUT_PATH_SEPARATOR + str, e);
            }
        } catch (Throwable th) {
            open.close();
            LOG.info("Restored configuration {}/{}", entityType, str);
            throw th;
        }
    }

    public void cleanupUpdateInit() {
        this.updatesInProgress.set(null);
    }

    @Override // org.apache.falcon.service.FalconService
    public String getName() {
        return getClass().getName();
    }

    @Override // org.apache.falcon.service.FalconService
    public void destroy() {
    }
}
