/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.transforms.partitions;

import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.transforms.partitions.PartitionRouting;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class PartitionRoutingTest {
    public static final Schema VALUE_SCHEMA = SchemaBuilder.struct().name("server1.inventory.products.Value").field("id", Schema.INT64_SCHEMA).field("price", Schema.FLOAT32_SCHEMA).field("product", Schema.OPTIONAL_STRING_SCHEMA).build();
    private final PartitionRouting<SourceRecord> partitionRoutingTransformation = new PartitionRouting();

    @Test
    public void whenNoPartitionPayloadFieldDeclaredAConfigExceptionIsThrew() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.partitionRoutingTransformation.configure(Map.of("partition.topic.num", 2))).isInstanceOf(ConfigException.class)).hasMessageContaining("Invalid value null for configuration partition.payload.fields: The 'partition.payload.fields' value is invalid: A value is required");
    }

    @Test
    public void whenNoPartitionTopicNumFieldDeclaredAConfigExceptionIsThrew() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", 2))).isInstanceOf(ConfigException.class)).hasMessageContaining("Invalid value null for configuration partition.topic.num: The 'partition.topic.num' value is invalid: A value is required");
    }

    @Test
    public void whenPartitionPayloadFieldContainsEmptyElementAConfigExceptionIsThrew() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", ",source.table", "partition.topic.num", 2))).isInstanceOf(ConfigException.class)).hasMessageContaining("Invalid value ,source.table for configuration partition.payload.fields: The 'partition.payload.fields' value is invalid: Empty string element(s) not permitted");
    }

    @Test
    public void spaceBetweenNestedFiledSeparatedWillBeCorrectManaged() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "change . product", "partition.topic.num", 2));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "APPLE"), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Integer)transformed.kafkaPartition()).isZero();
    }

    @Test
    public void correctComputeKafkaPartitionBasedOnNewConfiguredFieldOnCreateAndUpdateEvents() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "after.product", "partition.topic.num", 2));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "APPLE"), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Integer)transformed.kafkaPartition()).isZero();
    }

    @Test
    public void correctComputeKafkaPartitionBasedOnSpecialChangeNestedFieldOnCreateEvent() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "change.product", "partition.topic.num", 2));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "APPLE"), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Integer)transformed.kafkaPartition()).isZero();
    }

    @Test
    public void whenASpecifiedFieldIsNotFoundOnPayloadItWillBeIgnored() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "after.not_existing", "partition.topic.num", 2));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "APPLE"), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRecord).isEqualTo((Object)transformed);
    }

    @Test
    @FixFor(value={"DBZ-6543"})
    public void whenAnSpecifiedOptionalFieldIsNotFoundOnPayloadItWillBeIgnored() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "change.product", "partition.topic.num", 2));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(Map.of("id", 1L, "price", Float.valueOf(1.0f))), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRecord).isEqualTo((Object)transformed);
    }

    @Test
    public void onlyFieldThatExistForCurrentEventWillBeUsedForPartitionComputation() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "after.not_existing,change.product", "partition.topic.num", 3));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "orange"), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Integer)transformed.kafkaPartition()).isOne();
    }

    @Test
    public void correctComputeKafkaPartitionBasedOnSpecialChangeNestedFieldOnCreateDelete() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "change.product", "partition.topic.num", 2));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "APPLE"), Envelope.Operation.DELETE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Integer)transformed.kafkaPartition()).isZero();
    }

    @Test
    public void truncateOperationRecordWillBeSkipped() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "change.product", "partition.topic.num", 2));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "APPLE"), Envelope.Operation.TRUNCATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Object)eventRecord).isEqualTo((Object)transformed);
    }

    @Test
    public void correctComputeKafkaPartitionBasedOnNotNestedField() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "op", "partition.topic.num", 2));
        SourceRecord createRecord1 = this.buildSourceRecord(this.productRow(1L, 1.0f, "APPLE"), Envelope.Operation.CREATE);
        SourceRecord createRecord2 = this.buildSourceRecord(this.productRow(1L, 1.0f, "ORANGE"), Envelope.Operation.CREATE);
        SourceRecord updateRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "ORANGE"), Envelope.Operation.UPDATE);
        SourceRecord transformedCreateRecord1 = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)createRecord1);
        SourceRecord transformedCreateRecord2 = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)createRecord2);
        SourceRecord transformedUpdateRecord = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)updateRecord);
        Assertions.assertThat((Integer)transformedCreateRecord1.kafkaPartition()).isEqualTo((Object)transformedCreateRecord2.kafkaPartition());
        Assertions.assertThat((Object)transformedUpdateRecord).isNotEqualTo((Object)updateRecord);
    }

    @Test
    public void byDefaultJavaHashIsUsed() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "change.id, change.product", "partition.topic.num", 100));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "orange"), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Integer)transformed.kafkaPartition()).isEqualTo(39);
    }

    @Test
    public void murmurHashWillBeUsed() {
        this.partitionRoutingTransformation.configure(Map.of("partition.payload.fields", "change.id, change.product", "partition.topic.num", 100, "partition.hash.function", "murmur"));
        SourceRecord eventRecord = this.buildSourceRecord(this.productRow(1L, 1.0f, "orange"), Envelope.Operation.CREATE);
        SourceRecord transformed = (SourceRecord)this.partitionRoutingTransformation.apply((ConnectRecord)eventRecord);
        Assertions.assertThat((Integer)transformed.kafkaPartition()).isEqualTo(65);
    }

    private SourceRecord buildSourceRecord(Struct row, Envelope.Operation operation) {
        SchemaBuilder sourceSchemaBuilder = SchemaBuilder.struct().name("source").field("connector", Schema.STRING_SCHEMA).field("db", Schema.STRING_SCHEMA).field("table", Schema.STRING_SCHEMA);
        Schema sourceSchema = sourceSchemaBuilder.build();
        Envelope createEnvelope = Envelope.defineSchema().withName("server1.inventory.product.Envelope").withRecord(VALUE_SCHEMA).withSource(sourceSchema).build();
        Struct source = new Struct(sourceSchema);
        source.put("connector", (Object)"mysql");
        source.put("db", (Object)"inventory");
        source.put("table", (Object)"products");
        Struct payload = createEnvelope.create((Object)row, source, Instant.now());
        switch (operation) {
            case CREATE: 
            case UPDATE: 
            case READ: {
                payload = createEnvelope.create((Object)row, source, Instant.now());
                break;
            }
            case DELETE: {
                payload = createEnvelope.delete((Object)row, source, Instant.now());
                break;
            }
            case TRUNCATE: {
                payload = createEnvelope.truncate(source, Instant.now());
            }
        }
        return new SourceRecord(new HashMap(), new HashMap(), "prefix.inventory.products", createEnvelope.schema(), (Object)payload);
    }

    private Struct productRow(long id, float price, String name) {
        return new Struct(VALUE_SCHEMA).put("id", (Object)id).put("price", (Object)Float.valueOf(price)).put("product", (Object)name);
    }

    private Struct productRow(Map<String, Object> rowValues) {
        Struct struct = new Struct(VALUE_SCHEMA);
        rowValues.forEach((arg_0, arg_1) -> ((Struct)struct).put(arg_0, arg_1));
        return struct;
    }
}

