package org.streampipes.wrapper.kafka;

import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.base.InvocableStreamPipesEntity;
import org.streampipes.wrapper.context.RuntimeContext;
import org.streampipes.wrapper.distributed.runtime.DistributedRuntime;
import org.streampipes.wrapper.params.binding.BindingParams;
import org.streampipes.wrapper.params.runtime.RuntimeParams;

/* loaded from: input_file:org/streampipes/wrapper/kafka/KafkaStreamsRuntime.class */
public abstract class KafkaStreamsRuntime<RP extends RuntimeParams<B, I, RC>, B extends BindingParams<I>, I extends InvocableStreamPipesEntity, RC extends RuntimeContext> extends DistributedRuntime<RP, B, I, RC> {
    Properties config;
    KafkaStreams streams;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamsRuntime(RP rp) {
        super(rp);
    }

    public void prepareRuntime() throws SpRuntimeException {
        this.config = new Properties();
        this.config.put("application.id", gneerateApplicationId(this.runtimeParams.getBindingParams().getGraph().getElementId()));
        this.config.put("bootstrap.servers", getKafkaUrl((SpDataStream) this.runtimeParams.getBindingParams().getGraph().getInputStreams().get(0)));
        this.config.put("default.key.serde", Serdes.String().getClass());
        this.config.put("default.value.serde", Serdes.String().getClass());
    }

    private String gneerateApplicationId(String str) {
        return str.replaceAll("/", "-").replaceAll(":", "-");
    }

    public void postDiscard() throws SpRuntimeException {
    }

    public void discardRuntime() throws SpRuntimeException {
        this.streams.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract KStream<String, Map<String, Object>> getApplicationLogic(KStream<String, Map<String, Object>>... kStreamArr);
}
