package com.hazelcast.jet.impl.connector;

import com.hazelcast.dataconnection.impl.JdbcDataConnection;
import com.hazelcast.function.BiConsumerEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.concurrent.BackoffIdleStrategy;
import com.hazelcast.internal.util.concurrent.IdleStrategy;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import java.lang.invoke.SerializedLambda;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.sql.CommonDataSource;
import javax.sql.DataSource;
import javax.sql.PooledConnection;
import javax.sql.XAConnection;
import javax.sql.XADataSource;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.4.0.jar:com/hazelcast/jet/impl/connector/WriteJdbcP.class */
public final class WriteJdbcP<T> extends XaSinkProcessorBase {
    private static final IdleStrategy IDLER;
    private final CommonDataSource dataSource;
    private final BiConsumerEx<? super PreparedStatement, ? super T> bindFn;
    private final PredicateEx<SQLException> isNonTransientPredicate;
    private final String updateQuery;
    private final int batchLimit;
    private ILogger logger;
    private XAConnection xaConnection;
    private Connection connection;
    private Processor.Context context;
    private PreparedStatement statement;
    private int idleCount;
    private boolean supportsBatch;
    private int batchCount;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.4.0.jar:com/hazelcast/jet/impl/connector/WriteJdbcP$WriteJdbcSupplier.class */
    public static class WriteJdbcSupplier<T> implements ProcessorSupplier {
        private static final long serialVersionUID = 1;
        private final String dataConnectionName;
        private final String updateQuery;
        private final BiConsumerEx<? super PreparedStatement, ? super T> bindFn;
        private final boolean exactlyOnce;
        private final int batchLimit;
        private final String jdbcUrl;
        private transient JdbcDataConnection dataConnection;
        private transient CommonDataSource dataSource;

