package io.trino.plugin.kafka;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.kafka.schema.ContentSchemaReader;
import io.trino.spi.HostAddress;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/trino/plugin/kafka/KafkaSplitManager.class */
public class KafkaSplitManager implements ConnectorSplitManager {
    private final KafkaConsumerFactory consumerFactory;
    private final KafkaFilterManager kafkaFilterManager;
    private final ContentSchemaReader contentSchemaReader;
    private final int messagesPerSplit;

    @Inject
    public KafkaSplitManager(KafkaConsumerFactory kafkaConsumerFactory, KafkaConfig kafkaConfig, KafkaFilterManager kafkaFilterManager, ContentSchemaReader contentSchemaReader) {
        this.consumerFactory = (KafkaConsumerFactory) Objects.requireNonNull(kafkaConsumerFactory, "consumerFactory is null");
        this.messagesPerSplit = ((KafkaConfig) Objects.requireNonNull(kafkaConfig, "kafkaConfig is null")).getMessagesPerSplit();
        this.kafkaFilterManager = (KafkaFilterManager) Objects.requireNonNull(kafkaFilterManager, "kafkaFilterManager is null");
        this.contentSchemaReader = (ContentSchemaReader) Objects.requireNonNull(contentSchemaReader, "contentSchemaReader is null");
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorTableHandle connectorTableHandle, ConnectorSplitManager.SplitSchedulingStrategy splitSchedulingStrategy, DynamicFilter dynamicFilter, Constraint constraint) {
        KafkaTableHandle kafkaTableHandle = (KafkaTableHandle) connectorTableHandle;
        try {
            KafkaConsumer<byte[], byte[]> create = this.consumerFactory.create(connectorSession);
            try {
                List<PartitionInfo> partitionsFor = create.partitionsFor(kafkaTableHandle.getTopicName());
                List list = (List) partitionsFor.stream().map(KafkaSplitManager::toTopicPartition).collect(ImmutableList.toImmutableList());
                KafkaFilteringResult kafkaFilterResult = this.kafkaFilterManager.getKafkaFilterResult(connectorSession, kafkaTableHandle, partitionsFor, create.beginningOffsets(list), create.endOffsets(list));
                List<PartitionInfo> partitionInfos = kafkaFilterResult.getPartitionInfos();
                Map<TopicPartition, Long> partitionBeginOffsets = kafkaFilterResult.getPartitionBeginOffsets();
                Map<TopicPartition, Long> partitionEndOffsets = kafkaFilterResult.getPartitionEndOffsets();
                ImmutableList.Builder builder = ImmutableList.builder();
                Optional<String> readKeyContentSchema = this.contentSchemaReader.readKeyContentSchema(kafkaTableHandle);
                Optional<String> readValueContentSchema = this.contentSchemaReader.readValueContentSchema(kafkaTableHandle);
                for (PartitionInfo partitionInfo : partitionInfos) {
                    TopicPartition topicPartition = toTopicPartition(partitionInfo);
                    HostAddress fromParts = HostAddress.fromParts(partitionInfo.leader().host(), partitionInfo.leader().port());
                    Stream<R> map = new Range(partitionBeginOffsets.get(topicPartition).longValue(), partitionEndOffsets.get(topicPartition).longValue()).partition(this.messagesPerSplit).stream().map(range -> {
                        return new KafkaSplit(kafkaTableHandle.getTopicName(), kafkaTableHandle.getKeyDataFormat(), kafkaTableHandle.getMessageDataFormat(), readKeyContentSchema, readValueContentSchema, partitionInfo.partition(), range, fromParts);
                    });
                    Objects.requireNonNull(builder);
                    map.forEach((v1) -> {
                        r1.add(v1);
                    });
                }
                FixedSplitSource fixedSplitSource = new FixedSplitSource(builder.build());
                if (create != null) {
                    create.close();
                }
                return fixedSplitSource;
            } finally {
            }
        } catch (Exception e) {
            if (e instanceof TrinoException) {
                throw e;
            }
            throw new TrinoException(KafkaErrorCode.KAFKA_SPLIT_ERROR, String.format("Cannot list splits for table '%s' reading topic '%s'", kafkaTableHandle.getTableName(), kafkaTableHandle.getTopicName()), e);
        }
    }

    private static TopicPartition toTopicPartition(PartitionInfo partitionInfo) {
        return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
    }
}
