/*
 * Decompiled with CFR 0.152.
 */
package io.kcache.keta.server.grpc;

import io.etcd.jetcd.api.Event;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.kcache.keta.KetaEngine;
import io.kcache.keta.auth.KetaAuthManager;
import io.kcache.keta.server.grpc.LeaseService;
import io.kcache.keta.server.grpc.errors.KetaErrorType;
import io.kcache.keta.server.grpc.utils.AuthServerInterceptor;
import io.kcache.keta.server.grpc.utils.GrpcUtils;
import io.kcache.keta.server.leader.KetaLeaderElector;
import io.kcache.keta.watch.KetaWatchManager;
import io.kcache.keta.watch.Watch;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WatchService
extends WatchGrpc.WatchImplBase {
    private static final Logger LOG = LoggerFactory.getLogger(LeaseService.class);
    private final KetaLeaderElector elector;

    public WatchService(KetaLeaderElector elector) {
        this.elector = elector;
    }

    @Override
    public StreamObserver<WatchRequest> watch(final StreamObserver<WatchResponse> responseObserver) {
        if (!KetaEngine.getInstance().isInitialized()) {
            responseObserver.onError((Throwable)KetaErrorType.Starting.toException());
            return null;
        }
        return new StreamObserver<WatchRequest>(){

            public void onNext(WatchRequest request) {
                LOG.debug("received a watchRequest {}", (Object)request);
                switch (request.getRequestUnionCase()) {
                    case CREATE_REQUEST: {
                        WatchService.this.handleCreateRequest(request.getCreateRequest(), (StreamObserver<WatchResponse>)responseObserver);
                        break;
                    }
                    case CANCEL_REQUEST: {
                        WatchService.this.handleCancelRequest(request.getCancelRequest(), (StreamObserver<WatchResponse>)responseObserver);
                        break;
                    }
                    case REQUESTUNION_NOT_SET: {
                        LOG.warn("received an empty watch request");
                    }
                }
            }

            public void onError(Throwable t) {
            }

            public void onCompleted() {
            }
        };
    }

    private void handleCreateRequest(WatchCreateRequest createRequest, StreamObserver<WatchResponse> responseObserver) {
        KetaWatchManager watchMgr = KetaEngine.getInstance().getWatchManager();
        if (watchMgr == null) {
            return;
        }
        KetaAuthManager authMgr = KetaEngine.getInstance().getAuthManager();
        if (authMgr != null) {
            authMgr.checkRangePermitted((String)AuthServerInterceptor.USER_CTX_KEY.get(), createRequest.getKey(), createRequest.getRangeEnd());
        }
        Watch watch = new Watch(0L, createRequest.getKey(), createRequest.getRangeEnd());
        watch = watchMgr.add(watch);
        long watchId = watch.getID();
        List<WatchCreateRequest.FilterType> filters = createRequest.getFiltersList();
        boolean prevKv = createRequest.getPrevKv();
        watchMgr.watch(watch, event -> {
            LOG.debug("inside WatchService");
            try {
                List<Object> events = Collections.singletonList(event);
                events = events.stream().filter(e -> e.getType() == Event.EventType.PUT && !filters.contains((Object)WatchCreateRequest.FilterType.NOPUT) || e.getType() == Event.EventType.DELETE && !filters.contains((Object)WatchCreateRequest.FilterType.NODELETE)).map(e -> prevKv ? e : Event.newBuilder().mergeFrom(event).clearPrevKv().build()).collect(Collectors.toList());
                responseObserver.onNext((Object)WatchResponse.newBuilder().setHeader(GrpcUtils.toResponseHeader(this.elector.getMemberId())).setWatchId(watchId).addAllEvents(events).build());
            }
            catch (StatusRuntimeException e2) {
                if (e2.getStatus().equals((Object)Status.CANCELLED)) {
                    LOG.warn("connection was closed");
                    return;
                }
                LOG.error("caught an error writing response: {}", (Object)e2.getMessage());
            }
        });
        responseObserver.onNext((Object)WatchResponse.newBuilder().setHeader(GrpcUtils.toResponseHeader(this.elector.getMemberId())).setWatchId(watchId).setCreated(true).build());
        LOG.debug("successfully registered new Watch");
    }

    private void handleCancelRequest(WatchCancelRequest cancelRequest, StreamObserver<WatchResponse> responseObserver) {
        LOG.debug("cancel watch");
        KetaWatchManager watchMgr = KetaEngine.getInstance().getWatchManager();
        if (watchMgr == null) {
            return;
        }
        long watchId = cancelRequest.getWatchId();
        watchMgr.delete(watchId);
        responseObserver.onNext((Object)WatchResponse.newBuilder().setHeader(GrpcUtils.toResponseHeader(this.elector.getMemberId())).setWatchId(watchId).setCanceled(true).build());
    }
}

