package org.opentripplanner.ext.siri.updater.azure;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import jakarta.xml.bind.JAXBException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.xml.stream.XMLStreamException;
import org.apache.hc.core5.net.URIBuilder;
import org.opentripplanner.ext.siri.SiriAlertsUpdateHandler;
import org.opentripplanner.framework.time.DurationUtils;
import org.opentripplanner.routing.impl.TransitAlertServiceImpl;
import org.opentripplanner.routing.services.TransitAlertService;
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.alert.TransitAlertProvider;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.Siri;

/* loaded from: input_file:org/opentripplanner/ext/siri/updater/azure/SiriAzureSXUpdater.class */
public class SiriAzureSXUpdater extends AbstractAzureSiriUpdater implements TransitAlertProvider {
    private final Logger LOG;
    private final SiriAlertsUpdateHandler updateHandler;
    private final TransitAlertService transitAlertService;
    private static final transient AtomicLong messageCounter = new AtomicLong(0);
    private final LocalDate fromDateTime;
    private final LocalDate toDateTime;
    private Instant startTime;

    public SiriAzureSXUpdater(SiriAzureSXUpdaterParameters siriAzureSXUpdaterParameters, TransitModel transitModel) {
        super(siriAzureSXUpdaterParameters, transitModel);
        this.LOG = LoggerFactory.getLogger(getClass());
        this.fromDateTime = siriAzureSXUpdaterParameters.getFromDateTime();
        this.toDateTime = siriAzureSXUpdaterParameters.getToDateTime();
        this.transitAlertService = new TransitAlertServiceImpl(transitModel);
        this.updateHandler = new SiriAlertsUpdateHandler(this.feedId, transitModel, this.transitAlertService, fuzzyTripMatcher(), Duration.ZERO);
    }

    @Override // org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater
    protected void messageConsumer(ServiceBusReceivedMessageContext serviceBusReceivedMessageContext) {
        ServiceBusReceivedMessage message = serviceBusReceivedMessageContext.getMessage();
        this.LOG.debug("Processing message. messageId={}, sequenceNumber={}, enqueued time={}", message.getMessageId(), Long.valueOf(message.getSequenceNumber()), message.getEnqueuedTime());
        messageCounter.incrementAndGet();
        processMessage(message.getBody().toString(), message.getMessageId());
    }

    @Override // org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater
    protected void errorConsumer(ServiceBusErrorContext serviceBusErrorContext) {
        defaultErrorConsumer(serviceBusErrorContext);
    }

    @Override // org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater
    protected void initializeData(String str, Consumer<ServiceBusReceivedMessageContext> consumer) throws URISyntaxException {
        if (str == null) {
            this.LOG.info("No history url set up for Siri Azure Sx Updater");
            return;
        }
        while (!isPrimed()) {
            this.startTime = Instant.now();
            URI build = new URIBuilder(str).addParameter("publishFromDateTime", this.fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)).addParameter("publishToDateTime", this.toDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)).build();
            this.LOG.info("Fetching initial Siri SX data from {}, timeout is {}ms", build, Integer.valueOf(this.timeout));
            long currentTimeMillis = System.currentTimeMillis();
            String fetchInitialData = fetchInitialData(build);
            this.LOG.info("Fetching initial data - finished after {} ms, got {} bytes", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(fetchInitialData.length()));
            processHistory(fetchInitialData, "SX-INITIAL-1");
        }
    }

    private Siri getSiri(String str, String str2) throws XMLStreamException, JAXBException {
        Siri parseXml = SiriXml.parseXml(str);
        if (parseXml.getServiceDelivery() != null && parseXml.getServiceDelivery().getSituationExchangeDeliveries() != null && !parseXml.getServiceDelivery().getSituationExchangeDeliveries().isEmpty()) {
            return parseXml;
        }
        if (parseXml.getHeartbeatNotification() != null) {
            this.LOG.info("Received SIRI heartbeat message");
            return null;
        }
        this.LOG.warn("Empty Siri message for messageId {}", str2);
        this.LOG.debug(str);
        return null;
    }

    private void processMessage(String str, String str2) {
        try {
            Siri siri = getSiri(str, str2);
            if (siri == null) {
                return;
            }
            this.saveResultOnGraph.execute((graph, transitModel) -> {
                this.updateHandler.update(siri.getServiceDelivery());
            });
        } catch (JAXBException | XMLStreamException e) {
            this.LOG.error(e.getLocalizedMessage(), (Throwable) e);
        }
    }

    private void processHistory(String str, String str2) {
        try {
            Siri siri = getSiri(str, str2);
            if (siri == null) {
                this.LOG.info("Did not receive any SX messages from history endpoint.");
            } else {
                this.saveResultOnGraph.execute((graph, transitModel) -> {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        this.updateHandler.update(siri.getServiceDelivery());
                        this.LOG.info("Azure SX updater initialized after {} ms: [time since startup: {}]", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), DurationUtils.durationToStr(Duration.between(this.startTime, Instant.now())));
                        setPrimed(true);
                    } catch (Exception e) {
                        this.LOG.error("Could not process SX history", (Throwable) e);
                    }
                }).get();
            }
        } catch (JAXBException | XMLStreamException | InterruptedException | ExecutionException e) {
            this.LOG.error(e.getLocalizedMessage(), (Throwable) e);
        }
    }

    @Override // org.opentripplanner.updater.alert.TransitAlertProvider
    public TransitAlertService getTransitAlertService() {
        return this.transitAlertService;
    }
}
