/*
 * Decompiled with CFR 0.152.
 */
package pro.chenggang.project.reactive.mybatis.support.r2dbc.executor;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import java.util.function.Function;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.session.RowBounds;
import org.reactivestreams.Publisher;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.MybatisReactiveContextManager;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.connection.ConnectionCloseHolder;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.delegate.R2dbcMybatisConfiguration;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.executor.ReactiveMybatisExecutor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class AbstractReactiveMybatisExecutor
implements ReactiveMybatisExecutor {
    private static final Log log = LogFactory.getLog(AbstractReactiveMybatisExecutor.class);
    protected final R2dbcMybatisConfiguration configuration;
    protected final ConnectionFactory connectionFactory;

    protected AbstractReactiveMybatisExecutor(R2dbcMybatisConfiguration configuration, ConnectionFactory connectionFactory) {
        this.configuration = configuration;
        this.connectionFactory = connectionFactory;
    }

    @Override
    public Mono<Integer> update(MappedStatement mappedStatement, Object parameter) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setDirty();
            return this.inConnection(this.connectionFactory, connection -> this.doUpdateWithConnection((Connection)connection, mappedStatement, parameter));
        });
    }

    @Override
    public <E> Flux<E> query(MappedStatement mappedStatement, Object parameter, RowBounds rowBounds) {
        return this.inConnectionMany(this.connectionFactory, connection -> this.doQueryWithConnection((Connection)connection, mappedStatement, parameter, rowBounds));
    }

    @Override
    public Mono<Void> commit(boolean required) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setForceCommit(reactiveExecutorContext.isDirty() || required);
            return Mono.justOrEmpty(reactiveExecutorContext.getConnection()).flatMap(connection -> Mono.from((Publisher)connection.close())).then(Mono.defer(() -> {
                reactiveExecutorContext.resetDirty();
                return Mono.empty();
            }));
        });
    }

    @Override
    public Mono<Void> rollback(boolean required) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setForceRollback(reactiveExecutorContext.isDirty() || required);
            return Mono.justOrEmpty(reactiveExecutorContext.getConnection()).flatMap(connection -> Mono.from((Publisher)connection.close())).then(Mono.defer(() -> {
                reactiveExecutorContext.resetDirty();
                return Mono.empty();
            }));
        });
    }

    @Override
    public Mono<Void> close(boolean forceRollback) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> {
            reactiveExecutorContext.setForceRollback(forceRollback);
            reactiveExecutorContext.setRequireClosed(true);
            return Mono.justOrEmpty(reactiveExecutorContext.getConnection()).flatMap(connection -> Mono.from((Publisher)connection.close())).then(Mono.defer(() -> {
                reactiveExecutorContext.resetDirty();
                return Mono.empty();
            }));
        });
    }

    protected abstract Mono<Integer> doUpdateWithConnection(Connection var1, MappedStatement var2, Object var3);

    protected abstract <E> Flux<E> doQueryWithConnection(Connection var1, MappedStatement var2, Object var3, RowBounds var4);

    protected <T> Mono<T> inConnection(ConnectionFactory connectionFactory, Function<Connection, Mono<T>> action) {
        Mono connectionMono = MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> Mono.from((Publisher)connectionFactory.create()).doOnNext(connection -> log.debug("Execute Statement With Mono,Get Connection [" + connection + "] From Connection Factory "))).map(connection -> new ConnectionCloseHolder((Connection)connection, this::closeConnection));
        return Mono.usingWhen((Publisher)connectionMono, connection -> (Mono)action.apply(connection.getTarget()), ConnectionCloseHolder::close, (connection, err) -> connection.close(), ConnectionCloseHolder::close);
    }

    protected <T> Flux<T> inConnectionMany(ConnectionFactory connectionFactory, Function<Connection, Flux<T>> action) {
        Mono connectionMono = MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> Mono.from((Publisher)connectionFactory.create()).doOnNext(connection -> log.debug("Execute Statement With Flux,Get Connection [" + connection + "] From Connection Factory "))).map(connection -> new ConnectionCloseHolder((Connection)connection, this::closeConnection));
        return Flux.usingWhen((Publisher)connectionMono, connection -> (Flux)action.apply(connection.getTarget()), ConnectionCloseHolder::close, (connection, err) -> connection.close(), ConnectionCloseHolder::close);
    }

    protected Mono<Void> closeConnection(Connection connection) {
        return Mono.from((Publisher)connection.close()).onErrorResume(e -> Mono.from((Publisher)connection.close()));
    }
}

