package org.yamcs.parameterarchive;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.rocksdb.RocksDBException;
import org.yamcs.ConfigurationException;
import org.yamcs.Processor;
import org.yamcs.Spec;
import org.yamcs.YConfiguration;
import org.yamcs.YamcsServer;
import org.yamcs.parameter.BasicParameterValue;
import org.yamcs.parameterarchive.ParameterGroupIdDb;
import org.yamcs.utils.IntArray;
import org.yamcs.utils.TimeEncoding;

/* loaded from: input_file:org/yamcs/parameterarchive/RealtimeArchiveFiller.class */
public class RealtimeArchiveFiller extends AbstractArchiveFiller {
    String processorName;
    final String yamcsInstance;
    Processor realtimeProcessor;
    int subscriptionId;
    ExecutorService executor;
    Map<Integer, SegmentQueue> queues;
    private YamcsServer yamcsServer;
    int flushInterval;
    long sortingThreshold;
    long pastJumpThreshold;
    int numThreads;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/yamcs/parameterarchive/RealtimeArchiveFiller$SegmentQueue.class */
    public static class SegmentQueue {
        static final int QSIZE = 16;
        static final int MASK = 15;
        final PGSegment[] segments = new PGSegment[16];
        int head = 0;
        int tail = 0;
        final int parameterGroupId;
        final int maxSegmentSize;
        private long latestUpdateTime;

        public SegmentQueue(int i, int i2) {
            this.parameterGroupId = i;
            this.maxSegmentSize = i2;
        }

        public long getStart() {
            if (isEmpty()) {
                throw new IllegalStateException("queue is empty");
            }
            return this.segments[this.head].getSegmentStart();
        }

        public synchronized boolean addRecord(long j, IntArray intArray, List<BasicParameterValue> list) {
            this.latestUpdateTime = System.currentTimeMillis();
            boolean z = false;
            int i = this.head;
            long interval = ParameterArchive.getInterval(j);
            while (i != this.tail) {
                PGSegment pGSegment = this.segments[i];
                long interval2 = pGSegment.getInterval();
                if (interval2 >= interval) {
                    if (interval2 > interval) {
                        break;
                    }
                    if (j <= pGSegment.getSegmentEnd() || pGSegment.size() < this.maxSegmentSize) {
                        pGSegment.addRecord(j, intArray, list);
                        z = true;
                        break;
                    }
                }
                i = inc(i);
            }
            if (z) {
                return true;
            }
            if (inc(this.tail) == this.head || this.segments[this.tail] != null) {
                return false;
            }
            PGSegment pGSegment2 = new PGSegment(this.parameterGroupId, j, intArray.size());
            pGSegment2.addRecord(j, intArray, list);
            int i2 = i;
            while (true) {
                int i3 = i2;
                if (i3 == this.tail) {
                    this.tail = inc(this.tail);
                    this.segments[i] = pGSegment2;
                    return true;
                }
                this.segments[inc(i3)] = this.segments[i3];
                i2 = inc(i3);
            }
        }

        void sendToArchive(long j, Function<PGSegment, CompletableFuture<Void>> function) {
            while (this.head != this.tail) {
                PGSegment pGSegment = this.segments[this.head];
                if (pGSegment.getInterval() >= ParameterArchive.getInterval(j) && (pGSegment.size() < this.maxSegmentSize || pGSegment.getSegmentEnd() >= j)) {
                    return;
                }
                int i = this.head;
                this.head = inc(this.head);
                toArchive(i, function);
            }
        }

        synchronized void flush(Function<PGSegment, CompletableFuture<Void>> function) {
            while (this.head != this.tail) {
                toArchive(this.head, function);
                this.head = inc(this.head);
            }
        }

        private void toArchive(int i, Function<PGSegment, CompletableFuture<Void>> function) {
            function.apply(this.segments[i]).thenAccept(r6 -> {
                this.segments[i] = null;
            });
        }

        public int size() {
            return (this.tail - this.head) & 15;
        }

        public boolean isEmpty() {
            return this.head == this.tail;
        }

        public synchronized List<ParameterValueSegment> getPVSegments(int i, boolean z) {
            return this.head == this.tail ? Collections.emptyList() : z ? getSegmentsAscending(i) : getSegmentsDescending(i);
        }

        private List<ParameterValueSegment> getSegmentsAscending(int i) {
            int i2;
            ArrayList arrayList = new ArrayList();
            int i3 = this.head;
            while (true) {
                i2 = i3;
                if (i2 == this.tail || this.segments[dec(i2)] == null) {
                    break;
                }
                i3 = dec(i2);
            }
            while (i2 != this.tail) {
                PGSegment pGSegment = this.segments[i2];
                if (pGSegment != null) {
                    ParameterValueSegment parameterValue = pGSegment.getParameterValue(i);
                    if (parameterValue != null) {
                        arrayList.add(parameterValue);
                    }
                    i2 = inc(i2);
                }
            }
            return arrayList;
        }

