package com.hazelcast.client.impl.proxy;

import com.hazelcast.client.config.ClientReliableTopicConfig;
import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.spi.ClientContext;
import com.hazelcast.client.impl.spi.ClientProxy;
import com.hazelcast.config.cp.RaftAlgorithmConfig;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.util.ConcurrencyUtil;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.topic.ITopic;
import com.hazelcast.topic.LocalTopicStats;
import com.hazelcast.topic.MessageListener;
import com.hazelcast.topic.ReliableMessageListener;
import com.hazelcast.topic.TopicOverloadException;
import com.hazelcast.topic.TopicOverloadPolicy;
import com.hazelcast.topic.impl.reliable.MessageRunner;
import com.hazelcast.topic.impl.reliable.ReliableMessageListenerAdapter;
import com.hazelcast.topic.impl.reliable.ReliableTopicMessage;
import com.hazelcast.topic.impl.reliable.ReliableTopicService;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/client/impl/proxy/ClientReliableTopicProxy.class */
public class ClientReliableTopicProxy<E> extends ClientProxy implements ITopic<E> {
    private static final String NULL_MESSAGE_IS_NOT_ALLOWED = "Null message is not allowed!";
    private static final String NULL_LISTENER_IS_NOT_ALLOWED = "Null listener is not allowed!";
    private static final int MAX_BACKOFF = 2000;
    private static final int INITIAL_BACKOFF_MS = 100;
    private final ILogger logger;
    private final ConcurrentMap<UUID, MessageRunner<E>> runnersMap;
    private final Ringbuffer<ReliableTopicMessage> ringbuffer;
    private final SerializationService serializationService;
    private final ClientReliableTopicConfig config;
    private final Executor executor;
    private final TopicOverloadPolicy overloadPolicy;

    public ClientReliableTopicProxy(String str, ClientContext clientContext, HazelcastClientInstanceImpl hazelcastClientInstanceImpl) {
        super(ReliableTopicService.SERVICE_NAME, str, clientContext);
        this.runnersMap = new ConcurrentHashMap();
        this.ringbuffer = hazelcastClientInstanceImpl.getRingbuffer(RingbufferService.TOPIC_RB_PREFIX + str);
        this.serializationService = hazelcastClientInstanceImpl.getSerializationService();
        this.config = hazelcastClientInstanceImpl.getClientConfig().getReliableTopicConfig(str);
        this.executor = getExecutor(this.config);
        this.overloadPolicy = this.config.getTopicOverloadPolicy();
        this.logger = hazelcastClientInstanceImpl.getLoggingService().getLogger(getClass());
    }

    private Executor getExecutor(ClientReliableTopicConfig clientReliableTopicConfig) {
        Executor executor = clientReliableTopicConfig.getExecutor();
        if (executor == null) {
            executor = ConcurrencyUtil.getDefaultAsyncExecutor();
        }
        return executor;
    }

