package io.debezium.connector.oracle.logminer.logwriter;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-oracle-1.7.0.Final.jar:io/debezium/connector/oracle/logminer/logwriter/RacCommitLogWriterFlushStrategy.class */
public class RacCommitLogWriterFlushStrategy implements LogWriterFlushStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RacCommitLogWriterFlushStrategy.class);
    private final Map<String, CommitLogWriterFlushStrategy> flushStrategies = new HashMap();
    private final OracleStreamingChangeEventSourceMetrics streamingMetrics;
    private final JdbcConfiguration jdbcConfiguration;
    private final Set<String> hosts;

    public RacCommitLogWriterFlushStrategy(OracleConnectorConfig oracleConnectorConfig, JdbcConfiguration jdbcConfiguration, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics) {
        this.jdbcConfiguration = jdbcConfiguration;
        this.streamingMetrics = oracleStreamingChangeEventSourceMetrics;
        this.hosts = (Set) oracleConnectorConfig.getRacNodes().stream().map((v0) -> {
            return v0.toUpperCase();
        }).collect(Collectors.toSet());
        recreateRacNodeFlushStrategies();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        closeRacNodeFlushStrategies();
        this.flushStrategies.clear();
    }

    @Override // io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy
    public String getHost() {
        return String.join(", ", this.hosts);
    }

    @Override // io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy
    public void flush(Scn scn) throws InterruptedException {
        Instant now = Instant.now();
        if (this.flushStrategies.isEmpty()) {
            throw new DebeziumException("No RAC node addresses supplied or currently connected");
        }
        boolean z = false;
        Iterator<Map.Entry<String, CommitLogWriterFlushStrategy>> it = this.flushStrategies.entrySet().iterator();
        while (it.hasNext()) {
            CommitLogWriterFlushStrategy value = it.next().getValue();
            try {
                value.flush(scn);
            } catch (Exception e) {
                LOGGER.warn("Failed to flush LGWR buffer on RAC node '{}'", value.getHost(), e);
                z = true;
            }
        }
        if (z) {
            recreateRacNodeFlushStrategies();
            LOGGER.warn("Not all LGWR buffers were flushed, waiting 3 seconds for Oracle to flush automatically.");
            try {
                Metronome.sleeper(Duration.ofSeconds(3L), Clock.SYSTEM).pause();
            } catch (InterruptedException e2) {
                LOGGER.warn("The LGWR buffer wait was interrupted.");
                throw e2;
            }
        }
        LOGGER.trace("LGWR flush took {} to complete.", Duration.between(now, Instant.now()));
    }

    private void recreateRacNodeFlushStrategies() {
        closeRacNodeFlushStrategies();
        this.flushStrategies.clear();
        for (String str : this.hosts) {
            try {
                String[] split = str.split(LocalDateTimeSchema.DELIMITER);
                this.flushStrategies.put(str, createHostFlushStrategy(split[0], Integer.valueOf(Integer.parseInt(split[1]))));
            } catch (SQLException e) {
                throw new DebeziumException("Cannot connect to RAC node '" + str + "'", e);
            }
        }
    }

    private CommitLogWriterFlushStrategy createHostFlushStrategy(String str, Integer num) throws SQLException {
        JdbcConfiguration adapt = JdbcConfiguration.adapt(this.jdbcConfiguration.edit().with(JdbcConfiguration.HOSTNAME, str).with(JdbcConfiguration.PORT, num).build());
        LOGGER.debug("Creating flush connection to RAC node '{}'", str);
        return new CommitLogWriterFlushStrategy(adapt);
    }

    private void closeRacNodeFlushStrategies() {
        for (CommitLogWriterFlushStrategy commitLogWriterFlushStrategy : this.flushStrategies.values()) {
            try {
                commitLogWriterFlushStrategy.close();
            } catch (Exception e) {
                LOGGER.warn("Failed to close RAC connection to node '{}'", commitLogWriterFlushStrategy.getHost(), e);
                this.streamingMetrics.incrementWarningCount();
            }
        }
    }
}
