/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net;

import java.nio.ByteBuffer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.IOStreams;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.Channel;
import reactor.io.net.PeerStream;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.broadcast.Broadcaster;

public abstract class ChannelStream<IN, OUT>
extends Stream<IN>
implements Channel<IN, OUT> {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final PeerStream<IN, OUT, ChannelStream<IN, OUT>> peer;
    protected final Broadcaster<IN> contentStream;
    private final Environment env;
    private final Dispatcher ioDispatcher;
    private final Dispatcher eventsDispatcher;
    private final Function<Buffer, IN> decoder;
    private final Function<OUT, Buffer> encoder;
    private final long prefetch;

    protected ChannelStream(@Nonnull Environment env, @Nullable Codec<Buffer, IN, OUT> codec, long prefetch, @Nonnull PeerStream<IN, OUT, ChannelStream<IN, OUT>> peer, @Nonnull Dispatcher ioDispatcher, @Nonnull Dispatcher eventsDispatcher) {
        Assert.notNull((Object)env, (String)"IO Dispatcher cannot be null");
        Assert.notNull((Object)env, (String)"Events Reactor cannot be null");
        this.env = env;
        this.prefetch = prefetch;
        this.ioDispatcher = ioDispatcher;
        this.peer = peer;
        this.eventsDispatcher = eventsDispatcher;
        this.contentStream = Broadcaster.create((Environment)env, (Dispatcher)eventsDispatcher);
        if (null != codec) {
            this.decoder = codec.decoder(new Consumer<IN>(){

                public void accept(IN in) {
                    ChannelStream.this.doDecoded(in);
                }
            });
            this.encoder = codec;
        } else {
            this.decoder = null;
            this.encoder = null;
        }
    }

    public void subscribe(Subscriber<? super IN> s) {
        this.contentStream.subscribe(s);
    }

    @Override
    public final void sink(Publisher<? extends OUT> source) {
        this.peer.subscribeChannelHandlers(Streams.create(source), this);
    }

    public final void sinkBuffers(Publisher<? extends Buffer> source) {
        Stream encodedSource = Streams.create(source).map(new Function<Buffer, OUT>(){

            public OUT apply(Buffer data) {
                if (null != ChannelStream.this.encoder) {
                    Buffer bytes = (Buffer)ChannelStream.this.encoder.apply((Object)data);
                    return bytes;
                }
                return data;
            }
        });
        this.peer.subscribeChannelHandlers(encodedSource, this);
    }

    public final Environment getEnvironment() {
        return this.env;
    }

    public final Dispatcher getDispatcher() {
        return this.eventsDispatcher;
    }

    public final long getCapacity() {
        return this.prefetch;
    }

    public final Dispatcher getIODispatcher() {
        return this.ioDispatcher;
    }

    public final Function<Buffer, IN> getDecoder() {
        return this.decoder;
    }

    public final Function<OUT, Buffer> getEncoder() {
        return this.encoder;
    }

    public final Subscriber<IN> in() {
        return this.contentStream;
    }

    public final <DECODED> Stream<DECODED> decode(Codec<IN, DECODED, ?> codec) {
        return IOStreams.decode(codec, (Publisher)this);
    }

    public abstract Object delegate();

    public void registerOnPeer() {
        this.peer.notifyNewChannel(this);
        this.peer.mergeWrite(this);
    }

    Consumer<OUT> writeThrough(boolean autoflush) {
        return new WriteConsumer(autoflush);
    }

    protected final void cascadeErrorToPeer(Throwable t) {
        this.log.error("", t);
        this.peer.notifyError(t);
    }

    protected void doDecoded(IN in) {
        this.contentStream.onNext(in);
    }

    protected void write(Buffer data, Subscriber<?> onComplete, boolean flush) {
        this.write(data.byteBuffer(), onComplete, flush);
    }

    protected abstract void write(ByteBuffer var1, Subscriber<?> var2, boolean var3);

    protected abstract void write(Object var1, Subscriber<?> var2, boolean var3);

    protected abstract void flush();

    final class WriteConsumer
    implements Consumer<OUT> {
        final boolean autoflush;

        public WriteConsumer(boolean autoflush) {
            this.autoflush = autoflush;
        }

        public void accept(OUT data) {
            try {
                if (null != ChannelStream.this.encoder) {
                    Buffer bytes = (Buffer)ChannelStream.this.encoder.apply(data);
                    if (bytes.remaining() > 0) {
                        ChannelStream.this.write(bytes, (Subscriber<?>)null, this.autoflush);
                    }
                } else if (Buffer.class == data.getClass()) {
                    ChannelStream.this.write((Buffer)data, (Subscriber<?>)null, this.autoflush);
                } else {
                    ChannelStream.this.write(data, null, this.autoflush);
                }
            }
            catch (Throwable t) {
                ChannelStream.this.peer.notifyError(t);
            }
        }
    }
}

