package org.yamcs.artemis;

import com.google.common.util.concurrent.AbstractService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yamcs.ConfigurationException;
import org.yamcs.YConfiguration;
import org.yamcs.YamcsService;
import org.yamcs.api.artemis.YamcsSession;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;

/* loaded from: input_file:org/yamcs/artemis/AbstractArtemisTranslatorService.class */
public class AbstractArtemisTranslatorService extends AbstractService implements YamcsService {
    private final TupleTranslator translator;
    YamcsSession yamcsSession;
    public static final String UNIQUEID_HDR_NAME = "_y_uniqueid";
    public static final int UNIQUEID = new Random().nextInt();
    public static final String ARTEMIS_URL_KEY = "artemisUrl";
    String instance;
    ServerLocator locator;
    List<Stream> streams = new ArrayList();
    Map<Stream, StreamSubscriber> streamSubscribers = new HashMap();
    Logger log = LoggerFactory.getLogger(getClass().getName());
    private final ThreadLocal<ArtemisClient> artemisClient = new ThreadLocal<ArtemisClient>() { // from class: org.yamcs.artemis.AbstractArtemisTranslatorService.1
        ArtemisClient client;

        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public ArtemisClient initialValue() {
            try {
                this.client = new ArtemisClient();
                ClientSessionFactory createSessionFactory = AbstractArtemisTranslatorService.this.locator.createSessionFactory();
                this.client.session = createSessionFactory.createSession();
                this.client.producer = this.client.session.createProducer();
                return this.client;
            } catch (Exception e) {
                throw new ConfigurationException("Cannot create a artemis client", e);
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/yamcs/artemis/AbstractArtemisTranslatorService$ArtemisClient.class */
    public static class ArtemisClient {
        ClientSession session;
        ClientProducer producer;

        ArtemisClient() {
        }
    }

    public AbstractArtemisTranslatorService(String str, List<String> list, TupleTranslator tupleTranslator) throws ConfigurationException {
        this.locator = getServerLocator(str);
        this.instance = str;
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(str);
        for (String str2 : list) {
            Stream stream = yarchDatabase.getStream(str2);
            if (stream == null) {
                throw new ConfigurationException("Cannot find stream '" + str2 + "'");
            }
            this.streams.add(stream);
        }
        this.translator = tupleTranslator;
    }

    public String toString() {
        return "ArtemisTmService";
    }

    protected void doStart() {
        for (Stream stream : this.streams) {
            final SimpleString simpleString = new SimpleString(this.instance + "." + stream.getName());
            this.log.debug("Starting providing tuples from stream {} as messages on {} ActiveMQ address", stream.getName(), simpleString.toString());
            StreamSubscriber streamSubscriber = new StreamSubscriber() { // from class: org.yamcs.artemis.AbstractArtemisTranslatorService.2
                public void onTuple(Stream stream2, Tuple tuple) {
                    try {
                        ArtemisClient artemisClient = (ArtemisClient) AbstractArtemisTranslatorService.this.artemisClient.get();
                        ClientMessage buildMessage = AbstractArtemisTranslatorService.this.translator.buildMessage(artemisClient.session.createMessage(false), tuple);
                        buildMessage.putIntProperty(AbstractArtemisTranslatorService.UNIQUEID_HDR_NAME, AbstractArtemisTranslatorService.UNIQUEID);
                        artemisClient.producer.send(simpleString, buildMessage);
                    } catch (IllegalArgumentException | ActiveMQException e) {
                        AbstractArtemisTranslatorService.this.log.warn("Got exception when sending message:", e);
                    }
                }

                public void streamClosed(Stream stream2) {
                    AbstractArtemisTranslatorService.this.log.info("Stream " + stream2 + " closed");
                }
            };
            stream.addSubscriber(streamSubscriber);
            this.streamSubscribers.put(stream, streamSubscriber);
        }
        notifyStarted();
    }

    protected void doStop() {
        for (Stream stream : this.streams) {
            stream.removeSubscriber(this.streamSubscribers.get(stream));
        }
        notifyStopped();
    }

    public static ServerLocator getServerLocator(String str) {
        String str2 = "vm:///";
        YConfiguration configuration = YConfiguration.getConfiguration("yamcs." + str);
        if (configuration.containsKey(ARTEMIS_URL_KEY)) {
            str2 = configuration.getString(ARTEMIS_URL_KEY);
        } else {
            YConfiguration configuration2 = YConfiguration.getConfiguration("yamcs");
            if (configuration2.containsKey(ARTEMIS_URL_KEY)) {
                str2 = configuration2.getString(ARTEMIS_URL_KEY);
            }
        }
        try {
            return ActiveMQClient.createServerLocator(str2);
        } catch (Exception e) {
            throw new ConfigurationException("Cannot create Artemis connection", e);
        }
    }
}
