/*
 * Decompiled with CFR 0.152.
 */
package mantis.io.reactivex.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import mantis.io.reactivex.netty.channel.ChannelMetricEventProvider;
import mantis.io.reactivex.netty.channel.DefaultChannelWriter;
import mantis.io.reactivex.netty.channel.NewRxConnectionEvent;
import mantis.io.reactivex.netty.metrics.Clock;
import mantis.io.reactivex.netty.metrics.MetricEventsSubject;
import mantis.io.reactivex.netty.pipeline.ReadTimeoutPipelineConfigurator;
import mantis.io.reactivex.netty.util.NoOpSubscriber;
import rx.Observable;
import rx.Subscriber;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

public class ObservableConnection<I, O>
extends DefaultChannelWriter<O> {
    public static AttributeKey<Boolean> AUTO_RELEASE_BUFFERS = AttributeKey.valueOf("rxnetty_auto_release_buffers");
    private Subject<I, I> inputSubject;
    private final MetricEventsSubject eventsSubject;
    private final ChannelMetricEventProvider metricEventProvider;
    protected volatile long closeStartTimeMillis = -1L;

    protected ObservableConnection(Channel channel, ChannelMetricEventProvider metricEventProvider, MetricEventsSubject<?> eventsSubject) {
        super(channel, eventsSubject, metricEventProvider);
        this.eventsSubject = eventsSubject;
        this.metricEventProvider = metricEventProvider;
        this.inputSubject = new SerializedSubject<I, I>(PublishSubject.create());
    }

    public Observable<I> getInput() {
        return this.inputSubject;
    }

    public static <I, O> ObservableConnection<I, O> create(Channel channel, MetricEventsSubject<?> eventsSubject, ChannelMetricEventProvider metricEventProvider) {
        ObservableConnection<I, O> toReturn = new ObservableConnection<I, O>(channel, metricEventProvider, eventsSubject);
        toReturn.fireNewRxConnectionEvent();
        return toReturn;
    }

    protected void fireNewRxConnectionEvent() {
        ChannelHandlerContext firstContext = this.getChannel().pipeline().firstContext();
        firstContext.fireUserEventTriggered(new NewRxConnectionEvent(this, this.inputSubject));
    }

    @Override
    public Observable<Void> close() {
        return super.close();
    }

    @Override
    protected Observable<Void> _close(boolean flush2) {
        final Subject<I, I> thisSubject = this.inputSubject;
        ReadTimeoutPipelineConfigurator.disableReadTimeout(this.getChannel().pipeline());
        if (flush2) {
            Observable<Void> toReturn = this.flush().lift(new Observable.Operator<Void, Void>(){

                @Override
                public Subscriber<? super Void> call(final Subscriber<? super Void> child) {
                    return new Subscriber<Void>(){

                        @Override
                        public void onCompleted() {
                            ObservableConnection.this._closeChannel().subscribe(child);
                            thisSubject.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e2) {
                            child.onError(e2);
                        }

                        @Override
                        public void onNext(Void aVoid) {
                        }
                    };
                }
            });
            toReturn.subscribe(new NoOpSubscriber());
            return toReturn;
        }
        Observable<Void> toReturn = this._closeChannel();
        thisSubject.onCompleted();
        return toReturn;
    }

    protected Observable<Void> _closeChannel() {
        this.closeStartTimeMillis = Clock.newStartTimeMillis();
        this.eventsSubject.onEvent(this.metricEventProvider.getChannelCloseStartEvent());
        final ChannelFuture closeFuture = this.getChannel().close();
        closeFuture.addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future2) throws Exception {
                if (future2.isSuccess()) {
                    ObservableConnection.this.eventsSubject.onEvent(ObservableConnection.this.metricEventProvider.getChannelCloseSuccessEvent(), Clock.onEndMillis(ObservableConnection.this.closeStartTimeMillis));
                } else {
                    ObservableConnection.this.eventsSubject.onEvent(ObservableConnection.this.metricEventProvider.getChannelCloseFailedEvent(), Clock.onEndMillis(ObservableConnection.this.closeStartTimeMillis), future2.cause());
                }
            }
        });
        return Observable.create(new Observable.OnSubscribe<Void>(){

            @Override
            public void call(final Subscriber<? super Void> subscriber2) {
                closeFuture.addListener(new ChannelFutureListener(){

                    @Override
                    public void operationComplete(ChannelFuture future2) throws Exception {
                        if (future2.isSuccess()) {
                            subscriber2.onCompleted();
                        } else {
                            subscriber2.onError(future2.cause());
                        }
                    }
                });
            }
        });
    }

    protected void updateInputSubject(Subject<I, I> newSubject) {
        this.inputSubject = new SerializedSubject<I, I>(newSubject);
    }
}

