package org.yamcs.archive;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.naming.ConfigurationException;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.yamcs.AbstractYamcsService;
import org.yamcs.InitException;
import org.yamcs.NotThreadSafe;
import org.yamcs.StandardTupleDefinitions;
import org.yamcs.StreamConfig;
import org.yamcs.ThreadSafe;
import org.yamcs.YConfiguration;
import org.yamcs.http.HttpServer;
import org.yamcs.protobuf.Yamcs;
import org.yamcs.tctm.CcsdsPacket;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.utils.TimeInterval;
import org.yamcs.utils.parser.ParseException;
import org.yamcs.yarch.HistogramSegment;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;
import org.yamcs.yarch.YarchException;
import org.yamcs.yarch.rocksdb.AscendingRangeIterator;
import org.yamcs.yarch.rocksdb.RdbStorageEngine;
import org.yamcs.yarch.rocksdb.Tablespace;
import org.yamcs.yarch.rocksdb.YRDB;
import org.yamcs.yarch.rocksdb.protobuf.Tablespace;
import org.yamcs.yarch.streamsql.StreamSqlException;

@ThreadSafe
/* loaded from: input_file:org/yamcs/archive/CcsdsTmIndex.class */
public class CcsdsTmIndex extends AbstractYamcsService implements TmIndexService {
    static final String TM_INDEX_NAME = "CCSDS";
    static long maxApidInterval;
    private static AtomicInteger streamCounter;
    protected Tablespace tablespace;
    int tbsIndex;
    List<String> streamNames;
    static final /* synthetic */ boolean $assertionsDisabled;

    @NotThreadSafe
    /* loaded from: input_file:org/yamcs/archive/CcsdsTmIndex$CcsdsIndexIterator.class */
    class CcsdsIndexIterator {
        long start;
        long stop;
        AscendingRangeIterator rangeIt;
        short apid;
        short curApid;
        Record curr;

        public CcsdsIndexIterator(short s, long j, long j2) {
            j = j < 0 ? 0L : j;
            j2 = j2 < 0 ? Long.MAX_VALUE : j2;
            this.apid = s;
            this.start = j;
            this.stop = j2;
        }

        boolean jumpAtApid() throws RocksDBException {
            byte[] key = Record.key(CcsdsTmIndex.this.tbsIndex, this.curApid, this.start, (short) 0);
            byte[] key2 = Record.key(CcsdsTmIndex.this.tbsIndex, this.curApid, this.stop, (short) -1);
            if (this.rangeIt != null) {
                this.rangeIt.close();
            }
            this.rangeIt = new AscendingRangeIterator(CcsdsTmIndex.this.tablespace.getRdb().newIterator(), key, key2);
            return this.rangeIt.isValid();
        }

