/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.jdbc.offset;

import io.debezium.config.Configuration;
import io.debezium.storage.jdbc.RetriableConnection;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcOffsetBackingStore
implements OffsetBackingStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcOffsetBackingStore.class);
    private JdbcOffsetBackingStoreConfig config;
    protected ConcurrentHashMap<String, String> data = new ConcurrentHashMap();
    protected ExecutorService executor;
    private final AtomicInteger recordInsertSeq = new AtomicInteger(0);
    private RetriableConnection conn;

    public String fromByteBuffer(ByteBuffer data) {
        return data != null ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null;
    }

    public ByteBuffer toByteBuffer(String data) {
        return data != null ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)) : null;
    }

    public void configure(WorkerConfig config) {
        try {
            Configuration configuration = Configuration.from((Map)config.originalsStrings());
            this.config = new JdbcOffsetBackingStoreConfig(configuration);
            this.conn = new RetriableConnection(this.config.getJdbcUrl(), this.config.getUser(), this.config.getPassword(), this.config.getWaitRetryDelay(), this.config.getMaxRetryCount());
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed to connect JDBC offset backing store: " + config.originalsStrings(), e);
        }
    }

    public synchronized void start() {
        this.executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory((String)(this.getClass().getSimpleName() + "-%d"), (boolean)false));
        LOGGER.info("Starting JdbcOffsetBackingStore db '{}'", (Object)this.config.getJdbcUrl());
        try {
            this.initializeTable();
        }
        catch (SQLException e) {
            throw new IllegalStateException("Failed to create JDBC offset table: " + this.config.getJdbcUrl(), e);
        }
        this.load();
    }

    private void initializeTable() throws SQLException {
        this.conn.executeWithRetry(conn -> {
            DatabaseMetaData dbMeta = conn.getMetaData();
            try (ResultSet tableExists = dbMeta.getTables(null, null, this.config.getTableName(), null);){
                if (tableExists.next()) {
                    return;
                }
            }
            LOGGER.info("Creating table {} to store offset", (Object)this.config.getTableName());
            try (PreparedStatement ps = conn.prepareStatement(this.config.getTableCreate());){
                ps.execute();
            }
        }, "checking / creating table", false);
    }

    protected void save() {
        LOGGER.debug("Saving data to state table...");
        try {
            this.conn.executeWithRetry(conn -> {
                try (PreparedStatement sqlDelete = conn.prepareStatement(this.config.getTableDelete());){
                    sqlDelete.executeUpdate();
                    for (Map.Entry<String, String> mapEntry : this.data.entrySet()) {
                        Timestamp currentTs = new Timestamp(System.currentTimeMillis());
                        String key = mapEntry.getKey() != null ? mapEntry.getKey() : null;
                        String value = mapEntry.getValue() != null ? mapEntry.getValue() : null;
                        PreparedStatement sql = conn.prepareStatement(this.config.getTableInsert());
                        try {
                            sql.setString(1, UUID.randomUUID().toString());
                            sql.setString(2, key);
                            sql.setString(3, value);
                            sql.setTimestamp(4, currentTs);
                            sql.setInt(5, this.recordInsertSeq.incrementAndGet());
                            sql.executeUpdate();
                        }
                        finally {
                            if (sql == null) continue;
                            sql.close();
                        }
                    }
                }
                conn.commit();
            }, "Saving offset", true);
        }
        catch (SQLException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    private void load() {
        try {
            ConcurrentHashMap tmpData = new ConcurrentHashMap();
            this.conn.executeWithRetry(conn -> {
                try (Statement stmt = conn.createStatement();
                     ResultSet rs = stmt.executeQuery(this.config.getTableSelect());){
                    while (rs.next()) {
                        String key = rs.getString("offset_key");
                        String val = rs.getString("offset_val");
                        tmpData.put(key, val);
                    }
                }
                this.data = tmpData;
            }, "loading offset data", false);
        }
        catch (SQLException e) {
            throw new ConnectException("Failed recover records from database: " + this.config.getJdbcUrl(), (Throwable)e);
        }
    }

    private void stopExecutor() {
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (!this.executor.shutdownNow().isEmpty()) {
                throw new ConnectException("Failed to stop JdbcOffsetBackingStore. Exiting without cleanly shutting down pending tasks and/or callbacks.");
            }
            this.executor = null;
        }
    }

    public synchronized void stop() {
        this.stopExecutor();
        try {
            if (this.conn != null) {
                this.conn.close();
            }
        }
        catch (SQLException e) {
            LOGGER.error("Exception while stopping JdbcOffsetBackingStore", (Throwable)e);
        }
        LOGGER.info("Stopped JdbcOffsetBackingStore");
    }

    public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
        return this.executor.submit(new Callable<Void>(){

            @Override
            public Void call() {
                for (Map.Entry entry : values.entrySet()) {
                    if (entry.getKey() == null) continue;
                    JdbcOffsetBackingStore.this.data.put(JdbcOffsetBackingStore.this.fromByteBuffer((ByteBuffer)entry.getKey()), JdbcOffsetBackingStore.this.fromByteBuffer((ByteBuffer)entry.getValue()));
                }
                JdbcOffsetBackingStore.this.save();
                if (callback != null) {
                    callback.onCompletion(null, null);
                }
                return null;
            }
        });
    }

    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
        return this.executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>(){

            @Override
            public Map<ByteBuffer, ByteBuffer> call() {
                HashMap<ByteBuffer, ByteBuffer> result = new HashMap<ByteBuffer, ByteBuffer>();
                for (ByteBuffer key : keys) {
                    result.put(key, JdbcOffsetBackingStore.this.toByteBuffer(JdbcOffsetBackingStore.this.data.get(JdbcOffsetBackingStore.this.fromByteBuffer(key))));
                }
                return result;
            }
        });
    }

    public Set<Map<String, Object>> connectorPartitions(String connectorName) {
        return null;
    }
}

