/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.ezorm.rdb.executor.jdbc;

import java.sql.Connection;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.jdbc.JdbcSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public abstract class JdbcReactiveSqlExecutor
extends JdbcSqlExecutor
implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(JdbcReactiveSqlExecutor.class);

    public JdbcReactiveSqlExecutor() {
        super(log);
    }

    public abstract Mono<Connection> getConnection();

    @Override
    public Mono<Integer> update(Publisher<SqlRequest> request) {
        return this.getConnection().flatMap(connection -> this.toFlux(request).map(sql -> this.doUpdate((Connection)connection, (SqlRequest)sql)).reduce(Math::addExact)).defaultIfEmpty((Object)0);
    }

    @Override
    public Mono<Void> execute(Publisher<SqlRequest> request) {
        return this.getConnection().flatMap(connection -> this.toFlux(request).doOnNext(sql -> this.doExecute((Connection)connection, (SqlRequest)sql)).then());
    }

    @Override
    public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
        return Flux.create(sink -> {
            Disposable disposable = this.getConnection().flatMap(connection -> this.toFlux(request).doOnNext(sql -> this.doSelect((Connection)connection, (SqlRequest)sql, ResultWrappers.consumer(wrapper, arg_0 -> ((FluxSink)sink).next(arg_0)))).then()).doOnError(arg_0 -> ((FluxSink)sink).error(arg_0)).subscriberContext(sink.currentContext()).doOnSuccess(ignore -> sink.complete()).subscribe();
            sink.onCancel(disposable).onDispose(disposable);
        });
    }

    protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {
        return Flux.from(request);
    }
}