        private List<ParameterValueSegment> getSegmentsDescending(int i) {
            ArrayList arrayList = new ArrayList();
            int dec = dec(this.tail);
            while (true) {
                int i2 = dec;
                PGSegment pGSegment = this.segments[i2];
                if (pGSegment == null) {
                    return arrayList;
                }
                ParameterValueSegment parameterValue = pGSegment.getParameterValue(i);
                if (parameterValue != null) {
                    arrayList.add(parameterValue);
                }
                dec = dec(i2);
            }
        }

        public synchronized List<MultiParameterValueSegment> getPVSegments(ParameterId[] parameterIdArr, boolean z) {
            return this.head == this.tail ? Collections.emptyList() : z ? getSegmentsAscending(parameterIdArr) : getSegmentsDescending(parameterIdArr);
        }

        private List<MultiParameterValueSegment> getSegmentsAscending(ParameterId[] parameterIdArr) {
            int i;
            ArrayList arrayList = new ArrayList();
            int i2 = this.head;
            while (true) {
                i = i2;
                if (i == this.tail || this.segments[dec(i)] == null) {
                    break;
                }
                i2 = dec(i);
            }
            while (i != this.tail) {
                PGSegment pGSegment = this.segments[i];
                if (pGSegment != null) {
                    MultiParameterValueSegment parametersValues = pGSegment.getParametersValues(parameterIdArr);
                    if (parametersValues != null) {
                        arrayList.add(parametersValues);
                    }
                    i = inc(i);
                }
            }
            return arrayList;
        }

        private List<MultiParameterValueSegment> getSegmentsDescending(ParameterId[] parameterIdArr) {
            ArrayList arrayList = new ArrayList();
            int dec = dec(this.tail);
            while (true) {
                int i = dec;
                PGSegment pGSegment = this.segments[i];
                if (pGSegment == null) {
                    return arrayList;
                }
                MultiParameterValueSegment parametersValues = pGSegment.getParametersValues(parameterIdArr);
                if (parametersValues != null) {
                    arrayList.add(parametersValues);
                }
                dec = dec(i);
            }
        }

        static final int inc(int i) {
            return (i + 1) & 15;
        }

        static final int dec(int i) {
            return (i - 1) & 15;
        }

        public long getLatestUpdateTime() {
            return this.latestUpdateTime;
        }
    }

