/*
 * Decompiled with CFR 0.152.
 */
package pro.chenggang.project.reactive.mybatis.support.r2dbc.defaults;

import org.reactivestreams.Publisher;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.MybatisReactiveContextManager;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.ReactiveSqlSession;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.ReactiveSqlSessionFactory;
import pro.chenggang.project.reactive.mybatis.support.r2dbc.ReactiveSqlSessionOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultReactiveSqlSessionOperator
implements ReactiveSqlSessionOperator {
    private final ReactiveSqlSession reactiveSqlSession;
    private final MybatisReactiveContextManager mybatisReactiveContextManager;

    public DefaultReactiveSqlSessionOperator(ReactiveSqlSessionFactory reactiveSqlSessionFactory) {
        this(reactiveSqlSessionFactory, true);
    }

    public DefaultReactiveSqlSessionOperator(ReactiveSqlSessionFactory reactiveSqlSessionFactory, boolean enableTransaction) {
        this.reactiveSqlSession = reactiveSqlSessionFactory.openSession().usingTransaction(enableTransaction);
        this.mybatisReactiveContextManager = (MybatisReactiveContextManager)((Object)this.reactiveSqlSession);
    }

    @Override
    public <T> Mono<T> execute(Mono<T> monoExecution) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> Mono.usingWhen((Publisher)Mono.just((Object)this.reactiveSqlSession), session -> monoExecution, ReactiveSqlSession::close, (session, err) -> Mono.empty(), ReactiveSqlSession::close).onErrorResume(ex -> this.reactiveSqlSession.close().then(Mono.defer(() -> Mono.error((Throwable)ex))))).contextWrite(this.mybatisReactiveContextManager::initReactiveExecutorContext).contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
    }

    @Override
    public <T> Mono<T> executeAndCommit(Mono<T> monoExecution) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> Mono.usingWhen((Publisher)Mono.just((Object)this.reactiveSqlSession), session -> monoExecution, session -> session.commit(true).then(Mono.defer(session::close)), (session, err) -> Mono.empty(), session -> session.rollback(true).then(Mono.defer(session::close))).onErrorResume(ex -> this.reactiveSqlSession.rollback(true).then(Mono.defer(this.reactiveSqlSession::close)).then(Mono.defer(() -> Mono.error((Throwable)ex))))).contextWrite(this.mybatisReactiveContextManager::initReactiveExecutorContext).contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
    }

    @Override
    public <T> Mono<T> executeAndRollback(Mono<T> monoExecution) {
        return MybatisReactiveContextManager.currentContext().flatMap(reactiveExecutorContext -> Mono.usingWhen((Publisher)Mono.just((Object)this.reactiveSqlSession), session -> monoExecution, session -> session.rollback(true).then(Mono.defer(session::close)), (session, err) -> Mono.empty(), session -> session.rollback(true).then(Mono.defer(session::close))).onErrorResume(ex -> this.reactiveSqlSession.rollback(true).then(Mono.defer(this.reactiveSqlSession::close)).then(Mono.defer(() -> Mono.error((Throwable)ex))))).contextWrite(this.mybatisReactiveContextManager::initReactiveExecutorContext).contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
    }

    @Override
    public <T> Flux<T> executeMany(Flux<T> fluxExecution) {
        return MybatisReactiveContextManager.currentContext().flatMapMany(reactiveExecutorContext -> Flux.usingWhen((Publisher)Mono.just((Object)this.reactiveSqlSession), session -> fluxExecution, ReactiveSqlSession::close, (session, err) -> Mono.empty(), ReactiveSqlSession::close).onErrorResume(ex -> this.reactiveSqlSession.close().then(Mono.defer(() -> Mono.error((Throwable)ex))))).contextWrite(this.mybatisReactiveContextManager::initReactiveExecutorContext).contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
    }

    @Override
    public <T> Flux<T> executeManyAndCommit(Flux<T> fluxExecution) {
        return MybatisReactiveContextManager.currentContext().flatMapMany(reactiveExecutorContext -> Flux.usingWhen((Publisher)Mono.just((Object)this.reactiveSqlSession), session -> fluxExecution, session -> session.commit(true).then(Mono.defer(session::close)), (session, err) -> Mono.empty(), session -> session.rollback(true).then(Mono.defer(session::close))).onErrorResume(ex -> this.reactiveSqlSession.rollback(true).then(Mono.defer(this.reactiveSqlSession::close)).then(Mono.defer(() -> Mono.error((Throwable)ex))))).contextWrite(this.mybatisReactiveContextManager::initReactiveExecutorContext).contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
    }

    @Override
    public <T> Flux<T> executeManyAndRollback(Flux<T> fluxExecution) {
        return MybatisReactiveContextManager.currentContext().flatMapMany(reactiveExecutorContext -> Flux.usingWhen((Publisher)Mono.just((Object)this.reactiveSqlSession), session -> fluxExecution, session -> session.commit(true).then(Mono.defer(session::close)), (session, err) -> Mono.empty(), session -> session.rollback(true).then(Mono.defer(session::close))).onErrorResume(ex -> this.reactiveSqlSession.rollback(true).then(Mono.defer(this.reactiveSqlSession::close)).then(Mono.defer(() -> Mono.error((Throwable)ex))))).contextWrite(this.mybatisReactiveContextManager::initReactiveExecutorContext).contextWrite(MybatisReactiveContextManager::initReactiveExecutorContextAttribute);
    }
}

