package org.opentripplanner.ext.siri.updater;

import ch.qos.logback.core.CoreConstants;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.pubsub.v1.ExpirationPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.entur.protobuf.mapper.SiriMapper;
import org.opentripplanner.ext.siri.EntityResolver;
import org.opentripplanner.ext.siri.SiriFuzzyTripMatcher;
import org.opentripplanner.ext.siri.SiriTimetableSnapshotSource;
import org.opentripplanner.framework.application.ApplicationShutdownSupport;
import org.opentripplanner.framework.io.HttpUtils;
import org.opentripplanner.framework.io.OtpHttpClient;
import org.opentripplanner.framework.retry.OtpRetry;
import org.opentripplanner.framework.retry.OtpRetryBuilder;
import org.opentripplanner.framework.text.FileSizeToTextConverter;
import org.opentripplanner.framework.time.DurationUtils;
import org.opentripplanner.transit.service.DefaultTransitService;
import org.opentripplanner.transit.service.TransitModel;
import org.opentripplanner.updater.spi.GraphUpdater;
import org.opentripplanner.updater.spi.UpdateResult;
import org.opentripplanner.updater.spi.WriteToGraphCallback;
import org.opentripplanner.updater.trip.metrics.TripUpdateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.siri.siri20.EstimatedTimetableDeliveryStructure;
import uk.org.siri.siri20.Siri;
import uk.org.siri.www.siri.SiriType;

