package org.streampipes.connect.adapter.generic.protocol.stream;

import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.connect.SendToPipeline;
import org.streampipes.connect.adapter.generic.format.Format;
import org.streampipes.connect.adapter.generic.format.Parser;
import org.streampipes.connect.adapter.generic.pipeline.AdapterPipeline;
import org.streampipes.connect.adapter.generic.protocol.Protocol;
import org.streampipes.connect.exception.ParseException;

/* loaded from: input_file:org/streampipes/connect/adapter/generic/protocol/stream/PullProtocol.class */
public abstract class PullProtocol extends Protocol {
    private ScheduledExecutorService scheduler;
    private Logger logger;
    private long interval;

    public PullProtocol() {
        this.logger = LoggerFactory.getLogger((Class<?>) PullProtocol.class);
    }

    public PullProtocol(Parser parser, Format format, long j) {
        super(parser, format);
        this.logger = LoggerFactory.getLogger((Class<?>) PullProtocol.class);
        this.interval = j;
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void run(AdapterPipeline adapterPipeline) {
        Runnable runnable = () -> {
            executeProtocolLogic(adapterPipeline);
        };
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.scheduler.schedule(runnable, 0L, TimeUnit.MILLISECONDS);
    }

    private void executeProtocolLogic(AdapterPipeline adapterPipeline) {
        Runnable runnable = () -> {
            this.format.reset();
            SendToPipeline sendToPipeline = new SendToPipeline(this.format, adapterPipeline);
            InputStream dataFromEndpoint = getDataFromEndpoint();
            try {
                if (dataFromEndpoint != null) {
                    this.parser.parse(dataFromEndpoint, sendToPipeline);
                } else {
                    this.logger.warn("Could not receive data from Endpoint. Try again in " + this.interval + " seconds.");
                }
            } catch (ParseException e) {
                this.logger.error("Error while parsing: " + e.getMessage());
            }
        };
        this.scheduler = Executors.newScheduledThreadPool(1);
        try {
            this.scheduler.scheduleAtFixedRate(runnable, 1L, this.interval, TimeUnit.SECONDS).get();
        } catch (InterruptedException e) {
            this.logger.error("Error", (Throwable) e);
        } catch (ExecutionException e2) {
            this.logger.error("Error", (Throwable) e2);
        }
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void stop() {
        this.scheduler.shutdownNow();
    }

    abstract InputStream getDataFromEndpoint();
}
