/*
 * Decompiled with CFR 0.152.
 */
package mantis.io.reactivex.netty.channel;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.FileRegion;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import mantis.io.reactivex.netty.channel.ChannelMetricEventProvider;
import mantis.io.reactivex.netty.channel.ChannelWriter;
import mantis.io.reactivex.netty.channel.ContentTransformer;
import mantis.io.reactivex.netty.metrics.Clock;
import mantis.io.reactivex.netty.metrics.MetricEventsSubject;
import mantis.io.reactivex.netty.pipeline.PrimitiveConversionHandler;
import mantis.io.reactivex.netty.util.MultipleFutureListener;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

public class DefaultChannelWriter<O>
implements ChannelWriter<O> {
    protected static final Observable<Void> CONNECTION_ALREADY_CLOSED = Observable.error(new IllegalStateException("Connection is already closed."));
    protected final AtomicBoolean closeIssued = new AtomicBoolean();
    private final Channel nettyChannel;
    private final AtomicReference<MultipleFutureListener> unflushedWritesListener;
    private final MetricEventsSubject eventsSubject;
    private final ChannelMetricEventProvider metricEventProvider;

    protected DefaultChannelWriter(Channel nettyChannel, MetricEventsSubject<?> eventsSubject, ChannelMetricEventProvider metricEventProvider) {
        this.eventsSubject = eventsSubject;
        this.metricEventProvider = metricEventProvider;
        if (null == nettyChannel) {
            throw new NullPointerException("Channel can not be null.");
        }
        this.nettyChannel = nettyChannel;
        this.unflushedWritesListener = new AtomicReference<MultipleFutureListener>(new MultipleFutureListener(nettyChannel.newPromise()));
    }

    @Override
    public Observable<Void> writeAndFlush(O msg) {
        this.write(msg);
        return this.flush();
    }

    @Override
    public <R> Observable<Void> writeAndFlush(R msg, ContentTransformer<R> transformer) {
        this.write(msg, transformer);
        return this.flush();
    }

    @Override
    public Observable<Void> writeBytesAndFlush(ByteBuf msg) {
        this.writeBytes(msg);
        return this.flush();
    }

    @Override
    public void write(O msg) {
        this.writeOnChannel(msg);
    }

    @Override
    public <R> void write(final R msg, final ContentTransformer<R> transformer) {
        this.writeOnChannel(new PrimitiveConversionHandler.DelayedTransformationMessage(){

            @Override
            public Object getTransformed(ByteBufAllocator allocator) {
                return transformer.call(msg, allocator);
            }
        });
    }

    @Override
    public void writeBytes(ByteBuf msg) {
        this.writeOnChannel(msg);
    }

    @Override
    public void writeBytes(byte[] msg) {
        this.writeOnChannel(msg);
    }

    @Override
    public void writeString(String msg) {
        this.writeOnChannel(msg);
    }

    @Override
    public Observable<Void> writeBytesAndFlush(byte[] msg) {
        this.writeBytes(msg);
        return this.flush();
    }

    @Override
    public Observable<Void> writeStringAndFlush(String msg) {
        this.writeString(msg);
        return this.flush();
    }

    @Override
    public void writeFileRegion(FileRegion region) {
        this.writeOnChannel(region);
    }

    @Override
    public Observable<Void> flush() {
        final long startTimeMillis = Clock.newStartTimeMillis();
        this.eventsSubject.onEvent(this.metricEventProvider.getFlushStartEvent());
        MultipleFutureListener existingListener = this.unflushedWritesListener.getAndSet(new MultipleFutureListener(this.nettyChannel.newPromise()));
        this.nettyChannel.flush();
        return existingListener.asObservable().doOnCompleted(new Action0(){

            @Override
            public void call() {
                DefaultChannelWriter.this.eventsSubject.onEvent(DefaultChannelWriter.this.metricEventProvider.getFlushSuccessEvent(), Clock.onEndMillis(startTimeMillis));
            }
        }).doOnError(new Action1<Throwable>(){

            @Override
            public void call(Throwable throwable) {
                DefaultChannelWriter.this.eventsSubject.onEvent(DefaultChannelWriter.this.metricEventProvider.getFlushFailedEvent(), Clock.onEndMillis(startTimeMillis), throwable);
            }
        });
    }

    @Override
    public void cancelPendingWrites(boolean mayInterruptIfRunning) {
        this.unflushedWritesListener.get().cancelPendingFutures(mayInterruptIfRunning);
    }

    @Override
    public ByteBufAllocator getAllocator() {
        return this.nettyChannel.alloc();
    }

    protected ChannelFuture writeOnChannel(Object msg) {
        ChannelFuture writeFuture = this.getChannel().write(msg);
        this.unflushedWritesListener.get().listen(writeFuture);
        return writeFuture;
    }

    public Channel getChannel() {
        return this.nettyChannel;
    }

    public boolean isCloseIssued() {
        return this.closeIssued.get();
    }

    @Override
    public Observable<Void> close() {
        return this.close(false);
    }

    @Override
    public Observable<Void> close(boolean flush2) {
        if (this.closeIssued.compareAndSet(false, true)) {
            return this._close(flush2);
        }
        return CONNECTION_ALREADY_CLOSED;
    }

    protected Observable<Void> _close(boolean flush2) {
        return Observable.empty();
    }
}

