/*
 * Decompiled with CFR 0.152.
 */
package io.palyvos.haren;

import io.palyvos.haren.AbstractExecutor;
import io.palyvos.haren.HighestPriorityExecutor;
import io.palyvos.haren.ReconfigurationAction;
import io.palyvos.haren.Scheduler;
import io.palyvos.haren.SchedulerState;
import io.palyvos.haren.SchedulerStateBuilder;
import io.palyvos.haren.Task;
import io.palyvos.haren.function.InterThreadSchedulingFunction;
import io.palyvos.haren.function.SingleIntraThreadSchedulingFunction;
import io.palyvos.haren.function.VectorIntraThreadSchedulingFunction;
import io.palyvos.haren.function.VectorIntraThreadSchedulingFunctionImpl;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class HarenScheduler
implements Scheduler {
    private volatile boolean active;
    private static final Logger LOG = LogManager.getLogger();
    private final int nThreads;
    private final List<Task> tasks = new ArrayList<Task>();
    private final Set<Integer> currentTaskIndexes = new HashSet<Integer>();
    private final List<Thread> threads = new ArrayList<Thread>();
    private final int[] workerAffinity;
    private volatile ReconfigurationAction reconfigurationAction;
    private SchedulerState state;
    private final SchedulerStateBuilder stateBuilder = new SchedulerStateBuilder();

    public HarenScheduler(int nThreads, VectorIntraThreadSchedulingFunction intraThreadFunction, InterThreadSchedulingFunction interThreadFunction, boolean caching, int batchSize, long schedulingPeriod, BitSet workerAffinity) {
        Validate.isTrue((nThreads > 0 ? 1 : 0) != 0);
        Validate.notNull((Object)intraThreadFunction);
        Validate.notNull((Object)interThreadFunction);
        Validate.isTrue((batchSize > 0 ? 1 : 0) != 0);
        Validate.isTrue((schedulingPeriod > 0L ? 1 : 0) != 0);
        this.nThreads = nThreads;
        this.stateBuilder.setThreadNumber(nThreads).setInterThreadSchedulingFunction(interThreadFunction).setIntraThreadSchedulingFunction(intraThreadFunction).setPriorityCaching(caching).setBatchSize(batchSize).setSchedulingPeriod(schedulingPeriod);
        this.workerAffinity = workerAffinity.stream().toArray();
        if (this.workerAffinity.length < nThreads) {
            LOG.warn("#CPUs assigned is less than #threads! Performance might suffer.");
        }
    }

    public HarenScheduler(int nThreads, SingleIntraThreadSchedulingFunction intraThreadFunction, InterThreadSchedulingFunction interThreadFunction, boolean priorityCaching, int batchSize, int schedulingPeriod, BitSet workerAffinity) {
        this(nThreads, new VectorIntraThreadSchedulingFunctionImpl(intraThreadFunction), interThreadFunction, priorityCaching, batchSize, (long)schedulingPeriod, workerAffinity);
    }

    @Override
    public void start() {
        int i;
        Validate.isTrue((this.tasks.size() >= this.nThreads ? 1 : 0) != 0, (String)"Tasks less than threads!", (Object[])new Object[0]);
        this.active = true;
        this.stateBuilder.setTaskNumber(this.tasks.size()).setThreadNumber(this.nThreads);
        LOG.info("Starting Scheduler");
        LOG.info(this.stateBuilder.toString());
        this.state = this.stateBuilder.createSchedulerState();
        ArrayList<AbstractExecutor> executors = new ArrayList<AbstractExecutor>();
        this.reconfigurationAction = new ReconfigurationAction(this.tasks, executors, this.state);
        CyclicBarrier barrier = new CyclicBarrier(this.nThreads, this.reconfigurationAction);
        for (i = 0; i < this.nThreads; ++i) {
            int cpuId = this.getAffinity(i);
            executors.add(new HighestPriorityExecutor(this.state, barrier, cpuId));
        }
        for (i = 0; i < executors.size(); ++i) {
            Thread t = new Thread((Runnable)executors.get(i));
            t.setName(String.format("Scheduler-Worker-%d", i));
            this.threads.add(t);
            t.start();
        }
    }

    @Override
    public void stop() {
        this.reconfigurationAction.stop();
        for (Thread thread : this.threads) {
            thread.interrupt();
            try {
                thread.join();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.active = false;
    }

    public void setBatchSize(int batchSize) {
        this.state.setBatchSize(batchSize);
    }

    public void setSchedulingPeriod(long schedulingPeriod) {
        this.state.setSchedulingPeriod(schedulingPeriod);
    }

    public void setIntraThreadFunction(VectorIntraThreadSchedulingFunction intraThreadFunction) {
        this.state.setIntraThreadSchedulingFunction(intraThreadFunction);
    }

    @Override
    public void addTasks(Collection<Task> tasks) {
        if (this.active) {
            this.reconfigurationAction.addTasks(tasks);
        } else {
            Set<Integer> newTaskIndexes = HarenScheduler.taskIndexes(tasks);
            Validate.isTrue((boolean)Collections.disjoint(this.currentTaskIndexes, newTaskIndexes), (String)"Tried to add tasks that have already been added!", (Object[])new Object[0]);
            this.tasks.addAll(tasks);
            this.currentTaskIndexes.addAll(newTaskIndexes);
        }
        LOG.info("{} reconfiguration. Adding: {} tasks", (Object)this.reconfigurationType(), (Object)tasks.size());
    }

    @Override
    public void removeTasks(Collection<Task> tasks) {
        if (this.active) {
            this.reconfigurationAction.removeTasks(tasks);
        } else {
            Set<Integer> newTaskIndexes = HarenScheduler.taskIndexes(tasks);
            Validate.isTrue((boolean)this.currentTaskIndexes.containsAll(newTaskIndexes), (String)"Tried to remove tasks that are not currently being scheduled.", (Object[])new Object[0]);
            this.tasks.removeAll(tasks);
            this.currentTaskIndexes.removeAll(newTaskIndexes);
        }
        LOG.info("{} reconfiguration. Removing: {} tasks", (Object)this.reconfigurationType(), (Object)tasks.size());
    }

    private int getAffinity(int i) {
        return this.workerAffinity != null ? this.workerAffinity[i % this.workerAffinity.length] : -1;
    }

    public List<Task> tasks() {
        return this.tasks;
    }

    private String reconfigurationType() {
        return this.active ? "Live" : "Static";
    }

    static Set<Integer> taskIndexes(Collection<Task> tasks) {
        return tasks.stream().map(t -> t.getIndex()).collect(Collectors.toSet());
    }
}