        boolean nextApid() throws RocksDBException {
            if (this.curApid == -1) {
                if (this.apid != -1) {
                    this.curApid = this.apid;
                    return jumpAtApid();
                }
                this.curApid = (short) 0;
            }
            if (this.apid != -1) {
                return false;
            }
            while (true) {
                RocksIterator newIterator = CcsdsTmIndex.this.tablespace.getRdb().newIterator();
                try {
                    newIterator.seek(Record.key(CcsdsTmIndex.this.tbsIndex, this.curApid, Long.MAX_VALUE, Short.MAX_VALUE));
                    if (!newIterator.isValid()) {
                        if (newIterator != null) {
                            newIterator.close();
                        }
                        return false;
                    }
                    this.curApid = new Record(newIterator.key(), newIterator.value()).apid();
                    if (this.curApid == Short.MAX_VALUE) {
                        if (newIterator != null) {
                            newIterator.close();
                        }
                        return false;
                    }
                    if (jumpAtApid()) {
                        if (newIterator != null) {
                            newIterator.close();
                        }
                        return true;
                    }
                    if (newIterator != null) {
                        newIterator.close();
                    }
                } catch (Throwable th) {
                    if (newIterator != null) {
                        try {
                            newIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }

        public Record getNextRecord() {
            if (this.rangeIt == null || !this.rangeIt.isValid()) {
                try {
                    if (!nextApid()) {
                        return null;
                    }
                } catch (RocksDBException e) {
                    throw new UncheckedIOException(new IOException((Throwable) e));
                }
            }
            Record record = new Record(this.rangeIt.key(), this.rangeIt.value());
            this.rangeIt.next();
            return record;
        }

        public void close() {
            if (this.rangeIt != null) {
                this.rangeIt.close();
            }
        }
    }

    /* loaded from: input_file:org/yamcs/archive/CcsdsTmIndex$CcsdsIndexIteratorAdapter.class */
    class CcsdsIndexIteratorAdapter implements IndexIterator {
        CcsdsIndexIterator iterator;
        final Set<Short> apids;

        CcsdsIndexIteratorAdapter(Set<Short> set, long j, long j2) {
            this.apids = set;
            this.iterator = new CcsdsIndexIterator((short) -1, j, j2);
        }

        @Override // org.yamcs.archive.IndexIterator
        public void close() {
            this.iterator.close();
        }

        @Override // org.yamcs.archive.IndexIterator
        public Yamcs.ArchiveRecord getNextRecord() {
            Record nextRecord;
            short s;
            do {
                nextRecord = this.iterator.getNextRecord();
                if (nextRecord != null) {
                    s = nextRecord.apid;
                    if (this.apids == null) {
                        break;
                    }
                } else {
                    return null;
                }
            } while (!this.apids.contains(Short.valueOf(s)));
            return Yamcs.ArchiveRecord.newBuilder().setId(Yamcs.NamedObjectId.newBuilder().setName("apid_" + s).build()).setNum(nextRecord.numPackets).setFirst(TimeEncoding.toProtobufTimestamp(nextRecord.firstTime())).setLast(TimeEncoding.toProtobufTimestamp(nextRecord.lastTime)).setSeqFirst(nextRecord.seqFirst).setSeqLast(nextRecord.seqLast).build();
        }
    }

    @Override // org.yamcs.AbstractYamcsService, org.yamcs.YamcsService
    public void init(String str, String str2, YConfiguration yConfiguration) throws InitException {
        super.init(str, str2, yConfiguration);
        if (this.config.containsKey("streams")) {
            this.streamNames = this.config.getList("streams");
        } else {
            this.streamNames = (List) StreamConfig.getInstance(str).getEntries(StreamConfig.StandardStreamType.TM).stream().map(streamConfigEntry -> {
                return streamConfigEntry.getName();
            }).collect(Collectors.toList());
        }
        this.tablespace = RdbStorageEngine.getInstance().getTablespace(YarchDatabase.getInstance(str));
        try {
            openDb();
            this.log.debug("Listening to streams {}", this.streamNames);
        } catch (RocksDBException e) {
            throw new InitException("Failed to open rocksdb", e);
        }
    }

    protected void doStart() {
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(this.yamcsInstance);
        for (String str : this.streamNames) {
            Stream stream = yarchDatabase.getStream(str);
            if (stream == null) {
                notifyFailed(new ConfigurationException("Stream " + str + " does not exist"));
                return;
            }
            stream.addSubscriber(this);
        }
        notifyStarted();
    }

    protected void doStop() {
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(this.yamcsInstance);
        Iterator<String> it = this.streamNames.iterator();
        while (it.hasNext()) {
            Stream stream = yarchDatabase.getStream(it.next());
            if (stream != null) {
                stream.removeSubscriber(this);
            }
        }
        notifyStopped();
    }

    private void openDb() throws RocksDBException {
        Tablespace.TablespaceRecord tablespaceRecord;
        List<Tablespace.TablespaceRecord> filter = this.tablespace.filter(Tablespace.TablespaceRecord.Type.TM_INDEX, this.yamcsInstance, builder -> {
            return !builder.hasTmIndexName() || TM_INDEX_NAME.equals(builder.getTmIndexName());
        });
        if (filter.isEmpty()) {
            tablespaceRecord = this.tablespace.createMetadataRecord(this.yamcsInstance, Tablespace.TablespaceRecord.newBuilder().setType(Tablespace.TablespaceRecord.Type.TM_INDEX).setTmIndexName(TM_INDEX_NAME));
            YRDB rdb = this.tablespace.getRdb();
            byte[] bArr = new byte[14];
            rdb.put(firstKey(tablespaceRecord.getTbsIndex()), bArr);
            rdb.put(lastKey(tablespaceRecord.getTbsIndex()), bArr);
        } else {
            tablespaceRecord = filter.get(0);
        }
        this.tbsIndex = tablespaceRecord.getTbsIndex();
    }

    private static byte[] firstKey(int i) {
        return Record.key(i, (short) 0, 0L, (short) 0);
    }

    private static byte[] lastKey(int i) {
        return Record.key(i, Short.MAX_VALUE, Long.MAX_VALUE, Short.MAX_VALUE);
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void onTuple(Stream stream, Tuple tuple) {
        byte[] bArr = (byte[]) tuple.getColumn(StandardTupleDefinitions.TM_PACKET_COLUMN);
        if (bArr.length < 7) {
            this.log.warn("Short packet (size : {}) received by the CcsdsTmIndex Ignored.", Integer.valueOf(bArr.length));
            return;
        }
        try {
            addPacket(CcsdsPacket.getAPID(bArr), getTime(tuple), (short) CcsdsPacket.getSequenceCount(bArr));
        } catch (RocksDBException e) {
            this.log.error("got exception while saving the packet into index", e);
        }
    }

    protected long getTime(Tuple tuple) {
        return ((Long) tuple.getColumn("gentime")).longValue();
    }

    synchronized void addPacket(short s, long j, short s2) throws RocksDBException {
        YRDB rdb = this.tablespace.getRdb();
        RocksIterator newIterator = this.tablespace.getRdb().newIterator();
        try {
            newIterator.seek(Record.key(this.tbsIndex, s, j, s2));
            while (true) {
                if (!$assertionsDisabled && !newIterator.isValid()) {
                    throw new AssertionError();
                }
                Record record = new Record(newIterator.key(), newIterator.value());
                int compare = compare(s, j, s2, record);
                if (compare == 0) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("ignored duplicate packet: apid={} time={} seq={}", Short.valueOf(s), TimeEncoding.toOrdinalDateTime(j), Short.valueOf(s2));
                    }
                    return;
                }
                if (compare < 0) {
                    newIterator.prev();
                    Record record2 = new Record(newIterator.key(), newIterator.value());
                    int compare2 = compare(s, j, s2, record2);
                    if (compare2 == 0) {
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("ignored duplicate packet: apid={} time={} seq={}", Short.valueOf(s), TimeEncoding.toOrdinalDateTime(j), Short.valueOf(s2));
                        }
                        newIterator.close();
                        return;
                    }
                    if (compare2 == 1 && compare == -1) {
                        record2.seqLast = record.seqLast;
                        record2.lastTime = record.lastTime;
                        record2.numPackets += record.numPackets + 1;
                        rdb.put(record2.key(this.tbsIndex), record2.val());
                        rdb.delete(record.key(this.tbsIndex));
                    } else if (compare2 == 1) {
                        record2.seqLast = s2;
                        record2.lastTime = j;
                        record2.numPackets++;
                        rdb.put(record2.key(this.tbsIndex), record2.val());
                    } else if (compare == -1) {
                        rdb.delete(record.key(this.tbsIndex));
                        record.seqFirst = s2;
                        record.firstTime = j;
                        record.numPackets++;
                        rdb.put(record.key(this.tbsIndex), record.val());
                    } else {
                        Record record3 = new Record(s, j, s2, 1);
                        rdb.put(record3.key(this.tbsIndex), record3.val());
                    }
                    newIterator.close();
                    return;
                }
                newIterator.next();
            }
        } finally {
            newIterator.close();
        }
    }

    private static int compare(short s, long j, short s2, Record record) {
        short apid = record.apid();
        if (s != apid) {
            return 16383 * Integer.signum(s - apid);
        }
        int compare = compare(j, s2, record.firstTime(), record.firstSeq());
        if (compare <= 0) {
            return compare;
        }
        int compare2 = compare(j, s2, record.lastTime(), record.lastSeq());
        if (compare2 >= 0) {
            return compare2;
        }
        return 0;
    }

    static int compare(long j, short s, long j2, short s2) {
        if (j < j2) {
            return (j2 - j > maxApidInterval || ((s2 - s) & 16383) != 1) ? -16383 : -1;
        }
        if (j != j2) {
            return (j - j2 > maxApidInterval || ((s - s2) & 16383) != 1) ? 16383 : 1;
        }
        int i = (s - s2) & 16383;
        return i < 8192 ? i : i - 16384;
    }

    @Override // org.yamcs.archive.TmIndexService
    public synchronized void deleteRecords(long j, long j2) {
        try {
            deleteRecords(new TimeInterval(j, j2));
        } catch (RocksDBException e) {
            this.log.error("Error when deleting records from the ccsdstmindex", e);
        }
    }

    @Override // org.yamcs.archive.TmIndexService
    public IndexIterator getIterator(List<Yamcs.NamedObjectId> list, long j, long j2) {
        return list == null ? new CcsdsIndexIteratorAdapter(null, j, j2) : new CcsdsIndexIteratorAdapter((Set) list.stream().filter(namedObjectId -> {
            return namedObjectId.getName().startsWith("apid_");
        }).map(namedObjectId2 -> {
            return Short.valueOf(namedObjectId2.getName().substring(5));
        }).collect(Collectors.toSet()), j, j2);
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void streamClosed(Stream stream) {
        this.log.warn("Stream {} closed", stream.getName());
        this.streamNames.remove(stream.getName());
        if (this.streamNames.isEmpty()) {
            this.log.warn("No stream left");
            notifyFailed(new Exception("stream clsed"));
        }
    }

    public synchronized CompletableFuture<Void> rebuild(TimeInterval timeInterval) throws YarchException {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (timeInterval.hasStart() || timeInterval.hasEnd()) {
            this.log.info("{}: Rebuilding the CCSDS tm index for time interval: {}", this.yamcsInstance, timeInterval.toStringEncoded());
            try {
                deleteRecords(timeInterval);
            } catch (Exception e) {
                this.log.error("Error when removing the existing CCSDS tm index", e);
                completableFuture.completeExceptionally(e);
                return completableFuture;
            }
        } else {
            this.log.info("{} Rebuilding the CCSDS tm index from scratch", this.yamcsInstance);
            try {
                this.tablespace.removeTbsIndex(Tablespace.TablespaceRecord.Type.TM_INDEX, this.tbsIndex);
                openDb();
            } catch (Exception e2) {
                this.log.error("Error when removing existing tm index", e2);
                completableFuture.completeExceptionally(e2);
                return completableFuture;
            }
        }
        String str = "histo_rebuild_" + streamCounter.incrementAndGet();
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(this.yamcsInstance);
        try {
            yarchDatabase.execute("create stream " + str + " as select * from tm " + getWhereCondition("gentime", timeInterval), new Object[0]);
            Stream stream = yarchDatabase.getStream(str);
            stream.addSubscriber(new StreamSubscriber() { // from class: org.yamcs.archive.CcsdsTmIndex.1
                @Override // org.yamcs.yarch.StreamSubscriber
                public void streamClosed(Stream stream2) {
                    completableFuture.complete(null);
                }

                @Override // org.yamcs.yarch.StreamSubscriber
                public void onTuple(Stream stream2, Tuple tuple) {
                    CcsdsTmIndex.this.onTuple(stream2, tuple);
                }
            });
            stream.start();
            return completableFuture;
        } catch (ParseException | StreamSqlException e3) {
            throw new RuntimeException(e3);
        }
    }

    private synchronized void deleteRecords(TimeInterval timeInterval) throws RocksDBException {
        YRDB rdb = this.tablespace.getRdb();
        RocksIterator newIterator = rdb.newIterator();
        try {
            newIterator.seek(firstKey(this.tbsIndex));
            newIterator.next();
            while (newIterator.isValid()) {
                Record record = new Record(newIterator.key(), newIterator.value());
                if (record.apid == Short.MAX_VALUE) {
                    break;
                }
                byte[] key = Record.key(this.tbsIndex, record.apid, timeInterval.hasStart() ? timeInterval.getStart() : 0L, (short) 0);
                byte[] key2 = timeInterval.hasEnd() ? Record.key(this.tbsIndex, record.apid, timeInterval.getEnd(), (short) 0) : Record.key(this.tbsIndex, record.apid, Long.MAX_VALUE, (short) 0);
                newIterator.seek(key2);
                rdb.getDb().deleteRange(key, key2);
            }
            if (newIterator != null) {
                newIterator.close();
            }
        } catch (Throwable th) {
            if (newIterator != null) {
                try {
                    newIterator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static String getWhereCondition(String str, TimeInterval timeInterval) {
        if (!timeInterval.hasStart() && !timeInterval.hasEnd()) {
            return HttpServer.TYPE_URL_PREFIX;
        }
        StringBuilder sb = new StringBuilder();
        sb.append(" where ");
        if (timeInterval.hasStart()) {
            sb.append(str + " >= " + (HistogramSegment.GROUPING_FACTOR * (timeInterval.getStart() / HistogramSegment.GROUPING_FACTOR)));
            if (timeInterval.hasEnd()) {
                sb.append(" and ");
            }
        }
        if (timeInterval.hasEnd()) {
            sb.append(str + " < " + (HistogramSegment.GROUPING_FACTOR * (1 + (timeInterval.getEnd() / HistogramSegment.GROUPING_FACTOR))));
        }
        return sb.toString();
    }

    static {
        $assertionsDisabled = !CcsdsTmIndex.class.desiredAssertionStatus();
        maxApidInterval = HistogramSegment.GROUPING_FACTOR;
        streamCounter = new AtomicInteger();
    }
}
