/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.signal;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.data.Envelope;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.Log;
import io.debezium.pipeline.signal.Signal;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.Assertions;
import org.junit.Test;

public class SignalTest {
    @Test
    public void shouldDetectSignal() {
        Signal signal = new Signal(this.config());
        Assertions.assertThat((boolean)signal.isSignal((DataCollectionId)new TableId("dbo", null, "mytable"))).isFalse();
        Assertions.assertThat((boolean)signal.isSignal((DataCollectionId)new TableId("debezium", null, "signal"))).isTrue();
    }

    @Test
    public void shouldExecuteLog() throws Exception {
        Signal signal = new Signal(this.config());
        LogInterceptor log = new LogInterceptor(Log.class);
        Assertions.assertThat((boolean)signal.process((Partition)new TestPartition(), "log1", "log", "{\"message\": \"signallog {}\"}")).isTrue();
        Assertions.assertThat((boolean)log.containsMessage("signallog <none>")).isTrue();
    }

    @Test
    public void shouldIgnoreInvalidSignalType() throws Exception {
        Signal signal = new Signal(this.config());
        Assertions.assertThat((boolean)signal.process((Partition)new TestPartition(), "log1", "log1", "{\"message\": \"signallog\"}")).isFalse();
    }

    @Test
    public void shouldIgnoreUnparseableData() throws Exception {
        Signal signal = new Signal(this.config());
        Assertions.assertThat((boolean)signal.process((Partition)new TestPartition(), "log1", "log", "{\"message: \"signallog\"}")).isFalse();
    }

    @Test
    public void shouldRegisterAdditionalAction() throws Exception {
        Signal signal = new Signal(this.config());
        AtomicInteger called = new AtomicInteger();
        Signal.Action testAction = signalPayload -> {
            called.set(signalPayload.data.getInteger((CharSequence)"v"));
            return true;
        };
        signal.registerSignalAction("custom", testAction);
        Assertions.assertThat((boolean)signal.process((Partition)new TestPartition(), "log1", "custom", "{\"v\": 5}")).isTrue();
        Assertions.assertThat((int)called.intValue()).isEqualTo(5);
    }

    @Test
    public void shouldExecuteFromEnvelope() throws Exception {
        Signal signal = new Signal(this.config());
        Schema afterSchema = SchemaBuilder.struct().name("signal").field("col1", Schema.OPTIONAL_STRING_SCHEMA).field("col2", Schema.OPTIONAL_STRING_SCHEMA).field("col3", Schema.OPTIONAL_STRING_SCHEMA).build();
        Envelope env = Envelope.defineSchema().withName("someName").withRecord(afterSchema).withSource(SchemaBuilder.struct().name("source").build()).build();
        Struct record = new Struct(afterSchema);
        record.put("col1", (Object)"log1");
        record.put("col2", (Object)"custom");
        record.put("col3", (Object)"{\"v\": 5}");
        AtomicInteger called = new AtomicInteger();
        Signal.Action testAction = signalPayload -> {
            called.set(signalPayload.data.getInteger((CharSequence)"v"));
            return true;
        };
        signal.registerSignalAction("custom", testAction);
        Assertions.assertThat((boolean)signal.process((Partition)new TestPartition(), env.create((Object)record, null, null), null)).isTrue();
        Assertions.assertThat((int)called.intValue()).isEqualTo(5);
    }

    @Test
    public void shouldIgnoreInvalidEnvelope() throws Exception {
        Signal signal = new Signal(this.config());
        Schema afterSchema = SchemaBuilder.struct().name("signal").field("col1", Schema.OPTIONAL_STRING_SCHEMA).field("col2", Schema.OPTIONAL_STRING_SCHEMA).build();
        Envelope env = Envelope.defineSchema().withName("someName").withRecord(afterSchema).withSource(SchemaBuilder.struct().name("source").build()).build();
        Struct record = new Struct(afterSchema);
        record.put("col1", (Object)"log1");
        record.put("col2", (Object)"custom");
        AtomicInteger called = new AtomicInteger();
        Signal.Action testAction = signalPayload -> {
            called.set(signalPayload.data.getInteger((CharSequence)"v"));
            return true;
        };
        signal.registerSignalAction("custom", testAction);
        Assertions.assertThat((boolean)signal.process((Partition)new TestPartition(), env.create((Object)record, null, null), null)).isFalse();
        Assertions.assertThat((int)called.intValue()).isEqualTo(0);
        Assertions.assertThat((boolean)signal.process((Partition)new TestPartition(), record, null)).isFalse();
        Assertions.assertThat((int)called.intValue()).isEqualTo(0);
    }

    protected CommonConnectorConfig config() {
        return new CommonConnectorConfig(((Configuration.Builder)Configuration.create().with(CommonConnectorConfig.SIGNAL_DATA_COLLECTION, "debezium.signal")).build(), "core", 0){

            protected SourceInfoStructMaker<?> getSourceInfoStructMaker(CommonConnectorConfig.Version version) {
                return null;
            }

            public String getContextName() {
                return null;
            }

            public String getConnectorName() {
                return null;
            }
        };
    }

    private static class TestPartition
    implements Partition {
        private TestPartition() {
        }

        public Map<String, String> getSourcePartition() {
            throw new UnsupportedOperationException();
        }
    }
}

