/*
 * Decompiled with CFR 0.152.
 */
package org.opentripplanner.ext.siri.updater.azure;

import com.azure.core.credential.TokenCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusFailureReason;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient;
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import jakarta.xml.bind.JAXBException;
import java.io.UncheckedIOException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.xml.stream.XMLStreamException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.entur.siri21.util.SiriXml;
import org.opentripplanner.ext.siri.updater.azure.AuthenticationType;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureETUpdater;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureETUpdaterParameters;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureInitializationException;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureMessageHandler;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdater;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureSXUpdaterParameters;
import org.opentripplanner.ext.siri.updater.azure.SiriAzureUpdaterParameters;
import org.opentripplanner.framework.application.ApplicationShutdownSupport;
import org.opentripplanner.framework.io.OtpHttpClient;
import org.opentripplanner.framework.io.OtpHttpClientException;
import org.opentripplanner.framework.io.OtpHttpClientFactory;
import org.opentripplanner.routing.services.TransitAlertService;
import org.opentripplanner.transit.service.TimetableRepository;
import org.opentripplanner.updater.alert.TransitAlertProvider;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.HttpHeaders;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.siri.SiriRealTimeTripUpdateAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri21.ServiceDelivery;
import uk.org.siri.siri21.Siri;

