/*
 * Decompiled with CFR 0.152.
 */
package io.eventuate.messaging.redis.spring.consumer;

import io.eventuate.messaging.partitionmanagement.Assignment;
import io.eventuate.messaging.partitionmanagement.Coordinator;
import io.eventuate.messaging.partitionmanagement.CoordinatorFactory;
import io.eventuate.messaging.partitionmanagement.SubscriptionLeaderHook;
import io.eventuate.messaging.partitionmanagement.SubscriptionLifecycleHook;
import io.eventuate.messaging.redis.spring.common.RedisUtil;
import io.eventuate.messaging.redis.spring.consumer.ChannelProcessor;
import io.eventuate.messaging.redis.spring.consumer.RedisKeyUtil;
import io.eventuate.messaging.redis.spring.consumer.RedisMessageHandler;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

public class Subscription {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private final String subscriptionId;
    private String consumerId;
    private RedisTemplate<String, String> redisTemplate;
    private String subscriberId;
    private RedisMessageHandler handler;
    private long timeInMillisecondsToSleepWhenKeyDoesNotExist;
    private long blockStreamTimeInMilliseconds;
    private ExecutorService executorService = Executors.newCachedThreadPool();
    private Coordinator coordinator;
    private Map<String, Set<Integer>> currentPartitionsByChannel = new HashMap<String, Set<Integer>>();
    private ConcurrentHashMap<ChannelPartition, ChannelProcessor> channelProcessorsByChannelAndPartition = new ConcurrentHashMap();
    private Optional<SubscriptionLifecycleHook> subscriptionLifecycleHook = Optional.empty();
    private Optional<SubscriptionLeaderHook> leaderHook = Optional.empty();
    private Optional<Runnable> closingCallback = Optional.empty();

    public Subscription(String subscriptionId, String consumerId, RedisTemplate<String, String> redisTemplate, String subscriberId, Set<String> channels, RedisMessageHandler handler, CoordinatorFactory coordinatorFactory, long timeInMillisecondsToSleepWhenKeyDoesNotExist, long blockStreamTimeInMilliseconds) {
        this.subscriptionId = subscriptionId;
        this.consumerId = consumerId;
        this.redisTemplate = redisTemplate;
        this.subscriberId = subscriberId;
        this.handler = handler;
        this.timeInMillisecondsToSleepWhenKeyDoesNotExist = timeInMillisecondsToSleepWhenKeyDoesNotExist;
        this.blockStreamTimeInMilliseconds = blockStreamTimeInMilliseconds;
        channels.forEach(channelName -> {
            Set cfr_ignored_0 = this.currentPartitionsByChannel.put((String)channelName, new HashSet());
        });
        this.coordinator = coordinatorFactory.makeCoordinator(subscriberId, channels, subscriptionId, this::assignmentUpdated, RedisKeyUtil.keyForLeaderLock(subscriberId), leadershipController -> this.leaderHook.ifPresent(hook -> hook.leaderUpdated(Boolean.valueOf(true), subscriptionId)), () -> this.leaderHook.ifPresent(hook -> hook.leaderUpdated(Boolean.valueOf(false), subscriptionId)));
        this.logger.info("subscription created (channels = {}, {})", channels, (Object)this.identificationInformation());
    }

    public void setSubscriptionLifecycleHook(SubscriptionLifecycleHook subscriptionLifecycleHook) {
        this.subscriptionLifecycleHook = Optional.ofNullable(subscriptionLifecycleHook);
    }

    public void setLeaderHook(SubscriptionLeaderHook leaderHook) {
        this.leaderHook = Optional.ofNullable(leaderHook);
    }

    public void setClosingCallback(Runnable closingCallback) {
        this.closingCallback = Optional.of(closingCallback);
    }

    private void assignmentUpdated(Assignment assignment) {
        this.logger.info("assignment is updated (assignment = {}, {})", (Object)assignment, (Object)this.identificationInformation());
        for (String channelName : this.currentPartitionsByChannel.keySet()) {
            Set<Integer> currentPartitions = this.currentPartitionsByChannel.get(channelName);
            Set expectedPartitions = (Set)assignment.getPartitionAssignmentsByChannel().get(channelName);
            Set<Integer> resignedPartitions = currentPartitions.stream().filter(currentPartition -> !expectedPartitions.contains(currentPartition)).collect(Collectors.toSet());
            this.logger.info("partitions resigned (resigned partitions = {}, {})", resignedPartitions, (Object)this.identificationInformation());
            Set<Integer> assignedPartitions = expectedPartitions.stream().filter(expectedPartition -> !currentPartitions.contains(expectedPartition)).collect(Collectors.toSet());
            this.logger.info("partitions asigned (resigned partitions = {}, {})", (Object)assignment, (Object)this.identificationInformation());
            resignedPartitions.forEach(resignedPartition -> this.channelProcessorsByChannelAndPartition.remove(new ChannelPartition(channelName, (int)resignedPartition)).stop());
            assignedPartitions.forEach(assignedPartition -> {
                ChannelProcessor channelProcessor = new ChannelProcessor(this.redisTemplate, this.subscriberId, RedisUtil.channelToRedisStream((String)channelName, (int)assignedPartition), this.handler, this.identificationInformation(), this.timeInMillisecondsToSleepWhenKeyDoesNotExist, this.blockStreamTimeInMilliseconds);
                this.executorService.submit(channelProcessor::process);
                this.channelProcessorsByChannelAndPartition.put(new ChannelPartition(channelName, (int)assignedPartition), channelProcessor);
            });
            this.currentPartitionsByChannel.put(channelName, expectedPartitions);
            this.subscriptionLifecycleHook.ifPresent(sh -> sh.partitionsUpdated(channelName, this.subscriptionId, expectedPartitions));
        }
    }

    public void close() {
        this.coordinator.close();
        this.channelProcessorsByChannelAndPartition.values().forEach(ChannelProcessor::stop);
    }

    private String identificationInformation() {
        return String.format("(consumerId = %s, subscriptionId = %s, subscriberId = %s)", this.consumerId, this.subscriptionId, this.subscriberId);
    }

    private static class ChannelPartition {
        private String channel;
        private int partition;

        public ChannelPartition() {
        }

        public ChannelPartition(String channel, int partition) {
            this.channel = channel;
            this.partition = partition;
        }

        public String getChannel() {
            return this.channel;
        }

        public void setChannel(String channel) {
            this.channel = channel;
        }

        public int getPartition() {
            return this.partition;
        }

        public void setPartition(int partition) {
            this.partition = partition;
        }

        public int hashCode() {
            return HashCodeBuilder.reflectionHashCode((Object)this);
        }

        public boolean equals(Object obj) {
            return EqualsBuilder.reflectionEquals((Object)this, (Object)obj);
        }
    }
}

