package org.yamcs;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.yamcs.StreamConfig;
import org.yamcs.mdb.Mdb;
import org.yamcs.mdb.MdbFactory;
import org.yamcs.xtce.SequenceContainer;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;

/* loaded from: input_file:org/yamcs/StreamTmPacketProvider.class */
public class StreamTmPacketProvider extends AbstractProcessorService implements TmPacketProvider {
    Stream stream;
    TmProcessor tmProcessor;
    Mdb mdb;
    volatile long lastPacketTime;
    volatile boolean disabled = false;
    List<StreamReader> readers = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/yamcs/StreamTmPacketProvider$StreamReader.class */
    public class StreamReader implements StreamSubscriber {
        Stream stream;
        SequenceContainer rootContainer;

        public StreamReader(Stream stream, SequenceContainer sequenceContainer) {
            this.stream = stream;
            this.rootContainer = sequenceContainer;
        }

        @Override // org.yamcs.yarch.StreamSubscriber
        public void onTuple(Stream stream, Tuple tuple) {
            long longValue = ((Long) tuple.getColumn("rectime")).longValue();
            long longValue2 = ((Long) tuple.getColumn("gentime")).longValue();
            TmPacket tmPacket = new TmPacket(longValue, longValue2, ((Integer) tuple.getColumn("seqNum")).intValue(), (byte[]) tuple.getColumn(StandardTupleDefinitions.TM_PACKET_COLUMN));
            StreamTmPacketProvider.this.lastPacketTime = longValue2;
            String str = (String) tuple.getColumn(StandardTupleDefinitions.TM_ROOT_CONTAINER_COLUMN);
            if (str != null) {
                tmPacket.setRootContainer(StreamTmPacketProvider.this.mdb.getSequenceContainer(str));
            }
            StreamTmPacketProvider.this.tmProcessor.processPacket(tmPacket, this.rootContainer);
        }

        @Override // org.yamcs.yarch.StreamSubscriber
        public void streamClosed(Stream stream) {
            StreamTmPacketProvider.this.notifyStopped();
        }
    }

    @Override // org.yamcs.AbstractProcessorService, org.yamcs.ProcessorService
    public void init(Processor processor, YConfiguration yConfiguration, Object obj) {
        super.init(processor, yConfiguration, obj);
        this.tmProcessor = processor.getTmProcessor();
        this.mdb = MdbFactory.getInstance(processor.getInstance());
        readStreamConfig(processor.getName());
        processor.setPacketProvider(this);
    }

    private void readStreamConfig(String str) {
        String yamcsInstance = getYamcsInstance();
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(yamcsInstance);
        StreamConfig streamConfig = StreamConfig.getInstance(yamcsInstance);
        HashSet<String> hashSet = new HashSet();
        if (this.config.containsKey("streams")) {
            hashSet.addAll(this.config.getList("streams"));
        }
        for (StreamConfig.StreamConfigEntry streamConfigEntry : streamConfig.getEntries(StreamConfig.StandardStreamType.TM)) {
            if (str.equals(streamConfigEntry.getProcessor())) {
                hashSet.add(streamConfigEntry.getName());
            }
        }
        for (String str2 : hashSet) {
            SequenceContainer rootContainer = streamConfig.getTmEntry(str2).getRootContainer();
            if (rootContainer == null) {
                rootContainer = this.mdb.getRootSequenceContainer();
            }
            if (rootContainer == null) {
                throw new ConfigurationException("MDB does not have a root sequence container and none was defined under streamConfig -> tm");
            }
            this.log.debug("Processing packets from stream {} starting with root container {}", str2, rootContainer.getQualifiedName());
            Stream stream = yarchDatabase.getStream(str2);
            if (stream == null) {
                throw new ConfigurationException("Cannot find stream '" + str2 + "'");
            }
            this.readers.add(new StreamReader(stream, rootContainer));
        }
        if (this.readers.isEmpty()) {
            throw new ConfigurationException("Processor " + str + " found no tm_stream to process data from. Please configure the processor: under streamConfig->tm; If tm processing has to be excluded from this processor, please configure the entry in processors.yaml appropiately");
        }
    }

    protected void doStart() {
        for (StreamReader streamReader : this.readers) {
            streamReader.stream.addSubscriber(streamReader);
        }
        notifyStarted();
    }

    protected void doStop() {
        for (StreamReader streamReader : this.readers) {
            streamReader.stream.removeSubscriber(streamReader);
        }
        notifyStopped();
    }

    @Override // org.yamcs.TmPacketProvider
    public boolean isArchiveReplay() {
        return false;
    }
}