    public RealtimeArchiveFiller(ParameterArchive parameterArchive, YConfiguration yConfiguration) {
        super(parameterArchive);
        this.processorName = "realtime";
        this.queues = new HashMap();
        this.flushInterval = 600;
        this.yamcsInstance = parameterArchive.getYamcsInstance();
        this.processorName = yConfiguration.getString("processorName", this.processorName);
        this.sortingThreshold = yConfiguration.getInt("sortingThreshold");
        this.numThreads = yConfiguration.getInt("numThreads", getDefaultNumThreads());
        this.pastJumpThreshold = yConfiguration.getLong("pastJumpThreshold") * 1000;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Spec getSpec() {
        Spec spec = new Spec();
        spec.addOption("enabled", Spec.OptionType.BOOLEAN);
        spec.addOption("processorName", Spec.OptionType.STRING).withDefault("realtime");
        spec.addOption("sortingThreshold", Spec.OptionType.INTEGER).withDefault(1000);
        spec.addOption("numThreads", Spec.OptionType.INTEGER);
        spec.addOption("pastJumpThreshold", Spec.OptionType.INTEGER).withDescription("When receiving data with an old timestamp differing from the previous data by more than this threshold in seconds, the old segments are flushed to archinve and a new one is started. This is to avoid that the data is rejected because the time is reinitialized on-board for example.").withDefault(86400);
        return spec;
    }

    private synchronized YamcsServer getYamcsServer() {
        if (this.yamcsServer == null) {
            this.yamcsServer = YamcsServer.getServer();
        }
        return this.yamcsServer;
    }

    synchronized void setYamcsServer(YamcsServer yamcsServer) {
        this.yamcsServer = yamcsServer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() {
        this.realtimeProcessor = getYamcsServer().getProcessor(this.yamcsInstance, this.processorName);
        if (this.realtimeProcessor == null) {
            throw new ConfigurationException("No processor named '" + this.processorName + "' in instance " + this.yamcsInstance);
        }
        if (this.realtimeProcessor.getParameterCache() != null) {
            this.log.warn("Both realtime archive filler and parameter cache configured for processor {}.The parameter cache can be safely disabled (to save memory) by setting parameterCache->enabled to false in processor.yaml", this.processorName);
        }
        this.subscriptionId = this.realtimeProcessor.getParameterRequestManager().subscribeAll(this);
        this.log.debug("Starting executor for archive writing with {} threads", Integer.valueOf(this.numThreads));
        this.executor = Executors.newFixedThreadPool(this.numThreads, new ThreadFactoryBuilder().setNameFormat("realtime-parameter-archive-writer-%d").build());
        ScheduledThreadPoolExecutor threadPoolExecutor = getYamcsServer().getThreadPoolExecutor();
        if (threadPoolExecutor != null) {
            threadPoolExecutor.scheduleAtFixedRate(this::flushPeriodically, this.flushInterval, this.flushInterval, TimeUnit.SECONDS);
        }
    }

    private void flushPeriodically() {
        long currentTimeMillis = System.currentTimeMillis();
        for (Map.Entry<Integer, SegmentQueue> entry : this.queues.entrySet()) {
            SegmentQueue value = entry.getValue();
            synchronized (value) {
                if (value.isEmpty() && currentTimeMillis > value.getLatestUpdateTime() + (this.flushInterval * 1000)) {
                    value.flush(this::scheduleWriteToArchive);
                    this.log.debug("Flush interval reached without new segment for parameter group {}, flushing queue", entry.getKey());
                }
            }
        }
    }

    public void shutDown() throws InterruptedException {
        this.realtimeProcessor.getParameterRequestManager().unsubscribeAll(this.subscriptionId);
        this.log.info("Shutting down, writing all pending segments");
        Iterator<SegmentQueue> it = this.queues.values().iterator();
        while (it.hasNext()) {
            it.next().flush(pGSegment -> {
                return scheduleWriteToArchive(pGSegment);
            });
        }
        this.executor.shutdown();
        if (this.executor.awaitTermination(30L, TimeUnit.SECONDS)) {
            return;
        }
        this.log.warn("Timed out before flushing all pending segments");
    }

    @Override // org.yamcs.parameterarchive.AbstractArchiveFiller
    protected void processParameters(long j, BasicParameterList basicParameterList) {
        try {
            ParameterGroupIdDb.ParameterGroup group = this.parameterGroupIdMap.getGroup(basicParameterList.getPids());
            SegmentQueue computeIfAbsent = this.queues.computeIfAbsent(Integer.valueOf(group.id), num -> {
                return new SegmentQueue(group.id, this.maxSegmentSize);
            });
            synchronized (computeIfAbsent) {
                if (!computeIfAbsent.isEmpty()) {
                    long start = computeIfAbsent.getStart();
                    if (j < start - this.pastJumpThreshold) {
                        this.log.warn("Time jumped in the past; current timestamp: {}, new timestamp: {}. Flushing old data.", TimeEncoding.toString(start), TimeEncoding.toString(j));
                        computeIfAbsent.flush(pGSegment -> {
                            return scheduleWriteToArchive(pGSegment);
                        });
                    } else {
                        if (j < start - this.sortingThreshold) {
                            this.log.warn("Dropping old data with timestamp {} (minimum allowed is {}).Unsorted data received in the realtime filler? Consider using a backfiller instead", TimeEncoding.toString(j), TimeEncoding.toString(start - this.sortingThreshold));
                            return;
                        }
                        computeIfAbsent.sendToArchive(j - this.sortingThreshold, pGSegment2 -> {
                            return scheduleWriteToArchive(pGSegment2);
                        });
                    }
                }
                if (!computeIfAbsent.addRecord(j, basicParameterList.getPids(), basicParameterList.getValues())) {
                    this.log.warn("Realtime parameter archive queue full.Consider increasing the writerThreads (if CPUs are available) or using a back filler");
                }
            }
        } catch (RocksDBException e) {
            this.log.error("Error creating parameter group id", e);
        }
    }

    private CompletableFuture<Void> scheduleWriteToArchive(PGSegment pGSegment) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                long nanoTime = System.nanoTime();
                this.parameterArchive.writeToArchive(pGSegment);
                this.log.debug("Wrote segment {} to archive in {} millisec", pGSegment, Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
                return null;
            } catch (RocksDBException | IOException e) {
                this.log.error("Error writing segment to the parameter archive", e);
                return null;
            }
        }, this.executor);
    }

    @Override // org.yamcs.parameterarchive.AbstractArchiveFiller
    protected void abort() {
        this.queues.clear();
    }

    private int getDefaultNumThreads() {
        int availableProcessors = Runtime.getRuntime().availableProcessors() - 1;
        if (availableProcessors > 0) {
            return availableProcessors;
        }
        return 1;
    }

    public List<ParameterValueSegment> getSegments(int i, int i2, boolean z) {
        SegmentQueue segmentQueue = this.queues.get(Integer.valueOf(i2));
        return segmentQueue == null ? Collections.emptyList() : segmentQueue.getPVSegments(i, z);
    }

    public List<MultiParameterValueSegment> getSegments(ParameterId[] parameterIdArr, int i, boolean z) {
        SegmentQueue segmentQueue = this.queues.get(Integer.valueOf(i));
        return segmentQueue == null ? Collections.emptyList() : segmentQueue.getPVSegments(parameterIdArr, z);
    }

    @Override // org.yamcs.parameterarchive.AbstractArchiveFiller
    public /* bridge */ /* synthetic */ long getNumProcessedParameters() {
        return super.getNumProcessedParameters();
    }

    @Override // org.yamcs.parameterarchive.AbstractArchiveFiller, org.yamcs.parameter.ParameterConsumer
    public /* bridge */ /* synthetic */ void updateItems(int i, List list) {
        super.updateItems(i, list);
    }
}
