package org.streampipes.connect.adapter.specific.wikipedia;

import com.google.gson.Gson;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.streampipes.connect.adapter.specific.SpecificDataStreamAdapter;
import org.streampipes.connect.adapter.specific.wikipedia.model.WikipediaModel;
import org.streampipes.connect.exception.AdapterException;
import org.streampipes.connect.exception.ParseException;
import org.streampipes.model.connect.adapter.SpecificAdapterStreamDescription;
import org.streampipes.model.connect.guess.GuessSchema;
import org.streampipes.sdk.builder.adapter.GuessSchemaBuilder;
import org.streampipes.sdk.helpers.EpProperties;
import org.streampipes.sdk.helpers.Labels;

/* loaded from: input_file:org/streampipes/connect/adapter/specific/wikipedia/WikipediaAdapter.class */
public abstract class WikipediaAdapter extends SpecificDataStreamAdapter {
    public static final String TIMESTAMP = "timestamp";
    public static final String TYPE = "type";
    public static final String EVENT_ID = "id";
    public static final String NAMESPACE = "namespace";
    public static final String TITLE = "title";
    public static final String USER = "user";
    public static final String BOT = "bot";
    public static final String MINOR = "minor";
    public static final String OLDLENGTH = "oldlength";
    public static final String NEWLENGTH = "newlength";
    public static final String OLDREVISION = "oldrevision";
    public static final String NEWREVISION = "newrevision";
    public static final String SERVERURL = "serverurl";
    public static final String SERVERNAME = "servername";
    public static final String WIKI = "wiki";
    public static final String URI = "uri";
    public static final String COMMENT = "comment";
    public static final String DOMAIN = "domain";
    private static final String VocabPrefix = "http://wikipedia.org/";
    private static final String WikipediaApiUrl = "https://stream.wikimedia.org/v2/stream/recentchange";
    private Thread thread;
    private String type;
    private WikipediaSseConsumer consumer;

    public WikipediaAdapter(SpecificAdapterStreamDescription specificAdapterStreamDescription, String str) {
        super(specificAdapterStreamDescription);
        this.type = str;
    }

    public WikipediaAdapter() {
    }

    @Override // org.streampipes.connect.adapter.Adapter
    public void startAdapter() throws AdapterException {
        Gson gson = new Gson();
        this.thread = new Thread(() -> {
            this.consumer = new WikipediaSseConsumer();
            try {
                this.consumer.consumeEventStream(WikipediaApiUrl, str -> {
                    WikipediaModel wikipediaModel = (WikipediaModel) gson.fromJson(str, WikipediaModel.class);
                    if (wikipediaModel == null || wikipediaModel.getType() == null || !wikipediaModel.getType().equals(this.type)) {
                        return;
                    }
                    this.adapterPipeline.process(new WikipediaModelConverter(wikipediaModel).makeMap());
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        this.thread.start();
    }

    @Override // org.streampipes.connect.adapter.Adapter
    public void stopAdapter() throws AdapterException {
        if (this.thread != null) {
            this.consumer.stop();
        }
    }

    @Override // org.streampipes.connect.adapter.Adapter
    public GuessSchema getSchema(SpecificAdapterStreamDescription specificAdapterStreamDescription) throws AdapterException, ParseException {
        return GuessSchemaBuilder.create().property(EpProperties.timestampProperty("timestamp")).property(EpProperties.stringEp(Labels.from("id", Apps.ID, ""), "id", dp("id"))).property(EpProperties.doubleEp(Labels.from("type", "Type", "The change type (edit|new)"), "type", dp("type"))).property(EpProperties.integerEp(Labels.from(NAMESPACE, "Namespace", "The Wikipedia namespace"), NAMESPACE, dp(NAMESPACE))).property(EpProperties.stringEp(Labels.from("title", "Title", "The article title"), "title", dp("title"))).property(EpProperties.stringEp(Labels.from("user", KafkaPrincipal.USER_TYPE, "The user ID"), "user", dp("user"))).property(EpProperties.booleanEp(Labels.from(BOT, "Bot", "Edited by a bot"), BOT, dp(BOT))).property(EpProperties.booleanEp(Labels.from(MINOR, "Minor", "Minor edit"), MINOR, dp(MINOR))).property(EpProperties.integerEp(Labels.from(OLDLENGTH, "Old length", ""), OLDLENGTH, dp(OLDLENGTH))).property(EpProperties.integerEp(Labels.from("newlength", "New length", ""), "newlength", dp("newlength"))).property(EpProperties.longEp(Labels.from(OLDREVISION, "Old revision ID", ""), OLDREVISION, dp(OLDREVISION))).property(EpProperties.longEp(Labels.from(NEWREVISION, "New revision ID", ""), NEWREVISION, dp(NEWREVISION))).property(EpProperties.stringEp(Labels.from(SERVERURL, "Server URL", ""), SERVERURL, dp(SERVERURL))).property(EpProperties.stringEp(Labels.from(SERVERNAME, "Server Name", ""), SERVERNAME, dp(SERVERNAME))).property(EpProperties.stringEp(Labels.from(WIKI, "Wiki Name", ""), WIKI, dp(WIKI))).property(EpProperties.stringEp(Labels.from("uri", "Internal URI", ""), "uri", dp("uri"))).property(EpProperties.stringEp(Labels.from("comment", "Comment", "Comment field"), "comment", dp("comment"))).property(EpProperties.stringEp(Labels.from("domain", "Domain", "Wiki Domain"), "domain", dp("domain"))).build();
    }

    public String dp(String str) {
        return VocabPrefix + str;
    }
}
