/*
 * Decompiled with CFR 0.152.
 */
package io.leoplatform.sdk.oracle;

import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.oracle.OracleChangeDestination;
import io.leoplatform.sdk.oracle.OracleChangeSource;
import io.leoplatform.sdk.oracle.OracleChangeWriter;
import java.sql.CallableStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.ToLongFunction;
import javax.inject.Inject;
import javax.inject.Singleton;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.DatabaseChangeRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public final class OracleChangeRegistrar {
    private static final Logger log = LoggerFactory.getLogger(OracleChangeRegistrar.class);
    private final OracleChangeSource source;
    private final OracleChangeWriter ocw;
    private final ExecutorManager executorManager;

    @Inject
    public OracleChangeRegistrar(OracleChangeSource source, OracleChangeWriter ocw, ExecutorManager executorManager) {
        this.source = source;
        this.ocw = ocw;
        this.executorManager = executorManager;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public DatabaseChangeRegistration create(OracleChangeDestination destination) {
        try (OracleConnection conn = this.source.connection();){
            DatabaseChangeRegistration dcr = this.register(destination, conn);
            this.addChangeListener(dcr);
            this.addObjects(conn, dcr);
            this.increaseRowThreshold(conn);
            DatabaseChangeRegistration databaseChangeRegistration = dcr;
            return databaseChangeRegistration;
        }
        catch (SQLException e) {
            String msg = "'grant change notification to <User>' required for registration";
            log.error(msg);
            throw new IllegalStateException(msg, e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<String> remove(DatabaseChangeRegistration dcr) {
        try (OracleConnection conn = this.source.connection();){
            conn.unregisterDatabaseChangeNotification(dcr);
            List list = this.source.tables();
            return list;
        }
        catch (SQLException e) {
            log.warn("Unable to remove database change notification", (Throwable)e);
            return Collections.emptyList();
        }
    }

    public List<String> end() {
        this.ocw.end();
        this.executorManager.end();
        return this.source.tables();
    }

    private DatabaseChangeRegistration register(OracleChangeDestination destination, OracleConnection conn) throws SQLException {
        Properties props = this.listenerProps(destination);
        DatabaseChangeRegistration dcr = Optional.ofNullable(conn.registerDatabaseChangeNotification(props)).orElseThrow(() -> new SQLException("Missing change registration"));
        log.info("Registered listener from DB to {}:{}", props.get("NTF_LOCAL_HOST"), props.get("NTF_LOCAL_TCP_PORT"));
        return dcr;
    }

    private void addChangeListener(DatabaseChangeRegistration dcr) {
        try {
            dcr.addListener((DatabaseChangeListener)this.ocw, this.executorManager.get());
        }
        catch (SQLException e) {
            throw new IllegalStateException("Could not add listener to registrar", e);
        }
    }

    private void increaseRowThreshold(OracleConnection conn) {
        try (CallableStatement cs = conn.prepareCall("{call DBMS_CQ_NOTIFICATION.SET_ROWID_THRESHOLD(?, ?)}");){
            this.source.tables().forEach(t -> {
                try {
                    cs.setString(1, (String)t);
                    cs.setInt(2, 100000);
                    cs.executeUpdate();
                }
                catch (SQLException s) {
                    log.warn("Could not set threshold parameter for " + t);
                }
            });
        }
        catch (SQLException s) {
            log.warn("Could not set DBMS_CQ_NOTIFICATION.SET_ROWID_THRESHOLD");
        }
    }

    private void addObjects(OracleConnection conn, DatabaseChangeRegistration dcr) throws SQLException {
        try (Statement stmt = conn.createStatement();){
            ((OracleStatement)stmt).setDatabaseChangeRegistration(dcr);
            stmt.setFetchSize(100000);
            stmt.setFetchDirection(1000);
            this.registerTables(stmt);
        }
        catch (SQLException s) {
            String msg = "Could not register tables tables for notification";
            log.warn(msg, (Throwable)s);
            conn.unregisterDatabaseChangeNotification(dcr);
            throw s;
        }
    }

    private void registerTables(Statement stmt) {
        long totalEntries = this.source.tables().stream().map(t -> this.consume(stmt, (String)t)).mapToLong(this.getRows()).sum();
        int numTables = this.source.tables().size();
        String entriesLbl = totalEntries == 1L ? "entry" : "entries";
        String tablesLbl = numTables == 1 ? "table" : "tables";
        log.info("Watching {} {} with a total of {} {}", new Object[]{numTables, tablesLbl, totalEntries, entriesLbl});
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Map.Entry<String, Long> consume(Statement stmt, String table) {
        try (ResultSet rs = stmt.executeQuery("SELECT * FROM " + table);){
            long entries = 0L;
            while (rs.next()) {
                ++entries;
            }
            AbstractMap.SimpleImmutableEntry<String, Long> simpleImmutableEntry = new AbstractMap.SimpleImmutableEntry<String, Long>(table, entries);
            return simpleImmutableEntry;
        }
        catch (SQLException e) {
            throw new IllegalStateException("Could not read from registered table " + table, e);
        }
    }

    private Properties listenerProps(OracleChangeDestination destination) {
        Properties p = new Properties(destination.getProps());
        p.put("DCN_NOTIFY_ROWIDS", "true");
        p.put("NTF_QOS_RELIABLE", "true");
        p.put("NTF_LOCAL_HOST", destination.getHost());
        p.put("NTF_LOCAL_TCP_PORT", String.valueOf(destination.getPort()));
        return p;
    }

    private ToLongFunction<Map.Entry<String, Long>> getRows() {
        return res -> {
            String entriesLbl = (Long)res.getValue() == 1L ? "entry" : "entries";
            log.info("Watching for changes to {} with {} {}", new Object[]{res.getKey(), res.getValue(), entriesLbl});
            return (Long)res.getValue();
        };
    }
}

