/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.RequestedCounter;
import io.helidon.common.reactive.SingleSubscriberHolder;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

public class OutputStreamPublisher
extends OutputStream
implements Flow.Publisher<ByteBuffer> {
    private final SingleSubscriberHolder<ByteBuffer> subscriber = new SingleSubscriberHolder();
    private final Object invocationLock = new Object();
    private final RequestedCounter requested = new RequestedCounter();

    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriberParam) {
        if (this.subscriber.register(subscriberParam)) {
            subscriberParam.onSubscribe(new Flow.Subscription(){

                @Override
                public void request(long n) {
                    OutputStreamPublisher.this.requested.increment(n, t -> OutputStreamPublisher.this.complete(t));
                }

                @Override
                public void cancel() {
                    OutputStreamPublisher.this.subscriber.cancel();
                }
            });
        }
    }

    @Override
    public void write(byte[] b) throws IOException {
        this.publish(b, 0, b.length);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.publish(b, off, len);
    }

    @Override
    public void write(int b) throws IOException {
        byte bb = (byte)b;
        this.publish(new byte[]{bb}, 0, 1);
    }

    @Override
    public void close() throws IOException {
        this.complete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publish(byte[] buffer, int offset, int length) throws IOException {
        Objects.requireNonNull(buffer);
        try {
            Flow.Subscriber<ByteBuffer> sub = this.subscriber.get();
            while (!this.subscriber.isClosed() && !this.requested.tryDecrement()) {
                Thread.sleep(250L);
            }
            Object object = this.invocationLock;
            synchronized (object) {
                if (this.subscriber.isClosed()) {
                    throw new IOException("Output stream already closed.");
                }
                sub.onNext(this.createBuffer(buffer, offset, length));
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.complete(e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            this.complete(e.getCause());
            throw new IOException(e.getCause());
        }
    }

    private void complete() {
        this.subscriber.close(sub -> {
            Object object = this.invocationLock;
            synchronized (object) {
                sub.onComplete();
            }
        });
    }

    private void complete(Throwable t) {
        this.subscriber.close(sub -> {
            Object object = this.invocationLock;
            synchronized (object) {
                sub.onError(t);
            }
        });
    }

    private ByteBuffer createBuffer(byte[] buffer, int offset, int length) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(length - offset);
        byteBuffer.put(buffer, offset, length);
        return (ByteBuffer)byteBuffer.clear();
    }
}

