package org.yamcs.timeline;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import org.yamcs.Spec;
import org.yamcs.YConfiguration;
import org.yamcs.YamcsServer;
import org.yamcs.activities.Activity;
import org.yamcs.activities.ActivityListener;
import org.yamcs.activities.ActivityService;
import org.yamcs.activities.protobuf.ActivityDefinition;
import org.yamcs.http.api.GpbWellKnownHelper;
import org.yamcs.http.api.StreamFactory;
import org.yamcs.logging.Log;
import org.yamcs.protobuf.ExecutionStatus;
import org.yamcs.protobuf.TimelineItemType;
import org.yamcs.security.User;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.yarch.SqlBuilder;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;

/* loaded from: input_file:org/yamcs/timeline/ActivityScheduler.class */
public class ActivityScheduler extends AbstractExecutionThreadService implements ItemListener, ActivityListener {
    private static final long CHECK_INTERVAL = 500;
    private static final TimelineActivity POISON = new TimelineActivity(UUID.randomUUID());
    private Log log;
    private String yamcsInstance;
    private ActivityService activityService;
    private TimelineItemDb timelineItemDb;
    private Thread dispatcher;
    private PriorityQueue<TimelineActivity> planned = new PriorityQueue<>();
    private ReentrantLock planningLock = new ReentrantLock();
    private BlockingQueue<TimelineActivity> forDispatch = new LinkedBlockingQueue(200);
    private ConcurrentMap<UUID, TimelineActivity> ongoingItemsByRunId = new ConcurrentHashMap();

    public Spec getSpec() {
        return new Spec();
    }

    public void init(TimelineService timelineService, YConfiguration yConfiguration) {
        this.yamcsInstance = timelineService.getYamcsInstance();
        this.activityService = timelineService.getActivityService();
        this.timelineItemDb = timelineService.getTimelineItemDb();
        this.log = new Log(getClass(), this.yamcsInstance);
    }

    protected void startUp() throws Exception {
        this.timelineItemDb.addItemListener(this);
        this.activityService.addActivityListener(this);
    }

    protected void run() throws Exception {
        Activity prepareActivity;
        try {
            replan();
            User systemUser = YamcsServer.getServer().getSecurityStore().getSystemUser();
            while (true) {
                TimelineActivity take = this.forDispatch.take();
                if (take == POISON) {
                    return;
                }
                ActivityDefinition activityDefinition = take.getActivityDefinition();
                if (activityDefinition == null) {
                    prepareActivity = this.activityService.prepareActivity(ActivityService.ACTIVITY_TYPE_MANUAL, Map.of("name", take.getName()), systemUser, null);
                } else {
                    prepareActivity = this.activityService.prepareActivity(activityDefinition.getType(), GpbWellKnownHelper.toJava(activityDefinition.getArgs()), systemUser, null);
                }
                Activity activity = prepareActivity;
                this.ongoingItemsByRunId.put(activity.getId(), take);
                take.setStatus(ExecutionStatus.IN_PROGRESS);
                take.addRun(activity.getId());
                this.timelineItemDb.updateItem(take);
                this.activityService.startActivity(activity, systemUser);
            }
        } catch (Exception e) {
            this.log.error("Failed to perform initial planning", e);
            throw e;
        }
    }

    protected void triggerShutdown() {
        this.timelineItemDb.removeItemListener(this);
        this.activityService.removeActivityListener(this);
        stopDispatcher();
        this.forDispatch.offer(POISON);
    }

    @Override // org.yamcs.timeline.ItemListener
    public void onItemCreated(TimelineItem timelineItem) {
        if (timelineItem instanceof TimelineActivity) {
            replan();
        }
    }

    @Override // org.yamcs.timeline.ItemListener
    public void onItemUpdated(TimelineItem timelineItem) {
        if (timelineItem instanceof TimelineActivity) {
            replan();
        }
    }

    @Override // org.yamcs.timeline.ItemListener
    public void onItemDeleted(TimelineItem timelineItem) {
        if (timelineItem instanceof TimelineActivity) {
            replan();
        }
    }

    @Override // org.yamcs.activities.ActivityListener
    public void onActivityUpdated(Activity activity) {
        TimelineActivity timelineActivity = this.ongoingItemsByRunId.get(activity.getId());
        if (timelineActivity == null || !activity.isStopped()) {
            return;
        }
        this.ongoingItemsByRunId.remove(activity.getId());
        switch (activity.getStatus()) {
            case SUCCESSFUL:
                timelineActivity.setStatus(ExecutionStatus.COMPLETED);
                break;
            case CANCELLED:
                timelineActivity.setStatus(ExecutionStatus.ABORTED);
                timelineActivity.setFailureReason(activity.getFailureReason());
                break;
            case FAILED:
                timelineActivity.setStatus(ExecutionStatus.FAILED);
                timelineActivity.setFailureReason(activity.getFailureReason());
                break;
            default:
                throw new IllegalStateException("Unexpected terminal state " + activity.getStatus());
        }
        this.timelineItemDb.updateItem(timelineActivity);
    }

    private void replan() {
        try {
            this.planningLock.lock();
            stopDispatcher();
            this.planned.clear();
            SqlBuilder sqlBuilder = new SqlBuilder("timeline");
            sqlBuilder.whereColAfterOrEqual("start", TimeEncoding.getWallclockTime());
            sqlBuilder.where("type = '" + TimelineItemType.ACTIVITY.name() + "'", new Object[0]);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            StreamFactory.stream(this.yamcsInstance, sqlBuilder.toString(), sqlBuilder.getQueryArguments(), new StreamSubscriber() { // from class: org.yamcs.timeline.ActivityScheduler.1
                @Override // org.yamcs.yarch.StreamSubscriber
                public void onTuple(Stream stream, Tuple tuple) {
                    ActivityScheduler.this.planned.add(new TimelineActivity(TimelineItemType.ACTIVITY, tuple));
                }

                @Override // org.yamcs.yarch.StreamSubscriber
                public void streamClosed(Stream stream) {
                    if (!ActivityScheduler.this.planned.isEmpty()) {
                        ActivityScheduler.this.log.info("Upcoming:");
                        Iterator<TimelineActivity> it = ActivityScheduler.this.planned.iterator();
                        while (it.hasNext()) {
                            ActivityScheduler.this.log.info("- " + it.next());
                        }
                        ActivityScheduler.this.startDispatcher();
                    }
                    countDownLatch.countDown();
                }
            });
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.planningLock.unlock();
        }
    }

    private void startDispatcher() {
        this.dispatcher = new Thread(() -> {
            while (true) {
                try {
                    TimelineActivity poll = this.planned.poll();
                    if (poll != null) {
                        long wallclockTime = TimeEncoding.getWallclockTime();
                        if (wallclockTime < poll.start) {
                            Thread.sleep(poll.start - wallclockTime);
                        }
                        if (!this.forDispatch.offer(poll)) {
                            this.log.error("Failed to dispatch activity " + poll + " (queue full)");
                        }
                    } else {
                        Thread.sleep(CHECK_INTERVAL);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });
        this.dispatcher.start();
    }

    private void stopDispatcher() {
        Thread thread = this.dispatcher;
        if (thread != null) {
            thread.interrupt();
            this.dispatcher = null;
        }
    }
}
