/*
 * Decompiled with CFR 0.152.
 */
package io.joshworks.stream.client.sse;

import io.joshworks.stream.client.ClientConfiguration;
import io.joshworks.stream.client.ClientException;
import io.joshworks.stream.client.StreamConnection;
import io.joshworks.stream.client.sse.EventStreamChannelListener;
import io.joshworks.stream.client.sse.EventStreamParser;
import io.joshworks.stream.client.sse.SseClientCallback;
import io.joshworks.stream.client.sse.UTF8Output;
import io.undertow.client.ClientCallback;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientExchange;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientStatistics;
import io.undertow.client.UndertowClient;
import io.undertow.connector.ByteBufferPool;
import io.undertow.server.DefaultByteBufferPool;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import io.undertow.util.Methods;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.OptionMap;

public class SSEConnection
extends StreamConnection {
    private static final Logger logger = LoggerFactory.getLogger(SSEConnection.class);
    final SseClientCallback callback;
    private ClientConnection connection;
    String lastEventId;

    public SSEConnection(ClientConfiguration clientConfiguration, String lastEventId, SseClientCallback callback) {
        super(clientConfiguration);
        this.lastEventId = lastEventId;
        this.callback = callback;
    }

    @Override
    protected synchronized void tryConnect() throws Exception {
        try {
            this.shuttingDown = false;
            logger.info("Connecting to {}", (Object)this.url);
            if (this.connection != null) {
                return;
            }
            this.connection = (ClientConnection)UndertowClient.getInstance().connect(URI.create(this.url), this.worker, (ByteBufferPool)new DefaultByteBufferPool(false, 8192), OptionMap.EMPTY).get();
            ClientRequest request = new ClientRequest().setMethod(Methods.GET).setPath(this.url);
            request.getRequestHeaders().put(Headers.CONNECTION, "keep-alive");
            request.getRequestHeaders().put(Headers.ACCEPT, "text/event-stream");
            request.getRequestHeaders().put(Headers.HOST, this.url);
            if (this.lastEventId != null && !this.lastEventId.isEmpty()) {
                request.getRequestHeaders().put(HttpString.tryFromString((String)"Last-Event-ID"), this.lastEventId);
            }
            this.connection.sendRequest(request, this.createClientCallback());
        }
        catch (Exception e) {
            logger.warn("Could not tryConnect to " + this.url, (Throwable)e);
            try {
                this.callback.onError(e);
            }
            catch (Exception ex) {
                logger.error(e.getMessage(), (Throwable)e);
            }
            throw e;
        }
    }

    public String close() {
        this.shuttingDown = true;
        this.closeChannel();
        return this.lastEventId;
    }

    public ClientStatistics statistics() {
        return this.connection == null ? new DisconnectedStatistics() : this.connection.getStatistics();
    }

    @Override
    protected void closeChannel() {
        if (this.connection != null) {
            StreamConnection.closeChannel((Channel)this.connection);
            this.connection = null;
            this.callback.onClose(this.lastEventId);
        }
        this.monitor.remove(this.uuid);
    }

    public boolean isOpen() {
        return this.connection != null;
    }

    void retryAfter(long timeMilli) {
        logger.info("Reconnecting after {}ms", (Object)timeMilli);
        super.tryConnect(false, timeMilli);
    }

    private ClientCallback<ClientExchange> createClientCallback() {
        final EventStreamParser eventStreamParser = new EventStreamParser(this);
        return new ClientCallback<ClientExchange>(){

            public void completed(ClientExchange connectedExchange) {
                connectedExchange.setResponseListener((ClientCallback)new StreamHandler(SSEConnection.this.callback, eventStreamParser));
                SSEConnection.this.monitor.add(SSEConnection.this.uuid, () -> SSEConnection.this.close());
                logger.info("Connected to {}", (Object)SSEConnection.this.url);
            }

            public void failed(IOException e) {
                SSEConnection.this.callback.onError(e);
                SSEConnection.this.closeChannel();
                SSEConnection.this.reconnect();
            }
        };
    }

    public class DisconnectedStatistics
    implements ClientStatistics {
        public long getRequests() {
            return 0L;
        }

        public long getRead() {
            return 0L;
        }

        public long getWritten() {
            return 0L;
        }

        public void reset() {
        }
    }

    private class StreamHandler
    implements ClientCallback<ClientExchange> {
        private final SseClientCallback callback;
        private final EventStreamChannelListener listener;
        private final UTF8Output dataReader;

        StreamHandler(SseClientCallback callback, EventStreamParser streamParser) {
            this.callback = callback;
            this.dataReader = new UTF8Output(streamParser);
            this.listener = new EventStreamChannelListener((ByteBufferPool)new DefaultByteBufferPool(false, 8192), this.dataReader);
        }

        public void completed(ClientExchange result) {
            int responseCode = result.getResponse().getResponseCode();
            if (responseCode != 200) {
                String status = result.getResponse().getStatus();
                this.callback.onError(new ClientException(responseCode, "Server returned [" + responseCode + " - " + status + "] after connecting"));
                SSEConnection.this.closeChannel();
                return;
            }
            this.callback.onOpen();
            result.getResponseChannel().getCloseSetter().set(channel -> {
                SSEConnection.this.closeChannel();
                SSEConnection.this.reconnect();
            });
            this.listener.setup(result.getResponseChannel());
        }

        public void failed(IOException e) {
            this.callback.onError(e);
        }
    }
}