        WriteJdbcSupplier(String str, String str2, BiConsumerEx<? super PreparedStatement, ? super T> biConsumerEx, boolean z, int i, String str3) {
            this.dataConnectionName = str;
            this.updateQuery = str2;
            this.bindFn = biConsumerEx;
            this.exactlyOnce = z;
            this.batchLimit = i;
            this.jdbcUrl = str3;
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void init(@Nonnull ProcessorSupplier.Context context) {
            this.dataConnection = (JdbcDataConnection) context.dataConnectionService().getAndRetainDataConnection(this.dataConnectionName, JdbcDataConnection.class);
            JdbcDataConnection jdbcDataConnection = this.dataConnection;
            Objects.requireNonNull(jdbcDataConnection);
            this.dataSource = new DataSourceFromConnectionSupplier(jdbcDataConnection::getConnection);
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        public void close(Throwable th) throws Exception {
            if (this.dataConnection != null) {
                this.dataConnection.release();
            }
        }

        @Override // com.hazelcast.jet.core.ProcessorSupplier
        @Nonnull
        public Collection<? extends Processor> get(int i) {
            return (Collection) IntStream.range(0, i).mapToObj(i2 -> {
                return new WriteJdbcP(this.updateQuery, this.dataSource, this.bindFn, this.exactlyOnce, this.batchLimit);
            }).collect(Collectors.toList());
        }
    }

    public WriteJdbcP(@Nonnull String str, @Nonnull CommonDataSource commonDataSource, @Nonnull BiConsumerEx<? super PreparedStatement, ? super T> biConsumerEx, boolean z, int i) {
        super(z ? ProcessingGuarantee.EXACTLY_ONCE : ProcessingGuarantee.AT_LEAST_ONCE);
        this.updateQuery = str;
        this.dataSource = commonDataSource;
        this.bindFn = biConsumerEx;
        this.batchLimit = i;
        this.isNonTransientPredicate = this::isNonTransientException;
    }

    public WriteJdbcP(@Nonnull String str, @Nonnull CommonDataSource commonDataSource, @Nonnull BiConsumerEx<? super PreparedStatement, ? super T> biConsumerEx, @Nonnull PredicateEx<SQLException> predicateEx, boolean z, int i) {
        super(z ? ProcessingGuarantee.EXACTLY_ONCE : ProcessingGuarantee.AT_LEAST_ONCE);
        this.updateQuery = str;
        this.dataSource = commonDataSource;
        this.bindFn = biConsumerEx;
        this.batchLimit = i;
        this.isNonTransientPredicate = predicateEx;
    }

    public static <T> ProcessorMetaSupplier metaSupplier(@Nullable String str, @Nonnull final String str2, @Nonnull final FunctionEx<ProcessorMetaSupplier.Context, ? extends CommonDataSource> functionEx, @Nonnull final BiConsumerEx<? super PreparedStatement, ? super T> biConsumerEx, final boolean z, final int i) {
        Util.checkSerializable(functionEx, "dataSourceSupplier");
        Util.checkSerializable(biConsumerEx, "bindFn");
        Preconditions.checkPositive(i, "batchLimit");
        return ProcessorMetaSupplier.preferLocalParallelismOne(new ProcessorSupplier() { // from class: com.hazelcast.jet.impl.connector.WriteJdbcP.1
            private static final long serialVersionUID = 1;
            private transient CommonDataSource dataSource;

            @Override // com.hazelcast.jet.core.ProcessorSupplier
            public void init(@Nonnull ProcessorSupplier.Context context) {
                this.dataSource = (CommonDataSource) FunctionEx.this.apply(context);
            }

            @Override // com.hazelcast.jet.core.ProcessorSupplier
            public void close(Throwable th) throws Exception {
                if (this.dataSource instanceof AutoCloseable) {
                    ((AutoCloseable) this.dataSource).close();
                }
            }

            @Override // com.hazelcast.jet.core.ProcessorSupplier
            @Nonnull
            public Collection<? extends Processor> get(int i2) {
                IntStream range = IntStream.range(0, i2);
                String str3 = str2;
                BiConsumerEx biConsumerEx2 = biConsumerEx;
                boolean z2 = z;
                int i3 = i;
                return (Collection) range.mapToObj(i4 -> {
                    return new WriteJdbcP(str3, this.dataSource, biConsumerEx2, z2, i3);
                }).collect(Collectors.toList());
            }
        });
    }

    public static <T> ProcessorMetaSupplier metaSupplier(@Nullable String str, @Nonnull String str2, @Nonnull String str3, @Nonnull BiConsumerEx<? super PreparedStatement, ? super T> biConsumerEx, boolean z, int i) {
        Util.checkSerializable(biConsumerEx, "bindFn");
        Preconditions.checkPositive(i, "batchLimit");
        return ProcessorMetaSupplier.preferLocalParallelismOne(new WriteJdbcSupplier(str3, str2, biConsumerEx, z, i, str));
    }

    @Override // com.hazelcast.jet.impl.connector.XaSinkProcessorBase, com.hazelcast.jet.core.Processor
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws Exception {
        super.init(outbox, context);
        this.logger = context.logger();
        this.context = context;
        connectAndPrepareStatement();
    }

    @Override // com.hazelcast.jet.impl.connector.XaSinkProcessorBase, com.hazelcast.jet.core.Processor
    public boolean tryProcess() {
        if (reconnectIfNecessary()) {
            return super.tryProcess();
        }
        return false;
    }

    @Override // com.hazelcast.jet.core.Processor
    public void process(int i, @Nonnull Inbox inbox) {
        if (!reconnectIfNecessary() || this.snapshotUtility.activeTransaction() == null) {
            return;
        }
        try {
            Iterator<Object> it = inbox.iterator();
            while (it.hasNext()) {
                this.bindFn.accept(this.statement, it.next());
                addBatchOrExecute();
            }
            executeBatch();
            if (!this.snapshotUtility.usesTransactionLifecycle()) {
                this.connection.commit();
            }
            this.idleCount = 0;
            inbox.clear();
        } catch (SQLException e) {
            try {
                this.connection.rollback();
            } catch (SQLException e2) {
                this.logger.severe("Exception during rollback", e2);
            }
            if (this.isNonTransientPredicate.test(e) || this.snapshotUtility.usesTransactionLifecycle()) {
                throw ExceptionUtil.rethrow(e);
            }
            this.logger.warning("Exception during update", e);
            this.idleCount++;
        }
    }

    @Override // com.hazelcast.jet.impl.connector.XaSinkProcessorBase, com.hazelcast.jet.core.Processor
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    @Override // com.hazelcast.jet.impl.connector.XaSinkProcessorBase, com.hazelcast.jet.core.Processor
    public void close() throws Exception {
        super.close();
        closeWithLogging(this.statement);
        if (this.xaConnection != null) {
            closeWithLogging((PooledConnection) this.xaConnection);
        }
        closeWithLogging(this.connection);
    }

    private boolean connectAndPrepareStatement() {
        try {
            if (this.snapshotUtility.usesTransactionLifecycle()) {
                if (!(this.dataSource instanceof XADataSource)) {
                    throw new JetException("When using exactly-once, the dataSource must implement " + XADataSource.class.getName());
                }
                this.xaConnection = this.dataSource.getXAConnection();
                this.connection = this.xaConnection.getConnection();
                if (!$assertionsDisabled && this.idleCount != 0) {
                    throw new AssertionError("idleCount=" + this.idleCount);
                }
                setXaResource(this.xaConnection.getXAResource());
            } else if (this.dataSource instanceof DataSource) {
                this.connection = ((DataSource) this.dataSource).getConnection();
            } else {
                if (!(this.dataSource instanceof XADataSource)) {
                    throw new JetException("The dataSource implements neither " + DataSource.class.getName() + " nor " + XADataSource.class.getName());
                }
                this.logger.warning("Using " + XADataSource.class.getName() + " when no XA transactions are needed");
                this.connection = this.dataSource.getXAConnection().getConnection();
            }
            this.supportsBatch = this.connection.getMetaData().supportsBatchUpdates();
            this.connection.setAutoCommit(false);
            this.statement = this.connection.prepareStatement(this.updateQuery);
            return true;
        } catch (SQLException e) {
            if (this.isNonTransientPredicate.test(e) || this.snapshotUtility.usesTransactionLifecycle()) {
                throw ExceptionUtil.rethrow(e);
            }
            this.logger.warning("Exception when connecting and preparing the statement", e);
            this.idleCount++;
            return false;
        }
    }

    private void addBatchOrExecute() throws SQLException {
        if (!this.supportsBatch) {
            this.statement.executeUpdate();
            return;
        }
        this.statement.addBatch();
        int i = this.batchCount + 1;
        this.batchCount = i;
        if (i == this.batchLimit) {
            executeBatch();
        }
    }

    private void executeBatch() throws SQLException {
        if (!this.supportsBatch || this.batchCount <= 0) {
            return;
        }
        this.statement.executeBatch();
        this.batchCount = 0;
    }

    private boolean reconnectIfNecessary() {
        if (this.idleCount == 0) {
            return true;
        }
        if (!$assertionsDisabled && this.snapshotUtility.usesTransactionLifecycle()) {
            throw new AssertionError("attempt to reconnect in XA mode");
        }
        IDLER.idle(this.idleCount);
        closeWithLogging(this.statement);
        closeWithLogging(this.connection);
        return connectAndPrepareStatement();
    }

    private void closeWithLogging(PooledConnection pooledConnection) {
        if (pooledConnection == null) {
            return;
        }
        try {
            pooledConnection.close();
        } catch (Exception e) {
            this.logger.warning("Exception when closing " + pooledConnection + ", ignoring it: " + e, e);
        }
    }

    private void closeWithLogging(AutoCloseable autoCloseable) {
        if (autoCloseable == null) {
            return;
        }
        try {
            autoCloseable.close();
        } catch (Exception e) {
            this.logger.warning("Exception when closing " + autoCloseable + ", ignoring it: " + e, e);
        }
    }

    private boolean isNonTransientException(SQLException sQLException) {
        SQLException nextException = sQLException.getNextException();
        return (sQLException instanceof SQLNonTransientException) || (sQLException.getCause() instanceof SQLNonTransientException) || !(nextException == null || sQLException == nextException || !isNonTransientException(nextException));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -801458296:
                if (implMethodName.equals("isNonTransientException")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/WriteJdbcP") && serializedLambda.getImplMethodSignature().equals("(Ljava/sql/SQLException;)Z")) {
                    WriteJdbcP writeJdbcP = (WriteJdbcP) serializedLambda.getCapturedArg(0);
                    return writeJdbcP::isNonTransientException;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !WriteJdbcP.class.desiredAssertionStatus();
        IDLER = new BackoffIdleStrategy(0L, 0L, TimeUnit.SECONDS.toNanos(1L), TimeUnit.SECONDS.toNanos(3L));
        DriverManager.getDrivers();
    }
}
