/*
 * Decompiled with CFR 0.152.
 */
package mantis.io.reactivex.netty.protocol.http.websocket;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineException;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import mantis.io.reactivex.netty.client.ClientChannelFactory;
import mantis.io.reactivex.netty.client.ClientConnectionFactory;
import mantis.io.reactivex.netty.client.ClientMetricsEvent;
import mantis.io.reactivex.netty.client.RxClient;
import mantis.io.reactivex.netty.client.RxClientImpl;
import mantis.io.reactivex.netty.metrics.MetricEventsSubject;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurator;
import mantis.io.reactivex.netty.protocol.http.websocket.WebSocketClientHandler;
import rx.Observable;
import rx.Subscriber;

public class WebSocketClient<I extends WebSocketFrame, O extends WebSocketFrame>
extends RxClientImpl<I, O> {
    private static final HandshakeOperator HANDSHAKE_OPERATOR = new HandshakeOperator();

    public WebSocketClient(String name2, RxClient.ServerInfo serverInfo, Bootstrap clientBootstrap, PipelineConfigurator<O, I> pipelineConfigurator, RxClient.ClientConfig clientConfig, ClientChannelFactory<O, I> channelFactory, ClientConnectionFactory<O, I, ? extends ObservableConnection<O, I>> connectionFactory, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
        super(name2, serverInfo, clientBootstrap, pipelineConfigurator, clientConfig, channelFactory, connectionFactory, eventsSubject);
    }

    @Override
    public Observable<ObservableConnection<O, I>> connect() {
        return super.connect().lift(HANDSHAKE_OPERATOR);
    }

    static class HandshakeOperator<T extends WebSocketFrame>
    implements Observable.Operator<ObservableConnection<T, T>, ObservableConnection<T, T>> {
        HandshakeOperator() {
        }

        @Override
        public Subscriber<ObservableConnection<T, T>> call(final Subscriber<? super ObservableConnection<T, T>> originalSubscriber) {
            Subscriber liftSubscriber = new Subscriber<ObservableConnection<T, T>>(){

                @Override
                public void onCompleted() {
                }

                @Override
                public void onError(Throwable e2) {
                    originalSubscriber.onError(e2);
                }

                @Override
                public void onNext(final ObservableConnection<T, T> connection) {
                    ChannelPipeline p = connection.getChannel().pipeline();
                    ChannelHandlerContext hctx = p.context(WebSocketClientHandler.class);
                    if (hctx != null) {
                        WebSocketClientHandler handler = p.get(WebSocketClientHandler.class);
                        handler.addHandshakeFinishedListener(new ChannelFutureListener(){

                            @Override
                            public void operationComplete(ChannelFuture future2) throws Exception {
                                if (future2.isSuccess()) {
                                    originalSubscriber.onNext(connection);
                                    originalSubscriber.onCompleted();
                                } else {
                                    originalSubscriber.onError(future2.cause());
                                }
                            }
                        });
                    } else {
                        originalSubscriber.onError(new ChannelPipelineException("invalid pipeline configuration - WebSocket pipeline with no WebSocketClientHandler"));
                    }
                }
            };
            originalSubscriber.add(liftSubscriber);
            return liftSubscriber;
        }
    }
}

