/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.JdbcChangeEventSink;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.Module;
import io.debezium.connector.jdbc.QueryBinderResolver;
import io.debezium.connector.jdbc.RecordWriter;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.dialect.DatabaseDialectResolver;
import io.debezium.pipeline.sink.spi.ChangeEventSink;
import io.debezium.util.Strings;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.hibernate.SessionFactory;
import org.hibernate.SharedSessionContract;
import org.hibernate.StatelessSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcSinkConnectorTask
extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSinkConnectorTask.class);
    private SessionFactory sessionFactory;
    private final AtomicReference<State> state = new AtomicReference<State>(State.STOPPED);
    private final ReentrantLock stateLock = new ReentrantLock();
    private ChangeEventSink changeEventSink;
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
    private Throwable previousPutException;

    public String version() {
        return Module.version();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(Map<String, String> props) {
        this.stateLock.lock();
        try {
            if (!this.state.compareAndSet(State.STOPPED, State.RUNNING)) {
                LOGGER.info("Connector has already been started");
                return;
            }
            this.previousPutException = null;
            JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(props);
            config.validate();
            this.sessionFactory = config.getHibernateConfiguration().buildSessionFactory();
            StatelessSession session = this.sessionFactory.openStatelessSession();
            DatabaseDialect databaseDialect = DatabaseDialectResolver.resolve(config, this.sessionFactory);
            QueryBinderResolver queryBinderResolver = new QueryBinderResolver();
            RecordWriter recordWriter = new RecordWriter((SharedSessionContract)session, queryBinderResolver, config, databaseDialect);
            this.changeEventSink = new JdbcChangeEventSink(config, session, databaseDialect, recordWriter);
        }
        finally {
            this.stateLock.unlock();
        }
    }

    public void put(Collection<SinkRecord> records) {
        if (this.previousPutException != null) {
            LOGGER.error("JDBC sink connector failure", this.previousPutException);
            throw new ConnectException("JDBC sink connector failure", this.previousPutException);
        }
        LOGGER.debug("Received {} changes.", (Object)records.size());
        try {
            this.changeEventSink.execute(records);
            records.forEach(this::markProcessed);
        }
        catch (Throwable throwable) {
            LOGGER.error("Failed to process record: {}", (Object)throwable.getMessage(), (Object)throwable);
            this.previousPutException = throwable;
            records.forEach(this::markNotProcessed);
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        LOGGER.debug("Flushing offsets: {}", this.offsets);
        this.flush(this.offsets);
        return this.offsets;
    }

    public void stop() {
        block8: {
            this.stateLock.lock();
            try {
                if (this.changeEventSink == null) break block8;
                try {
                    this.changeEventSink.close();
                    if (this.sessionFactory != null && this.sessionFactory.isOpen()) {
                        LOGGER.info("Closing the session factory");
                        this.sessionFactory.close();
                        break block8;
                    }
                    LOGGER.info("Session factory already closed");
                }
                catch (Exception e) {
                    LOGGER.error("Failed to gracefully close resources.", (Throwable)e);
                }
            }
            finally {
                if (this.previousPutException != null) {
                    this.previousPutException = null;
                }
                if (this.changeEventSink != null) {
                    this.changeEventSink = null;
                }
                this.stateLock.unlock();
            }
        }
    }

    private void markProcessed(SinkRecord record) {
        String topicName = this.getOriginalTopicName(record);
        if (Strings.isNullOrBlank((String)topicName)) {
            return;
        }
        LOGGER.trace("Marking processed record for topic {}", (Object)topicName);
        TopicPartition topicPartition = new TopicPartition(topicName, record.kafkaPartition().intValue());
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.kafkaOffset() + 1L);
        OffsetAndMetadata existing = this.offsets.put(topicPartition, offsetAndMetadata);
        if (existing == null) {
            LOGGER.trace("Advanced topic {} to offset {}.", (Object)topicName, (Object)record.kafkaOffset());
        } else {
            LOGGER.trace("Updated topic {} from offset {} to {}.", new Object[]{topicName, existing.offset(), record.kafkaOffset()});
        }
    }

    private void markNotProcessed(SinkRecord record) {
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition().intValue());
        if (!this.offsets.containsKey(topicPartition)) {
            LOGGER.debug("Rewinding topic {} offset to {}.", (Object)record.topic(), (Object)record.kafkaOffset());
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.kafkaOffset());
            this.offsets.put(topicPartition, offsetAndMetadata);
        }
    }

    private String getOriginalTopicName(SinkRecord record) {
        if (record instanceof InternalSinkRecord) {
            return ((InternalSinkRecord)record).originalRecord().topic();
        }
        return null;
    }

    private static enum State {
        RUNNING,
        STOPPED;

    }
}

