package io.vertx.sqlclient.impl;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderException;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.sqlclient.impl.Connection;
import io.vertx.sqlclient.impl.cache.PreparedStatementCache;
import io.vertx.sqlclient.impl.codec.InvalidCachedStatementEvent;
import io.vertx.sqlclient.impl.command.CloseConnectionCommand;
import io.vertx.sqlclient.impl.command.CloseStatementCommand;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.command.CommandResponse;
import io.vertx.sqlclient.impl.command.ExtendedQueryCommand;
import io.vertx.sqlclient.impl.command.PrepareStatementCommand;
import java.util.ArrayDeque;
import java.util.List;
import java.util.function.Predicate;

/* loaded from: input_file:io/vertx/sqlclient/impl/SocketConnectionBase.class */
public abstract class SocketConnectionBase implements Connection {
    protected static final Logger logger;
    private static final String PENDING_CMD_CONNECTION_CORRUPT_MSG = "Pending requests failed to be sent due to connection has been closed.";
    protected final PreparedStatementCache psCache;
    protected final EventLoopContext context;
    private final Predicate<String> preparedStatementCacheSqlFilter;
    private Connection.Holder holder;
    private final int pipeliningLimit;
    private boolean executing;
    private int inflight;
    protected final NetSocketInternal socket;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ArrayDeque<CommandBase<?>> pending = new ArrayDeque<>();
    protected Status status = Status.CONNECTED;
    private boolean paused = false;

    /* loaded from: input_file:io/vertx/sqlclient/impl/SocketConnectionBase$Status.class */
    public enum Status {
        CLOSED,
        CONNECTED,
        CLOSING
    }

    public SocketConnectionBase(NetSocketInternal netSocketInternal, boolean z, int i, Predicate<String> predicate, int i2, EventLoopContext eventLoopContext) {
        this.socket = netSocketInternal;
        this.context = eventLoopContext;
        this.pipeliningLimit = i2;
        this.psCache = z ? new PreparedStatementCache(i) : null;
        this.preparedStatementCacheSqlFilter = predicate;
    }

    public Context context() {
        return this.context;
    }

    public void init() {
        this.socket.closeHandler(this::handleClosed);
        this.socket.exceptionHandler(this::handleException);
        this.socket.messageHandler(obj -> {
            try {
                handleMessage(obj);
            } catch (Exception e) {
                handleException(e);
            }
        });
    }

    public NetSocketInternal socket() {
        return this.socket;
    }

    @Override // io.vertx.sqlclient.impl.Connection
    public SocketAddress server() {
        return this.socket.remoteAddress();
    }

    @Override // io.vertx.sqlclient.impl.Connection
    public boolean isSsl() {
        return this.socket.isSsl();
    }

    @Override // io.vertx.sqlclient.impl.Connection
    public boolean isValid() {
        return this.status == Status.CONNECTED;
    }

    @Override // io.vertx.sqlclient.impl.Connection
    public void init(Connection.Holder holder) {
        if (Vertx.currentContext() != this.context) {
            throw new IllegalStateException();
        }
        if (this.status != Status.CONNECTED) {
            throw new IllegalStateException();
        }
        this.holder = holder;
    }

    @Override // io.vertx.sqlclient.impl.Connection
    public int getProcessId() {
        throw new UnsupportedOperationException();
    }

    @Override // io.vertx.sqlclient.impl.Connection
    public int getSecretKey() {
        throw new UnsupportedOperationException();
    }

    @Override // io.vertx.sqlclient.impl.Connection
    public void close(Connection.Holder holder, Promise<Void> promise) {
        if (Vertx.currentContext() != this.context) {
            this.context.runOnContext(r7 -> {
                close(holder, promise);
            });
        } else if (this.status == Status.CONNECTED) {
            this.status = Status.CLOSING;
            this.socket.channelHandlerContext().channel().closeFuture().addListener(channelFuture -> {
                promise.complete();
            });
            this.pending.add(CloseConnectionCommand.INSTANCE);
            checkPending();
        }
    }

    @Override // io.vertx.sqlclient.impl.command.CommandScheduler
    public <R> Future<R> schedule(ContextInternal contextInternal, CommandBase<R> commandBase) {
        PromiseInternal promise = contextInternal.promise();
        this.context.emit(r7 -> {
            doSchedule(commandBase, promise);
        });
        return promise.future();
    }

    protected <R> void doSchedule(CommandBase<R> commandBase, Handler<AsyncResult<R>> handler) {
        if (handler == null) {
            throw new IllegalArgumentException();
        }
        if (Vertx.currentContext() != this.context) {
            throw new IllegalStateException();
        }
        commandBase.handler = handler;
        if (this.status != Status.CONNECTED) {
            commandBase.fail((Throwable) new NoStackTraceThrowable("Connection is not active now, current status: " + this.status));
        } else {
            this.pending.add(commandBase);
            checkPending();
        }
    }

