/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.webserver.tyrus;

import io.helidon.common.http.DataChunk;
import io.helidon.webserver.tyrus.TyrusSupport;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.logging.Logger;
import javax.websocket.CloseReason;
import org.glassfish.tyrus.spi.Connection;

public class TyrusReaderSubscriber
implements Flow.Subscriber<DataChunk> {
    private static final Logger LOGGER = Logger.getLogger(TyrusSupport.class.getName());
    private static final int MAX_RETRIES = 5;
    private static final CloseReason CONNECTION_CLOSED = new CloseReason((CloseReason.CloseCode)CloseReason.CloseCodes.NORMAL_CLOSURE, "Connection closed");
    private final Connection connection;
    private final ExecutorService executorService;
    private Flow.Subscription subscription;

    TyrusReaderSubscriber(Connection connection) {
        this(connection, null);
    }

    TyrusReaderSubscriber(Connection connection, ExecutorService executorService) {
        if (connection == null) {
            throw new IllegalArgumentException("Connection cannot be null");
        }
        this.connection = connection;
        this.executorService = executorService;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
    }

    @Override
    public void onNext(DataChunk item) {
        if (this.executorService == null) {
            for (ByteBuffer byteBuffer : item.data()) {
                this.submitBuffer(byteBuffer);
            }
        } else {
            this.executorService.submit(() -> {
                for (ByteBuffer byteBuffer : item.data()) {
                    this.submitBuffer(byteBuffer);
                }
            });
        }
    }

    private void submitBuffer(ByteBuffer data) {
        int retries = 5;
        while (data.remaining() > 0 && retries-- > 0) {
            this.connection.getReadHandler().handle(data);
        }
        if (retries == 0) {
            LOGGER.warning("Tyrus did not consume all data buffer after 5 retries");
        }
        this.subscription.request(1L);
    }

    @Override
    public void onError(Throwable throwable) {
        this.connection.close(new CloseReason((CloseReason.CloseCode)CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
    }

    @Override
    public void onComplete() {
        this.connection.close(CONNECTION_CLOSED);
    }
}

