package org.opentripplanner.ext.siri.updater.azure;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import jakarta.xml.bind.JAXBException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.xml.stream.XMLStreamException;
import org.apache.hc.core5.net.URIBuilder;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.framework.time.DurationUtils;
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.rutebanken.siri20.util.SiriXml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri20.Siri;

/* loaded from: input_file:org/opentripplanner/ext/siri/updater/azure/SiriAzureETUpdater.class */
public class SiriAzureETUpdater extends AbstractAzureSiriUpdater {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SiriAzureSXUpdater.class);
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0);
    private final LocalDate fromDateTime;
    private final SiriTimetableSnapshotSource snapshotSource;
    private Instant startTime;
    private final Consumer<UpdateResult> recordMetrics;

    public SiriAzureETUpdater(SiriAzureETUpdaterParameters siriAzureETUpdaterParameters, TransitModel transitModel, SiriTimetableSnapshotSource siriTimetableSnapshotSource) {
        super(siriAzureETUpdaterParameters, transitModel);
        this.fromDateTime = siriAzureETUpdaterParameters.getFromDateTime();
        this.snapshotSource = siriTimetableSnapshotSource;
        this.recordMetrics = TripUpdateMetrics.streaming(siriAzureETUpdaterParameters);
    }

    @Override // org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater
    protected void messageConsumer(ServiceBusReceivedMessageContext serviceBusReceivedMessageContext) {
        ServiceBusReceivedMessage message = serviceBusReceivedMessageContext.getMessage();
        MESSAGE_COUNTER.incrementAndGet();
        if (MESSAGE_COUNTER.get() % 100 == 0) {
            LOG.info("Total SIRI-ET messages received={}", Long.valueOf(MESSAGE_COUNTER.get()));
        }
        processMessage(message.getBody().toString(), message.getMessageId());
    }

    @Override // org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater
    protected void errorConsumer(ServiceBusErrorContext serviceBusErrorContext) {
        defaultErrorConsumer(serviceBusErrorContext);
    }

    @Override // org.opentripplanner.ext.siri.updater.azure.AbstractAzureSiriUpdater
    protected void initializeData(String str, Consumer<ServiceBusReceivedMessageContext> consumer) throws URISyntaxException {
        if (str == null) {
            LOG.info("No history url set up for Siri Azure ET Updater");
            return;
        }
        URI build = new URIBuilder(str).addParameter("fromDateTime", this.fromDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE)).build();
        while (!isPrimed()) {
            this.startTime = Instant.now();
            LOG.info("Fetching initial Siri ET data from {}, timeout is {}ms", build, Integer.valueOf(this.timeout));
            long currentTimeMillis = System.currentTimeMillis();
            String fetchInitialData = fetchInitialData(build);
            LOG.info("Fetching initial data - finished after {} ms, got {} bytes", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(fetchInitialData.length()));
            processHistory(fetchInitialData, "ET-INITIAL-1");
        }
    }

    private void processMessage(String str, String str2) {
        try {
            List<EstimatedTimetableDeliveryStructure> updates = getUpdates(str, str2);
            if (updates.isEmpty()) {
                return;
            }
            this.saveResultOnGraph.execute((graph, transitModel) -> {
                this.snapshotSource.applyEstimatedTimetable(fuzzyTripMatcher(), entityResolver(), this.feedId, false, updates);
            });
        } catch (JAXBException | XMLStreamException e) {
            LOG.error(e.getLocalizedMessage(), (Throwable) e);
        }
    }

    private void processHistory(String str, String str2) {
        try {
            List<EstimatedTimetableDeliveryStructure> updates = getUpdates(str, str2);
            if (updates.isEmpty()) {
                LOG.info("Did not receive any ET messages from history endpoint");
            } else {
                this.saveResultOnGraph.execute((graph, transitModel) -> {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        this.recordMetrics.accept(this.snapshotSource.applyEstimatedTimetable(fuzzyTripMatcher(), entityResolver(), this.feedId, false, updates));
                        setPrimed(true);
                        LOG.info("Azure ET updater initialized after {} ms: [time since startup: {}]", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), DurationUtils.durationToStr(Duration.between(this.startTime, Instant.now())));
                    } catch (Exception e) {
                        LOG.error("Could not process ET history", (Throwable) e);
                    }
                }).get();
            }
        } catch (JAXBException | XMLStreamException | InterruptedException | ExecutionException e) {
            LOG.error(e.getLocalizedMessage(), (Throwable) e);
        }
    }

    private List<EstimatedTimetableDeliveryStructure> getUpdates(String str, String str2) throws JAXBException, XMLStreamException {
        Siri parseXml = SiriXml.parseXml(str);
        if (parseXml.getServiceDelivery() != null && parseXml.getServiceDelivery().getEstimatedTimetableDeliveries() != null && !parseXml.getServiceDelivery().getEstimatedTimetableDeliveries().isEmpty()) {
            return parseXml.getServiceDelivery().getEstimatedTimetableDeliveries();
        }
        if (parseXml.getHeartbeatNotification() != null) {
            LOG.info("Received SIRI heartbeat message");
        } else {
            LOG.warn("Empty Siri message {}: {}", str2, str);
        }
        return new ArrayList();
    }
}
