/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal;

import io.inverno.mod.base.Charsets;
import io.inverno.mod.base.resource.Resource;
import io.inverno.mod.http.base.InternalServerErrorException;
import io.inverno.mod.http.base.NotFoundException;
import io.inverno.mod.http.base.header.Headers;
import io.inverno.mod.http.server.ResponseBody;
import io.inverno.mod.http.server.ResponseData;
import io.inverno.mod.http.server.internal.AbstractResponse;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

public class GenericResponseBody
implements ResponseBody {
    private static final String SSE_CONTENT_TYPE = "text/event-stream;charset=utf-8";
    protected final AbstractResponse response;
    protected ResponseData<ByteBuf> rawData;
    protected ResponseData<CharSequence> stringData;
    protected ResponseBody.Resource resourceData;
    protected ResponseBody.Sse<ByteBuf, ResponseBody.Sse.Event<ByteBuf>, ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>> sseData;
    protected ResponseBody.Sse<CharSequence, ResponseBody.Sse.Event<CharSequence>, ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>> sseStringData;
    private MonoSink<Publisher<ByteBuf>> dataEmitter;
    private Publisher<ByteBuf> data;
    private boolean subscribed;
    private boolean dataSet;
    private boolean single;
    private Function<Publisher<ByteBuf>, Publisher<ByteBuf>> transformer;

    public GenericResponseBody(AbstractResponse response) {
        this.response = response;
    }

    protected final void setData(Publisher<ByteBuf> data) {
        Publisher<ByteBuf> transformedData;
        if (this.subscribed && this.dataSet) {
            throw new IllegalStateException("Response data already posted");
        }
        Publisher<ByteBuf> publisher = transformedData = this.transformer != null ? this.transformer.apply(data) : data;
        if (transformedData instanceof Mono) {
            this.single = true;
        }
        if (this.dataEmitter != null) {
            this.dataEmitter.success(transformedData);
        } else {
            this.data = transformedData;
        }
        this.dataSet = true;
    }

    public void dataSubscribe(Subscriber<? super ByteBuf> s) {
        if (this.subscribed) {
            throw new IllegalStateException("Response data already subscribed");
        }
        if (this.data == null) {
            this.data = Flux.switchOnNext((Publisher)Mono.create(emitter -> {
                this.dataEmitter = emitter;
            }));
        }
        Flux.from(this.data).doOnDiscard(ByteBuf.class, ReferenceCounted::release).subscribe(s);
        this.subscribed = true;
    }

    public boolean isSingle() {
        return this.single;
    }

    @Override
    public ResponseBody transform(Function<Publisher<ByteBuf>, Publisher<ByteBuf>> transformer) {
        if (this.subscribed && this.dataSet) {
            throw new IllegalStateException("Response data already posted");
        }
        this.transformer = this.transformer == null ? transformer : this.transformer.andThen(transformer);
        if (this.dataSet) {
            this.data = transformer.apply(this.data);
        }
        return this;
    }

    @Override
    public void empty() {
        if (!this.dataSet) {
            this.setData((Publisher<ByteBuf>)Mono.empty());
        }
    }

    @Override
    public ResponseData<ByteBuf> raw() {
        if (this.rawData == null) {
            this.rawData = new GenericResponseBodyRawData();
        }
        return this.rawData;
    }

    @Override
    public <T extends CharSequence> ResponseData<T> string() {
        if (this.stringData == null) {
            this.stringData = new GenericResponseBodyStringData();
        }
        return this.stringData;
    }

    @Override
    public ResponseBody.Resource resource() {
        if (this.resourceData == null) {
            this.resourceData = new GenericResponseBodyResourceData();
        }
        return this.resourceData;
    }

    @Override
    public ResponseBody.Sse<ByteBuf, ResponseBody.Sse.Event<ByteBuf>, ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>> sse() {
        if (this.sseData == null) {
            this.sseData = new GenericResponseBodySseData();
        }
        return this.sseData;
    }

    public ResponseBody.Sse<CharSequence, ResponseBody.Sse.Event<CharSequence>, ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>> sseString() {
        if (this.sseStringData == null) {
            this.sseStringData = new GenericResponseBodySseStringData();
        }
        return this.sseStringData;
    }

    protected class GenericResponseBodyRawData
    implements ResponseData<ByteBuf> {
        protected GenericResponseBodyRawData() {
        }

        @Override
        public <T extends ByteBuf> void stream(Publisher<T> data) {
            GenericResponseBody.this.setData(data);
        }
    }

    protected class GenericResponseBodyStringData
    implements ResponseData<CharSequence> {
        protected GenericResponseBodyStringData() {
        }

        @Override
        public <T extends CharSequence> void stream(Publisher<T> value) throws IllegalStateException {
            Object data = value instanceof Mono ? ((Mono)value).map(chunk -> Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)chunk, (Charset)Charsets.DEFAULT))) : Flux.from(value).map(chunk -> Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)chunk, (Charset)Charsets.DEFAULT)));
            GenericResponseBody.this.setData((Publisher<ByteBuf>)data);
        }

        @Override
        public <T extends CharSequence> void value(T value) throws IllegalStateException {
            GenericResponseBody.this.setData((Publisher<ByteBuf>)(value != null ? Mono.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer(value, (Charset)Charsets.DEFAULT))) : Mono.empty()));
        }
    }

    protected class GenericResponseBodyResourceData
    implements ResponseBody.Resource {
        protected GenericResponseBodyResourceData() {
        }

        protected void populateHeaders(Resource resource) {
            GenericResponseBody.this.response.headers(h -> {
                String mediaType;
                if (GenericResponseBody.this.response.headers().getContentLength() == null) {
                    resource.size().ifPresent(h::contentLength);
                }
                if (GenericResponseBody.this.response.headers().getCharSequence("content-type") == null && (mediaType = resource.getMediaType()) != null) {
                    h.contentType(mediaType);
                }
                if (GenericResponseBody.this.response.headers().getCharSequence("last-modified") == null) {
                    resource.lastModified().ifPresent(lastModified -> h.set("last-modified", Headers.FORMATTER_RFC_5322_DATE_TIME.format(lastModified.toInstant())));
                    mediaType = resource.getMediaType();
                    if (mediaType != null) {
                        h.contentType(mediaType);
                    }
                }
            });
        }

        @Override
        public void value(Resource resource) {
            Objects.requireNonNull(resource);
            if (!resource.exists().orElse(true).booleanValue()) {
                throw new NotFoundException();
            }
            this.populateHeaders(resource);
            GenericResponseBody.this.setData((Publisher<ByteBuf>)((Publisher)resource.read().orElseThrow(() -> new InternalServerErrorException("Resource " + resource + " is not readable"))));
        }
    }

    protected class GenericResponseBodySseData
    implements ResponseBody.Sse<ByteBuf, ResponseBody.Sse.Event<ByteBuf>, ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>> {
        protected GenericResponseBodySseData() {
        }

        @Override
        public void from(BiConsumer<ResponseBody.Sse.EventFactory<ByteBuf, ResponseBody.Sse.Event<ByteBuf>>, ResponseData<ResponseBody.Sse.Event<ByteBuf>>> data) {
            data.accept(this::create, this::stream);
        }

        protected <T extends ResponseBody.Sse.Event<ByteBuf>> void stream(Publisher<T> value) {
            GenericResponseBody.this.response.headers(headers -> headers.contentType(GenericResponseBody.SSE_CONTENT_TYPE));
            GenericResponseBody.this.setData((Publisher<ByteBuf>)Flux.from(value).cast(GenericEvent.class).flatMapSequential(sse -> {
                StringBuilder sseMetaData = new StringBuilder();
                if (sse.getId() != null) {
                    sseMetaData.append("id:").append(sse.getId()).append("\n");
                }
                if (sse.getEvent() != null) {
                    sseMetaData.append("event:").append(sse.getEvent()).append("\n");
                }
                if (sse.getComment() != null) {
                    sseMetaData.append(":").append(sse.getComment().replaceAll("\\r\\n|\\r|\\n", "\r\n:")).append("\n");
                }
                if (sse.getData() != null) {
                    sseMetaData.append("data:");
                }
                Flux sseData = Flux.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)sseMetaData, (Charset)Charsets.UTF_8)));
                if (sse.getData() != null) {
                    sseData = sseData.concatWith((Publisher)Flux.from(sse.getData()).map(chunk -> {
                        ByteBuf escapedChunk = Unpooled.unreleasableBuffer((ByteBuf)Unpooled.buffer((int)chunk.readableBytes(), (int)Integer.MAX_VALUE));
                        while (chunk.isReadable()) {
                            byte nextByte = chunk.readByte();
                            if (nextByte == 13) {
                                if (chunk.getByte(chunk.readerIndex()) == 10) {
                                    chunk.readByte();
                                }
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            if (nextByte == 10) {
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            escapedChunk.writeByte((int)nextByte);
                        }
                        return escapedChunk;
                    }));
                }
                sseData = sseData.concatWith((Publisher)Mono.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)"\r\n\r\n", (Charset)Charsets.UTF_8))));
                return sseData;
            }));
        }

        protected ResponseBody.Sse.Event<ByteBuf> create(Consumer<ResponseBody.Sse.Event<ByteBuf>> configurer) {
            GenericEvent sse = new GenericEvent();
            configurer.accept(sse);
            return sse;
        }

        protected final class GenericEvent
        implements ResponseBody.Sse.Event<ByteBuf> {
            private String id;
            private String comment;
            private String event;
            private Publisher<ByteBuf> data;

            protected GenericEvent() {
            }

            @Override
            public <T extends ByteBuf> void stream(Publisher<T> data) {
                this.data = data;
            }

            @Override
            public <T extends ByteBuf> void value(T data) {
                this.data = Mono.just(data);
            }

            public GenericEvent id(String id) {
                this.id = id;
                return this;
            }

            public GenericEvent comment(String comment) {
                this.comment = comment;
                return this;
            }

            public GenericEvent event(String event) {
                this.event = event;
                return this;
            }

            public String getId() {
                return this.id;
            }

            public String getComment() {
                return this.comment;
            }

            public String getEvent() {
                return this.event;
            }

            public Publisher<ByteBuf> getData() {
                return this.data;
            }
        }
    }

    protected class GenericResponseBodySseStringData
    implements ResponseBody.Sse<CharSequence, ResponseBody.Sse.Event<CharSequence>, ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>> {
        protected GenericResponseBodySseStringData() {
        }

        @Override
        public void from(BiConsumer<ResponseBody.Sse.EventFactory<CharSequence, ResponseBody.Sse.Event<CharSequence>>, ResponseData<ResponseBody.Sse.Event<CharSequence>>> data) {
            data.accept(this::create, this::stream);
        }

        protected <T extends ResponseBody.Sse.Event<CharSequence>> void stream(Publisher<T> value) {
            GenericResponseBody.this.response.headers(headers -> headers.contentType(GenericResponseBody.SSE_CONTENT_TYPE));
            GenericResponseBody.this.setData((Publisher<ByteBuf>)Flux.from(value).cast(GenericEvent.class).flatMapSequential(sse -> {
                StringBuilder sseMetaData = new StringBuilder();
                if (sse.getId() != null) {
                    sseMetaData.append("id:").append(sse.getId()).append("\n");
                }
                if (sse.getEvent() != null) {
                    sseMetaData.append("event:").append(sse.getEvent()).append("\n");
                }
                if (sse.getComment() != null) {
                    sseMetaData.append(":").append(sse.getComment().replaceAll("\\r\\n|\\r|\\n", "\r\n:")).append("\n");
                }
                if (sse.getData() != null) {
                    sseMetaData.append("data:");
                }
                Flux sseData = Flux.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)sseMetaData, (Charset)Charsets.UTF_8)));
                if (sse.getData() != null) {
                    sseData = sseData.concatWith((Publisher)Flux.from(sse.getData()).map(chunk -> {
                        ByteBuf escapedChunk = Unpooled.unreleasableBuffer((ByteBuf)Unpooled.buffer((int)chunk.length(), (int)Integer.MAX_VALUE));
                        for (int i = 0; i < chunk.length(); ++i) {
                            char nextChar = chunk.charAt(i);
                            if (nextChar == '\r') {
                                if (i < chunk.length() - 1 && chunk.charAt(i + 1) == '\n') {
                                    ++i;
                                }
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            if (nextChar == '\n') {
                                escapedChunk.writeCharSequence((CharSequence)"\r\ndata:", Charsets.UTF_8);
                                continue;
                            }
                            escapedChunk.writeByte((int)nextChar);
                        }
                        return escapedChunk;
                    }));
                }
                sseData = sseData.concatWith((Publisher)Mono.just((Object)Unpooled.unreleasableBuffer((ByteBuf)Unpooled.copiedBuffer((CharSequence)"\r\n\r\n", (Charset)Charsets.UTF_8))));
                return sseData;
            }));
        }

        protected ResponseBody.Sse.Event<CharSequence> create(Consumer<ResponseBody.Sse.Event<CharSequence>> configurer) {
            GenericEvent sse = new GenericEvent();
            configurer.accept(sse);
            return sse;
        }

        protected final class GenericEvent
        implements ResponseBody.Sse.Event<CharSequence> {
            private String id;
            private String comment;
            private String event;
            private Publisher<CharSequence> data;

            protected GenericEvent() {
            }

            @Override
            public <T extends CharSequence> void stream(Publisher<T> data) {
                this.data = data;
            }

            @Override
            public <T extends CharSequence> void value(T data) {
                this.data = Mono.justOrEmpty(data);
            }

            public GenericEvent id(String id) {
                this.id = id;
                return this;
            }

            public GenericEvent comment(String comment) {
                this.comment = comment;
                return this;
            }

            public GenericEvent event(String event) {
                this.event = event;
                return this;
            }

            public String getId() {
                return this.id;
            }

            public String getComment() {
                return this.comment;
            }

            public String getEvent() {
                return this.event;
            }

            public Publisher<CharSequence> getData() {
                return this.data;
            }
        }
    }
}

