package org.realityforge.replicant.server.ee.rest;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.PreDestroy;
import javax.enterprise.concurrent.ContextService;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.validation.constraints.NotNull;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.Response;

/* loaded from: input_file:org/realityforge/replicant/server/ee/rest/AbstractReplicantPollResource.class */
public abstract class AbstractReplicantPollResource {
    private final Map<AsyncResponse, SuspendedRequest> _requests = new ConcurrentHashMap();
    private ScheduledFuture<?> _future;

    /* loaded from: input_file:org/realityforge/replicant/server/ee/rest/AbstractReplicantPollResource$PendingDataChecker.class */
    public class PendingDataChecker implements Runnable {
        private final Lock _lock = new ReentrantLock(true);
        private final Map<AsyncResponse, SuspendedRequest> _requests;
        private final PollSource _source;

        PendingDataChecker(@Nonnull Map<AsyncResponse, SuspendedRequest> map, @Nonnull PollSource pollSource) {
            this._requests = map;
            this._source = pollSource;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this._lock.lockInterruptibly();
                try {
                    doPoll();
                    this._lock.unlock();
                } catch (Throwable th) {
                    this._lock.unlock();
                    throw th;
                }
            } catch (InterruptedException e) {
            }
        }

        private void doPoll() {
            Iterator<SuspendedRequest> it = this._requests.values().iterator();
            while (it.hasNext()) {
                SuspendedRequest next = it.next();
                if (!next.getResponse().isSuspended() || next.getResponse().isCancelled()) {
                    it.remove();
                } else {
                    try {
                        String poll = this._source.poll(next.getSessionID(), next.getRxSequence());
                        if (null != poll) {
                            AbstractReplicantPollResource.this.resume(next.getResponse(), poll);
                            it.remove();
                        }
                    } catch (Exception e) {
                        AbstractReplicantPollResource.this.handleException(next.getSessionID(), next.getResponse(), e);
                        it.remove();
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/realityforge/replicant/server/ee/rest/AbstractReplicantPollResource$PollSource.class */
    public interface PollSource {
        @Nullable
        String poll(@Nonnull String str, int i) throws Exception;
    }

    /* loaded from: input_file:org/realityforge/replicant/server/ee/rest/AbstractReplicantPollResource$PollSourceImpl.class */
    public class PollSourceImpl implements PollSource {
        public PollSourceImpl() {
        }

        @Override // org.realityforge.replicant.server.ee.rest.AbstractReplicantPollResource.PollSource
        @Nullable
        public String poll(@Nonnull String str, int i) throws Exception {
            return AbstractReplicantPollResource.this.poll(str, i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/realityforge/replicant/server/ee/rest/AbstractReplicantPollResource$SuspendedRequest.class */
    public static class SuspendedRequest {
        private final String _sessionID;
        private final int _rxSequence;
        private final AsyncResponse _response;

        SuspendedRequest(@Nonnull String str, int i, @Nonnull AsyncResponse asyncResponse) {
            this._sessionID = str;
            this._rxSequence = i;
            this._response = asyncResponse;
        }

        String getSessionID() {
            return this._sessionID;
        }

        int getRxSequence() {
            return this._rxSequence;
        }

        AsyncResponse getResponse() {
            return this._response;
        }
    }

    public void postConstruct() {
        this._future = schedule(new PendingDataChecker(this._requests, createContextualProxyPollSource()));
    }

    ScheduledFuture<?> schedule(PendingDataChecker pendingDataChecker) {
        return getScheduledExecutorService().scheduleWithFixedDelay(pendingDataChecker, 0L, 100L, TimeUnit.MILLISECONDS);
    }

    @Nonnull
    PollSource createContextualProxyPollSource() {
        return (PollSource) getContextService().createContextualProxy(new PollSourceImpl(), PollSource.class);
    }

    @PreDestroy
    public void preDestroy() {
        this._future.cancel(true);
    }

    @GET
    @Produces({"text/plain"})
    public void poll(@Suspended AsyncResponse asyncResponse, @NotNull @HeaderParam("X-GWT-SessionID") String str, @NotNull @QueryParam("rx") int i) {
        asyncResponse.setTimeout(getPollTime(), TimeUnit.SECONDS);
        asyncResponse.register(new ConnectionCallback() { // from class: org.realityforge.replicant.server.ee.rest.AbstractReplicantPollResource.1
            public void onDisconnect(AsyncResponse asyncResponse2) {
                AbstractReplicantPollResource.this.doDisconnect(asyncResponse2);
            }
        });
        asyncResponse.setTimeoutHandler(new TimeoutHandler() { // from class: org.realityforge.replicant.server.ee.rest.AbstractReplicantPollResource.2
            public void handleTimeout(AsyncResponse asyncResponse2) {
                AbstractReplicantPollResource.this.doTimeout(asyncResponse2);
            }
        });
        try {
            String poll = poll(str, i);
            if (null != poll) {
                resume(asyncResponse, poll);
            } else {
                this._requests.put(asyncResponse, new SuspendedRequest(str, i, asyncResponse));
            }
        } catch (Exception e) {
            handleException(str, asyncResponse, e);
        }
    }

    @Nonnull
    protected abstract ManagedScheduledExecutorService getScheduledExecutorService();

    @Nonnull
    protected abstract ContextService getContextService();

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(@Nonnull String str, @Nonnull AsyncResponse asyncResponse, @Nonnull Exception exc) {
        if (isSessionConnected(str)) {
            resume(asyncResponse, exc);
        } else {
            resume(asyncResponse, "");
        }
    }

    @Nullable
    protected abstract String poll(@Nonnull String str, int i) throws Exception;

    protected abstract boolean isSessionConnected(@Nonnull String str);

    @Nonnull
    protected Map<AsyncResponse, SuspendedRequest> getRequests() {
        return this._requests;
    }

    protected int getPollTime() {
        return 30;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doTimeout(@Nonnull AsyncResponse asyncResponse) {
        doDisconnect(asyncResponse);
        resume(asyncResponse, "");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resume(@Nonnull AsyncResponse asyncResponse, @Nonnull Object obj) {
        asyncResponse.resume(toResponse(obj));
    }

    @Nonnull
    private Response toResponse(@Nonnull Object obj) {
        Response.ResponseBuilder ok = Response.ok();
        CacheUtil.configureNoCacheHeaders(ok);
        return ok.entity(obj).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doDisconnect(@Nonnull AsyncResponse asyncResponse) {
        this._requests.remove(asyncResponse);
    }
}