/* loaded from: input_file:org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdater.class */
public class SiriETGooglePubsubUpdater implements GraphUpdater {
    private static final String SUBSCRIPTION_PREFIX = "siri-et-";
    private static final int RETRY_MAX_ATTEMPTS = Integer.MAX_VALUE;
    private static final int RETRY_BACKOFF = 2;
    private final URI dataInitializationUrl;
    private final String feedId;
    private final Duration reconnectPeriod;
    private final Duration initialGetDataTimeout;
    private final String subscriptionName;
    private final ProjectTopicName topic;
    private final Subscriber subscriber;
    private final PushConfig pushConfig;
    private final String configRef;
    private final SiriTimetableSnapshotSource snapshotSource;
    private final SiriFuzzyTripMatcher fuzzyTripMatcher;
    private final Instant startTime = Instant.now();
    private final Consumer<UpdateResult> recordMetrics;
    private final EntityResolver entityResolver;
    private final OtpRetry retry;
    private WriteToGraphCallback saveResultOnGraph;
    private volatile boolean primed;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SiriETGooglePubsubUpdater.class);
    private static final AtomicLong MESSAGE_COUNTER = new AtomicLong(0);
    private static final AtomicLong UPDATE_COUNTER = new AtomicLong(0);
    private static final AtomicLong SIZE_COUNTER = new AtomicLong(0);
    private static final Duration RETRY_INITIAL_DELAY = Duration.ofSeconds(1);

    /* loaded from: input_file:org/opentripplanner/ext/siri/updater/SiriETGooglePubsubUpdater$EstimatedTimetableMessageReceiver.class */
    class EstimatedTimetableMessageReceiver implements MessageReceiver {
        EstimatedTimetableMessageReceiver() {
        }

        @Override // com.google.cloud.pubsub.v1.MessageReceiver
        public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
            SiriETGooglePubsubUpdater.this.processSiriData(pubsubMessage.getData());
            ackReplyConsumer.ack();
        }
    }

    public SiriETGooglePubsubUpdater(SiriETGooglePubsubUpdaterParameters siriETGooglePubsubUpdaterParameters, TransitModel transitModel, SiriTimetableSnapshotSource siriTimetableSnapshotSource) {
        this.configRef = siriETGooglePubsubUpdaterParameters.configRef();
        this.dataInitializationUrl = URI.create(siriETGooglePubsubUpdaterParameters.dataInitializationUrl());
        this.feedId = siriETGooglePubsubUpdaterParameters.feedId();
        this.reconnectPeriod = siriETGooglePubsubUpdaterParameters.reconnectPeriod();
        this.initialGetDataTimeout = siriETGooglePubsubUpdaterParameters.initialGetDataTimeout();
        this.snapshotSource = siriTimetableSnapshotSource;
        String buildSubscriptionId = buildSubscriptionId();
        String subscriptionProjectName = siriETGooglePubsubUpdaterParameters.subscriptionProjectName();
        String str = siriETGooglePubsubUpdaterParameters.topicProjectName();
        String str2 = siriETGooglePubsubUpdaterParameters.topicName();
        this.subscriptionName = ProjectSubscriptionName.of(subscriptionProjectName, buildSubscriptionId).toString();
        this.subscriber = Subscriber.newBuilder(this.subscriptionName, new EstimatedTimetableMessageReceiver()).build();
        this.topic = ProjectTopicName.of(str, str2);
        this.pushConfig = PushConfig.getDefaultInstance();
        DefaultTransitService defaultTransitService = new DefaultTransitService(transitModel);
        this.entityResolver = new EntityResolver(defaultTransitService, this.feedId);
        this.fuzzyTripMatcher = siriETGooglePubsubUpdaterParameters.fuzzyTripMatching() ? SiriFuzzyTripMatcher.of(defaultTransitService) : null;
        this.recordMetrics = TripUpdateMetrics.streaming(siriETGooglePubsubUpdaterParameters);
        addShutdownHook();
        this.retry = new OtpRetryBuilder().withName("SIRI-ET Google PubSub Updater setup").withMaxAttempts(Integer.MAX_VALUE).withInitialRetryInterval(RETRY_INITIAL_DELAY).withBackoffMultiplier(2).build();
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public void setGraphUpdaterManager(WriteToGraphCallback writeToGraphCallback) {
        this.saveResultOnGraph = writeToGraphCallback;
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public void run() {
        try {
            LOG.info("Creating subscription {}", this.subscriptionName);
            this.retry.execute(this::createSubscription);
            LOG.info("Created subscription {}", this.subscriptionName);
            this.retry.execute(this::initializeData);
            while (true) {
                try {
                    this.subscriber.startAsync().awaitRunning();
                    this.subscriber.awaitTerminated();
                } catch (IllegalStateException e) {
                    this.subscriber.stopAsync();
                }
                Thread.sleep(this.reconnectPeriod.toMillis());
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            LOG.info("OTP is shutting down, stopping the SIRI ET Google PubSub Updater.");
        }
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public void teardown() {
        if (this.subscriber != null) {
            LOG.info("Stopping SIRI-ET PubSub subscriber  {}", this.subscriptionName);
            this.subscriber.stopAsync();
        }
        deleteSubscription();
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public boolean isPrimed() {
        return this.primed;
    }

    @Override // org.opentripplanner.updater.spi.GraphUpdater
    public String getConfigRef() {
        return this.configRef;
    }

    private void addShutdownHook() {
        Thread thread = new Thread(this::teardown, "siri-et-google-pubsub-shutdown");
        if (ApplicationShutdownSupport.addShutdownHook(thread, thread.getName())) {
            return;
        }
        LOG.info("Instance is already shutting down - cleaning up immediately.");
        teardown();
    }

    private static String buildSubscriptionId() {
        String str = System.getenv(CoreConstants.HOSTNAME_KEY);
        return (str == null || str.isEmpty()) ? "siri-et-otp-" + UUID.randomUUID() : "siri-et-" + str + "-" + Instant.now().toEpochMilli();
    }

    private void createSubscription() {
        try {
            SubscriptionAdminClient create = SubscriptionAdminClient.create();
            try {
                create.createSubscription(Subscription.newBuilder().setTopic(this.topic.toString()).setName(this.subscriptionName).setPushConfig(this.pushConfig).setMessageRetentionDuration(com.google.protobuf.Duration.newBuilder().setSeconds(600L).build()).setExpirationPolicy(ExpirationPolicy.newBuilder().setTtl(com.google.protobuf.Duration.newBuilder().setSeconds(86400L).build()).build()).build());
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException("Unable to initialize Google Pubsub-updater: System.getenv('GOOGLE_APPLICATION_CREDENTIALS') = " + System.getenv(ServiceOptions.CREDENTIAL_ENV_NAME));
        }
    }

    private void deleteSubscription() {
        try {
            SubscriptionAdminClient create = SubscriptionAdminClient.create();
            try {
                LOG.info("Deleting subscription {}", this.subscriptionName);
                create.deleteSubscription(this.subscriptionName);
                LOG.info("Subscription deleted {} - time since startup: {}", this.subscriptionName, DurationUtils.durationToStr(Duration.between(this.startTime, Instant.now())));
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (NotFoundException e) {
            LOG.info("Subscription {} not found, ignoring deletion request", this.subscriptionName);
        } catch (IOException e2) {
            LOG.error("Could not delete subscription {}", this.subscriptionName);
        }
    }

    private String getTimeSinceStartupString() {
        return DurationUtils.durationToStr(Duration.between(this.startTime, Instant.now()));
    }

    private void initializeData() {
        if (this.dataInitializationUrl != null) {
            LOG.info("Fetching initial data from {}", this.dataInitializationUrl);
            long currentTimeMillis = System.currentTimeMillis();
            ByteString fetchInitialData = fetchInitialData();
            long currentTimeMillis2 = System.currentTimeMillis();
            LOG.info("Fetching initial data - finished after {} ms, got {}", Long.valueOf(currentTimeMillis2 - currentTimeMillis), FileSizeToTextConverter.fileSizeToString(fetchInitialData.size()));
            processSiriData(fetchInitialData);
            this.primed = true;
            LOG.info("Pubsub updater initialized after {} ms: [messages: {},  updates: {}, total size: {}, time since startup: {}]", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2), Long.valueOf(MESSAGE_COUNTER.get()), Long.valueOf(UPDATE_COUNTER.get()), FileSizeToTextConverter.fileSizeToString(SIZE_COUNTER.get()), getTimeSinceStartupString());
        }
    }

    private ByteString fetchInitialData() {
        OtpHttpClient otpHttpClient = new OtpHttpClient();
        try {
            ByteString byteString = (ByteString) otpHttpClient.getAndMap(this.dataInitializationUrl, this.initialGetDataTimeout, Map.of("Content-Type", HttpUtils.APPLICATION_X_PROTOBUF), ByteString::readFrom);
            otpHttpClient.close();
            return byteString;
        } catch (Throwable th) {
            try {
                otpHttpClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void processSiriData(ByteString byteString) {
        try {
            SIZE_COUNTER.addAndGet(byteString.size());
            Siri mapToJaxb = SiriMapper.mapToJaxb(SiriType.parseFrom(byteString));
            if (mapToJaxb.getServiceDelivery() != null) {
                List<EstimatedTimetableDeliveryStructure> estimatedTimetableDeliveries = mapToJaxb.getServiceDelivery().getEstimatedTimetableDeliveries();
                int i = 0;
                try {
                    i = estimatedTimetableDeliveries.get(0).getEstimatedJourneyVersionFrames().get(0).getEstimatedVehicleJourneies().size();
                } catch (Exception e) {
                }
                long addAndGet = UPDATE_COUNTER.addAndGet(i);
                long incrementAndGet = MESSAGE_COUNTER.incrementAndGet();
                if (incrementAndGet % 1000 == 0) {
                    LOG.info("Pubsub stats: [messages: {}, updates: {}, total size: {}, current delay {} ms, time since startup: {}]", Long.valueOf(incrementAndGet), Long.valueOf(addAndGet), FileSizeToTextConverter.fileSizeToString(SIZE_COUNTER.get()), Long.valueOf(Duration.between(mapToJaxb.getServiceDelivery().getResponseTimestamp().toInstant(), Instant.now()).toMillis()), getTimeSinceStartupString());
                }
                Future<?> execute = this.saveResultOnGraph.execute((graph, transitModel) -> {
                    this.recordMetrics.accept(this.snapshotSource.applyEstimatedTimetable(this.fuzzyTripMatcher, this.entityResolver, this.feedId, false, estimatedTimetableDeliveries));
                });
                if (isPrimed()) {
                    return;
                }
                try {
                    execute.get();
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e2);
                } catch (ExecutionException e3) {
                    throw new RuntimeException(e3);
                }
            }
        } catch (InvalidProtocolBufferException e4) {
            throw new RuntimeException(e4);
        }
    }
}
