package io.neonbee.data;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.neonbee.NeonBee;
import io.neonbee.NeonBeeDeployable;
import io.neonbee.data.DataRequest;
import io.neonbee.data.internal.DataContextImpl;
import io.neonbee.entity.EntityVerticle;
import io.neonbee.internal.helper.FunctionalHelper;
import io.neonbee.logging.LoggingFacade;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.ReplyException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:io/neonbee/data/DataVerticle.class */
public abstract class DataVerticle<T> extends AbstractVerticle implements DataAdapter<T> {
    public static final String CONTEXT_HEADER = "context";
    static final String RESOLUTION_STRATEGY_HEADER = "resolutionStrategy";
    private static final LoggingFacade LOGGER = LoggingFacade.create();
    private final Supplier<String> namespaceSupplier = () -> {
        return (String) Optional.ofNullable((NeonBeeDeployable) getClass().getAnnotation(NeonBeeDeployable.class)).map((v0) -> {
            return v0.namespace();
        }).map(Strings::emptyToNull).map((v0) -> {
            return v0.toLowerCase();
        }).orElse(null);
    };

    /* loaded from: input_file:io/neonbee/data/DataVerticle$ManipulationRoutine.class */
    private class ManipulationRoutine implements ResolutionRoutine {
        private ManipulationRoutine() {
        }

