/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.connect.common.grouper;

import io.aiven.kafka.connect.common.config.FilenameTemplateVariable;
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;

public class TopicPartitionKeyRecordGrouper
implements RecordGrouper {
    private static final Map<String, DateTimeFormatter> TIMESTAMP_FORMATTERS = Map.of("yyyy", DateTimeFormatter.ofPattern("yyyy"), "MM", DateTimeFormatter.ofPattern("MM"), "dd", DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH"));
    private final Template filenameTemplate;
    private final Map<TopicPartitionKey, SinkRecord> currentHeadRecords = new HashMap<TopicPartitionKey, SinkRecord>();
    private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<String, List<SinkRecord>>();
    private final Function<SinkRecord, Function<VariableTemplatePart.Parameter, String>> setTimestampBasedOnRecord;
    private final RecordGrouper.Rotator<List<SinkRecord>> rotator;

    TopicPartitionKeyRecordGrouper(Template filenameTemplate, Integer maxRecordsPerFile, TimestampSource tsSource) {
        Objects.requireNonNull(filenameTemplate, "filenameTemplate cannot be null");
        Objects.requireNonNull(tsSource, "tsSource cannot be null");
        this.filenameTemplate = filenameTemplate;
        this.setTimestampBasedOnRecord = record -> parameter -> tsSource.time((SinkRecord)record).format(TIMESTAMP_FORMATTERS.get(parameter.value()));
        this.rotator = buffer -> {
            boolean unlimited;
            boolean bl = unlimited = maxRecordsPerFile == null;
            if (unlimited) {
                return false;
            }
            return buffer == null || buffer.size() >= maxRecordsPerFile;
        };
    }

    @Override
    public void put(SinkRecord record) {
        Objects.requireNonNull(record, "record cannot be null");
        String recordKey = this.resolveRecordKeyFor(record);
        this.fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList()).add(record);
    }

    protected String resolveRecordKeyFor(SinkRecord record) {
        SinkRecord currentHeadRecord;
        String key = this.recordKey(record);
        TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition().intValue()), key);
        String objectKey = this.generateObjectKey(tpk, currentHeadRecord = this.currentHeadRecords.computeIfAbsent(tpk, ignored -> record), record);
        if (this.rotator.rotate(this.fileBuffers.get(objectKey))) {
            objectKey = this.generateNewRecordKey(record);
        }
        return objectKey;
    }

    private String recordKey(SinkRecord record) {
        String key = record.key() == null ? "null" : (record.keySchema() != null && record.keySchema().type() == Schema.Type.STRING ? (String)record.key() : record.key().toString());
        return key;
    }

    public String generateObjectKey(TopicPartitionKey tpk, SinkRecord headRecord, SinkRecord currentRecord) {
        Function<VariableTemplatePart.Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() != false ? String.format("%020d", headRecord.kafkaOffset()) : Long.toString(headRecord.kafkaOffset());
        Function<VariableTemplatePart.Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() != false ? String.format("%010d", headRecord.kafkaPartition()) : Long.toString(headRecord.kafkaPartition().intValue());
        return this.filenameTemplate.instance().bindVariable(FilenameTemplateVariable.TOPIC.name, () -> ((TopicPartition)tpk.topicPartition).topic()).bindVariable(FilenameTemplateVariable.PARTITION.name, setKafkaPartition).bindVariable(FilenameTemplateVariable.KEY.name, tpk::key).bindVariable(FilenameTemplateVariable.START_OFFSET.name, setKafkaOffset).bindVariable(FilenameTemplateVariable.TIMESTAMP.name, this.setTimestampBasedOnRecord.apply(currentRecord)).render();
    }

    protected String generateNewRecordKey(SinkRecord record) {
        String key = this.recordKey(record);
        TopicPartitionKey tpk = new TopicPartitionKey(new TopicPartition(record.topic(), record.kafkaPartition().intValue()), key);
        this.currentHeadRecords.put(tpk, record);
        return this.generateObjectKey(tpk, record, record);
    }

    @Override
    public void clear() {
        this.currentHeadRecords.clear();
        this.fileBuffers.clear();
    }

    @Override
    public Map<String, List<SinkRecord>> records() {
        return Collections.unmodifiableMap(this.fileBuffers);
    }

    public static class TopicPartitionKey {
        final TopicPartition topicPartition;
        final String key;

        TopicPartitionKey(TopicPartition topicPartition, String key) {
            this.topicPartition = topicPartition;
            this.key = key;
        }

        public String key() {
            return this.key;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TopicPartitionKey that = (TopicPartitionKey)o;
            return Objects.equals(this.topicPartition, that.topicPartition) && Objects.equals(this.key, that.key);
        }

        public int hashCode() {
            return Objects.hash(this.topicPartition, this.key);
        }
    }
}

