/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.webserver.tyrus;

import io.helidon.common.http.DataChunk;
import jakarta.websocket.CloseReason;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import org.glassfish.tyrus.spi.Connection;

public class TyrusReaderSubscriber
implements Flow.Subscriber<DataChunk> {
    private static final int MAX_RETRIES = 5;
    private static final CloseReason CONNECTION_CLOSED = new CloseReason((CloseReason.CloseCode)CloseReason.CloseCodes.NORMAL_CLOSURE, "Connection closed");
    private final Connection connection;
    private final ExecutorService executorService;
    private Flow.Subscription subscription;

    TyrusReaderSubscriber(Connection connection) {
        this(connection, null);
    }

    TyrusReaderSubscriber(Connection connection, ExecutorService executorService) {
        if (connection == null) {
            throw new IllegalArgumentException("Connection cannot be null");
        }
        this.connection = connection;
        this.executorService = executorService;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
    }

    @Override
    public void onNext(DataChunk item) {
        if (this.subscription != null) {
            if (this.executorService == null) {
                this.submitDataChunk(item);
            } else {
                this.executorService.submit(() -> this.submitDataChunk(item));
            }
        } else {
            item.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitDataChunk(DataChunk item) {
        try {
            for (ByteBuffer byteBuffer : item.data()) {
                this.submitBuffer(byteBuffer);
            }
        }
        finally {
            item.release();
        }
        if (this.subscription != null) {
            this.subscription.request(1L);
        }
    }

    private void submitBuffer(ByteBuffer data) {
        int retries = 5;
        while (data.remaining() > 0 && retries-- > 0) {
            this.connection.getReadHandler().handle(data);
        }
        if (retries == 0) {
            this.subscription.cancel();
            this.subscription = null;
            this.connection.close(new CloseReason((CloseReason.CloseCode)CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Tyrus did not consume all data after 5 retries"));
        }
    }

    @Override
    public void onError(Throwable throwable) {
        this.connection.close(new CloseReason((CloseReason.CloseCode)CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
    }

    @Override
    public void onComplete() {
        this.connection.close(CONNECTION_CLOSED);
    }
}