        @Override // io.neonbee.data.DataVerticle.ResolutionRoutine
        public Future<T> execute(DataQuery dataQuery, DataContext dataContext) {
            try {
                return DataVerticle.this.manipulateData(dataQuery, dataContext);
            } catch (Exception e) {
                return Future.failedFuture(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/neonbee/data/DataVerticle$OptimizedResolutionRoutine.class */
    public class OptimizedResolutionRoutine implements ResolutionRoutine {
        private OptimizedResolutionRoutine() {
        }

        @Override // io.neonbee.data.DataVerticle.ResolutionRoutine
        public Future<T> execute(DataQuery dataQuery, DataContext dataContext) {
            return Future.failedFuture(new UnsupportedOperationException("Optimized resolution strategy not available."));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/neonbee/data/DataVerticle$RecursiveResolutionRoutine.class */
    public class RecursiveResolutionRoutine implements ResolutionRoutine {
        private RecursiveResolutionRoutine() {
        }

        @Override // io.neonbee.data.DataVerticle.ResolutionRoutine
        public Future<T> execute(DataQuery dataQuery, DataContext dataContext) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            return DataVerticle.this.requireData(dataQuery, dataContext).compose(collection -> {
                Stream map = ((Stream) Optional.ofNullable(collection).map((v0) -> {
                    return v0.stream();
                }).orElse(Stream.empty())).map(dataRequest -> {
                    return (AsyncResult) linkedHashMap.computeIfAbsent(dataRequest, dataRequest -> {
                        return DataVerticle.requestData(DataVerticle.this.vertx, dataRequest, dataContext.copy());
                    });
                });
                Class<Future> cls = Future.class;
                Objects.requireNonNull(Future.class);
                return CompositeFuture.join((List) map.map((v1) -> {
                    return r1.cast(v1);
                }).collect(Collectors.toList())).otherwiseEmpty();
            }).compose(compositeFuture -> {
                try {
                    return DataVerticle.this.retrieveData(dataQuery, new DataMap((Map<DataRequest, AsyncResult<?>>) linkedHashMap), dataContext);
                } catch (Exception e) {
                    return Future.failedFuture(e);
                }
            });
        }
    }

    /* loaded from: input_file:io/neonbee/data/DataVerticle$ResolutionRoutine.class */
    private interface ResolutionRoutine {
        Future<?> execute(DataQuery dataQuery, DataContext dataContext);
    }

    public abstract String getName();

    public final String getNamespace() {
        return this.namespaceSupplier.get();
    }

    public MessageCodec<T, T> getMessageCodec() {
        return null;
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void init(Vertx vertx, Context context) {
        super.init(vertx, context);
        MessageCodec<T, T> messageCodec = getMessageCodec();
        if (messageCodec == null || messageCodec.name() == null) {
            return;
        }
        try {
            vertx.eventBus().registerCodec(messageCodec);
        } catch (IllegalStateException e) {
            if (LOGGER.isDebugEnabled() && e.getMessage().startsWith("Already a codec registered with name")) {
                LOGGER.debug("Codec {} is already registered. Ignore the exception.", messageCodec.name());
            }
        }
    }

    @Override // io.vertx.core.AbstractVerticle, io.vertx.core.Verticle
    public void start(Promise<Void> promise) {
        Promise promise2 = Promise.promise();
        String address = getAddress();
        this.vertx.eventBus().consumer(address, message -> {
            MultiMap headers = message.headers();
            try {
                ResolutionRoutine resolutionRoutineForStrategy = ((DataQuery) message.body()).getAction() == DataAction.READ ? resolutionRoutineForStrategy((DataRequest.ResolutionStrategy) Optional.ofNullable(headers.get(RESOLUTION_STRATEGY_HEADER)).map(DataRequest.ResolutionStrategy::valueOf).orElse(DataRequest.ResolutionStrategy.RECURSIVE)) : new ManipulationRoutine();
                DataContext decodeContextFromString = DataContextImpl.decodeContextFromString(headers.get("context"));
                if (decodeContextFromString instanceof DataContextImpl) {
                    ((DataContextImpl) decodeContextFromString).amendTopVerticleCoordinate(deploymentID());
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.correlateWith(decodeContextFromString).debug("Data verticle {} received event bus message from {}, using resolution routine {}", getQualifiedName(), message.replyAddress(), resolutionRoutineForStrategy.getClass().getSimpleName());
                }
                try {
                    resolutionRoutineForStrategy.execute((DataQuery) message.body(), decodeContextFromString).onComplete2(asyncResult -> {
                        try {
                            if (asyncResult.succeeded()) {
                                message.reply(asyncResult.result(), deliveryOptions(this.vertx, getMessageCodec(), decodeContextFromString));
                            } else {
                                Throwable cause = asyncResult.cause();
                                if (LOGGER.isWarnEnabled()) {
                                    LoggingFacade correlateWith = LOGGER.correlateWith(decodeContextFromString);
                                    Object[] objArr = new Object[3];
                                    objArr[0] = getQualifiedName();
                                    objArr[1] = cause instanceof DataException ? cause.toString() : "";
                                    objArr[2] = cause;
                                    correlateWith.warn("Data verticle {} routine execution failed", objArr);
                                }
                                if (cause instanceof DataException) {
                                    message.fail(((DataException) cause).failureCode(), cause.getMessage());
                                } else {
                                    message.fail(DataException.FAILURE_CODE_PROCESSING_FAILED, "Processing of message failed. " + cause.getMessage());
                                }
                            }
                        } catch (Exception e) {
                            LOGGER.correlateWith(decodeContextFromString).error("Processing of message failed", (Throwable) e);
                            message.fail(DataException.FAILURE_CODE_PROCESSING_FAILED, e.getMessage());
                        }
                    });
                } catch (DataException e) {
                    LOGGER.correlateWith(decodeContextFromString).error("Processing of message failed", (Throwable) e);
                    message.fail(e.failureCode(), e.getMessage());
                } catch (IllegalArgumentException e2) {
                    LOGGER.correlateWith(decodeContextFromString).error("Missing message codec", (Throwable) e2);
                    message.fail(1001, e2.getMessage());
                }
            } catch (IllegalArgumentException e3) {
                message.fail(1000, "Unknown data resolution strategy");
            }
        }).completionHandler(promise2);
        promise2.future().compose(r5 -> {
            try {
                start();
                NeonBee.get(this.vertx).registerLocalConsumer(address);
                return Future.succeededFuture((Void) null);
            } catch (Exception e) {
                return Future.failedFuture(e);
            }
        }).onComplete2(promise);
    }

    @Override // io.vertx.core.AbstractVerticle
    public void stop() throws Exception {
        NeonBee neonBee = NeonBee.get(this.vertx);
        if (neonBee != null) {
            neonBee.unregisterLocalConsumer(getAddress());
        }
        super.stop();
    }

    public Future<Collection<DataRequest>> requireData(DataQuery dataQuery, DataContext dataContext) {
        return Future.succeededFuture(Collections.emptyList());
    }

    public Future<T> retrieveData(DataQuery dataQuery, DataMap dataMap, DataContext dataContext) {
        return retrieveData(dataQuery, dataContext);
    }

    public <U> Future<U> requestData(DataRequest dataRequest, DataContext dataContext) {
        LOGGER.correlateWith(dataContext).debug("Data verticle {} requesting data from {}", getQualifiedName(), dataRequest);
        return requestData(this.vertx, dataRequest, dataContext);
    }

    public static <U> Future<U> requestData(Vertx vertx, DataRequest dataRequest, DataContext dataContext) {
        DataSource<?> dataSource = dataRequest.getDataSource();
        if (dataSource != null) {
            return dataSource.retrieveData(dataRequest.getQuery(), dataContext).map(FunctionalHelper::uncheckedMapper);
        }
        DataSink<?> dataSink = dataRequest.getDataSink();
        if (dataSink != null) {
            return dataSink.manipulateData(dataRequest.getQuery(), dataContext).map(FunctionalHelper::uncheckedMapper);
        }
        String qualifiedName = dataRequest.getQualifiedName();
        if (qualifiedName == null) {
            return dataRequest.getEntityTypeName() != null ? EntityVerticle.requestEntity(vertx, dataRequest, dataContext).map((v0) -> {
                return FunctionalHelper.uncheckedMapper(v0);
            }) : Future.failedFuture(new IllegalArgumentException("Data request did not specify what data to request"));
        }
        LOGGER.correlateWith(dataContext).debug("Sending message via the event bus to {}", qualifiedName);
        String address = getAddress(qualifiedName);
        return vertx.eventBus().request(address, dataRequest.getQuery(), requestDeliveryOptions(vertx, dataRequest, dataContext, address)).transform(asyncResult -> {
            LOGGER.correlateWith(dataContext).debug("Received event bus reply");
            if (asyncResult.succeeded()) {
                dataContext.setData((Map) Optional.ofNullable(DataContextImpl.decodeContextFromString(((Message) asyncResult.result()).headers().get("context"))).map((v0) -> {
                    return v0.data();
                }).orElse(null));
                return Future.succeededFuture(((Message) asyncResult.result()).body());
            }
            Throwable cause = asyncResult.cause();
            if (LOGGER.isWarnEnabled()) {
                LOGGER.correlateWith(dataContext).warn("Failed to receive event bus reply from {}", qualifiedName, cause);
            }
            return Future.failedFuture(mapException(cause));
        });
    }

    @VisibleForTesting
    public final String getQualifiedName() {
        String name = getName();
        String namespace = getNamespace();
        return namespace != null ? createQualifiedName(namespace, name) : name;
    }

    public static String createQualifiedName(String str, String str2) {
        return String.format("%s/%s", str.toLowerCase(Locale.ROOT), str2);
    }

    protected final String getAddress() {
        return getAddress(getQualifiedName());
    }

    protected static String getAddress(String str) {
        return String.format("%s[%s]", DataVerticle.class.getSimpleName(), str);
    }

    private static DeliveryOptions requestDeliveryOptions(Vertx vertx, DataRequest dataRequest, DataContext dataContext, String str) {
        if (dataContext instanceof DataContextImpl) {
            ((DataContextImpl) dataContext).pushVerticleToPath(dataRequest.getQualifiedName());
        }
        DeliveryOptions deliveryOptions = deliveryOptions(vertx, null, dataContext);
        if (dataContext instanceof DataContextImpl) {
            ((DataContextImpl) dataContext).popVerticleFromPath();
        }
        deliveryOptions.setLocalOnly(dataRequest.isLocalOnly() || (dataRequest.isLocalPreferred() && NeonBee.get(vertx).isLocalConsumerAvailable(str)));
        if (dataRequest.getSendTimeout() > 0) {
            deliveryOptions.setSendTimeout(dataRequest.getSendTimeout());
        }
        Optional.ofNullable(dataRequest.getResolutionStrategy()).map((v0) -> {
            return v0.name();
        }).ifPresent(str2 -> {
            deliveryOptions.addHeader(RESOLUTION_STRATEGY_HEADER, str2);
        });
        return deliveryOptions;
    }

    private static DeliveryOptions deliveryOptions(Vertx vertx, MessageCodec<?, ?> messageCodec, DataContext dataContext) {
        DeliveryOptions deliveryOptions = new DeliveryOptions();
        deliveryOptions.setSendTimeout(TimeUnit.SECONDS.toMillis(NeonBee.get(vertx).getConfig().getEventBusTimeout())).setCodecName((String) Optional.ofNullable(messageCodec).map((v0) -> {
            return v0.name();
        }).orElse(null));
        Optional.ofNullable(dataContext).map(DataContextImpl::encodeContextToString).ifPresent(str -> {
            deliveryOptions.addHeader("context", str);
        });
        return deliveryOptions;
    }

    private static DataException mapException(Throwable th) {
        if (th instanceof DataException) {
            return (DataException) th;
        }
        int i = 1030;
        String message = th.getMessage();
        if (th instanceof ReplyException) {
            ReplyException replyException = (ReplyException) th;
            switch (replyException.failureType()) {
                case NO_HANDLERS:
                    i = 1010;
                    break;
                case TIMEOUT:
                    i = 1020;
                    break;
                default:
                    i = replyException.failureCode();
                    break;
            }
        }
        return new DataException(i, message);
    }

    private ResolutionRoutine resolutionRoutineForStrategy(DataRequest.ResolutionStrategy resolutionStrategy) {
        return resolutionStrategy == DataRequest.ResolutionStrategy.OPTIMIZED ? new OptimizedResolutionRoutine() : new RecursiveResolutionRoutine();
    }
}
