package com.hazelcast.topic.impl.reliable;

import com.hazelcast.config.ListenerConfig;
import com.hazelcast.config.ReliableTopicConfig;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.MessageListener;
import com.hazelcast.monitor.LocalTopicStats;
import com.hazelcast.monitor.impl.LocalTopicStatsImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ClassLoaderUtil;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.spi.AbstractDistributedObject;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.topic.ReliableMessageListener;
import com.hazelcast.topic.TopicOverloadException;
import com.hazelcast.topic.TopicOverloadPolicy;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.Preconditions;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.springframework.util.backoff.ExponentialBackOff;

/* loaded from: input_file:lib/hazelcast-3.5.3.jar:com/hazelcast/topic/impl/reliable/ReliableTopicProxy.class */
public class ReliableTopicProxy<E> extends AbstractDistributedObject<ReliableTopicService> implements ITopic<E> {
    public static final int MAX_BACKOFF = 2000;
    public static final int INITIAL_BACKOFF_MS = 100;
    final Ringbuffer<ReliableTopicMessage> ringbuffer;
    final Executor executor;
    final ConcurrentMap<String, ReliableMessageListenerRunner> runnersMap;
    final LocalTopicStatsImpl localTopicStats;
    final ReliableTopicConfig topicConfig;
    final TopicOverloadPolicy overloadPolicy;
    private final NodeEngine nodeEngine;
    private final Address thisAddress;
    private final String name;

    public ReliableTopicProxy(String str, NodeEngine nodeEngine, ReliableTopicService reliableTopicService, ReliableTopicConfig reliableTopicConfig) {
        super(nodeEngine, reliableTopicService);
        this.runnersMap = new ConcurrentHashMap();
        this.localTopicStats = new LocalTopicStatsImpl();
        this.name = str;
        this.topicConfig = reliableTopicConfig;
        this.nodeEngine = nodeEngine;
        this.ringbuffer = nodeEngine.getHazelcastInstance().getRingbuffer(RingbufferService.TOPIC_RB_PREFIX + str);
        this.executor = initExecutor(nodeEngine, reliableTopicConfig);
        this.thisAddress = nodeEngine.getThisAddress();
        this.overloadPolicy = reliableTopicConfig.getTopicOverloadPolicy();
        Iterator<ListenerConfig> it = reliableTopicConfig.getMessageListenerConfigs().iterator();
        while (it.hasNext()) {
            addMessageListener(it.next());
        }
    }

    @Override // com.hazelcast.spi.AbstractDistributedObject, com.hazelcast.core.DistributedObject
    public String getServiceName() {
        return ReliableTopicService.SERVICE_NAME;
    }

    @Override // com.hazelcast.core.DistributedObject
    public String getName() {
        return this.name;
    }

    private void addMessageListener(ListenerConfig listenerConfig) {
        NodeEngine nodeEngine = getNodeEngine();
        MessageListener<E> loadListener = loadListener(listenerConfig);
        if (loadListener == null) {
            return;
        }
        if (loadListener instanceof HazelcastInstanceAware) {
            ((HazelcastInstanceAware) loadListener).setHazelcastInstance(nodeEngine.getHazelcastInstance());
        }
        addMessageListener(loadListener);
    }

    private MessageListener loadListener(ListenerConfig listenerConfig) {
        try {
            MessageListener messageListener = (MessageListener) listenerConfig.getImplementation();
            if (messageListener != null) {
                return messageListener;
            }
            if (listenerConfig.getClassName() != null) {
                Object newInstance = ClassLoaderUtil.newInstance(this.nodeEngine.getConfigClassLoader(), listenerConfig.getClassName());
                if (!(newInstance instanceof MessageListener)) {
                    throw new HazelcastException("class '" + listenerConfig.getClassName() + "' is not an instance of " + MessageListener.class.getName());
                }
                messageListener = (MessageListener) newInstance;
            }
            return messageListener;
        } catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private Executor initExecutor(NodeEngine nodeEngine, ReliableTopicConfig reliableTopicConfig) {
        Executor executor = reliableTopicConfig.getExecutor();
        if (executor == null) {
            executor = nodeEngine.getExecutionService().getExecutor(ExecutionService.ASYNC_EXECUTOR);
        }
        return executor;
    }

    @Override // com.hazelcast.core.ITopic
    public void publish(E e) {
        try {
            ReliableTopicMessage reliableTopicMessage = new ReliableTopicMessage(this.nodeEngine.toData(e), this.thisAddress);
            switch (this.overloadPolicy) {
                case ERROR:
                    addOrFail(reliableTopicMessage);
                    break;
                case DISCARD_OLDEST:
                    addOrOverwrite(reliableTopicMessage);
                    break;
                case DISCARD_NEWEST:
                    this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).get();
                    break;
                case BLOCK:
                    addWithBackoff(reliableTopicMessage);
                    break;
                default:
                    throw new IllegalArgumentException("Unknown overloadPolicy:" + this.overloadPolicy);
            }
            this.localTopicStats.incrementPublishes();
        } catch (RuntimeException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new HazelcastException("Failed to publish message: " + e + " to topic:" + getName(), e3);
        }
    }

    private Long addOrOverwrite(ReliableTopicMessage reliableTopicMessage) throws Exception {
        return this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.OVERWRITE).get();
    }

    private void addOrFail(ReliableTopicMessage reliableTopicMessage) throws Exception {
        if (this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).get().longValue() == -1) {
            throw new TopicOverloadException("Failed to publish message: " + reliableTopicMessage + " on topic:" + getName());
        }
    }

    private void addWithBackoff(ReliableTopicMessage reliableTopicMessage) throws Exception {
        long j = 100;
        while (this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).get().longValue() == -1) {
            TimeUnit.MILLISECONDS.sleep(j);
            j *= 2;
            if (j > ExponentialBackOff.DEFAULT_INITIAL_INTERVAL) {
                j = 2000;
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [com.hazelcast.topic.ReliableMessageListener] */
    @Override // com.hazelcast.core.ITopic
    public String addMessageListener(MessageListener<E> messageListener) {
        Preconditions.checkNotNull(messageListener, "listener can't be null");
        String uuid = UUID.randomUUID().toString();
        ReliableMessageListenerRunner reliableMessageListenerRunner = new ReliableMessageListenerRunner(uuid, messageListener instanceof ReliableMessageListener ? (ReliableMessageListener) messageListener : new ReliableMessageListenerAdapter(messageListener), this);
        this.runnersMap.put(uuid, reliableMessageListenerRunner);
        reliableMessageListenerRunner.next();
        return uuid;
    }

    @Override // com.hazelcast.core.ITopic
    public boolean removeMessageListener(String str) {
        Preconditions.checkNotNull(str, "registrationId can't be null");
        ReliableMessageListenerRunner reliableMessageListenerRunner = this.runnersMap.get(str);
        if (reliableMessageListenerRunner == null) {
            return false;
        }
        reliableMessageListenerRunner.cancel();
        return true;
    }

    @Override // com.hazelcast.spi.AbstractDistributedObject
    protected void postDestroy() {
        this.ringbuffer.destroy();
    }

    @Override // com.hazelcast.core.ITopic
    public LocalTopicStats getLocalTopicStats() {
        return this.localTopicStats;
    }
}
