package com.spotify.styx;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.spotify.styx.model.Workflow;
import com.spotify.styx.schedule.ScheduleSource;
import com.sun.nio.file.SensitivityWatchEventModifier;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import okio.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/styx/LocalFileScheduleSource.class */
class LocalFileScheduleSource implements ScheduleSource {
    private static final String LOCAL_DIR_CONFIG_KEY = "styx.source.local.dir";
    private static final long POLL_TIMEOUT_MILLIS = 100;
    private final Config config;
    private final Closer closer;
    private final ScheduledExecutorService executor;
    private final Consumer<Workflow> changeListener;
    private final Consumer<Workflow> removeListener;
    private final Map<String, Set<Workflow>> workflows = Maps.newHashMap();
    private volatile boolean running;
    private static final WatchEvent.Kind<?>[] EVENTS = {StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE};
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileScheduleSource.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public LocalFileScheduleSource(Config config, Closer closer, ScheduledExecutorService scheduledExecutorService, Consumer<Workflow> consumer, Consumer<Workflow> consumer2) {
        this.config = (Config) Objects.requireNonNull(config);
        this.closer = (Closer) Objects.requireNonNull(closer);
        this.executor = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService);
        this.changeListener = (Consumer) Objects.requireNonNull(consumer);
        this.removeListener = (Consumer) Objects.requireNonNull(consumer2);
    }

    public void start() {
        if (!this.config.hasPath(LOCAL_DIR_CONFIG_KEY)) {
            LOG.error("Configuration key '{}' not set", LOCAL_DIR_CONFIG_KEY);
            throw new RuntimeException("Can't load local file schedule source: not configured");
        }
        String string = this.config.getString(LOCAL_DIR_CONFIG_KEY);
        try {
            Path path = Paths.get(string, new String[0]);
            try {
                Files.list(path).filter(this::isYamlFile).forEach(this::readFile);
                try {
                    WatchService newWatchService = FileSystems.getDefault().newWatchService();
                    path.register(newWatchService, EVENTS, SensitivityWatchEventModifier.HIGH);
                    this.running = true;
                    this.closer.register(() -> {
                        this.running = false;
                    });
                    this.executor.submit(() -> {
                        poll(path, newWatchService);
                    });
                } catch (IOException e) {
                    LOG.error("Could not watch: {}", path, e);
                    throw new RuntimeException("Can't load local file schedule source", e);
                }
            } catch (IOException e2) {
                LOG.error("Failed to List: {}", string, e2);
                throw new RuntimeException("Can't load local file schedule source: initial listing failed", e2);
            }
        } catch (InvalidPathException e3) {
            LOG.error("Invalid path: {}", string, e3);
            throw new RuntimeException("Can't load local file schedule source: invalid path", e3);
        }
    }

    private void poll(Path path, WatchService watchService) {
        LOG.info("Watching {} for schedule definitions", path);
        while (this.running) {
            try {
                WatchKey poll = watchService.poll(POLL_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    for (WatchEvent<?> watchEvent : poll.pollEvents()) {
                        WatchEvent.Kind<?> kind = watchEvent.kind();
                        if (kind != StandardWatchEventKinds.OVERFLOW) {
                            Path resolve = path.resolve((Path) cast(watchEvent).context());
                            if (isYamlFile(resolve)) {
                                String componentId = componentId(resolve);
                                LOG.debug("{} event for component {}, from file {}", new Object[]{kind, componentId, resolve});
                                if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
                                    readFile(resolve);
                                }
                                if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
                                    Set<Workflow> orDefault = this.workflows.getOrDefault(componentId, Collections.emptySet());
                                    orDefault.forEach(this.removeListener);
                                    orDefault.clear();
                                }
                            }
                        }
                    }
                    if (!poll.reset()) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                LOG.warn("interrupted", e);
            }
        }
        LOG.info("Stopped watching {}", path);
    }

    private boolean isYamlFile(Path path) {
        String path2 = path.getFileName().toString();
        return path2.endsWith(".yaml") || path2.endsWith(".yml");
    }

    private void readFile(Path path) {
        try {
            for (Workflow workflow : readWorkflows(path)) {
                this.workflows.computeIfAbsent(workflow.componentId(), str -> {
                    return Sets.newHashSet();
                }).add(workflow);
                this.changeListener.accept(workflow);
            }
        } catch (IOException e) {
            LOG.warn("Failed to read schedule definition {}", path, e);
        }
    }

    private List<Workflow> readWorkflows(Path path) throws IOException {
        byte[] readAllBytes = Files.readAllBytes(path);
        LOG.debug("Read yaml file \n{}", ByteString.of(readAllBytes).utf8());
        YamlScheduleDefinition parseScheduleDefinition = Yaml.parseScheduleDefinition(readAllBytes);
        LOG.debug("Parsed schedule definitions: {}", parseScheduleDefinition);
        String componentId = componentId(path);
        URI uri = path.toUri();
        return (List) parseScheduleDefinition.schedules().stream().map(dataEndpoint -> {
            return Workflow.create(componentId, uri, dataEndpoint);
        }).collect(Collectors.toList());
    }

    private String componentId(Path path) {
        return path.getFileName().toString();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <T> WatchEvent<T> cast(WatchEvent<?> watchEvent) {
        return watchEvent;
    }
}
