package io.zeebe.redis.exporter;

import io.camunda.zeebe.exporter.api.context.Controller;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisConnectionStateListener;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/redis/exporter/RedisSender.class */
public class RedisSender {
    private final Logger logger;
    private final Controller controller;
    private final UniversalRedisConnection<String, ?> redisConnection;
    private final int batchSize;
    private final AtomicBoolean redisConnected = new AtomicBoolean(true);
    private final List<ImmutablePair<Long, RedisEvent>> deQueue = new ArrayList();

    public RedisSender(ExporterConfiguration exporterConfiguration, Controller controller, UniversalRedisConnection<String, ?> universalRedisConnection, Logger logger) {
        this.batchSize = exporterConfiguration.getBatchSize();
        this.controller = controller;
        this.redisConnection = universalRedisConnection;
        this.logger = logger;
        this.redisConnection.setAutoFlushCommands(false);
        this.redisConnection.addListener(new RedisConnectionStateListener() { // from class: io.zeebe.redis.exporter.RedisSender.1
            @Override // io.lettuce.core.RedisConnectionStateListener
            public void onRedisConnected(RedisChannelHandler<?, ?> redisChannelHandler, SocketAddress socketAddress) {
                RedisSender.this.redisConnected.set(true);
            }

            @Override // io.lettuce.core.RedisConnectionStateListener
            public void onRedisDisconnected(RedisChannelHandler<?, ?> redisChannelHandler) {
                RedisSender.this.redisConnected.set(false);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendFrom(EventQueue eventQueue) {
        if (this.redisConnected.get() && sendDeQueue()) {
            try {
                Long l = -1L;
                RedisStreamAsyncCommands<String, ?> asyncStreamCommands = this.redisConnection.asyncStreamCommands();
                ArrayList arrayList = new ArrayList();
                ImmutablePair<Long, RedisEvent> nextEvent = eventQueue.getNextEvent();
                while (nextEvent != null) {
                    for (int i = 0; i < this.batchSize; i++) {
                        this.deQueue.add(nextEvent);
                        RedisEvent value = nextEvent.getValue();
                        arrayList.add(asyncStreamCommands.xadd((RedisStreamAsyncCommands<String, ?>) value.stream, String.valueOf(value.key), value.value));
                        l = nextEvent.getKey();
                        nextEvent = eventQueue.getNextEvent();
                        if (nextEvent == null) {
                            break;
                        }
                    }
                    if (arrayList.size() > 0) {
                        this.redisConnection.flushCommands();
                        if (!LettuceFutures.awaitAll(7L, TimeUnit.SECONDS, (Future[]) arrayList.toArray(new RedisFuture[arrayList.size()]))) {
                            break;
                        }
                        this.controller.updateLastExportedRecordPosition(l.longValue());
                        this.deQueue.clear();
                        this.logger.debug("Exported {} events to Redis", Integer.valueOf(arrayList.size()));
                        arrayList.clear();
                    }
                }
            } catch (RedisCommandTimeoutException | RedisConnectionException e) {
                this.logger.error("Error when sending events to Redis due to possible Redis unavailability: {}", e.getMessage());
            } catch (Exception e2) {
                this.logger.error("Error when sending events to Redis", e2);
            }
        }
    }

    private boolean sendDeQueue() {
        if (this.deQueue.isEmpty()) {
            return true;
        }
        try {
            Long l = -1L;
            RedisStreamAsyncCommands<String, ?> asyncStreamCommands = this.redisConnection.asyncStreamCommands();
            ArrayList arrayList = new ArrayList();
            for (ImmutablePair<Long, RedisEvent> immutablePair : this.deQueue) {
                RedisEvent value = immutablePair.getValue();
                arrayList.add(asyncStreamCommands.xadd((RedisStreamAsyncCommands<String, ?>) value.stream, String.valueOf(value.key), value.value));
                l = immutablePair.getKey();
            }
            this.redisConnection.flushCommands();
            if (!LettuceFutures.awaitAll(7L, TimeUnit.SECONDS, (Future[]) arrayList.toArray(new RedisFuture[arrayList.size()]))) {
                return false;
            }
            this.controller.updateLastExportedRecordPosition(l.longValue());
            this.logger.debug("Exported {} dequeued events to Redis", Integer.valueOf(arrayList.size()));
            this.deQueue.clear();
            return true;
        } catch (RedisCommandTimeoutException | RedisConnectionException e) {
            this.logger.error("Error when sending dequeued events to Redis due to possible Redis unavailability: {}", e.getMessage());
            return false;
        } catch (Exception e2) {
            this.logger.error("Error when sending dequeued events to Redis", e2);
            return false;
        }
    }
}
