package org.streampipes.wrapper.kafka;

import java.util.regex.Pattern;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.graph.DataProcessorInvocation;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.streampipes.wrapper.kafka.converter.JsonToMapFormat;
import org.streampipes.wrapper.kafka.converter.MapToJsonFormat;
import org.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import org.streampipes.wrapper.params.runtime.EventProcessorRuntimeParams;

/* loaded from: input_file:org/streampipes/wrapper/kafka/KafkaStreamsDataProcessorRuntime.class */
public abstract class KafkaStreamsDataProcessorRuntime<B extends EventProcessorBindingParams> extends KafkaStreamsRuntime<EventProcessorRuntimeParams<B>, B, DataProcessorInvocation, EventProcessorRuntimeContext> {
    public KafkaStreamsDataProcessorRuntime(EventProcessorRuntimeParams<B> eventProcessorRuntimeParams) {
        super(eventProcessorRuntimeParams);
    }

    public void bindRuntime() throws SpRuntimeException {
        try {
            prepareRuntime();
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            SpDataStream spDataStream = (SpDataStream) this.runtimeParams.getBindingParams().getGraph().getInputStreams().get(0);
            getApplicationLogic((protocol(spDataStream).getTopicDefinition() instanceof SimpleTopicDefinition ? streamsBuilder.stream(getTopic(spDataStream)) : streamsBuilder.stream(Pattern.compile(replaceWildcardWithPatternFormat(getTopic(spDataStream))))).flatMapValues(str -> {
                return new JsonToMapFormat(getGraph()).apply(str);
            })).flatMapValues(new MapToJsonFormat()).to(getTopic(this.runtimeParams.getBindingParams().getGraph().getOutputStream()));
            this.streams = new KafkaStreams(streamsBuilder.build(), this.config);
            this.streams.start();
        } catch (Exception e) {
            throw new SpRuntimeException(e.getMessage());
        }
    }
}
