/*
 * Decompiled with CFR 0.152.
 */
package dev.snowdrop.vertx.kafka;

import dev.snowdrop.vertx.kafka.Header;
import dev.snowdrop.vertx.kafka.KafkaProducer;
import dev.snowdrop.vertx.kafka.PartitionInfo;
import dev.snowdrop.vertx.kafka.ProducerRecord;
import dev.snowdrop.vertx.kafka.RecordMetadata;
import dev.snowdrop.vertx.kafka.SnowdropPartitionInfo;
import dev.snowdrop.vertx.kafka.SnowdropRecordMetadata;
import io.vertx.axle.core.buffer.Buffer;
import io.vertx.axle.kafka.client.producer.KafkaHeader;
import io.vertx.axle.kafka.client.producer.KafkaProducerRecord;
import io.vertx.core.streams.WriteStream;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final class SnowdropKafkaProducer<K, V>
implements KafkaProducer<K, V> {
    private final io.vertx.axle.kafka.client.producer.KafkaProducer<K, V> delegate;

    SnowdropKafkaProducer(io.vertx.axle.kafka.client.producer.KafkaProducer<K, V> delegate) {
        this.delegate = delegate;
    }

    @Override
    public Mono<RecordMetadata> send(ProducerRecord<K, V> record) {
        Objects.requireNonNull(record, "Record cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.send(this.toAxleProducerRecord(record))).map(SnowdropRecordMetadata::new);
    }

    @Override
    public Flux<PartitionInfo> partitionsFor(String topic) {
        if (StringUtils.isEmpty((Object)topic)) {
            throw new IllegalArgumentException("Topic cannot be empty");
        }
        return Mono.fromCompletionStage(() -> this.delegate.partitionsFor(topic)).flatMapMany(Flux::fromIterable).map(SnowdropPartitionInfo::new);
    }

    @Override
    public Mono<Void> flush() {
        return Mono.create(sink -> {
            try {
                this.delegate.flush(v -> sink.success());
            }
            catch (Throwable t) {
                sink.error(t);
            }
        });
    }

    @Override
    public Mono<Void> close() {
        return Mono.fromCompletionStage(() -> this.delegate.close());
    }

    @Override
    public Mono<Void> close(long timeout) {
        return Mono.fromCompletionStage(() -> this.delegate.close(timeout));
    }

    @Override
    public <T> Mono<T> doOnVertxProducer(Function<io.vertx.kafka.client.producer.KafkaProducer<K, V>, T> function) {
        Objects.requireNonNull(function, "Function cannot be null");
        return Mono.create(sink -> {
            try {
                Object result = function.apply(this.delegate.getDelegate());
                sink.success(result);
            }
            catch (Throwable t) {
                sink.error(t);
            }
        });
    }

    @Override
    public KafkaProducer<K, V> exceptionHandler(Consumer<Throwable> handler) {
        this.delegate.exceptionHandler(handler);
        return this;
    }

    @Override
    public KafkaProducer<K, V> drainHandler(Consumer<Void> handler) {
        this.delegate.drainHandler(handler);
        return this;
    }

    @Override
    public KafkaProducer<K, V> setWriteQueueMaxSize(int maxSize) {
        this.delegate.setWriteQueueMaxSize(maxSize);
        return this;
    }

    public boolean writeQueueFull() {
        return this.delegate.writeQueueFull();
    }

    public Mono<Void> write(ProducerRecord<K, V> record) {
        Objects.requireNonNull(record, "Record cannot be null");
        return Mono.fromCompletionStage(() -> this.delegate.write(this.toAxleProducerRecord(record)));
    }

    public Mono<Void> end() {
        return Mono.fromCompletionStage(() -> this.delegate.end());
    }

    public WriteStream vertxWriteStream() {
        return this.delegate.getDelegate();
    }

    private KafkaProducerRecord<K, V> toAxleProducerRecord(ProducerRecord<K, V> record) {
        List axleHeaders = record.headers().stream().map(this::toAxleHeader).collect(Collectors.toList());
        return KafkaProducerRecord.create((String)record.topic(), record.key(), record.value(), (Long)record.timestamp(), (Integer)record.partition()).addHeaders(axleHeaders);
    }

    private KafkaHeader toAxleHeader(Header header) {
        return KafkaHeader.header((String)header.key(), (Buffer)Buffer.buffer((byte[])header.value().asByteBuffer().array()));
    }
}

