/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal;

import io.inverno.mod.base.Charsets;
import io.inverno.mod.base.resource.Resource;
import io.inverno.mod.http.base.NotFoundException;
import io.inverno.mod.http.base.OutboundData;
import io.inverno.mod.http.base.header.Headers;
import io.inverno.mod.http.server.ResponseBody;
import io.inverno.mod.http.server.internal.AbstractResponseBody;
import io.inverno.mod.http.server.internal.AbstractResponseHeaders;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public abstract class AbstractResponseBody<A extends AbstractResponseHeaders<?>, B extends AbstractResponseBody<A, B>>
implements ResponseBody {
    protected static final String SSE_CONTENT_TYPE = "text/event-stream;charset=" + Charsets.DEFAULT.displayName();
    protected final A headers;
    protected OutboundData<ByteBuf> rawData;
    protected OutboundData<CharSequence> stringData;
    protected ResponseBody.Resource resourceData;
    protected ResponseBody.Sse<ByteBuf, ResponseBody.Sse.Event<ByteBuf>, ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>> sseData;
    protected ResponseBody.Sse<CharSequence, ResponseBody.Sse.Event<CharSequence>, ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>> sseStringData;
    private MonoSink<Publisher<ByteBuf>> dataEmitter;
    private Publisher<ByteBuf> data;
    private boolean dataSet;
    private boolean subscribed;
    private Function<Publisher<ByteBuf>, Publisher<ByteBuf>> transformer;

    public AbstractResponseBody(A headers) {
        this.headers = headers;
    }

    protected void setData(Publisher<ByteBuf> data) throws IllegalStateException {
        if (this.subscribed && this.dataSet) {
            throw new IllegalStateException("Response data already sent");
        }
        Object transformedData = this.transformer != null ? this.transformer.apply((Publisher<ByteBuf>)data) : data;
        transformedData = transformedData instanceof Mono ? ((Mono)transformedData).doOnDiscard(ByteBuf.class, ReferenceCounted::release) : Flux.from(transformedData).doOnDiscard(ByteBuf.class, ReferenceCounted::release);
        if (this.dataEmitter != null) {
            this.dataEmitter.success(transformedData);
        } else {
            this.data = transformedData;
        }
        this.dataSet = true;
    }

    public final Publisher<ByteBuf> getData() {
        if (this.data == null) {
            this.data = Flux.switchOnNext((Publisher)Mono.create(emitter -> {
                this.dataEmitter = emitter;
            }));
        }
        return this.data;
    }

    public final B transform(Function<Publisher<ByteBuf>, Publisher<ByteBuf>> transformer) throws IllegalArgumentException {
        if (this.subscribed && this.dataSet) {
            throw new IllegalStateException("Response data already sent");
        }
        this.transformer = this.transformer == null ? transformer : this.transformer.andThen(transformer);
        if (this.dataSet) {
            this.data = transformer.apply(this.data);
        }
        return (B)this;
    }

    @Override
    public void empty() {
        this.setData((Publisher<ByteBuf>)Mono.just((Object)Unpooled.EMPTY_BUFFER));
    }

    @Override
    public OutboundData<ByteBuf> raw() {
        if (this.rawData == null) {
            this.rawData = new RawOutboundData();
        }
        return this.rawData;
    }

    @Override
    public <T extends CharSequence> OutboundData<T> string() {
        if (this.stringData == null) {
            this.stringData = new StringOutboundData();
        }
        return this.stringData;
    }

    @Override
    public ResponseBody.Resource resource() {
        if (this.resourceData == null) {
            this.resourceData = new ResourceOutboundData();
        }
        return this.resourceData;
    }

    @Override
    public ResponseBody.Sse<ByteBuf, ResponseBody.Sse.Event<ByteBuf>, ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>> sse() {
        if (this.sseData == null) {
            this.sseData = new SseRawOutboundData();
        }
        return this.sseData;
    }

    public ResponseBody.Sse<CharSequence, ResponseBody.Sse.Event<CharSequence>, ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>> sseString() {
        if (this.sseStringData == null) {
            this.sseStringData = new SseStringOutboundData();
        }
        return this.sseStringData;
    }

    protected class RawOutboundData
    implements OutboundData<ByteBuf> {
        protected RawOutboundData() {
        }

        public <T extends ByteBuf> void stream(Publisher<T> data) {
            AbstractResponseBody.this.setData(data);
        }
    }

    protected class StringOutboundData
    implements OutboundData<CharSequence> {
        protected StringOutboundData() {
        }

        public <T extends CharSequence> void stream(Publisher<T> value) throws IllegalStateException {
            Object data = value instanceof Mono ? ((Mono)value).map(chunk -> Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)chunk, (Charset)Charsets.DEFAULT))) : Flux.from(value).map(chunk -> Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)chunk, (Charset)Charsets.DEFAULT)));
            AbstractResponseBody.this.setData((Publisher<ByteBuf>)data);
        }

        public <T extends CharSequence> void value(T value) throws IllegalStateException {
            AbstractResponseBody.this.setData((Publisher<ByteBuf>)(value != null ? Mono.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer(value, (Charset)Charsets.DEFAULT))) : Mono.empty()));
        }
    }

    protected class ResourceOutboundData
    implements ResponseBody.Resource {
        protected ResourceOutboundData() {
        }

        protected void populateHeaders(Resource resource) {
            String mediaType;
            if (AbstractResponseBody.this.headers.getContentLength() == null) {
                resource.size().ifPresent(arg_0 -> AbstractResponseBody.this.headers.contentLength(arg_0));
            }
            if (!AbstractResponseBody.this.headers.contains((CharSequence)"content-type") && (mediaType = resource.getMediaType()) != null) {
                ((AbstractResponseHeaders)AbstractResponseBody.this.headers).contentType(mediaType);
            }
            if (!AbstractResponseBody.this.headers.contains((CharSequence)"last-modified")) {
                resource.lastModified().ifPresent(lastModified -> ((AbstractResponseHeaders)AbstractResponseBody.this.headers).set("last-modified", Headers.FORMATTER_RFC_5322_DATE_TIME.format(lastModified.toInstant())));
            }
        }

        @Override
        public void value(Resource resource) {
            Objects.requireNonNull(resource);
            if (!resource.exists().orElse(true).booleanValue()) {
                throw new NotFoundException();
            }
            this.populateHeaders(resource);
            AbstractResponseBody.this.setData((Publisher<ByteBuf>)resource.read());
        }
    }

    protected class SseRawOutboundData
    implements ResponseBody.Sse<ByteBuf, ResponseBody.Sse.Event<ByteBuf>, ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>> {
        protected SseRawOutboundData() {
        }

        @Override
        public void from(BiConsumer<ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>, OutboundData<ResponseBody.Sse.Event<ByteBuf>>> data) {
            data.accept(this::create, (OutboundData<ResponseBody.Sse.Event<ByteBuf>>)((OutboundData)this::stream));
        }

        protected <T extends ResponseBody.Sse.Event<ByteBuf>> void stream(Publisher<T> value) {
            ((AbstractResponseHeaders)AbstractResponseBody.this.headers).contentType(SSE_CONTENT_TYPE);
            AbstractResponseBody.this.setData((Publisher<ByteBuf>)Flux.from(value).flatMapSequential(sse -> {
                StringBuilder sseMetaData = new StringBuilder();
                if (((GenericEvent)sse).getId() != null) {
                    sseMetaData.append("id:").append(((GenericEvent)sse).getId()).append("\n");
                }
                if (((GenericEvent)sse).getEvent() != null) {
                    sseMetaData.append("event:").append(((GenericEvent)sse).getEvent()).append("\n");
                }
                if (((GenericEvent)sse).getComment() != null) {
                    sseMetaData.append(":").append(((GenericEvent)sse).getComment().replaceAll("\\r\\n|\\r|\\n", "\r\n:")).append("\n");
                }
                if (((GenericEvent)sse).getData() != null) {
                    sseMetaData.append("data:");
                }
                Flux sseData = Flux.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)sseMetaData, (Charset)Charsets.UTF_8)));
                if (((GenericEvent)sse).getData() != null) {
                    sseData = sseData.concatWith((Publisher)Flux.from(((GenericEvent)sse).getData()).map(chunk -> {
                        ByteBuf escapedChunk = Unpooled.unreleasableBuffer((ByteBuf)Unpooled.buffer((int)chunk.readableBytes(), (int)Integer.MAX_VALUE));
                        while (chunk.isReadable()) {
                            byte nextByte = chunk.readByte();
                            if (nextByte == 13) {
                                if (chunk.getByte(chunk.readerIndex()) == 10) {
                                    chunk.readByte();
                                }
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            if (nextByte == 10) {
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            escapedChunk.writeByte((int)nextByte);
                        }
                        return escapedChunk;
                    }));
                }
                sseData = sseData.concatWith((Publisher)Mono.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)"\r\n\r\n", (Charset)Charsets.UTF_8))));
                return sseData;
            }));
        }

        protected ResponseBody.Sse.Event<ByteBuf> create(Consumer<ResponseBody.Sse.Event<ByteBuf>> configurer) {
            GenericEvent sse = new GenericEvent(this);
            configurer.accept(sse);
            return sse;
        }

        protected final class GenericEvent
        implements ResponseBody.Sse.Event<ByteBuf> {
            private String id;
            private String comment;
            private String event;
            private Publisher<ByteBuf> data;

            protected GenericEvent(SseRawOutboundData this$1) {
            }

            public <T extends ByteBuf> void stream(Publisher<T> data) {
                this.data = data;
            }

            public <T extends ByteBuf> void value(T data) {
                this.data = Mono.just(data);
            }

            public io.inverno.mod.http.server.internal.AbstractResponseBody$SseRawOutboundData.GenericEvent id(String id) {
                this.id = id;
                return this;
            }

            public io.inverno.mod.http.server.internal.AbstractResponseBody$SseRawOutboundData.GenericEvent comment(String comment) {
                this.comment = comment;
                return this;
            }

            public io.inverno.mod.http.server.internal.AbstractResponseBody$SseRawOutboundData.GenericEvent event(String event) {
                this.event = event;
                return this;
            }

            public String getId() {
                return this.id;
            }

            public String getComment() {
                return this.comment;
            }

            public String getEvent() {
                return this.event;
            }

            public Publisher<ByteBuf> getData() {
                return this.data;
            }
        }
    }

    protected class SseStringOutboundData
    implements ResponseBody.Sse<CharSequence, ResponseBody.Sse.Event<CharSequence>, ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>> {
        protected SseStringOutboundData() {
        }

        @Override
        public void from(BiConsumer<ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>, OutboundData<ResponseBody.Sse.Event<CharSequence>>> data) {
            data.accept(this::create, (OutboundData<ResponseBody.Sse.Event<CharSequence>>)((OutboundData)this::stream));
        }

        protected <T extends ResponseBody.Sse.Event<CharSequence>> void stream(Publisher<T> value) {
            ((AbstractResponseHeaders)AbstractResponseBody.this.headers).contentType(SSE_CONTENT_TYPE);
            AbstractResponseBody.this.setData((Publisher<ByteBuf>)Flux.from(value).flatMapSequential(sse -> {
                StringBuilder sseMetaData = new StringBuilder();
                if (((GenericEvent)sse).getId() != null) {
                    sseMetaData.append("id:").append(((GenericEvent)sse).getId()).append("\n");
                }
                if (((GenericEvent)sse).getEvent() != null) {
                    sseMetaData.append("event:").append(((GenericEvent)sse).getEvent()).append("\n");
                }
                if (((GenericEvent)sse).getComment() != null) {
                    sseMetaData.append(":").append(((GenericEvent)sse).getComment().replaceAll("\\r\\n|\\r|\\n", "\r\n:")).append("\n");
                }
                if (((GenericEvent)sse).getData() != null) {
                    sseMetaData.append("data:");
                }
                Flux sseData = Flux.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)sseMetaData, (Charset)Charsets.UTF_8)));
                if (((GenericEvent)sse).getData() != null) {
                    sseData = sseData.concatWith((Publisher)Flux.from(((GenericEvent)sse).getData()).map(chunk -> {
                        ByteBuf escapedChunk = Unpooled.unreleasableBuffer((ByteBuf)Unpooled.buffer((int)chunk.length(), (int)Integer.MAX_VALUE));
                        for (int i = 0; i < chunk.length(); ++i) {
                            char nextChar = chunk.charAt(i);
                            if (nextChar == '\r') {
                                if (i < chunk.length() - 1 && chunk.charAt(i + 1) == '\n') {
                                    ++i;
                                }
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            if (nextChar == '\n') {
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            escapedChunk.writeByte((int)nextChar);
                        }
                        return escapedChunk;
                    }));
                }
                sseData = sseData.concatWith((Publisher)Mono.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)"\r\n\r\n", (Charset)Charsets.UTF_8))));
                return sseData;
            }));
        }

        protected ResponseBody.Sse.Event<CharSequence> create(Consumer<ResponseBody.Sse.Event<CharSequence>> configurer) {
            GenericEvent sse = new GenericEvent(this);
            configurer.accept(sse);
            return sse;
        }

        protected final class GenericEvent
        implements ResponseBody.Sse.Event<CharSequence> {
            private String id;
            private String comment;
            private String event;
            private Publisher<CharSequence> data;

            protected GenericEvent(SseStringOutboundData this$1) {
            }

            public <T extends CharSequence> void stream(Publisher<T> data) {
                this.data = data;
            }

            public <T extends CharSequence> void value(T data) {
                this.data = Mono.justOrEmpty(data);
            }

            public io.inverno.mod.http.server.internal.AbstractResponseBody$SseStringOutboundData.GenericEvent id(String id) {
                this.id = id;
                return this;
            }

            public io.inverno.mod.http.server.internal.AbstractResponseBody$SseStringOutboundData.GenericEvent comment(String comment) {
                this.comment = comment;
                return this;
            }

            public io.inverno.mod.http.server.internal.AbstractResponseBody$SseStringOutboundData.GenericEvent event(String event) {
                this.event = event;
                return this;
            }

            public String getId() {
                return this.id;
            }

            public String getComment() {
                return this.comment;
            }

            public String getEvent() {
                return this.event;
            }

            public Publisher<CharSequence> getData() {
                return this.data;
            }
        }
    }
}

