/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.spanner.task.leader;

import com.google.cloud.Timestamp;
import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.kafka.internal.model.PartitionState;
import io.debezium.connector.spanner.kafka.internal.model.PartitionStateEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.task.PartitionFactory;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.TaskTestHelper;
import io.debezium.connector.spanner.task.leader.LeaderService;
import io.debezium.pipeline.ErrorHandler;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
class LeaderActionServiceTest {
    private static final String TASK_UID = "leader-007";
    @Mock
    private MetricsEventPublisher metricsEventPublisher;
    private TaskSyncContextHolder taskSyncContextHolder;
    private LeaderService leaderService;

    LeaderActionServiceTest() {
    }

    @BeforeEach
    void init() {
        this.taskSyncContextHolder = new TaskSyncContextHolder(this.metricsEventPublisher);
        this.taskSyncContextHolder.init(TaskSyncContext.builder().taskUid(TASK_UID).rebalanceState(RebalanceState.START_INITIAL_SYNC).build());
        SpannerConnectorConfig spannerConnectorConfig = (SpannerConnectorConfig)Mockito.mock(SpannerConnectorConfig.class);
        Mockito.when((Object)spannerConnectorConfig.startTime()).thenReturn((Object)Timestamp.now());
        Mockito.when((Object)spannerConnectorConfig.getAwaitTaskAnswerTimeout()).thenReturn((Object)Duration.of(120000L, ChronoUnit.MILLIS));
        this.leaderService = new LeaderService(this.taskSyncContextHolder, spannerConnectorConfig, event -> {}, (ErrorHandler)Mockito.mock(ErrorHandler.class), (PartitionFactory)Mockito.mock(PartitionFactory.class), (MetricsEventPublisher)Mockito.mock(MetricsEventPublisher.class));
    }

    @Test
    void isStartFromScratch_false_leaderInProgress() {
        TaskState taskState1 = this.generateTaskState("t1", 2, 1, PartitionStateEnum.FINISHED, PartitionStateEnum.REMOVED);
        TaskState taskState2 = this.generateTaskState("t2", 2, 1, PartitionStateEnum.REMOVED, PartitionStateEnum.FINISHED);
        TaskState leaderState = this.generateTaskState(TASK_UID, 1, 4, PartitionStateEnum.CREATED, PartitionStateEnum.CREATED);
        this.taskSyncContextHolder.updateAndGet(c -> c.toBuilder().currentTaskState(leaderState).taskStates(TaskTestHelper.createTaskStateMap(taskState1, taskState2)).build());
        boolean startFromScratch = this.leaderService.isStartFromScratch();
        Assertions.assertThat((boolean)startFromScratch).isFalse();
    }

    @Test
    void isStartFromScratch_false_someTaskInProgress() {
        TaskState taskState1 = this.generateTaskState("t1", 2, 1, PartitionStateEnum.FINISHED, PartitionStateEnum.REMOVED);
        TaskState taskState2 = this.generateTaskState("t2", 2, 1, PartitionStateEnum.REMOVED, PartitionStateEnum.READY_FOR_STREAMING);
        TaskState leaderState = this.generateTaskState(TASK_UID, 1, 4, PartitionStateEnum.FINISHED, PartitionStateEnum.FINISHED);
        this.taskSyncContextHolder.updateAndGet(c -> c.toBuilder().currentTaskState(leaderState).taskStates(TaskTestHelper.createTaskStateMap(taskState1, taskState2)).build());
        boolean startFromScratch = this.leaderService.isStartFromScratch();
        Assertions.assertThat((boolean)startFromScratch).isFalse();
    }

    @Test
    void isStartFromScratch_true_someTaskInProgress() {
        TaskState taskState1 = this.generateTaskState("t1", 2, 1, PartitionStateEnum.FINISHED, PartitionStateEnum.REMOVED);
        TaskState taskState2 = this.generateTaskState("t2", 2, 1, PartitionStateEnum.REMOVED, PartitionStateEnum.REMOVED);
        TaskState leaderState = this.generateTaskState(TASK_UID, 1, 4, PartitionStateEnum.FINISHED, PartitionStateEnum.FINISHED);
        this.taskSyncContextHolder.updateAndGet(c -> c.toBuilder().currentTaskState(leaderState).taskStates(TaskTestHelper.createTaskStateMap(taskState1, taskState2)).build());
        boolean startFromScratch = this.leaderService.isStartFromScratch();
        Assertions.assertThat((boolean)startFromScratch).isTrue();
    }

    @Test
    @Timeout(value=10L)
    void awaitAllNewTaskStateUpdatesWorks() throws InterruptedException {
        String consumer0 = "consumer0";
        String consumer1 = "consumer1";
        String consumer2 = "consumer2";
        String consumer3 = "consumer3";
        TaskState leaderState = TaskState.builder().taskUid(consumer1.toUpperCase()).consumerId(consumer1).build();
        this.taskSyncContextHolder.updateAndGet(context -> context.toBuilder().consumerId(consumer1).build());
        this.taskSyncContextHolder.updateAndGet(c -> c.toBuilder().currentTaskState(leaderState).taskStates(Map.of()).build());
        Set<String> consumers = Set.of(consumer1, consumer2, consumer3);
        Set<String> consumersWithoutLeader = Set.of(consumer2, consumer3);
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(consumers.size(), false, consumersWithoutLeader);
        Thread populator = new Thread(() -> {
            try {
                while (!queue.isEmpty()) {
                    Thread.sleep(40L);
                    String consumer = (String)queue.take();
                    TaskState newTask = TaskState.builder().taskUid(consumer.toUpperCase()).consumerId(consumer).build();
                    HashMap<String, TaskState> newTaskStates = new HashMap<String, TaskState>(this.taskSyncContextHolder.get().getTaskStates());
                    newTaskStates.put(newTask.getTaskUid(), newTask);
                    this.taskSyncContextHolder.updateAndGet(c -> c.toBuilder().taskStates(newTaskStates).build());
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
        populator.start();
        Map result = this.leaderService.awaitAllNewTaskStateUpdates(consumers, this.taskSyncContextHolder.get().getRebalanceGenerationId());
        Assertions.assertThat((Map)result).hasSameSizeAs(consumers);
        Assertions.assertThat((Map)result).doesNotContainKey((Object)consumer0);
        Assertions.assertThat((String)((String)result.get(consumer1))).isEqualTo(consumer1.toUpperCase());
        Assertions.assertThat((String)((String)result.get(consumer2))).isEqualTo(consumer2.toUpperCase());
        Assertions.assertThat((String)((String)result.get(consumer3))).isEqualTo(consumer3.toUpperCase());
    }

    private TaskState generateTaskState(String taskUid, int numPartitions, int numSharedPartitions, PartitionStateEnum partitionStatus, PartitionStateEnum sharedPartitionStatus) {
        List<PartitionState> partitions = TaskTestHelper.generatePartitions(numPartitions, () -> PartitionState.builder().token(UUID.randomUUID().toString()).state(partitionStatus).build());
        List<PartitionState> sharedPartitions = TaskTestHelper.generatePartitions(numSharedPartitions, () -> PartitionState.builder().token(UUID.randomUUID().toString()).state(sharedPartitionStatus).build());
        return TaskState.builder().taskUid(taskUid).partitions(partitions).sharedPartitions(sharedPartitions).build();
    }
}

