/*
 * Decompiled with CFR 0.152.
 */
package io.kcache.keta.server;

import com.google.protobuf.ByteString;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.common.exception.CompactedException;
import io.etcd.jetcd.kv.PutResponse;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.kcache.keta.server.utils.RemoteClusterTestHarness;
import io.kcache.keta.server.utils.TestUtils;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={VertxExtension.class})
@TestInstance(value=TestInstance.Lifecycle.PER_CLASS)
public class WatchTest
extends RemoteClusterTestHarness {
    public static final ByteSequence namespace = TestUtils.bytesOf("test-namespace/");
    private Client client;

    @BeforeAll
    public void deployVerticle(Vertx vertx, VertxTestContext testContext) throws Exception {
        vertx.deployVerticle((Verticle)this.createKeta(), testContext.completing());
        this.client = Client.builder().endpoints(new String[]{this.endpoints}).build();
    }

    @Override
    @BeforeEach
    public void setUp(Vertx vertx) throws Exception {
        super.setUp(vertx);
    }

    @Override
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
    }

    @Test
    public void testNamespacedAndNotNamespacedClient() throws Exception {
        ByteSequence key = TestUtils.randomByteSequence();
        ByteSequence nsKey = ByteSequence.from((ByteString)ByteString.copyFrom((byte[])namespace.getBytes()).concat(ByteString.copyFrom((byte[])key.getBytes())));
        Client client = Client.builder().endpoints(new String[]{this.endpoints}).build();
        Client nsClient = Client.builder().endpoints(new String[]{this.endpoints}).namespace(namespace).build();
        CountDownLatch latch = new CountDownLatch(1);
        ByteSequence value = TestUtils.randomByteSequence();
        AtomicReference ref = new AtomicReference();
        try (Watch.Watcher watcher = nsClient.getWatchClient().watch(key, response -> {
            ref.set(response);
            latch.countDown();
        });){
            client.getKVClient().put(nsKey, value).get();
            latch.await(4L, TimeUnit.SECONDS);
            Assertions.assertThat(ref.get()).isNotNull();
            Assertions.assertThat((int)((WatchResponse)ref.get()).getEvents().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)((WatchEvent)((WatchResponse)ref.get()).getEvents().get(0)).getEventType()).isEqualTo((Object)WatchEvent.EventType.PUT);
            Assertions.assertThat((Object)((WatchEvent)((WatchResponse)ref.get()).getEvents().get(0)).getKeyValue().getKey()).isEqualTo((Object)key);
        }
    }

    @Test
    public void testWatchOnPut() throws Exception {
        ByteSequence key = TestUtils.randomByteSequence();
        CountDownLatch latch = new CountDownLatch(1);
        ByteSequence value = TestUtils.randomByteSequence();
        AtomicReference ref = new AtomicReference();
        try (Watch.Watcher watcher = this.client.getWatchClient().watch(key, response -> {
            ref.set(response);
            latch.countDown();
        });){
            this.client.getKVClient().put(key, value).get();
            latch.await(4L, TimeUnit.SECONDS);
            Assertions.assertThat(ref.get()).isNotNull();
            Assertions.assertThat((int)((WatchResponse)ref.get()).getEvents().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)((WatchEvent)((WatchResponse)ref.get()).getEvents().get(0)).getEventType()).isEqualTo((Object)WatchEvent.EventType.PUT);
            Assertions.assertThat((Object)((WatchEvent)((WatchResponse)ref.get()).getEvents().get(0)).getKeyValue().getKey()).isEqualTo((Object)key);
        }
    }

    @Test
    public void testMultipleWatch() throws Exception {
        ByteSequence key = TestUtils.randomByteSequence();
        CountDownLatch latch = new CountDownLatch(2);
        ByteSequence value = TestUtils.randomByteSequence();
        List res = Collections.synchronizedList(new ArrayList(2));
        try (Watch.Watcher w1 = this.client.getWatchClient().watch(key, response -> {
            res.add(response);
            latch.countDown();
        });
             Watch.Watcher w2 = this.client.getWatchClient().watch(key, response -> {
            res.add(response);
            latch.countDown();
        });){
            this.client.getKVClient().put(key, value).get();
            latch.await(4L, TimeUnit.SECONDS);
            Assertions.assertThat(res).hasSize(2);
            Assertions.assertThat((int)((WatchResponse)res.get(0)).getEvents().size()).isEqualTo(1);
            Assertions.assertThat((Comparable)((WatchEvent)((WatchResponse)res.get(0)).getEvents().get(0)).getEventType()).isEqualTo((Object)WatchEvent.EventType.PUT);
            Assertions.assertThat((Object)((WatchEvent)((WatchResponse)res.get(0)).getEvents().get(0)).getKeyValue().getKey()).isEqualTo((Object)key);
        }
    }

    @Test
    public void testWatchOnDelete() throws Exception {
        ByteSequence key = TestUtils.randomByteSequence();
        CountDownLatch latch = new CountDownLatch(1);
        ByteSequence value = TestUtils.randomByteSequence();
        AtomicReference ref = new AtomicReference();
        this.client.getKVClient().put(key, value).get();
        try (Watch.Watcher watcher = this.client.getWatchClient().watch(key, response -> {
            ref.set(response);
            latch.countDown();
        });){
            this.client.getKVClient().delete(key).get();
            latch.await(4L, TimeUnit.SECONDS);
            Assertions.assertThat(ref.get()).isNotNull();
            Assertions.assertThat((int)((WatchResponse)ref.get()).getEvents().size()).isEqualTo(1);
            WatchEvent event = (WatchEvent)((WatchResponse)ref.get()).getEvents().get(0);
            Assertions.assertThat((Comparable)event.getEventType()).isEqualTo((Object)WatchEvent.EventType.DELETE);
            Assertions.assertThat((boolean)Arrays.equals(event.getKeyValue().getKey().getBytes(), key.getBytes())).isTrue();
        }
    }

    @Test
    @Disabled
    public void testWatchCompacted() throws Exception {
        ByteSequence key = TestUtils.randomByteSequence();
        ByteSequence value = TestUtils.randomByteSequence();
        this.client.getKVClient().put(key, value).get();
        PutResponse putResponse = (PutResponse)this.client.getKVClient().put(key, value).get();
        this.client.getKVClient().compact(putResponse.getHeader().getRevision()).get();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference ref = new AtomicReference();
        WatchOption watchOption = WatchOption.newBuilder().withRevision(putResponse.getHeader().getRevision() - 1L).build();
        try (Watch.Watcher watcher = this.client.getWatchClient().watch(key, watchOption, Watch.listener(response -> {}, error -> {
            ref.set(error);
            latch.countDown();
        }));){
            latch.await(4L, TimeUnit.SECONDS);
            Assertions.assertThat((Throwable)((Throwable)ref.get())).isNotNull();
            Assertions.assertThat(((Throwable)ref.get()).getClass()).isEqualTo(CompactedException.class);
        }
    }

    @Test
    public void testWatchClose() throws Exception {
        ByteSequence key = TestUtils.randomByteSequence();
        ByteSequence value = TestUtils.randomByteSequence();
        ArrayList events = new ArrayList();
        CountDownLatch l1 = new CountDownLatch(1);
        CountDownLatch l2 = new CountDownLatch(1);
        try (Watch.Watcher watcher = this.client.getWatchClient().watch(key, response -> {
            events.add(response);
            l1.countDown();
        });){
            this.client.getKVClient().put(key, value).get();
            l1.await();
        }
        this.client.getKVClient().put(key, TestUtils.randomByteSequence()).get();
        l2.await(4L, TimeUnit.SECONDS);
        Assertions.assertThat(events).hasSize(1);
        Assertions.assertThat((List)((WatchResponse)events.get(0)).getEvents()).hasSize(1);
        Assertions.assertThat((Comparable)((WatchEvent)((WatchResponse)events.get(0)).getEvents().get(0)).getEventType()).isEqualTo((Object)WatchEvent.EventType.PUT);
        Assertions.assertThat((Object)((WatchEvent)((WatchResponse)events.get(0)).getEvents().get(0)).getKeyValue().getKey()).isEqualTo((Object)key);
        Assertions.assertThat((Object)((WatchEvent)((WatchResponse)events.get(0)).getEvents().get(0)).getKeyValue().getValue()).isEqualTo((Object)value);
    }
}