    @Override // com.hazelcast.topic.ITopic
    public void publish(@Nonnull E e) {
        Preconditions.checkNotNull(e, NULL_MESSAGE_IS_NOT_ALLOWED);
        try {
            ReliableTopicMessage reliableTopicMessage = new ReliableTopicMessage(this.serializationService.toData(e), null);
            switch (this.overloadPolicy) {
                case ERROR:
                    addOrFail(reliableTopicMessage);
                    break;
                case DISCARD_OLDEST:
                    addOrOverwrite(reliableTopicMessage);
                    break;
                case DISCARD_NEWEST:
                    this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).toCompletableFuture().get();
                    break;
                case BLOCK:
                    addWithBackoff(Collections.singleton(reliableTopicMessage));
                    break;
                default:
                    throw new IllegalArgumentException("Unknown overloadPolicy:" + this.overloadPolicy);
            }
        } catch (Exception e2) {
            throw ((RuntimeException) ExceptionUtil.peel(e2, null, "Failed to publish message: " + e + " to topic:" + getName()));
        }
    }

    @Override // com.hazelcast.topic.ITopic
    public CompletionStage<Void> publishAsync(@Nonnull E e) {
        Preconditions.checkNotNull(e, NULL_MESSAGE_IS_NOT_ALLOWED);
        return publishAllAsync(Collections.singleton(e));
    }

    private void addOrOverwrite(ReliableTopicMessage reliableTopicMessage) throws Exception {
        this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.OVERWRITE).toCompletableFuture().get();
    }

    private void addOrFail(ReliableTopicMessage reliableTopicMessage) throws Exception {
        if (this.ringbuffer.addAsync(reliableTopicMessage, OverflowPolicy.FAIL).toCompletableFuture().get().longValue() == -1) {
            throw new TopicOverloadException("Failed to publish message: " + reliableTopicMessage + " on topic:" + this.name);
        }
    }

    private void addWithBackoff(Collection<ReliableTopicMessage> collection) throws Exception {
        long j = 100;
        while (this.ringbuffer.addAllAsync(collection, OverflowPolicy.FAIL).toCompletableFuture().get().longValue() == -1) {
            TimeUnit.MILLISECONDS.sleep(j);
            j *= 2;
            if (j > RaftAlgorithmConfig.DEFAULT_LEADER_ELECTION_TIMEOUT_IN_MILLIS) {
                j = 2000;
            }
        }
    }

    @Override // com.hazelcast.topic.ITopic
    @Nonnull
    public UUID addMessageListener(@Nonnull MessageListener<E> messageListener) {
        Preconditions.checkNotNull(messageListener, NULL_LISTENER_IS_NOT_ALLOWED);
        UUID newUnsecureUUID = UuidUtil.newUnsecureUUID();
        ClientReliableMessageRunner clientReliableMessageRunner = new ClientReliableMessageRunner(newUnsecureUUID, toReliableMessageListener(messageListener), this.ringbuffer, this.name, this.config.getReadBatchSize(), this.serializationService, this.executor, this.runnersMap, this.logger);
        this.runnersMap.put(newUnsecureUUID, clientReliableMessageRunner);
        clientReliableMessageRunner.next();
        return newUnsecureUUID;
    }

    public boolean isListenerCancelled(@Nonnull UUID uuid) {
        Preconditions.checkNotNull(uuid, "registrationId can't be null");
        MessageRunner<E> messageRunner = this.runnersMap.get(uuid);
        if (messageRunner == null) {
            return true;
        }
        return messageRunner.isCancelled();
    }

    private ReliableMessageListener<E> toReliableMessageListener(MessageListener<E> messageListener) {
        return messageListener instanceof ReliableMessageListener ? (ReliableMessageListener) messageListener : new ReliableMessageListenerAdapter(messageListener);
    }

    @Override // com.hazelcast.topic.ITopic
    public boolean removeMessageListener(@Nonnull UUID uuid) {
        Preconditions.checkNotNull(uuid, "registrationId can't be null");
        MessageRunner<E> messageRunner = this.runnersMap.get(uuid);
        if (messageRunner == null) {
            return false;
        }
        messageRunner.cancel();
        return true;
    }

    @Override // com.hazelcast.topic.ITopic
    @Nonnull
    public LocalTopicStats getLocalTopicStats() {
        throw new UnsupportedOperationException("Locality is ambiguous for client!");
    }

    @Override // com.hazelcast.topic.ITopic
    public void publishAll(@Nonnull Collection<? extends E> collection) {
        Preconditions.checkNotNull(collection, NULL_MESSAGE_IS_NOT_ALLOWED);
        Preconditions.checkNoNullInside(collection, NULL_MESSAGE_IS_NOT_ALLOWED);
        try {
            List list = (List) collection.stream().map(obj -> {
                return new ReliableTopicMessage(toData(obj), null);
            }).collect(Collectors.toList());
            switch (this.overloadPolicy) {
                case ERROR:
                    if (this.ringbuffer.addAllAsync(list, OverflowPolicy.FAIL).toCompletableFuture().get().longValue() == -1) {
                        throw new TopicOverloadException("Failed to publish messages: " + collection + " on topic:" + getName());
                    }
                    break;
                case DISCARD_OLDEST:
                    this.ringbuffer.addAllAsync(list, OverflowPolicy.OVERWRITE).toCompletableFuture().get();
                    break;
                case DISCARD_NEWEST:
                    this.ringbuffer.addAllAsync(list, OverflowPolicy.FAIL).toCompletableFuture().get();
                    break;
                case BLOCK:
                    addWithBackoff(list);
                    break;
                default:
                    throw new IllegalArgumentException("Unknown overloadPolicy:" + this.overloadPolicy);
            }
        } catch (Exception e) {
            throw ((RuntimeException) ExceptionUtil.peel(e, null, "Failed to publish messages: " + collection + " to topic:" + getName()));
        }
    }

    @Override // com.hazelcast.topic.ITopic
    public CompletionStage<Void> publishAllAsync(@Nonnull Collection<? extends E> collection) {
        Preconditions.checkNotNull(collection, NULL_MESSAGE_IS_NOT_ALLOWED);
        Preconditions.checkNoNullInside(collection, NULL_MESSAGE_IS_NOT_ALLOWED);
        InternalCompletableFuture<Void> internalCompletableFuture = new InternalCompletableFuture<>();
        try {
            List<ReliableTopicMessage> list = (List) collection.stream().map(obj -> {
                return new ReliableTopicMessage(toData(obj), null);
            }).collect(Collectors.toList());
            switch (this.overloadPolicy) {
                case ERROR:
                    addAsyncOrFail(collection, internalCompletableFuture, list);
                    break;
                case DISCARD_OLDEST:
                    addAsync(list, OverflowPolicy.OVERWRITE);
                    break;
                case DISCARD_NEWEST:
                    addAsync(list, OverflowPolicy.FAIL);
                    break;
                case BLOCK:
                    addAsyncAndBlock(internalCompletableFuture, list, 100L);
                    break;
                default:
                    throw new IllegalArgumentException("Unknown overloadPolicy:" + this.overloadPolicy);
            }
            return internalCompletableFuture;
        } catch (Exception e) {
            throw ((RuntimeException) ExceptionUtil.peel(e, null, String.format("Failed to publish messages: %s on topic: %s", collection, getName())));
        }
    }

    private void addAsyncAndBlock(InternalCompletableFuture<Void> internalCompletableFuture, List<ReliableTopicMessage> list, long j) {
        this.ringbuffer.addAllAsync(list, OverflowPolicy.FAIL).whenCompleteAsync((l, th) -> {
            if (th != null) {
                internalCompletableFuture.completeExceptionally(th);
            } else if (l.longValue() == -1) {
                getContext().getTaskScheduler().schedule(() -> {
                    addAsyncAndBlock(internalCompletableFuture, list, Math.min(j * 2, RaftAlgorithmConfig.DEFAULT_LEADER_ELECTION_TIMEOUT_IN_MILLIS));
                }, j, TimeUnit.MILLISECONDS);
            } else {
                internalCompletableFuture.complete(null);
            }
        });
    }

    private void addAsyncOrFail(@Nonnull Collection<? extends E> collection, InternalCompletableFuture<Void> internalCompletableFuture, List<ReliableTopicMessage> list) {
        this.ringbuffer.addAllAsync(list, OverflowPolicy.FAIL).whenCompleteAsync((l, th) -> {
            if (th != null) {
                internalCompletableFuture.completeExceptionally(th);
            } else if (l.longValue() == -1) {
                internalCompletableFuture.completeExceptionally(new TopicOverloadException("Failed to publish messages: " + collection + " on topic:" + getName()));
            } else {
                internalCompletableFuture.complete(null);
            }
        });
    }

    private InternalCompletableFuture<Void> addAsync(List<ReliableTopicMessage> list, OverflowPolicy overflowPolicy) {
        InternalCompletableFuture<Void> internalCompletableFuture = new InternalCompletableFuture<>();
        this.ringbuffer.addAllAsync(list, overflowPolicy).whenCompleteAsync((l, th) -> {
            if (th != null) {
                internalCompletableFuture.completeExceptionally(th);
            } else {
                internalCompletableFuture.complete(null);
            }
        });
        return internalCompletableFuture;
    }

    public Ringbuffer getRingbuffer() {
        return this.ringbuffer;
    }

    public String toString() {
        return "ITopic{name='" + this.name + "'}";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.client.impl.spi.ClientProxy
    public void postDestroy() {
        this.ringbuffer.destroy();
    }
}
