/*
 * Decompiled with CFR 0.152.
 */
package org.opentripplanner.updater.trip.siri.updater.google;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import jakarta.xml.bind.JAXBException;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import javax.xml.stream.XMLStreamException;
import org.entur.siri21.util.SiriXml;
import org.opentripplanner.framework.application.ApplicationShutdownSupport;
import org.opentripplanner.framework.io.OtpHttpClient;
import org.opentripplanner.framework.io.OtpHttpClientFactory;
import org.opentripplanner.framework.retry.OtpRetry;
import org.opentripplanner.framework.retry.OtpRetryBuilder;
import org.opentripplanner.updater.trip.siri.updater.AsyncEstimatedTimetableSource;
import org.opentripplanner.utils.text.FileSizeToTextConverter;
import org.opentripplanner.utils.time.DurationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri21.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri21.EstimatedVersionFrameStructure;
import uk.org.siri.siri21.ServiceDelivery;
import uk.org.siri.siri21.Siri;

public class GooglePubsubEstimatedTimetableSource
implements AsyncEstimatedTimetableSource {
    private static final Logger LOG = LoggerFactory.getLogger(GooglePubsubEstimatedTimetableSource.class);
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0L);
    private static final AtomicLong UPDATE_COUNTER = new AtomicLong(0L);
    private static final AtomicLong SIZE_COUNTER = new AtomicLong(0L);
    private static final String SUBSCRIPTION_PREFIX = "siri-et-";
    private static final int RETRY_MAX_ATTEMPTS = Integer.MAX_VALUE;
    private static final java.time.Duration RETRY_INITIAL_DELAY = java.time.Duration.ofSeconds(1L);
    private static final int RETRY_BACKOFF = 2;
    private final URI dataInitializationUrl;
    private final java.time.Duration reconnectPeriod;
    private final java.time.Duration initialGetDataTimeout;
    private final String subscriptionName;
    private final ProjectTopicName topic;
    private final Subscriber subscriber;
    private final PushConfig pushConfig;
    private final Instant startTime = Instant.now();
    private final OtpRetry retry;
    private Function<ServiceDelivery, Future<?>> serviceDeliveryConsumer;
    private volatile boolean primed;

    public GooglePubsubEstimatedTimetableSource(String dataInitializationUrl, java.time.Duration reconnectPeriod, java.time.Duration initialGetDataTimeout, String subscriptionProjectName, String topicProjectName, String topicName) {
        this.dataInitializationUrl = URI.create(dataInitializationUrl);
        this.reconnectPeriod = reconnectPeriod;
        this.initialGetDataTimeout = initialGetDataTimeout;
        String subscriptionId = GooglePubsubEstimatedTimetableSource.buildSubscriptionId();
        this.subscriptionName = ProjectSubscriptionName.of((String)subscriptionProjectName, (String)subscriptionId).toString();
        this.subscriber = Subscriber.newBuilder((String)this.subscriptionName, (MessageReceiver)new EstimatedTimetableMessageReceiver()).build();
        this.topic = ProjectTopicName.of((String)topicProjectName, (String)topicName);
        this.pushConfig = PushConfig.getDefaultInstance();
        this.retry = new OtpRetryBuilder().withName("SIRI-ET Google PubSub Updater setup").withMaxAttempts(Integer.MAX_VALUE).withInitialRetryInterval(RETRY_INITIAL_DELAY).withBackoffMultiplier(2).build();
        this.addShutdownHook();
    }

    @Override
    public void start(Function<ServiceDelivery, Future<?>> serviceDeliveryConsumer) {
        this.serviceDeliveryConsumer = serviceDeliveryConsumer;
        try {
            LOG.info("Creating subscription {}", (Object)this.subscriptionName);
            this.retry.execute(this::createSubscription);
            LOG.info("Created subscription {}", (Object)this.subscriptionName);
            this.retry.execute(this::initializeData);
            while (true) {
                try {
                    this.subscriber.startAsync().awaitRunning();
                    this.primed = true;
                    this.subscriber.awaitTerminated();
                }
                catch (IllegalStateException e) {
                    this.subscriber.stopAsync();
                }
                Thread.sleep(this.reconnectPeriod.toMillis());
            }
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOG.info("OTP is shutting down, stopping the SIRI ET Google PubSub Updater.");
            return;
        }
    }

    @Override
    public boolean isPrimed() {
        return this.primed;
    }

    private static String buildSubscriptionId() {
        String hostname = System.getenv("HOSTNAME");
        if (hostname == null || hostname.isEmpty()) {
            return "siri-et-otp-" + String.valueOf(UUID.randomUUID());
        }
        return SUBSCRIPTION_PREFIX + hostname + "-" + Instant.now().toEpochMilli();
    }

    private void createSubscription() {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
            subscriptionAdminClient.createSubscription(Subscription.newBuilder().setTopic(this.topic.toString()).setName(this.subscriptionName).setPushConfig(this.pushConfig).setMessageRetentionDuration(Duration.newBuilder().setSeconds(600L).build()).setExpirationPolicy(ExpirationPolicy.newBuilder().setTtl(Duration.newBuilder().setSeconds(86400L).build()).build()).build());
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to initialize Google Pubsub-updater: System.getenv('GOOGLE_APPLICATION_CREDENTIALS') = " + System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
        }
    }

    private void deleteSubscription() {
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
            LOG.info("Deleting subscription {}", (Object)this.subscriptionName);
            subscriptionAdminClient.deleteSubscription(this.subscriptionName);
            LOG.info("Subscription deleted {} - time since startup: {}", (Object)this.subscriptionName, (Object)DurationUtils.durationToStr((java.time.Duration)java.time.Duration.between(this.startTime, Instant.now())));
        }
        catch (IOException e) {
            LOG.error("Could not delete subscription {}", (Object)this.subscriptionName);
        }
        catch (NotFoundException nfe) {
            LOG.info("Subscription {} not found, ignoring deletion request", (Object)this.subscriptionName);
        }
    }

    private Optional<ServiceDelivery> serviceDelivery(ByteString data) {
        Siri siri;
        try {
            siri = SiriXml.parseXml((String)data.toStringUtf8());
        }
        catch (JAXBException | XMLStreamException e) {
            throw new RuntimeException(e);
        }
        return Optional.ofNullable(siri.getServiceDelivery());
    }

    private void initializeData() {
        if (this.dataInitializationUrl != null) {
            LOG.info("Fetching initial data from {}", (Object)this.dataInitializationUrl);
            long t1 = System.currentTimeMillis();
            ByteString value = this.fetchInitialData();
            long t2 = System.currentTimeMillis();
            LOG.info("Fetching initial data - finished after {} ms, got {}", (Object)(t2 - t1), (Object)FileSizeToTextConverter.fileSizeToString((long)value.size()));
            this.serviceDelivery(value).map(this.serviceDeliveryConsumer).ifPresent(future -> {
                try {
                    future.get();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
            LOG.info("Pubsub updater initialized after {} ms: [messages: {},  updates: {}, total size: {}, time since startup: {}]", new Object[]{System.currentTimeMillis() - t2, MESSAGE_COUNTER.get(), UPDATE_COUNTER.get(), FileSizeToTextConverter.fileSizeToString((long)SIZE_COUNTER.get()), this.getTimeSinceStartupString()});
        }
    }

    private ByteString fetchInitialData() {
        try (OtpHttpClientFactory otpHttpClientFactory = new OtpHttpClientFactory();){
            OtpHttpClient otpHttpClient = otpHttpClientFactory.create(LOG);
            ByteString byteString = otpHttpClient.getAndMap(this.dataInitializationUrl, this.initialGetDataTimeout, Map.of("Content-Type", "application/xml"), ByteString::readFrom);
            return byteString;
        }
    }

    private void addShutdownHook() {
        ApplicationShutdownSupport.addShutdownHook("siri-et-google-pubsub-shutdown", () -> {
            if (this.subscriber != null) {
                LOG.info("Stopping SIRI-ET PubSub subscriber '{}'.", (Object)this.subscriptionName);
                this.subscriber.stopAsync();
            }
            this.deleteSubscription();
        });
    }

    private String getTimeSinceStartupString() {
        return DurationUtils.durationToStr((java.time.Duration)java.time.Duration.between(this.startTime, Instant.now()));
    }

    private void logPubsubMessage(ServiceDelivery serviceDelivery) {
        int numberOfUpdatedTrips = 0;
        try {
            numberOfUpdatedTrips = ((EstimatedVersionFrameStructure)((EstimatedTimetableDeliveryStructure)serviceDelivery.getEstimatedTimetableDeliveries().getFirst()).getEstimatedJourneyVersionFrames().getFirst()).getEstimatedVehicleJourneies().size();
        }
        catch (Exception exception) {
            // empty catch block
        }
        long numberOfUpdates = UPDATE_COUNTER.addAndGet(numberOfUpdatedTrips);
        long numberOfMessages = MESSAGE_COUNTER.incrementAndGet();
        if (numberOfMessages % 1000L == 0L) {
            LOG.info("Pubsub stats: [messages: {}, updates: {}, total size: {}, current delay {} ms, time since startup: {}]", new Object[]{numberOfMessages, numberOfUpdates, FileSizeToTextConverter.fileSizeToString((long)SIZE_COUNTER.get()), java.time.Duration.between(serviceDelivery.getResponseTimestamp().toInstant(), Instant.now()).toMillis(), this.getTimeSinceStartupString()});
        }
    }

    class EstimatedTimetableMessageReceiver
    implements MessageReceiver {
        EstimatedTimetableMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            Optional<ServiceDelivery> serviceDelivery = GooglePubsubEstimatedTimetableSource.this.serviceDelivery(message.getData());
            serviceDelivery.ifPresent(sd -> {
                GooglePubsubEstimatedTimetableSource.this.logPubsubMessage((ServiceDelivery)sd);
                GooglePubsubEstimatedTimetableSource.this.serviceDeliveryConsumer.apply((ServiceDelivery)sd);
            });
            consumer.ack();
        }
    }
}