public class SiriAzureUpdater
implements GraphUpdater {
    private static final Set<ServiceBusFailureReason> RETRYABLE_REASONS = Set.of(ServiceBusFailureReason.GENERAL_ERROR, ServiceBusFailureReason.QUOTA_EXCEEDED, ServiceBusFailureReason.SERVICE_BUSY, ServiceBusFailureReason.SERVICE_COMMUNICATION_ERROR, ServiceBusFailureReason.SERVICE_TIMEOUT, ServiceBusFailureReason.UNAUTHORIZED, ServiceBusFailureReason.MESSAGE_LOCK_LOST, ServiceBusFailureReason.SESSION_LOCK_LOST, ServiceBusFailureReason.SESSION_CANNOT_BE_LOCKED);
    private static final Set<ServiceBusFailureReason> NON_RETRYABLE_REASONS = Set.of(ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND, ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED, ServiceBusFailureReason.MESSAGE_SIZE_EXCEEDED, ServiceBusFailureReason.MESSAGE_NOT_FOUND, ServiceBusFailureReason.MESSAGING_ENTITY_ALREADY_EXISTS);
    private static final Logger log = LoggerFactory.getLogger(SiriAzureUpdater.class);
    private final String updaterType;
    private final AuthenticationType authenticationType;
    private final String fullyQualifiedNamespace;
    private final String configRef;
    private final String serviceBusUrl;
    private final String topicName;
    private final Duration autoDeleteOnIdle;
    private final int prefetchCount;
    private ServiceBusProcessorClient eventProcessor;
    private ServiceBusAdministrationClient serviceBusAdmin;
    private boolean isPrimed = false;
    private String subscriptionName;
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0L);
    private static final int MESSAGE_COUNTER_LOG_INTERVAL = 100;
    private static final int ERROR_RETRY_WAIT_SECONDS = 5;
    private static final int INITIAL_RETRY_DELAY_MS = 1000;
    private static final int MAX_RETRY_DELAY_MS = 60000;
    protected final SiriAzureMessageHandler messageHandler;
    @Nullable
    private final URI dataInitializationUrl;
    private final int timeout;
    private final Duration startupTimeout;

    SiriAzureUpdater(SiriAzureUpdaterParameters config, SiriAzureMessageHandler messageHandler) {
        this.messageHandler = Objects.requireNonNull(messageHandler);
        try {
            this.dataInitializationUrl = config.buildDataInitializationUrl().orElse(null);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("Invalid history url", e);
        }
        this.configRef = Objects.requireNonNull(config.configRef(), "configRef must not be null");
        this.authenticationType = Objects.requireNonNull(config.getAuthenticationType(), "authenticationType must not be null");
        this.topicName = Objects.requireNonNull(config.getTopicName(), "topicName must not be null");
        this.updaterType = Objects.requireNonNull(config.getType(), "type must not be null");
        this.timeout = config.getTimeout();
        this.startupTimeout = config.getStartupTimeout();
        this.autoDeleteOnIdle = config.getAutoDeleteOnIdle();
        this.prefetchCount = config.getPrefetchCount();
        if (this.authenticationType == AuthenticationType.FederatedIdentity) {
            this.fullyQualifiedNamespace = Objects.requireNonNull(config.getFullyQualifiedNamespace(), "fullyQualifiedNamespace must not be null when using FederatedIdentity authentication");
            this.serviceBusUrl = null;
        } else if (this.authenticationType == AuthenticationType.SharedAccessKey) {
            this.serviceBusUrl = Objects.requireNonNull(config.getServiceBusUrl(), "serviceBusUrl must not be null when using SharedAccessKey authentication");
            this.fullyQualifiedNamespace = null;
        } else {
            throw new IllegalArgumentException("Unsupported authentication type: " + String.valueOf((Object)this.authenticationType));
        }
    }

    public static SiriAzureUpdater createETUpdater(SiriAzureETUpdaterParameters config, SiriRealTimeTripUpdateAdapter adapter) {
        SiriAzureETUpdater messageHandler = new SiriAzureETUpdater(config, adapter);
        return new SiriAzureUpdater(config, messageHandler);
    }

    public static SiriAzureUpdater createSXUpdater(SiriAzureSXUpdaterParameters config, TimetableRepository timetableRepository) {
        SiriAzureSXUpdater messageHandler = new SiriAzureSXUpdater(config, timetableRepository);
        return new SxWrapper((SiriAzureUpdaterParameters)config, messageHandler);
    }

    @Override
    public void setup(WriteToGraphCallback writeToGraphCallback) {
        this.messageHandler.setup(writeToGraphCallback);
    }

    @Override
    public void run() {
        try {
            this.subscriptionName = System.getenv("HOSTNAME");
            if (this.subscriptionName == null || this.subscriptionName.isBlank()) {
                this.subscriptionName = "otp-" + String.valueOf(UUID.randomUUID());
            }
            this.tryStartupStep(this::setupSubscription, "ServiceBusSubscription");
            this.tryStartupStep(() -> {
                Optional<ServiceDelivery> initialData = this.fetchInitialSiriData();
                if (initialData.isEmpty()) {
                    log.info("Got empty response from history endpoint");
                } else {
                    this.processInitialSiriData(initialData.get());
                }
            }, "HistoricalSiriData");
            this.tryStartupStep(this::startEventProcessor, "ServiceBusEventProcessor");
            this.setPrimed();
            this.registerShutdownHook();
        }
        catch (InterruptedException e) {
            log.info("Startup interrupted, aborting updater initialization");
            Thread.currentThread().interrupt();
        }
    }

    void sleep(int millis) throws InterruptedException {
        Thread.sleep(millis);
    }

    boolean executeWithRetry(CheckedRunnable task, String description, long timeoutMs) throws Exception {
        int sleepPeriod = 1000;
        int attemptCounter = 1;
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < timeoutMs) {
            try {
                task.run();
                log.info("{} succeeded after {} attempts.", (Object)description, (Object)attemptCounter);
                return true;
            }
            catch (InterruptedException ie) {
                log.warn("{} was interrupted.", (Object)description);
                Thread.currentThread().interrupt();
                throw ie;
            }
            catch (Exception e) {
                log.warn("{} failed. Error: {} (Attempt {})", new Object[]{description, e.getMessage(), attemptCounter});
                if (!this.shouldRetry(e)) {
                    log.error("{} encountered a non-retryable error: {}.", (Object)description, (Object)e.getMessage());
                    throw e;
                }
                log.debug("{} will retry in {} ms.", (Object)description, (Object)sleepPeriod);
                ++attemptCounter;
                try {
                    this.sleep(sleepPeriod);
                }
                catch (InterruptedException ie) {
                    log.warn("{} was interrupted during sleep.", (Object)description);
                    Thread.currentThread().interrupt();
                    throw ie;
                }
                sleepPeriod = Math.min(sleepPeriod * 2, 60000);
            }
        }
        log.warn("{} timed out after {} ms", (Object)description, (Object)timeoutMs);
        return false;
    }

    boolean shouldRetry(Exception e) {
        if (e instanceof ServiceBusException) {
            ServiceBusException sbException = (ServiceBusException)e;
            ServiceBusFailureReason reason = sbException.getReason();
            if (RETRYABLE_REASONS.contains(reason)) {
                log.warn("Transient error encountered: {}. Retrying...", (Object)reason);
                return true;
            }
            if (NON_RETRYABLE_REASONS.contains(reason)) {
                log.error("Non-recoverable error encountered: {}. Not retrying.", (Object)reason);
                return false;
            }
            log.warn("Unhandled ServiceBusFailureReason: {}. Retrying by default.", (Object)reason);
            return true;
        }
        if (ExceptionUtils.hasCause((Throwable)e, OtpHttpClientException.class)) {
            return true;
        }
        if (this.getNetworkErrorType(e).isPresent()) {
            log.warn("Network connectivity error encountered: {}. Retrying...", (Object)this.getNetworkErrorType(e).get());
            return true;
        }
        log.warn("Non-ServiceBus exception encountered: {}. Not retrying.", (Object)e.getClass().getName());
        return false;
    }

    private Optional<String> getNetworkErrorType(Exception e) {
        Throwable cause;
        if (ExceptionUtils.hasCause((Throwable)e, UnknownHostException.class)) {
            return Optional.of("DNS resolution failure");
        }
        if (ExceptionUtils.hasCause((Throwable)e, SocketTimeoutException.class)) {
            return Optional.of("Socket timeout");
        }
        if (ExceptionUtils.hasCause((Throwable)e, UncheckedIOException.class) && (cause = ExceptionUtils.getRootCause((Throwable)e)) != null && cause.getClass().getSimpleName().contains("ConnectTimeoutException")) {
            return Optional.of("Connection timeout");
        }
        return Optional.empty();
    }

    private void tryStartupStep(CheckedRunnable task, String stepDescription) throws InterruptedException {
        try {
            boolean success = this.executeWithRetry(task, stepDescription, this.startupTimeout.toMillis());
            if (success) {
                log.info("{} completed successfully", (Object)stepDescription);
            } else {
                log.warn("REALTIME_STARTUP_ALERT component={} status=TIMEOUT error=\"{} timed out after {} ms\"", new Object[]{stepDescription, stepDescription, this.startupTimeout.toMillis()});
            }
        }
        catch (InterruptedException e) {
            log.warn("REALTIME_STARTUP_ALERT component={} status=INTERRUPTED error=\"Aborting startup due to interrupt\"", (Object)stepDescription);
            Thread.currentThread().interrupt();
            throw e;
        }
        catch (Exception e) {
            String message = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName();
            log.warn("REALTIME_STARTUP_ALERT component={} status=FAILED error=\"{}\"", (Object)stepDescription, (Object)message);
        }
    }

    private void registerShutdownHook() {
        ApplicationShutdownSupport.addShutdownHook("azure-siri-updater-shutdown-" + this.updaterType, this::performShutdown);
    }

    private void performShutdown() {
        log.info("Starting shutdown for {} updater", (Object)this.updaterType);
        if (this.eventProcessor != null) {
            try {
                this.eventProcessor.close();
                log.debug("Event processor closed successfully");
            }
            catch (Exception e) {
                log.warn("Error closing event processor: {}", (Object)e.getMessage());
            }
        }
        if (this.serviceBusAdmin != null && this.subscriptionName != null) {
            try {
                this.serviceBusAdmin.deleteSubscription(this.topicName, this.subscriptionName);
                log.info("Subscription '{}' deleted on topic '{}'", (Object)this.subscriptionName, (Object)this.topicName);
            }
            catch (Exception e) {
                log.warn("Error deleting subscription '{}': {}", (Object)this.subscriptionName, (Object)e.getMessage());
            }
        }
        log.info("Shutdown complete for {} updater", (Object)this.updaterType);
    }

    private void setupSubscription() throws ServiceBusException, URISyntaxException {
        if (this.authenticationType == AuthenticationType.FederatedIdentity) {
            this.serviceBusAdmin = new ServiceBusAdministrationClientBuilder().credential(this.fullyQualifiedNamespace, (TokenCredential)new DefaultAzureCredentialBuilder().executorService((ExecutorService)MoreExecutors.newDirectExecutorService()).build()).buildClient();
        } else if (this.authenticationType == AuthenticationType.SharedAccessKey) {
            this.serviceBusAdmin = new ServiceBusAdministrationClientBuilder().connectionString(this.serviceBusUrl).buildClient();
        }
        CreateSubscriptionOptions options = new CreateSubscriptionOptions().setAutoDeleteOnIdle(this.autoDeleteOnIdle);
        if (this.serviceBusAdmin.getSubscriptionExists(this.topicName, this.subscriptionName)) {
            log.info("Subscription '{}' already exists. Deleting existing subscription.", (Object)this.subscriptionName);
            this.serviceBusAdmin.deleteSubscription(this.topicName, this.subscriptionName);
            log.info("Service Bus deleted subscription {}.", (Object)this.subscriptionName);
        }
        this.serviceBusAdmin.createSubscription(this.topicName, this.subscriptionName, options);
        log.info("{} updater created subscription {}", (Object)this.updaterType, (Object)this.subscriptionName);
    }

    private void startEventProcessor() throws ServiceBusException {
        ServiceBusClientBuilder clientBuilder = new ServiceBusClientBuilder();
        if (this.authenticationType == AuthenticationType.FederatedIdentity) {
            Preconditions.checkNotNull((Object)this.fullyQualifiedNamespace, (Object)"fullyQualifiedNamespace must be set for FederatedIdentity authentication");
            clientBuilder.fullyQualifiedNamespace(this.fullyQualifiedNamespace).credential((TokenCredential)new DefaultAzureCredentialBuilder().build());
        } else if (this.authenticationType == AuthenticationType.SharedAccessKey) {
            Preconditions.checkNotNull((Object)this.serviceBusUrl, (Object)"serviceBusUrl must be set for SharedAccessKey authentication");
            clientBuilder.connectionString(this.serviceBusUrl);
        } else {
            throw new IllegalArgumentException("Unsupported authentication type: " + String.valueOf((Object)this.authenticationType));
        }
        this.eventProcessor = clientBuilder.processor().topicName(this.topicName).subscriptionName(this.subscriptionName).receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE).disableAutoComplete().prefetchCount(this.prefetchCount).processError(this::errorConsumer).processMessage(this::handleMessage).buildProcessorClient();
        this.eventProcessor.start();
        log.info("Service Bus processor started for topic '{}' and subscription '{}', prefetching {} messages.", new Object[]{this.topicName, this.subscriptionName, this.prefetchCount});
    }

    private void handleMessage(ServiceBusReceivedMessageContext messageContext) {
        ServiceBusReceivedMessage message = messageContext.getMessage();
        MESSAGE_COUNTER.incrementAndGet();
        if (MESSAGE_COUNTER.get() % 100L == 0L) {
            log.debug("Total SIRI-{} messages received={}", (Object)this.updaterType, (Object)MESSAGE_COUNTER.get());
        }
        try {
            String siriXmlMessage = message.getBody().toString();
            Siri siri = SiriXml.parseXml((String)siriXmlMessage);
            ServiceDelivery serviceDelivery = siri.getServiceDelivery();
            if (serviceDelivery == null) {
                if (siri.getHeartbeatNotification() != null) {
                    log.debug("Updater {} received SIRI heartbeat message", (Object)this.updaterType);
                } else {
                    log.debug("Updater {} received SIRI message without ServiceDelivery", (Object)this.updaterType);
                }
            } else {
                this.messageHandler.handleMessage(serviceDelivery, message.getMessageId());
            }
        }
        catch (JAXBException | XMLStreamException e) {
            log.error(e.getLocalizedMessage(), e);
        }
    }

    @Override
    public boolean isPrimed() {
        return this.isPrimed;
    }

    private void setPrimed() {
        this.isPrimed = true;
    }

    @Override
    public String getConfigRef() {
        return this.configRef;
    }

    private Optional<ServiceDelivery> fetchInitialSiriData() {
        if (this.dataInitializationUrl == null) {
            return Optional.empty();
        }
        Map<String, String> headers = HttpHeaders.of().acceptApplicationXML().build().asMap();
        log.info("Fetching initial Siri data from {}, timeout is {} ms.", (Object)this.dataInitializationUrl, (Object)this.timeout);
        try (OtpHttpClientFactory otpHttpClientFactory = new OtpHttpClientFactory();){
            OtpHttpClient otpHttpClient = otpHttpClientFactory.create(log);
            long t1 = System.currentTimeMillis();
            Optional<Siri> siriOptional = otpHttpClient.executeAndMapOptional((HttpUriRequestBase)new HttpGet(this.dataInitializationUrl), Duration.ofMillis(this.timeout), headers, SiriXml::parseXml);
            long t2 = System.currentTimeMillis();
            log.info("Fetched initial data in {} ms", (Object)(t2 - t1));
            if (siriOptional.isEmpty()) {
                log.info("Got status 204 'No Content'.");
            }
            Optional<ServiceDelivery> optional = siriOptional.map(Siri::getServiceDelivery);
            return optional;
        }
    }

    public void processInitialSiriData(ServiceDelivery serviceDelivery) {
        try {
            long t1 = System.currentTimeMillis();
            Future<?> f = this.messageHandler.handleMessage(serviceDelivery, "history-message");
            if (f != null) {
                f.get();
            }
            log.info("{} updater initialized in {} ms.", (Object)this.updaterType, (Object)(System.currentTimeMillis() - t1));
        }
        catch (InterruptedException | ExecutionException e) {
            throw new SiriAzureInitializationException("Error applying history", e);
        }
    }

    private void errorConsumer(ServiceBusErrorContext errorContext) {
        log.error("Error when receiving messages from namespace={}, Entity={}", (Object)errorContext.getFullyQualifiedNamespace(), (Object)errorContext.getEntityPath());
        Throwable throwable = errorContext.getException();
        if (!(throwable instanceof ServiceBusException)) {
            log.error("Non-ServiceBusException occurred!", errorContext.getException());
            return;
        }
        ServiceBusException e = (ServiceBusException)throwable;
        ServiceBusFailureReason reason = e.getReason();
        if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND) {
            log.error("An unrecoverable error occurred. Stopping processing with reason {} {}", (Object)reason, (Object)e.getMessage());
        } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
            log.error("Message lock lost for message", (Throwable)e);
        } else if (reason == ServiceBusFailureReason.SERVICE_BUSY || reason == ServiceBusFailureReason.UNAUTHORIZED) {
            log.error("Service Bus is busy or unauthorized, wait and try again");
            try {
                TimeUnit.SECONDS.sleep(5L);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                log.info("OTP is shutting down, stopping processing of ServiceBus error messages");
            }
        } else {
            log.error(e.getLocalizedMessage(), (Throwable)e);
        }
    }

    public static class SxWrapper
    extends SiriAzureUpdater
    implements TransitAlertProvider {
        SxWrapper(SiriAzureUpdaterParameters config, SiriAzureSXUpdater messageHandler) {
            super(config, messageHandler);
        }

        @Override
        public TransitAlertService getTransitAlertService() {
            return ((SiriAzureSXUpdater)this.messageHandler).getTransitAlertService();
        }
    }

    @FunctionalInterface
    static interface CheckedRunnable {
        public void run() throws Exception;
    }
}

