/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net;

import java.util.Iterator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;

public abstract class PeerStream<IN, OUT, CONN extends ChannelStream<IN, OUT>>
extends Stream<CONN> {
    private final Dispatcher dispatcher;
    protected final Broadcaster<CONN> channels;
    protected final long prefetch;
    private final FastList<OUT> writePublishers = new FastList();
    private final Environment env;
    private final Codec<Buffer, IN, OUT> defaultCodec;

    protected PeerStream(Environment env, Dispatcher dispatcher, Codec<Buffer, IN, OUT> codec) {
        this(env, dispatcher, codec, Long.MAX_VALUE);
    }

    protected PeerStream(Environment env, Dispatcher dispatcher, Codec<Buffer, IN, OUT> codec, long prefetch) {
        this.env = env == null && Environment.alive() ? Environment.get() : env;
        this.defaultCodec = codec;
        this.prefetch = prefetch > 0L ? prefetch : Long.MAX_VALUE;
        this.dispatcher = dispatcher != null ? dispatcher : SynchronousDispatcher.INSTANCE;
        this.channels = Broadcaster.create((Environment)env, (Dispatcher)SynchronousDispatcher.INSTANCE);
    }

    public void subscribe(Subscriber<? super CONN> s) {
        this.channels.subscribe(s);
    }

    protected void doPipeline(final Function<? super CONN, ? extends Publisher<? extends OUT>> serviceFunction) {
        this.consume(new Consumer<CONN>(){

            public void accept(CONN inoutChannelStream) {
                PeerStream.this.addWritePublisher((Publisher)serviceFunction.apply(inoutChannelStream));
            }
        }, (Consumer)new Consumer<Throwable>(){

            public void accept(Throwable throwable) {
                PeerStream.this.notifyError(throwable);
            }
        });
    }

    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    protected final void notifyError(Throwable t) {
        this.channels.onError(t);
    }

    protected final void notifyNewChannel(CONN channel) {
        this.channels.onNext(channel);
    }

    protected final void notifyShutdown() {
        this.channels.onComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final Publisher<? extends OUT> addWritePublisher(Publisher<? extends OUT> publisher) {
        PeerStream peerStream = this;
        synchronized (peerStream) {
            this.writePublishers.add(publisher);
            return publisher;
        }
    }

    protected abstract CONN bindChannel(Object var1, long var2);

    protected Consumer<Throwable> createErrorConsumer(ChannelStream<IN, OUT> ch) {
        return new Consumer<Throwable>(){

            public void accept(Throwable throwable) {
                try {
                    PeerStream.this.channels.onError(throwable);
                }
                catch (Throwable t2) {
                    PeerStream.this.channels.onError(t2);
                }
            }
        };
    }

    protected Action<Long, Long> createBatchAction(final CONN ch, final Consumer<Throwable> errorConsumer, Consumer<Void> completionConsumer) {
        return new Action<Long, Long>(){
            boolean first = true;

            protected void doNext(Long aLong) {
                this.shouldFlush();
                this.broadcastNext(aLong);
            }

            protected void doComplete() {
                ch.flush();
                super.doComplete();
            }

            protected void doError(Throwable ev) {
                errorConsumer.accept((Object)ev);
                super.doError(ev);
            }

            private void shouldFlush() {
                if (this.first) {
                    this.first = false;
                } else {
                    ch.flush();
                }
            }
        };
    }

    protected Function<Stream<Long>, ? extends Publisher<? extends Long>> createAdaptiveDemandMapper(final CONN ch, final Consumer<Throwable> errorConsumer) {
        return new Function<Stream<Long>, Publisher<? extends Long>>(){

            public Publisher<? extends Long> apply(Stream<Long> requests) {
                return (Publisher)requests.broadcastTo(PeerStream.this.createBatchAction(ch, (Consumer<Throwable>)errorConsumer, PeerStream.this.completeConsumer(ch)));
            }
        };
    }

    protected Iterable<Publisher<? extends OUT>> routeChannel(CONN ch) {
        return this.writePublishers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void mergeWrite(CONN ch) {
        Iterable<Publisher<OUT>> publishers = this.routeChannel(ch);
        if (publishers == this.writePublishers) {
            int size;
            Publisher publisher = null;
            PeerStream peerStream = this;
            synchronized (peerStream) {
                size = this.writePublishers.size;
                if (size > 0) {
                    publisher = this.writePublishers.array[0];
                }
            }
            if (size == 0) {
                return;
            }
            if (size == 1 && publisher != null) {
                this.subscribeChannelHandlers(Streams.create((Publisher)publisher), ch);
                return;
            }
        }
        this.subscribeChannelHandlers(Streams.concat(publishers), ch);
    }

    protected Consumer<Void> completeConsumer(CONN ch) {
        return null;
    }

    protected void subscribeChannelHandlers(Stream<? extends OUT> writeStream, CONN ch) {
        if (writeStream.getCapacity() != Long.MAX_VALUE) {
            writeStream.adaptiveConsumeOn(((ChannelStream)ch).getIODispatcher(), ((ChannelStream)ch).writeThrough(false), this.createAdaptiveDemandMapper(ch, this.createErrorConsumer((ChannelStream<IN, OUT>)ch)));
        } else {
            writeStream.consumeOn(((ChannelStream)ch).getIODispatcher(), ((ChannelStream)ch).writeThrough(true), this.createErrorConsumer((ChannelStream<IN, OUT>)ch), this.completeConsumer(ch));
        }
    }

    @Nullable
    public final Codec<Buffer, IN, OUT> getDefaultCodec() {
        return this.defaultCodec;
    }

    @Nonnull
    public final Environment getEnvironment() {
        return this.env;
    }

    public final long getPrefetchSize() {
        return this.prefetch;
    }

    static final class FastList<T>
    implements Iterable<Publisher<? extends T>> {
        Publisher[] array;
        int size;

        FastList() {
        }

        public void add(Publisher o) {
            int s = this.size;
            Publisher[] a = this.array;
            if (a == null) {
                this.array = a = new Publisher[16];
            } else if (s == a.length) {
                Publisher[] array2 = new Publisher[s + (s >> 2)];
                System.arraycopy(a, 0, array2, 0, s);
                this.array = a = array2;
            }
            a[s] = o;
            this.size = s + 1;
        }

        @Override
        public Iterator<Publisher<? extends T>> iterator() {
            return new Iterator<Publisher<? extends T>>(){
                int i = 0;

                @Override
                public boolean hasNext() {
                    return this.i < FastList.this.size;
                }

                @Override
                public void remove() {
                    throw new UnsupportedOperationException("");
                }

                @Override
                public Publisher<? extends T> next() {
                    return FastList.this.array[this.i];
                }
            };
        }
    }
}

