package io.vertx.rxcore;

import io.vertx.rxcore.java.impl.SingleSubscriptionHandler;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.atomic.AtomicLong;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.streams.ReadStream;
import org.vertx.java.core.streams.WriteStream;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/vertx/rxcore/RxSupport.class */
public class RxSupport {
    public static Func2<Buffer, Buffer, Buffer> mergeBuffers = new Func2<Buffer, Buffer, Buffer>() { // from class: io.vertx.rxcore.RxSupport.1
        public Buffer call(Buffer buffer, Buffer buffer2) {
            buffer.appendBuffer(buffer2);
            return buffer;
        }
    };
    public static Func1<Message, Object> unwrapMessage = new Func1<Message, Object>() { // from class: io.vertx.rxcore.RxSupport.9
        public Object call(Message message) {
            return message.body();
        }
    };

    public static Observable<Long> stream(Observable<Buffer> observable, final WriteStream writeStream) {
        final PublishSubject create = PublishSubject.create();
        final AtomicLong atomicLong = new AtomicLong();
        writeStream.exceptionHandler(new Handler<Throwable>() { // from class: io.vertx.rxcore.RxSupport.2
            public void handle(Throwable th) {
                create.onError(th);
            }
        });
        observable.subscribe(new Action1<Buffer>() { // from class: io.vertx.rxcore.RxSupport.3
            public void call(Buffer buffer) {
                writeStream.write(buffer);
                atomicLong.addAndGet(buffer.length());
                create.onNext(Long.valueOf(atomicLong.get()));
            }
        }, new Action1<Throwable>() { // from class: io.vertx.rxcore.RxSupport.4
            public void call(Throwable th) {
                create.onError(th);
            }
        }, new Action0() { // from class: io.vertx.rxcore.RxSupport.5
            public void call() {
                create.onCompleted();
            }
        });
        return create;
    }

    public static Observable<Buffer> toObservable(final ReadStream readStream) {
        return Observable.create(new SingleSubscriptionHandler<Buffer, Buffer>() { // from class: io.vertx.rxcore.RxSupport.6
            @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
            public void execute() {
                readStream.dataHandler(this);
                readStream.exceptionHandler(new Handler<Throwable>() { // from class: io.vertx.rxcore.RxSupport.6.1
                    public void handle(Throwable th) {
                        fireError(th);
                    }
                });
                readStream.endHandler(new Handler<Void>() { // from class: io.vertx.rxcore.RxSupport.6.2
                    public void handle(Void r3) {
                        fireComplete();
                    }
                });
            }

            @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
            public void onUnsubscribed() {
                try {
                    readStream.dataHandler((Handler) null);
                    readStream.exceptionHandler((Handler) null);
                    readStream.endHandler((Handler) null);
                } catch (Exception e) {
                }
            }
        });
    }

    public static Func1<JsonObject, Buffer> encodeJson(final String str) {
        return new Func1<JsonObject, Buffer>() { // from class: io.vertx.rxcore.RxSupport.7
            public Buffer call(JsonObject jsonObject) {
                try {
                    return new Buffer(jsonObject.encode().getBytes(str));
                } catch (UnsupportedEncodingException e) {
                    throw new RuntimeException("Unable to encode JSON (charset=" + str + ")", e);
                }
            }
        };
    }

    public static Func1<Buffer, JsonObject> decodeJson(final String str) {
        return new Func1<Buffer, JsonObject>() { // from class: io.vertx.rxcore.RxSupport.8
            public JsonObject call(Buffer buffer) {
                try {
                    return new JsonObject(buffer.toString(str));
                } catch (Exception e) {
                    throw new RuntimeException("Unable to decode json request (e=" + e + ")");
                }
            }
        };
    }
}
