/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ReactorChannel;
import reactor.rx.IOStreams;
import reactor.rx.Stream;
import reactor.rx.Streams;

public abstract class ChannelStream<IN, OUT>
extends Stream<IN>
implements ReactorChannel<IN, OUT> {
    protected static final Logger log = LoggerFactory.getLogger(ChannelStream.class);
    private final Environment env;
    private final Dispatcher eventsDispatcher;
    private final Function<Buffer, IN> decoder;
    private final Function<OUT, Buffer> encoder;
    private final long prefetch;

    protected ChannelStream(Environment env, Codec<Buffer, IN, OUT> codec, long prefetch, Dispatcher eventsDispatcher) {
        Assert.notNull((Object)eventsDispatcher, (String)"Events Reactor cannot be null");
        this.env = env;
        this.prefetch = prefetch;
        this.eventsDispatcher = eventsDispatcher;
        if (null != codec) {
            this.decoder = codec.decoder(new Consumer<IN>(){

                public void accept(IN in) {
                    ChannelStream.this.doDecoded(in);
                }
            });
            this.encoder = codec.encoder();
        } else {
            this.decoder = null;
            this.encoder = null;
        }
    }

    public final Stream<Void> writeWith(final Publisher<? extends OUT> source) {
        final Object sourceStream = Stream.class.isAssignableFrom(source.getClass()) ? (Stream)source : new Stream<OUT>(){

            public void subscribe(Subscriber<? super OUT> subscriber) {
                source.subscribe(subscriber);
            }

            public long getCapacity() {
                return ChannelStream.this.prefetch;
            }
        };
        return new Stream<Void>(){

            public void subscribe(Subscriber<? super Void> s) {
                ChannelStream.this.doSubscribeWriter(sourceStream, s);
            }
        };
    }

    public final Stream<Void> writeBufferWith(Publisher<? extends Buffer> source) {
        Stream encodedSource = Streams.create(source).map(new Function<Buffer, OUT>(){

            public OUT apply(Buffer data) {
                if (null != ChannelStream.this.encoder) {
                    Buffer bytes = (Buffer)ChannelStream.this.encoder.apply((Object)data);
                    return bytes;
                }
                return data;
            }
        });
        return this.writeWith((Publisher<? extends OUT>)encodedSource);
    }

    public final Environment getEnvironment() {
        return this.env;
    }

    public final Dispatcher getDispatcher() {
        return this.eventsDispatcher;
    }

    public final long getCapacity() {
        return this.prefetch;
    }

    public final Function<Buffer, IN> getDecoder() {
        return this.decoder;
    }

    public final Function<OUT, Buffer> getEncoder() {
        return this.encoder;
    }

    public final <DECODED> Stream<DECODED> decode(Codec<IN, DECODED, ?> codec) {
        return IOStreams.decode(codec, (Publisher)this);
    }

    public abstract Object delegate();

    protected abstract void doSubscribeWriter(Publisher<? extends OUT> var1, Subscriber<? super Void> var2);

    protected abstract void doDecoded(IN var1);
}

