/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.RecordBuffer;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Tag(value="UnitTests")
class RecordBufferTest {
    private DatabaseDialect dialect;

    RecordBufferTest() {
    }

    @BeforeEach
    void setUp() {
        this.dialect = (DatabaseDialect)Mockito.mock(DatabaseDialect.class);
        Type type = (Type)Mockito.mock(Type.class);
        Mockito.when((Object)type.getTypeName((DatabaseDialect)ArgumentMatchers.eq((Object)this.dialect), (Schema)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)"");
        Mockito.when((Object)this.dialect.getSchemaType((Schema)ArgumentMatchers.any())).thenReturn((Object)type);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SinkRecordFactoryArgumentsProvider.class)
    @DisplayName(value="When 10 sink records arrives and buffer size is 5 then the buffer will be flushed 2 times")
    void correctlyBuffer(SinkRecordFactory factory) {
        JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5"));
        RecordBuffer recordBuffer = new RecordBuffer(config);
        List sinkRecords = IntStream.range(0, 10).mapToObj(i -> SinkRecordDescriptor.builder().withSinkRecord(factory.createRecord("topic", (byte)i)).withDialect(this.dialect).withPrimaryKeyFields(Set.of()).withPrimaryKeyMode(JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY).build()).collect(Collectors.toList());
        List batches = sinkRecords.stream().map(arg_0 -> ((RecordBuffer)recordBuffer).add(arg_0)).filter(Predicate.not(List::isEmpty)).collect(Collectors.toList());
        Assertions.assertThat((int)batches.size()).isEqualTo(2);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SinkRecordFactoryArgumentsProvider.class)
    @DisplayName(value="When key schema changes then the buffer will be flushed")
    void keySchemaChange(SinkRecordFactory factory) {
        JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5"));
        RecordBuffer recordBuffer = new RecordBuffer(config);
        List sinkRecords = IntStream.range(0, 3).mapToObj(i -> SinkRecordDescriptor.builder().withSinkRecord(factory.createRecord("topic", (byte)i)).withDialect(this.dialect).withPrimaryKeyFields(Set.of()).withPrimaryKeyMode(JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY).build()).collect(Collectors.toList());
        SinkRecord sinkRecordWithDifferentKeySchema = factory.updateBuilder().name("prefix").topic("topic").keySchema(factory.keySchema(UnaryOperator.identity(), Schema.INT16_SCHEMA)).recordSchema((Schema)SchemaBuilder.struct().field("id", Schema.INT8_SCHEMA)).sourceSchema(factory.basicSourceSchema()).key("id", (short)1).before("id", (byte)1).after("id", (byte)1).source("ts_ms", (int)Instant.now().getEpochSecond()).build();
        sinkRecords.add(SinkRecordDescriptor.builder().withSinkRecord(sinkRecordWithDifferentKeySchema).withDialect(this.dialect).withPrimaryKeyFields(Set.of("id")).withPrimaryKeyMode(JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY).build());
        List batches = sinkRecords.stream().map(arg_0 -> ((RecordBuffer)recordBuffer).add(arg_0)).filter(Predicate.not(List::isEmpty)).collect(Collectors.toList());
        Assertions.assertThat((int)batches.size()).isEqualTo(1);
    }

    @ParameterizedTest
    @ArgumentsSource(value=SinkRecordFactoryArgumentsProvider.class)
    @DisplayName(value="When value schema changes then the buffer will be flushed")
    void valueSchemaChange(SinkRecordFactory factory) {
        JdbcSinkConnectorConfig config = new JdbcSinkConnectorConfig(Map.of("batch.size", "5"));
        RecordBuffer recordBuffer = new RecordBuffer(config);
        List sinkRecords = IntStream.range(0, 3).mapToObj(i -> SinkRecordDescriptor.builder().withSinkRecord(factory.createRecord("topic", (byte)i)).withDialect(this.dialect).withPrimaryKeyFields(Set.of("id")).withPrimaryKeyMode(JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY).build()).collect(Collectors.toList());
        SinkRecord sinkRecordWithDifferentValueSchema = factory.updateBuilder().name("prefix").topic("topic").keySchema(factory.basicKeySchema()).recordSchema((Schema)SchemaBuilder.struct().field("id", Schema.INT16_SCHEMA)).sourceSchema(factory.basicSourceSchema()).key("id", (byte)1).before("id", (short)1).after("id", (short)1).source("ts_ms", (int)Instant.now().getEpochSecond()).build();
        sinkRecords.add(SinkRecordDescriptor.builder().withSinkRecord(sinkRecordWithDifferentValueSchema).withDialect(this.dialect).withPrimaryKeyFields(Set.of("id")).withPrimaryKeyMode(JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY).build());
        List batches = sinkRecords.stream().map(arg_0 -> ((RecordBuffer)recordBuffer).add(arg_0)).filter(Predicate.not(List::isEmpty)).collect(Collectors.toList());
        Assertions.assertThat((int)batches.size()).isEqualTo(1);
    }
}

