package org.apache.pulsar.io.jdbc;

import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.jdbc.JdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-jdbc-core-2.9.0-rc-202110122205.jar:org/apache/pulsar/io/jdbc/JdbcAbstractSink.class */
public abstract class JdbcAbstractSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) JdbcAbstractSink.class);
    private JdbcSinkConfig jdbcSinkConfig;
    private Connection connection;
    private String jdbcUrl;
    private String tableName;
    private JdbcUtils.TableId tableId;
    private PreparedStatement insertStatement;
    private PreparedStatement updateStatement;
    private PreparedStatement deleteStatement;
    protected static final String ACTION = "ACTION";
    protected static final String INSERT = "INSERT";
    protected static final String UPDATE = "UPDATE";
    protected static final String DELETE = "DELETE";
    protected JdbcUtils.TableDefinition tableDefinition;
    private List<Record<T>> incomingList;
    private List<Record<T>> swapList;
    private AtomicBoolean isFlushing;
    private int batchSize;
    private ScheduledExecutorService flushExecutor;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        this.jdbcSinkConfig = JdbcSinkConfig.load(map);
        this.jdbcUrl = this.jdbcSinkConfig.getJdbcUrl();
        if (this.jdbcSinkConfig.getJdbcUrl() == null) {
            throw new IllegalArgumentException("Required jdbc Url not set.");
        }
        Properties properties = new Properties();
        String userName = this.jdbcSinkConfig.getUserName();
        String password = this.jdbcSinkConfig.getPassword();
        if (userName != null) {
            properties.setProperty("user", userName);
        }
        if (password != null) {
            properties.setProperty("password", password);
        }
        Class.forName(JdbcUtils.getDriverClassName(this.jdbcSinkConfig.getJdbcUrl()));
        this.connection = DriverManager.getConnection(this.jdbcSinkConfig.getJdbcUrl(), properties);
        this.connection.setAutoCommit(false);
        log.info("Opened jdbc connection: {}, autoCommit: {}", this.jdbcUrl, Boolean.valueOf(this.connection.getAutoCommit()));
        this.tableName = this.jdbcSinkConfig.getTableName();
        this.tableId = JdbcUtils.getTableId(this.connection, this.tableName);
        initStatement();
        int timeoutMs = this.jdbcSinkConfig.getTimeoutMs();
        this.batchSize = this.jdbcSinkConfig.getBatchSize();
        this.incomingList = Lists.newArrayList();
        this.swapList = Lists.newArrayList();
        this.isFlushing = new AtomicBoolean(false);
        this.flushExecutor = Executors.newScheduledThreadPool(1);
        this.flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v22, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v27, types: [java.util.List] */
    private void initStatement() throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        String key = this.jdbcSinkConfig.getKey();
        if (key != null && !key.isEmpty()) {
            newArrayList = Arrays.asList(key.split(","));
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        String nonKey = this.jdbcSinkConfig.getNonKey();
        if (nonKey != null && !nonKey.isEmpty()) {
            newArrayList2 = Arrays.asList(nonKey.split(","));
        }
        this.tableDefinition = JdbcUtils.getTableDefinition(this.connection, this.tableId, newArrayList, newArrayList2);
        this.insertStatement = JdbcUtils.buildInsertStatement(this.connection, JdbcUtils.buildInsertSql(this.tableDefinition));
        if (!newArrayList2.isEmpty()) {
            this.updateStatement = JdbcUtils.buildUpdateStatement(this.connection, JdbcUtils.buildUpdateSql(this.tableDefinition));
        }
        if (newArrayList.isEmpty()) {
            return;
        }
        this.deleteStatement = JdbcUtils.buildDeleteStatement(this.connection, JdbcUtils.buildDeleteSql(this.tableDefinition));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!this.connection.getAutoCommit()) {
            this.connection.commit();
        }
        this.flushExecutor.shutdown();
        if (this.connection != null) {
            this.connection.close();
        }
        log.info("Closed jdbc connection: {}", this.jdbcUrl);
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<T> record) throws Exception {
        int size;
        synchronized (this) {
            this.incomingList.add(record);
            size = this.incomingList.size();
        }
        if (size == this.batchSize) {
            this.flushExecutor.schedule(this::flush, 0L, TimeUnit.MILLISECONDS);
        }
    }

    public abstract void bindValue(PreparedStatement preparedStatement, Record<T> record, String str) throws Exception;

    /* JADX WARN: Failed to find 'out' block for switch in B:28:0x00d7. Please report as an issue. */
    private void flush() {
        if (this.incomingList.size() <= 0 || !this.isFlushing.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("Already in flushing state, will not flush, queue size: {}", Integer.valueOf(this.incomingList.size()));
                return;
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Starting flush, queue size: {}", Integer.valueOf(this.incomingList.size()));
        }
        if (!this.swapList.isEmpty()) {
            throw new IllegalStateException("swapList should be empty since last flush. swapList.size: " + this.swapList.size());
        }
        synchronized (this) {
            this.swapList.clear();
            List<Record<T>> list = this.swapList;
            this.swapList = this.incomingList;
            this.incomingList = list;
        }
        int i = 0;
        try {
            for (Record<T> record : this.swapList) {
                String str = record.getProperties().get(ACTION);
                if (str == null) {
                    str = INSERT;
                }
                String str2 = str;
                boolean z = -1;
                switch (str2.hashCode()) {
                    case -2130463047:
                        if (str2.equals(INSERT)) {
                            z = 2;
                            break;
                        }
                        break;
                    case -1785516855:
                        if (str2.equals(UPDATE)) {
                            z = true;
                            break;
                        }
                        break;
                    case 2012838315:
                        if (str2.equals("DELETE")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        bindValue(this.deleteStatement, record, str);
                        i++;
                        this.deleteStatement.execute();
                        break;
                    case true:
                        bindValue(this.updateStatement, record, str);
                        i++;
                        this.updateStatement.execute();
                        break;
                    case true:
                        bindValue(this.insertStatement, record, str);
                        i++;
                        this.insertStatement.execute();
                        break;
                    default:
                        throw new IllegalArgumentException(String.format("Unsupported action %s, can be one of %s, or not set which indicate %s", str, Arrays.asList(INSERT, UPDATE, "DELETE"), INSERT));
                }
            }
            this.connection.commit();
            this.swapList.forEach((v0) -> {
                v0.ack();
            });
        } catch (Exception e) {
            log.error("Got exception ", (Throwable) e);
            this.swapList.forEach((v0) -> {
                v0.fail();
            });
        }
        if (this.swapList.size() != i) {
            log.error("Update count {}  not match total number of records {}", Integer.valueOf(i), Integer.valueOf(this.swapList.size()));
        }
        if (log.isDebugEnabled()) {
            log.debug("Finish flush, queue size: {}", Integer.valueOf(this.swapList.size()));
        }
        this.swapList.clear();
        this.isFlushing.set(false);
    }

    public Connection getConnection() {
        return this.connection;
    }
}
