package org.occurrent.subscription.mongodb.spring.blocking;

import com.mongodb.MongoCommandException;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.cloudevents.CloudEvent;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.PreDestroy;
import org.bson.BsonDocument;
import org.bson.Document;
import org.occurrent.mongodb.timerepresentation.TimeRepresentation;
import org.occurrent.retry.RetryStrategy;
import org.occurrent.retry.internal.RetryExecution;
import org.occurrent.subscription.PositionAwareCloudEvent;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.SubscriptionFilter;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.blocking.PositionAwareSubscriptionModel;
import org.occurrent.subscription.mongodb.MongoOperationTimeSubscriptionPosition;
import org.occurrent.subscription.mongodb.MongoResumeTokenSubscriptionPosition;
import org.occurrent.subscription.mongodb.internal.MongoCloudEventsToJsonDeserializer;
import org.occurrent.subscription.mongodb.internal.MongoCommons;
import org.occurrent.subscription.mongodb.spring.internal.ApplyFilterToChangeStreamOptionsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.mongodb.UncategorizedMongoDbException;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.Subscription;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest;

/* loaded from: input_file:org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionModel.class */
public class SpringMongoSubscriptionModel implements PositionAwareSubscriptionModel, SmartLifecycle {
    private static final Logger log = LoggerFactory.getLogger(SpringMongoSubscriptionModel.class);
    private final String eventCollection;
    private final MessageListenerContainer messageListenerContainer;
    private final ConcurrentMap<String, InternalSubscription> runningSubscriptions;
    private final ConcurrentMap<String, InternalSubscription> pausedSubscriptions;
    private final TimeRepresentation timeRepresentation;
    private final MongoOperations mongoOperations;
    private final RetryStrategy retryStrategy;
    private final boolean restartSubscriptionsOnChangeStreamHistoryLost;
    private volatile boolean shutdown;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/occurrent/subscription/mongodb/spring/blocking/SpringMongoSubscriptionModel$InternalSubscription.class */
    public static class InternalSubscription {
        private final SpringMongoSubscription occurrentSubscription;
        private final Function<StartAt, ChangeStreamRequest<Document>> changeStreamRequestBuilder;

        private InternalSubscription(SpringMongoSubscription springMongoSubscription, Function<StartAt, ChangeStreamRequest<Document>> function) {
            this.occurrentSubscription = springMongoSubscription;
            this.changeStreamRequestBuilder = function;
        }

        InternalSubscription copy(Subscription subscription) {
            return new InternalSubscription(new SpringMongoSubscription(this.occurrentSubscription.id(), subscription), this.changeStreamRequestBuilder);
        }

        ChangeStreamRequest<Document> newChangeStreamRequest() {
            return this.changeStreamRequestBuilder.apply(null);
        }

