package org.noear.solon.cloud.extend.water.service;

import org.noear.solon.Solon;
import org.noear.solon.Utils;
import org.noear.solon.cloud.CloudEventHandler;
import org.noear.solon.cloud.CloudProps;
import org.noear.solon.cloud.annotation.EventLevel;
import org.noear.solon.cloud.exception.CloudEventException;
import org.noear.solon.cloud.extend.water.WaterProps;
import org.noear.solon.cloud.model.Event;
import org.noear.solon.cloud.model.Instance;
import org.noear.solon.cloud.service.CloudEventObserverManger;
import org.noear.solon.cloud.service.CloudEventServicePlus;
import org.noear.water.WaterClient;
import org.noear.water.utils.EncryptUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/noear/solon/cloud/extend/water/service/CloudEventServiceWaterImp.class */
public class CloudEventServiceWaterImp implements CloudEventServicePlus {
    static Logger log = LoggerFactory.getLogger(CloudEventServiceWaterImp.class);
    private final CloudProps cloudProps;
    private String seal;
    private boolean unstable;
    private String eventChannelName;
    private String eventBroker;
    private String channel;
    private String group;
    private final String DEFAULT_SEAL = "Pckb6BpGzDE6RUIy";
    private CloudEventObserverManger instanceObserverManger = new CloudEventObserverManger();
    private CloudEventObserverManger clusterObserverManger = new CloudEventObserverManger();

    public CloudEventServiceWaterImp(CloudProps cloudProps) {
        this.cloudProps = cloudProps;
        this.unstable = cloudProps.getDiscoveryUnstable() || Solon.cfg().isFilesMode() || Solon.cfg().isDriftMode();
        this.eventChannelName = cloudProps.getEventChannel();
        this.eventBroker = cloudProps.getEventBroker();
        this.seal = getEventSeal();
        if (Utils.isEmpty(this.seal)) {
            this.seal = "Pckb6BpGzDE6RUIy";
        }
    }

    public String getSeal() {
        return this.seal;
    }

    public boolean publish(Event event) throws CloudEventException {
        if (Utils.isEmpty(event.topic())) {
            throw new IllegalArgumentException("Event missing topic");
        }
        if (Utils.isEmpty(event.content())) {
            throw new IllegalArgumentException("Event missing content");
        }
        try {
            return WaterClient.Message.sendMessageAndTags(this.eventBroker, event.key(), Utils.isEmpty(event.group()) ? event.topic() : event.group() + WaterProps.GROUP_TOPIC_SPLIT_MART + event.topic(), event.content(), event.scheduled(), event.tags());
        } catch (Throwable th) {
            throw new CloudEventException(th);
        }
    }

    public void attention(EventLevel eventLevel, String str, String str2, String str3, CloudEventHandler cloudEventHandler) {
        String str4 = Utils.isEmpty(str2) ? str3 : str2 + WaterProps.GROUP_TOPIC_SPLIT_MART + str3;
        if (eventLevel == EventLevel.instance) {
            this.instanceObserverManger.add(str4, eventLevel, str2, str3, cloudEventHandler);
        } else {
            this.clusterObserverManger.add(str4, eventLevel, str2, str3, cloudEventHandler);
        }
    }

    public void subscribe() {
        try {
            subscribe0();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void subscribe0() throws Exception {
        Instance local = Instance.local();
        if (this.instanceObserverManger.topicSize() > 0) {
            String str = "http://" + local.address() + "/_run/msg";
            WaterClient.Message.subscribeTopic(this.eventBroker, EncryptUtils.md5(local.service() + "_instance_" + str), local.service(), Solon.cfg().appGroup(), str, this.seal, "", 1, this.unstable, (String[]) this.instanceObserverManger.topicAll().toArray(new String[this.instanceObserverManger.topicAll().size()]));
        }
        if (this.clusterObserverManger.topicSize() > 0) {
            String eventReceive = getEventReceive();
            String str2 = Utils.isEmpty(eventReceive) ? "@" + Solon.cfg().appName() + "/_run/msg" : eventReceive.indexOf("://") > 0 ? eventReceive + "/_run/msg" : "http://" + eventReceive + "/_run/msg";
            WaterClient.Message.subscribeTopic(this.eventBroker, EncryptUtils.md5(local.service() + "_cluster_" + str2), local.service(), Solon.cfg().appGroup(), str2, this.seal, "", 1, false, (String[]) this.clusterObserverManger.topicAll().toArray(new String[this.clusterObserverManger.topicSize()]));
        }
    }

    public boolean onReceive(String str, Event event) throws Throwable {
        boolean z = true;
        boolean z2 = false;
        event.channel(this.eventChannelName);
        CloudEventHandler cloudEventHandler = this.instanceObserverManger.get(str);
        if (cloudEventHandler != null) {
            z2 = true;
            z = cloudEventHandler.handler(event);
        }
        CloudEventHandler cloudEventHandler2 = this.clusterObserverManger.get(str);
        if (cloudEventHandler2 != null) {
            z2 = true;
            z = z && cloudEventHandler2.handler(event);
        }
        if (!z2) {
            log.warn("There is no observer for this event topic[{}]", event.topic());
        }
        return z;
    }

    public String getChannel() {
        if (this.channel == null) {
            this.channel = this.cloudProps.getEventChannel();
        }
        return this.channel;
    }

    public String getGroup() {
        if (this.group == null) {
            this.group = this.cloudProps.getEventGroup();
        }
        return this.group;
    }

    public String getEventSeal() {
        return this.cloudProps.getProp(WaterProps.PROP_EVENT_seal);
    }

    public String getEventReceive() {
        return this.cloudProps.getProp(WaterProps.PROP_EVENT_receive);
    }
}