    private void checkPending() {
        CloseStatementCommand evictStatementIfNecessary;
        if (this.executing) {
            return;
        }
        try {
            this.executing = true;
            ChannelHandlerContext channelHandlerContext = this.socket.channelHandlerContext();
            int i = 0;
            while (!this.paused && this.inflight < this.pipeliningLimit) {
                Object obj = (CommandBase) this.pending.poll();
                Object obj2 = obj;
                if (obj == null) {
                    break;
                }
                this.inflight++;
                if (obj2 instanceof ExtendedQueryCommand) {
                    ExtendedQueryCommand<?> extendedQueryCommand = (ExtendedQueryCommand) obj2;
                    if (extendedQueryCommand.ps == null && this.psCache != null) {
                        extendedQueryCommand.ps = this.psCache.get(extendedQueryCommand.sql());
                    }
                    if (extendedQueryCommand.ps == null) {
                        boolean z = this.psCache != null && this.preparedStatementCacheSqlFilter.test(extendedQueryCommand.sql());
                        if (z && (evictStatementIfNecessary = evictStatementIfNecessary()) != null) {
                            this.inflight++;
                            i++;
                            channelHandlerContext.write(evictStatementIfNecessary, channelHandlerContext.voidPromise());
                        }
                        Object prepareCommand = prepareCommand(extendedQueryCommand, z, false);
                        this.paused = true;
                        this.inflight++;
                        obj2 = prepareCommand;
                    } else {
                        String prepare = extendedQueryCommand.prepare();
                        if (prepare != null) {
                            this.inflight--;
                            extendedQueryCommand.fail((Throwable) new NoStackTraceThrowable(prepare));
                        }
                    }
                }
                i++;
                channelHandlerContext.write(obj2, channelHandlerContext.voidPromise());
            }
            if (i > 0) {
                channelHandlerContext.flush();
            }
        } finally {
            this.executing = false;
        }
    }

    private PrepareStatementCommand prepareCommand(ExtendedQueryCommand<?> extendedQueryCommand, boolean z, boolean z2) {
        PrepareStatementCommand prepareStatementCommand = new PrepareStatementCommand(extendedQueryCommand.sql(), z, z2 ? extendedQueryCommand.parameterTypes() : null);
        prepareStatementCommand.handler = asyncResult -> {
            this.paused = false;
            if (!asyncResult.succeeded()) {
                Throwable cause = asyncResult.cause();
                if (!isIndeterminatePreparedStatementError(cause) || z2) {
                    this.inflight--;
                    extendedQueryCommand.fail(cause);
                    return;
                } else {
                    ChannelHandlerContext channelHandlerContext = this.socket.channelHandlerContext();
                    channelHandlerContext.write(prepareCommand(extendedQueryCommand, false, true), channelHandlerContext.voidPromise());
                    channelHandlerContext.flush();
                    return;
                }
            }
            PreparedStatement preparedStatement = (PreparedStatement) asyncResult.result();
            if (z) {
                cacheStatement(preparedStatement);
            }
            extendedQueryCommand.ps = preparedStatement;
            String prepare = extendedQueryCommand.prepare();
            if (prepare != null) {
                this.inflight--;
                extendedQueryCommand.fail((Throwable) new NoStackTraceThrowable(prepare));
            } else {
                ChannelHandlerContext channelHandlerContext2 = this.socket.channelHandlerContext();
                channelHandlerContext2.write(extendedQueryCommand, channelHandlerContext2.voidPromise());
                channelHandlerContext2.flush();
            }
        };
        return prepareStatementCommand;
    }

    protected void handleMessage(Object obj) {
        if (obj instanceof CommandResponse) {
            this.inflight--;
            ((CommandResponse) obj).fire();
            checkPending();
        } else if (obj instanceof InvalidCachedStatementEvent) {
            removeCachedStatement(((InvalidCachedStatementEvent) obj).sql());
        }
    }

    protected void handleEvent(Object obj) {
        if (this.holder != null) {
            this.holder.handleEvent(obj);
        }
    }

    private CloseStatementCommand evictStatementIfNecessary() {
        if (this.psCache == null || !this.psCache.isFull()) {
            return null;
        }
        CloseStatementCommand closeStatementCommand = new CloseStatementCommand(this.psCache.evict());
        closeStatementCommand.handler = asyncResult -> {
            if (asyncResult.failed()) {
                logger.error("Error when closing cached prepared statement", asyncResult.cause());
            }
        };
        return closeStatementCommand;
    }

    private void cacheStatement(PreparedStatement preparedStatement) {
        if (this.psCache != null) {
            List<PreparedStatement> put = this.psCache.put(preparedStatement);
            if (!$assertionsDisabled && put.size() != 0) {
                throw new AssertionError();
            }
        }
    }

    private void removeCachedStatement(String str) {
        if (this.psCache != null) {
            this.psCache.remove(str);
        }
    }

    private void handleClosed(Void r4) {
        handleClose(null);
    }

    private synchronized void handleException(Throwable th) {
        if (th instanceof DecoderException) {
            th = ((DecoderException) th).getCause();
        }
        handleClose(th);
    }

    protected void handleClose(Throwable th) {
        if (this.status != Status.CLOSED) {
            this.status = Status.CLOSED;
            if (th != null) {
                synchronized (this) {
                    if (this.holder != null) {
                        this.holder.handleException(th);
                    }
                }
            }
            NoStackTraceThrowable noStackTraceThrowable = th == null ? new NoStackTraceThrowable(PENDING_CMD_CONNECTION_CORRUPT_MSG) : new VertxException(PENDING_CMD_CONNECTION_CORRUPT_MSG, th);
            while (true) {
                CommandBase<?> poll = this.pending.poll();
                if (poll == null) {
                    break;
                } else {
                    this.context.runOnContext(r5 -> {
                        poll.fail(noStackTraceThrowable);
                    });
                }
            }
            if (this.holder != null) {
                this.holder.handleClosed();
            }
        }
    }

    static {
        $assertionsDisabled = !SocketConnectionBase.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(SocketConnectionBase.class);
    }
}
