package org.kinotic.continuum.internal.util;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.cache.Cache;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.EventType;
import org.apache.commons.lang3.Validate;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.lang.IgniteFuture;
import org.kinotic.continuum.core.api.event.StreamData;
import org.kinotic.continuum.core.api.event.StreamOperation;
import org.kinotic.continuum.internal.core.api.aignite.Observer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/kinotic/continuum/internal/util/IgniteUtils.class */
public class IgniteUtils {
    private static final Logger log = LoggerFactory.getLogger(IgniteUtils.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.kinotic.continuum.internal.util.IgniteUtils$1, reason: invalid class name */
    /* loaded from: input_file:org/kinotic/continuum/internal/util/IgniteUtils$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$javax$cache$event$EventType = new int[EventType.values().length];

        static {
            try {
                $SwitchMap$javax$cache$event$EventType[EventType.CREATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$javax$cache$event$EventType[EventType.UPDATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$javax$cache$event$EventType[EventType.REMOVED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$javax$cache$event$EventType[EventType.EXPIRED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public static <T> Mono<T> futureToMono(IgniteFuture<T> igniteFuture) {
        return Mono.create(monoSink -> {
            try {
                futureToMonoSink(monoSink, igniteFuture);
            } catch (Exception e) {
                monoSink.error(e);
            }
        });
    }

    public static <T> Mono<T> futureToMono(Supplier<IgniteFuture<T>> supplier) {
        return Mono.create(monoSink -> {
            try {
                futureToMonoSink(monoSink, (IgniteFuture) supplier.get());
            } catch (Exception e) {
                monoSink.error(e);
            }
        });
    }

    public static <T> void futureToMonoSink(MonoSink<T> monoSink, IgniteFuture<T> igniteFuture) {
        igniteFuture.listen(igniteFuture2 -> {
            try {
                monoSink.success(igniteFuture2.get());
            } catch (Exception e) {
                monoSink.error(e);
            }
        });
    }

    public static <T> Flux<T> observerToFlux(Supplier<Observer<T>> supplier) {
        return Flux.create(fluxSink -> {
            Observer observer = (Observer) supplier.get();
            observer.completionHandler(r3 -> {
                fluxSink.complete();
            });
            Objects.requireNonNull(fluxSink);
            observer.exceptionHandler(fluxSink::error);
            Objects.requireNonNull(fluxSink);
            observer.handler(fluxSink::next);
            fluxSink.onCancel(() -> {
                try {
                    observer.close();
                } catch (Exception e) {
                    log.error("Exception trying to close Observer", e);
                }
            });
            observer.start();
        });
    }

    public static <I, T> StreamData<I, T> cacheEntryEventToStreamData(CacheEntryEvent<? extends I, ? extends T> cacheEntryEvent) {
        StreamOperation streamOperation = toStreamOperation(cacheEntryEvent.getEventType());
        return streamOperation == StreamOperation.REMOVE ? new StreamData<>(streamOperation, cacheEntryEvent.getKey(), cacheEntryEvent.getOldValue()) : new StreamData<>(streamOperation, cacheEntryEvent.getKey(), cacheEntryEvent.getValue());
    }

    public static <I, T> StreamData<I, T> cacheEntryToStreamData(Cache.Entry<I, T> entry) {
        return new StreamData<>(StreamOperation.EXISTING, entry.getKey(), entry.getValue());
    }

    public static StreamOperation toStreamOperation(EventType eventType) {
        StreamOperation streamOperation;
        switch (AnonymousClass1.$SwitchMap$javax$cache$event$EventType[eventType.ordinal()]) {
            case 1:
                streamOperation = StreamOperation.EXISTING;
                break;
            case 2:
                streamOperation = StreamOperation.UPDATE;
                break;
            case 3:
            case 4:
                streamOperation = StreamOperation.REMOVE;
                break;
            default:
                throw new IllegalArgumentException("Unknown EventType " + eventType.name());
        }
        return streamOperation;
    }

    public static <K, V> Flux<Long> countCacheEntriesContinuous(Ignite ignite, Vertx vertx, IgniteCache<K, V> igniteCache) {
        Validate.notNull(ignite, "Ignite must not be null", new Object[0]);
        Validate.notNull(vertx, "Vertx must not be null", new Object[0]);
        Validate.notNull(igniteCache, "The IgniteCache provided must not be null", new Object[0]);
        return Flux.create(fluxSink -> {
            AtomicLong atomicLong = new AtomicLong();
            Context orCreateContext = vertx.getOrCreateContext();
            ContinuousQueryWithTransformer continuousQueryWithTransformer = new ContinuousQueryWithTransformer();
            continuousQueryWithTransformer.setIncludeExpired(true);
            continuousQueryWithTransformer.setRemoteTransformerFactory(FactoryBuilder.factoryOf(cacheEntryEvent -> {
                long j = 0;
                if (cacheEntryEvent.getEventType() == EventType.CREATED) {
                    j = 1;
                } else if (cacheEntryEvent.getEventType() == EventType.REMOVED || cacheEntryEvent.getEventType() == EventType.EXPIRED) {
                    j = -1;
                }
                return Long.valueOf(j);
            }));
            continuousQueryWithTransformer.setRemoteFilterFactory(() -> {
                return cacheEntryEvent2 -> {
                    return cacheEntryEvent2.getEventType() != EventType.UPDATED;
                };
            });
            continuousQueryWithTransformer.setLocalListener(iterable -> {
                orCreateContext.executeBlocking(promise -> {
                    Iterator it = iterable.iterator();
                    while (it.hasNext()) {
                        Long l = (Long) it.next();
                        if (!fluxSink.isCancelled()) {
                            fluxSink.next(Long.valueOf(atomicLong.addAndGet(l.longValue())));
                        }
                    }
                }, (Handler) null);
            });
            QueryCursor queryCursor = null;
            try {
                atomicLong.set(igniteCache.sizeLong(new CachePeekMode[0]));
                fluxSink.next(Long.valueOf(atomicLong.get()));
                queryCursor = igniteCache.query(continuousQueryWithTransformer);
                fluxSink.onDispose(() -> {
                    safeCloseCursor(queryCursor);
                });
            } catch (Exception e) {
                safeCloseCursor(queryCursor);
                fluxSink.error(e);
            }
        }).subscribeOn(Schedulers.fromExecutor(runnable -> {
            vertx.executeBlocking(promise -> {
                runnable.run();
            }, (Handler) null);
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void safeCloseCursor(QueryCursor<?> queryCursor) {
        if (queryCursor != null) {
            try {
                queryCursor.close();
            } catch (Exception e) {
                log.warn("Exception closing continuous query", e);
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1091358579:
                if (implMethodName.equals("lambda$countCacheEntriesContinuous$b58c1c50$1")) {
                    z = 2;
                    break;
                }
                break;
            case 137735934:
                if (implMethodName.equals("lambda$futureToMonoSink$933fcfc1$1")) {
                    z = false;
                    break;
                }
                break;
            case 486244726:
                if (implMethodName.equals("lambda$countCacheEntriesContinuous$59cde477$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteInClosure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/kinotic/continuum/internal/util/IgniteUtils") && serializedLambda.getImplMethodSignature().equals("(Lreactor/core/publisher/MonoSink;Lorg/apache/ignite/lang/IgniteFuture;)V")) {
                    MonoSink monoSink = (MonoSink) serializedLambda.getCapturedArg(0);
                    return igniteFuture2 -> {
                        try {
                            monoSink.success(igniteFuture2.get());
                        } catch (Exception e) {
                            monoSink.error(e);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("javax/cache/configuration/Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/kinotic/continuum/internal/util/IgniteUtils") && serializedLambda.getImplMethodSignature().equals("()Ljavax/cache/event/CacheEntryEventFilter;")) {
                    return () -> {
                        return cacheEntryEvent2 -> {
                            return cacheEntryEvent2.getEventType() != EventType.UPDATED;
                        };
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteClosure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/kinotic/continuum/internal/util/IgniteUtils") && serializedLambda.getImplMethodSignature().equals("(Ljavax/cache/event/CacheEntryEvent;)Ljava/lang/Long;")) {
                    return cacheEntryEvent -> {
                        long j = 0;
                        if (cacheEntryEvent.getEventType() == EventType.CREATED) {
                            j = 1;
                        } else if (cacheEntryEvent.getEventType() == EventType.REMOVED || cacheEntryEvent.getEventType() == EventType.EXPIRED) {
                            j = -1;
                        }
                        return Long.valueOf(j);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
