/*
 * Decompiled with CFR 0.152.
 */
package net.soundvibe.reacto.discovery.couchbase;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.AbstractDocument;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.error.CASMismatchException;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.error.TemporaryFailureException;
import com.couchbase.client.java.error.TemporaryLockFailureException;
import com.couchbase.client.java.view.AsyncViewResult;
import com.couchbase.client.java.view.AsyncViewRow;
import com.couchbase.client.java.view.DefaultView;
import com.couchbase.client.java.view.DesignDocument;
import com.couchbase.client.java.view.Stale;
import com.couchbase.client.java.view.ViewQuery;
import hu.akarnokd.rxjava.interop.RxJavaInterop;
import io.reactivex.Flowable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import net.soundvibe.reacto.client.events.CommandHandlerRegistry;
import net.soundvibe.reacto.discovery.AbstractServiceRegistry;
import net.soundvibe.reacto.discovery.types.ServiceRecord;
import net.soundvibe.reacto.discovery.types.ServiceType;
import net.soundvibe.reacto.discovery.types.Status;
import net.soundvibe.reacto.mappers.ServiceRegistryMapper;
import net.soundvibe.reacto.types.Any;
import net.soundvibe.reacto.types.Command;
import net.soundvibe.reacto.types.json.JsonObject;
import net.soundvibe.reacto.utils.Scheduler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

