/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.EmittingPublisher;
import io.helidon.common.reactive.Multi;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;

@Deprecated(since="2.0.0", forRemoval=true)
public class MultiFromOutputStream
extends OutputStream
implements Multi<ByteBuffer> {
    private long timeout = Duration.ofMinutes(10L).toMillis();
    private static final byte[] FLUSH_BUFFER = new byte[0];
    private final EmittingPublisher<ByteBuffer> emittingPublisher = EmittingPublisher.create();
    private volatile CompletableFuture<Void> demandUpdated = new CompletableFuture();

    protected MultiFromOutputStream() {
        this.emittingPublisher.onCancel(() -> this.demandUpdated.cancel(true));
        this.emittingPublisher.onRequest((n, demand) -> this.demandUpdated.complete(null));
    }

    void timeout(long timeout) {
        this.timeout = timeout;
    }

    public MultiFromOutputStream onRequest(BiConsumer<Long, Long> requestCallback) {
        this.emittingPublisher.onRequest(requestCallback);
        return this;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        this.emittingPublisher.subscribe(subscriber);
    }

    @Override
    public void write(byte[] b) throws IOException {
        this.publish(b, 0, b.length);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.publish(b, off, len);
    }

    @Override
    public void write(int b) throws IOException {
        byte bb = (byte)b;
        this.publish(new byte[]{bb}, 0, 1);
    }

    @Override
    public void close() throws IOException {
        this.complete();
    }

    @Override
    public void flush() throws IOException {
        this.publish(FLUSH_BUFFER, 0, 0);
    }

    private void publish(byte[] buffer, int offset, int length) throws IOException {
        Objects.requireNonNull(buffer);
        try {
            long start = System.currentTimeMillis();
            ByteBuffer byteBuffer = this.createBuffer(buffer, offset, length);
            while (!this.emittingPublisher.emit(byteBuffer)) {
                if (this.emittingPublisher.isCancelled()) {
                    throw new IOException("Output stream already closed.");
                }
                if (this.emittingPublisher.isFailed()) {
                    Throwable throwable = this.emittingPublisher.failCause().get();
                    throw new IOException(throwable);
                }
                this.await(start, this.timeout, this.demandUpdated);
                this.demandUpdated = new CompletableFuture();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.fail(e);
            throw new IOException(e);
        }
        catch (ExecutionException e) {
            this.fail(e.getCause());
            throw new IOException(e.getCause());
        }
        catch (IllegalStateException e) {
            this.fail(e);
            throw new IOException(e);
        }
    }

    void complete() {
        this.emittingPublisher.complete();
        this.demandUpdated.complete(null);
    }

    void fail(Throwable t) {
        this.emittingPublisher.fail(t);
        this.demandUpdated.completeExceptionally(t);
    }

    private void await(long startTime, long waitTime, CompletableFuture<?> future) throws ExecutionException, InterruptedException, IOException {
        block2: {
            try {
                future.get(waitTime, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException e) {
                long diff = System.currentTimeMillis() - startTime;
                if (diff <= this.timeout) break block2;
                throw new IOException("Timed out while waiting for subscriber to read data");
            }
        }
    }

    private ByteBuffer createBuffer(byte[] buffer, int offset, int length) {
        ByteBuffer byteBuffer = ByteBuffer.allocate(length - offset);
        byteBuffer.put(buffer, offset, length);
        return byteBuffer.clear();
    }
}

