/*
 * Decompiled with CFR 0.152.
 */
package io.aiven.kafka.connect.common.grouper;

import io.aiven.kafka.connect.common.config.FilenameTemplateVariable;
import io.aiven.kafka.connect.common.config.TimestampSource;
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.templating.Template;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;

class TopicPartitionRecordGrouper
implements RecordGrouper {
    private final Template filenameTemplate;
    private final Map<TopicPartition, SinkRecord> currentHeadRecords = new HashMap<TopicPartition, SinkRecord>();
    private final Map<String, List<SinkRecord>> fileBuffers = new HashMap<String, List<SinkRecord>>();
    private final Function<VariableTemplatePart.Parameter, String> setTimestamp;
    private final RecordGrouper.Rotator<List<SinkRecord>> rotator;

    TopicPartitionRecordGrouper(Template filenameTemplate, Integer maxRecordsPerFile, final TimestampSource tsSource) {
        Objects.requireNonNull(filenameTemplate, "filenameTemplate cannot be null");
        Objects.requireNonNull(tsSource, "tsSource cannot be null");
        this.filenameTemplate = filenameTemplate;
        this.setTimestamp = new Function<VariableTemplatePart.Parameter, String>(){
            private final Map<String, DateTimeFormatter> timestampFormatters = Map.of("yyyy", DateTimeFormatter.ofPattern("yyyy"), "MM", DateTimeFormatter.ofPattern("MM"), "dd", DateTimeFormatter.ofPattern("dd"), "HH", DateTimeFormatter.ofPattern("HH"));

            @Override
            public String apply(VariableTemplatePart.Parameter parameter) {
                return tsSource.time().format(this.timestampFormatters.get(parameter.value()));
            }
        };
        this.rotator = buffer -> {
            boolean unlimited;
            boolean bl = unlimited = maxRecordsPerFile == null;
            if (unlimited) {
                return false;
            }
            return buffer == null || buffer.size() >= maxRecordsPerFile;
        };
    }

    @Override
    public void put(SinkRecord record) {
        Objects.requireNonNull(record, "record cannot be null");
        String recordKey = this.resolveRecordKeyFor(record);
        this.fileBuffers.computeIfAbsent(recordKey, ignored -> new ArrayList()).add(record);
    }

    protected String resolveRecordKeyFor(SinkRecord record) {
        SinkRecord currentHeadRecord;
        TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition().intValue());
        String recordKey = this.generateRecordKey(tp, currentHeadRecord = this.currentHeadRecords.computeIfAbsent(tp, ignored -> record));
        if (this.rotator.rotate(this.fileBuffers.get(recordKey))) {
            recordKey = this.generateNewRecordKey(record);
        }
        return recordKey;
    }

    private String generateRecordKey(TopicPartition tp, SinkRecord headRecord) {
        Function<VariableTemplatePart.Parameter, String> setKafkaOffset = usePaddingParameter -> usePaddingParameter.asBoolean() != false ? String.format("%020d", headRecord.kafkaOffset()) : Long.toString(headRecord.kafkaOffset());
        Function<VariableTemplatePart.Parameter, String> setKafkaPartition = usePaddingParameter -> usePaddingParameter.asBoolean() != false ? String.format("%010d", headRecord.kafkaPartition()) : Long.toString(headRecord.kafkaPartition().intValue());
        return this.filenameTemplate.instance().bindVariable(FilenameTemplateVariable.TOPIC.name, () -> ((TopicPartition)tp).topic()).bindVariable(FilenameTemplateVariable.PARTITION.name, setKafkaPartition).bindVariable(FilenameTemplateVariable.START_OFFSET.name, setKafkaOffset).bindVariable(FilenameTemplateVariable.TIMESTAMP.name, this.setTimestamp).render();
    }

    protected String generateNewRecordKey(SinkRecord record) {
        TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition().intValue());
        this.currentHeadRecords.put(tp, record);
        return this.generateRecordKey(tp, record);
    }

    @Override
    public void clear() {
        this.currentHeadRecords.clear();
        this.fileBuffers.clear();
    }

    @Override
    public Map<String, List<SinkRecord>> records() {
        return Collections.unmodifiableMap(this.fileBuffers);
    }
}