        ChangeStreamRequest<Document> newChangeStreamRequest(StartAt startAt) {
            return this.changeStreamRequestBuilder.apply(startAt);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof InternalSubscription)) {
                return false;
            }
            InternalSubscription internalSubscription = (InternalSubscription) obj;
            return Objects.equals(this.occurrentSubscription, internalSubscription.occurrentSubscription) && Objects.equals(this.changeStreamRequestBuilder, internalSubscription.changeStreamRequestBuilder);
        }

        Subscription getSpringSubscription() {
            return this.occurrentSubscription.getSubscriptionReference().get();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void shutdown() {
            this.occurrentSubscription.shutdown();
        }

        public int hashCode() {
            return Objects.hash(this.occurrentSubscription, this.changeStreamRequestBuilder);
        }

        public String toString() {
            return new StringJoiner(", ", InternalSubscription.class.getSimpleName() + "[", "]").add("occurrentSubscription=" + this.occurrentSubscription).add("changeStreamRequestBuilder=" + this.changeStreamRequestBuilder).toString();
        }
    }

    public SpringMongoSubscriptionModel(MongoTemplate mongoTemplate, String str, TimeRepresentation timeRepresentation) {
        this(mongoTemplate, SpringMongoSubscriptionModelConfig.withConfig(str, timeRepresentation));
    }

    public SpringMongoSubscriptionModel(MongoTemplate mongoTemplate, String str, TimeRepresentation timeRepresentation, RetryStrategy retryStrategy) {
        this(mongoTemplate, SpringMongoSubscriptionModelConfig.withConfig(str, timeRepresentation).retryStrategy(retryStrategy));
    }

    public SpringMongoSubscriptionModel(MongoTemplate mongoTemplate, SpringMongoSubscriptionModelConfig springMongoSubscriptionModelConfig) {
        this.shutdown = false;
        Objects.requireNonNull(mongoTemplate, MongoOperations.class.getSimpleName() + " cannot be null");
        Objects.requireNonNull(springMongoSubscriptionModelConfig, SpringMongoSubscriptionModelConfig.class.getSimpleName() + " cannot be null");
        this.mongoOperations = mongoTemplate;
        this.timeRepresentation = springMongoSubscriptionModelConfig.timeRepresentation;
        this.eventCollection = springMongoSubscriptionModelConfig.eventCollection;
        this.runningSubscriptions = new ConcurrentHashMap();
        this.pausedSubscriptions = new ConcurrentHashMap();
        this.retryStrategy = springMongoSubscriptionModelConfig.retryStrategy;
        this.restartSubscriptionsOnChangeStreamHistoryLost = springMongoSubscriptionModelConfig.restartSubscriptionsOnChangeStreamHistoryLost;
        this.messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, springMongoSubscriptionModelConfig.executor);
        this.messageListenerContainer.start();
    }

    public synchronized org.occurrent.subscription.api.blocking.Subscription subscribe(String str, SubscriptionFilter subscriptionFilter, StartAt startAt, Consumer<CloudEvent> consumer) {
        Objects.requireNonNull(str, "subscriptionId cannot be null");
        Objects.requireNonNull(consumer, "Action cannot be null");
        Objects.requireNonNull(startAt, "StartAt cannot be null");
        if (this.runningSubscriptions.containsKey(str) || this.pausedSubscriptions.containsKey(str)) {
            throw new IllegalArgumentException("Subscription " + str + " is already defined.");
        }
        Function function = startAt2 -> {
            return new ChangeStreamRequest.ChangeStreamRequestOptions((String) null, this.eventCollection, ApplyFilterToChangeStreamOptionsBuilder.applyFilter(this.timeRepresentation, subscriptionFilter, (ChangeStreamOptions.ChangeStreamOptionsBuilder) MongoCommons.applyStartPosition(ChangeStreamOptions.builder(), (v0, v1) -> {
                return v0.startAfter(v1);
            }, (v0, v1) -> {
                return v0.resumeAt(v1);
            }, startAt2 == null ? startAt : startAt2)));
        };
        MessageListener messageListener = message -> {
            ChangeStreamDocument changeStreamDocument = (ChangeStreamDocument) message.getRaw();
            BsonDocument resumeToken = ((ChangeStreamDocument) Objects.requireNonNull(changeStreamDocument)).getResumeToken();
            MongoCloudEventsToJsonDeserializer.deserializeToCloudEvent(changeStreamDocument, this.timeRepresentation).map(cloudEvent -> {
                return new PositionAwareCloudEvent(cloudEvent, new MongoResumeTokenSubscriptionPosition(resumeToken));
            }).ifPresent(RetryExecution.executeWithRetry(consumer, th -> {
                return !this.shutdown;
            }, this.retryStrategy));
        };
        Function function2 = startAt3 -> {
            return new ChangeStreamRequest(messageListener, (SubscriptionRequest.RequestOptions) function.apply(startAt3));
        };
        SpringMongoSubscription springMongoSubscription = new SpringMongoSubscription(str, registerNewSpringSubscription(str, (ChangeStreamRequest) function2.apply(null)));
        if (this.messageListenerContainer.isRunning()) {
            this.runningSubscriptions.put(str, new InternalSubscription(springMongoSubscription, function2));
        } else {
            this.pausedSubscriptions.put(str, new InternalSubscription(springMongoSubscription, function2));
        }
        return springMongoSubscription;
    }

    public void cancelSubscription(String str) {
        InternalSubscription remove = this.runningSubscriptions.remove(str);
        if (remove != null) {
            this.messageListenerContainer.remove(remove.getSpringSubscription());
        }
    }

    @PreDestroy
    public synchronized void shutdown() {
        this.shutdown = true;
        this.runningSubscriptions.forEach((str, internalSubscription) -> {
            internalSubscription.shutdown();
        });
        this.runningSubscriptions.clear();
        this.pausedSubscriptions.forEach((str2, internalSubscription2) -> {
            internalSubscription2.shutdown();
        });
        this.pausedSubscriptions.clear();
        stopMessageListenerContainer();
    }

    public SubscriptionPosition globalSubscriptionPosition() {
        try {
            return new MongoOperationTimeSubscriptionPosition(MongoCommons.getServerOperationTime(this.mongoOperations.executeCommand(new Document("hostInfo", 1)), 1));
        } catch (UncategorizedMongoDbException e) {
            if (!(e.getCause() instanceof MongoCommandException)) {
                throw e;
            }
            log.warn(MongoCommons.cannotFindGlobalSubscriptionPositionErrorMessage(e.getCause()));
            return null;
        }
    }

    public synchronized void pauseSubscription(String str) {
        InternalSubscription remove = this.runningSubscriptions.remove(str);
        if (remove == null) {
            throw new IllegalArgumentException("Subscription " + str + " isn't running.");
        }
        this.messageListenerContainer.remove(remove.getSpringSubscription());
        this.pausedSubscriptions.put(str, remove);
    }

    public synchronized org.occurrent.subscription.api.blocking.Subscription resumeSubscription(String str) {
        InternalSubscription remove = this.pausedSubscriptions.remove(str);
        if (remove == null) {
            throw new IllegalArgumentException("Subscription " + str + " isn't paused.");
        }
        if (!this.messageListenerContainer.isRunning()) {
            this.messageListenerContainer.start();
        }
        Subscription registerNewSpringSubscription = registerNewSpringSubscription(str, remove.newChangeStreamRequest());
        this.runningSubscriptions.put(str, remove.copy(registerNewSpringSubscription));
        return new SpringMongoSubscription(str, registerNewSpringSubscription);
    }

    public boolean isRunning(String str) {
        return !this.shutdown && this.runningSubscriptions.containsKey(str);
    }

    public boolean isPaused(String str) {
        return !this.shutdown && this.pausedSubscriptions.containsKey(str);
    }

    public synchronized void start() {
        if (this.shutdown) {
            return;
        }
        this.messageListenerContainer.start();
        this.pausedSubscriptions.forEach((str, internalSubscription) -> {
            resumeSubscription(str).waitUntilStarted();
        });
    }

    public synchronized void stop() {
        if (this.shutdown) {
            return;
        }
        this.runningSubscriptions.forEach((str, internalSubscription) -> {
            pauseSubscription(str);
        });
        stopMessageListenerContainer();
    }

    public boolean isRunning() {
        return !this.shutdown && this.messageListenerContainer.isRunning();
    }

    public boolean isAutoStartup() {
        return this.messageListenerContainer.isAutoStartup();
    }

    private void stopMessageListenerContainer() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MessageListenerContainer messageListenerContainer = this.messageListenerContainer;
        countDownLatch.getClass();
        messageListenerContainer.stop(countDownLatch::countDown);
        try {
            if (!countDownLatch.await(10L, TimeUnit.SECONDS)) {
                log.warn("Failed to stop " + SpringMongoSubscriptionModel.class.getSimpleName() + " after 10 seconds");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private Subscription registerNewSpringSubscription(String str, ChangeStreamRequest<Document> changeStreamRequest) {
        return this.messageListenerContainer.register(changeStreamRequest, Document.class, th -> {
            if (!(th instanceof UncategorizedMongoDbException)) {
                if (!isCursorNoLongerOpen(th)) {
                    log.error("An error occurred for subscription {}", str, th);
                    return;
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("Cursor is not longer open for subscription {}, this may happen if you pause a subscription very soon after subscribing.", str, th);
                        return;
                    }
                    return;
                }
            }
            if (th.getCause().getErrorCode() == 286) {
                String str2 = this.restartSubscriptionsOnChangeStreamHistoryLost ? "will restart subscription from current time." : "will not restart subscription! Consider remove the subscription from the durable storage or use a catch-up subscription to get up to speed if needed.";
                if (!this.restartSubscriptionsOnChangeStreamHistoryLost) {
                    log.error("There was not enough oplog to resume subscription {}, {}", new Object[]{str, str2, th});
                    return;
                }
                log.warn("There was not enough oplog to resume subscription {}, {}", new Object[]{str, str2, th});
                InternalSubscription internalSubscription = this.runningSubscriptions.get(str);
                if (internalSubscription != null) {
                    new Thread(() -> {
                        Subscription springSubscription = internalSubscription.getSpringSubscription();
                        internalSubscription.occurrentSubscription.changeSubscription(registerNewSpringSubscription(str, internalSubscription.newChangeStreamRequest(StartAt.now())));
                        this.messageListenerContainer.remove(springSubscription);
                        log.info("Subscription {} successfully restarted", str);
                    }).start();
                }
            }
        });
    }

    private static boolean isCursorNoLongerOpen(Throwable th) {
        return (th instanceof IllegalStateException) && th.getMessage().startsWith("Cursor") && th.getMessage().endsWith("is not longer open.");
    }
}