public final class CouchbaseServiceRegistry
extends AbstractServiceRegistry {
    private static final Logger log = LoggerFactory.getLogger(CouchbaseServiceRegistry.class);
    public static final ViewQuery DEFAULT_VIEW_QUERY = ViewQuery.from((String)"reacto", (String)"services");
    public static final ServiceRecord DEFAULT_SERVICE_RECORD = ServiceRecord.create((String)"UNKNOWN", (Status)Status.UNKNOWN, (ServiceType)ServiceType.LOCAL, (String)UUID.randomUUID().toString(), (JsonObject)JsonObject.empty(), (JsonObject)JsonObject.empty());
    public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_SECONDS = 60;
    private final Supplier<Bucket> bucketSupplier;
    private final ViewQuery viewQuery;
    private final ServiceRecord serviceRecord;
    private final AtomicBoolean isOpen = new AtomicBoolean(false);
    private final AtomicReference<Timer> timer = new AtomicReference();
    private final com.couchbase.client.java.document.json.JsonObject serviceObject;
    private final int heartBeatIntervalInSeconds;
    private static final String viewMapFunction = "function (doc, meta) {\n  if (doc.status && doc.objectType) {\n    if (doc.objectType === \"reacto-service-registry\" && doc.status === \"UP\") {\n      emit(doc.registration, null);\n    }\n  }\n}";

    public CouchbaseServiceRegistry(Supplier<Bucket> bucketSupplier, CommandHandlerRegistry eventHandlerRegistry, ServiceRegistryMapper mapper, ServiceRecord serviceRecord) {
        this(bucketSupplier, DEFAULT_VIEW_QUERY, eventHandlerRegistry, mapper, serviceRecord, 60);
    }

    public CouchbaseServiceRegistry(Supplier<Bucket> bucketSupplier, ViewQuery viewQuery, CommandHandlerRegistry eventHandlerRegistry, ServiceRegistryMapper mapper, ServiceRecord serviceRecord, int heartBeatIntervalInSeconds) {
        super(eventHandlerRegistry, mapper);
        Objects.requireNonNull(bucketSupplier, "bucketSupplier cannot be null");
        Objects.requireNonNull(viewQuery, "viewQuery cannot be null");
        Objects.requireNonNull(serviceRecord, "serviceRecord cannot be null");
        this.bucketSupplier = bucketSupplier;
        this.viewQuery = viewQuery;
        this.serviceRecord = serviceRecord;
        this.serviceObject = CouchbaseServiceRegistry.toCouchbaseObject(serviceRecord);
        this.heartBeatIntervalInSeconds = heartBeatIntervalInSeconds;
    }

    public static com.couchbase.client.java.document.json.JsonObject toCouchbaseObject(ServiceRecord serviceRecord) {
        return com.couchbase.client.java.document.json.JsonObject.fromJson((String)serviceRecord.toJson());
    }

    public Flowable<DesignDocument> updateDefaultView(String designDocument, String viewName) {
        return Flowable.just((Object)DesignDocument.create((String)designDocument, Collections.singletonList(DefaultView.create((String)viewName, (String)viewMapFunction)))).flatMap(doc -> RxJavaInterop.toV2Flowable((Observable)this.bucketSupplier.get().bucketManager().async().upsertDesignDocument(doc)));
    }

    public Flowable<ServiceRecord> findRecords() {
        return RxJavaInterop.toV2Flowable((Observable)this.bucketSupplier.get().async().query(this.viewQuery).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).flatMap(AsyncViewResult::rows).flatMap(AsyncViewRow::document).map(AbstractDocument::content).map(CouchbaseServiceRegistry::toRecord));
    }

    protected Flowable<List<ServiceRecord>> findRecordsOf(Command command) {
        return this.findRecords().filter(serviceRecord -> serviceRecord.isCompatibleWith(command)).toList().toFlowable().defaultIfEmpty(Collections.emptyList());
    }

    private static Boolean RETRY_DEFAULT(Integer retryCount, Throwable error) {
        return retryCount < 10 && (error instanceof TemporaryFailureException || error instanceof TemporaryLockFailureException || error instanceof CASMismatchException);
    }

    public static ServiceRecord toRecord(com.couchbase.client.java.document.json.JsonObject jsonObject) {
        return ServiceRecord.fromJson((String)jsonObject.toString());
    }

    public Flowable<Any> unpublish(ServiceRecord serviceRecord) {
        return RxJavaInterop.toV2Flowable((Observable)this.bucketSupplier.get().async().remove(serviceRecord.registrationId, PersistTo.NONE, ReplicateTo.NONE).flatMap(jsonDocument -> this.bucketSupplier.get().async().query(ViewQuery.from((String)this.viewQuery.getDesign(), (String)this.viewQuery.getView()).stale(Stale.FALSE).limit(1)).map(AsyncViewResult::success)).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).filter(wasRefreshed -> wasRefreshed).map(__ -> Any.VOID).switchIfEmpty(Observable.error((Throwable)new IllegalStateException("Service record was not unpublished"))));
    }

    public Flowable<Any> publish() {
        return RxJavaInterop.toV2Flowable((Observable)this.bucketSupplier.get().async().upsert((Document)JsonDocument.create((String)this.serviceRecord.registrationId, (int)this.ttl(), (com.couchbase.client.java.document.json.JsonObject)this.serviceObject), PersistTo.NONE, ReplicateTo.NONE).flatMap(jsonDocument -> this.bucketSupplier.get().async().query(ViewQuery.from((String)this.viewQuery.getDesign(), (String)this.viewQuery.getView()).stale(Stale.FALSE).limit(1)).map(AsyncViewResult::success)).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).filter(wasAdded -> wasAdded).map(__ -> Any.VOID).switchIfEmpty(Observable.error((Throwable)new IllegalStateException("Service record was not published"))));
    }

    public Flowable<Any> update() {
        return RxJavaInterop.toV2Flowable((Observable)this.bucketSupplier.get().async().touch(this.serviceRecord.registrationId, this.ttl())).onErrorResumeNext(error -> error instanceof DocumentDoesNotExistException ? this.publish().map(any -> true) : Flowable.error((Throwable)error)).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).filter(wasUpdated -> wasUpdated).map(__ -> Any.VOID).switchIfEmpty((Publisher)Flowable.error((Throwable)new IllegalStateException("Service record was not updated")));
    }

    private int ttl() {
        return (int)((double)this.heartBeatIntervalInSeconds * 1.5);
    }

    private void startHeartBeat() {
        this.timer.set(Scheduler.scheduleAtFixedInterval((long)TimeUnit.SECONDS.toMillis(this.heartBeatIntervalInSeconds), () -> Flowable.just((Object)this.serviceRecord).filter(rec -> this.isOpen.get()).flatMap(rec -> this.update()).subscribe(any -> log.info("Service was updated successfully"), error -> log.error("Error when updating service registration: " + error, error)), (String)"Couchbase service registry heartbeat"));
    }

    public Flowable<Any> register() {
        return Flowable.just((Object)this.serviceRecord).filter(rec -> !rec.equals((Object)DEFAULT_SERVICE_RECORD)).filter(rec -> !this.isOpen.get()).flatMap(rec -> this.publish()).doOnNext(any -> this.startHeartBeat()).doOnNext(any -> Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Executing shutdown hook...");
            this.unregister().subscribe(__ -> log.info("Service was successfully unregistered before shutting down"), error -> log.error("Service was unable to unregister before shutting down: " + error));
        }))).doOnNext(any -> this.isOpen.set(true));
    }

    public Flowable<Any> unregister() {
        return Flowable.just((Object)this.serviceRecord).filter(rec -> !rec.equals((Object)DEFAULT_SERVICE_RECORD)).filter(rec -> this.isOpen.get()).doOnNext(rec -> this.timer.get().cancel()).flatMap(this::unpublish).doOnNext(any -> this.isOpen.set(false));
    }
}

