package org.apache.streams.sysomos.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.sysomos.SysomosConfiguration;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.data.util.RFC3339Utils;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/sysomos/provider/SysomosProvider.class */
public class SysomosProvider implements StreamsProvider {
    public static final String STREAMS_ID = "SysomosProvider";
    private static final Logger LOGGER;
    public static final String ENDING_TIME_KEY = "addedBefore";
    public static final String STARTING_TIME_KEY = "addedAfter";
    public static final String MODE_KEY = "mode";
    public static final String STARTING_DOCS_KEY = "startingDocs";
    public static final int LATENCY = 10000;
    public static final long PROVIDER_BATCH_SIZE = 10000;
    public static final long API_BATCH_SIZE = 1000;
    protected volatile Queue<StreamsDatum> providerQueue;
    private final long maxQueued;
    private final long minLatency;
    private final long scheduledLatency;
    private final long maxApiBatch;
    private SysomosClient client;
    private SysomosConfiguration config;
    private ScheduledExecutorService stream;
    private Map<String, String> documentIds;
    private Map<String, String> addedBefore;
    private Map<String, String> addedAfter;
    private AtomicInteger count;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Set<String> completedHeartbeats = Sets.newHashSet();
    private Mode mode = Mode.CONTINUOUS;
    private boolean started = false;

    /* loaded from: input_file:org/apache/streams/sysomos/provider/SysomosProvider$Mode.class */
    public enum Mode {
        CONTINUOUS,
        BACKFILL_AND_TERMINATE
    }

    public SysomosProvider(SysomosConfiguration sysomosConfiguration) {
        this.config = sysomosConfiguration;
        this.client = new SysomosClient(sysomosConfiguration.getApiKey());
        this.maxQueued = sysomosConfiguration.getMaxBatchSize() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMaxBatchSize().longValue();
        this.minLatency = sysomosConfiguration.getMinDelayMs() == null ? PROVIDER_BATCH_SIZE : sysomosConfiguration.getMinDelayMs().longValue();
        this.scheduledLatency = sysomosConfiguration.getScheduledDelayMs() == null ? 150000L : sysomosConfiguration.getScheduledDelayMs().longValue();
        this.maxApiBatch = sysomosConfiguration.getMinDelayMs() == null ? 1000L : sysomosConfiguration.getApiBatchSize().longValue();
        this.count = new AtomicInteger();
    }

    public SysomosConfiguration getConfig() {
        return this.config;
    }

    public void setConfig(SysomosConfiguration sysomosConfiguration) {
        this.config = sysomosConfiguration;
    }

    public Mode getMode() {
        return this.mode;
    }

    public long getMinLatency() {
        return this.minLatency;
    }

    public long getMaxApiBatch() {
        return this.maxApiBatch;
    }

    public SysomosClient getClient() {
        return this.client;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void startStream() {
        LOGGER.trace("Starting Producer");
        if (this.started) {
            return;
        }
        LOGGER.trace("Producer not started.  Initializing");
        this.stream = Executors.newScheduledThreadPool(getConfig().getHeartbeatIds().size() + 1);
        for (String str : getConfig().getHeartbeatIds()) {
            this.stream.scheduleWithFixedDelay(createStream(str), 0L, this.scheduledLatency, TimeUnit.MILLISECONDS);
            LOGGER.info("Started producer task for heartbeat {}", str);
        }
        this.started = true;
    }

    public StreamsResultSet readCurrent() {
        try {
            this.lock.writeLock().lock();
            LOGGER.debug("Creating new result set for {} items", Integer.valueOf(this.providerQueue.size()));
            this.count.addAndGet(this.providerQueue.size());
            StreamsResultSet streamsResultSet = new StreamsResultSet(this.providerQueue);
            this.providerQueue = constructQueue();
            return streamsResultSet;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        throw new NotImplementedException("readNew not currently implemented");
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        throw new NotImplementedException("readRange not currently implemented");
    }

    public boolean isRunning() {
        return this.providerQueue.size() > 0 || !(this.completedHeartbeats.size() >= getConfig().getHeartbeatIds().size() || this.stream.isTerminated() || this.stream.isShutdown());
    }

    public void prepare(Object obj) {
        this.providerQueue = constructQueue();
        if (obj instanceof Map) {
            extractConfigFromMap((Map) obj);
        } else if (obj instanceof String) {
            this.documentIds = Splitter.on(";").trimResults().withKeyValueSeparator("=").split((String) obj);
        }
    }

    public void cleanUp() {
        this.stream.shutdown();
        try {
            if (!this.stream.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.stream.shutdownNow();
                if (!this.stream.awaitTermination(60L, TimeUnit.SECONDS)) {
                    LOGGER.error("Stream did not terminate");
                }
            }
        } catch (InterruptedException e) {
            this.stream.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void signalComplete(String str) {
        try {
            this.lock.writeLock().lock();
            this.completedHeartbeats.add(str);
            if (!isRunning()) {
                cleanUp();
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueueItem(StreamsDatum streamsDatum) {
        boolean offer;
        do {
            try {
                pauseForSpace();
                this.lock.readLock().lock();
                offer = this.providerQueue.offer(streamsDatum);
                Thread.yield();
            } finally {
                this.lock.readLock().unlock();
            }
        } while (!offer);
    }

    protected SysomosHeartbeatStream createStream(String str) {
        String str2 = (this.addedAfter == null || !this.addedAfter.containsKey(str)) ? null : this.addedAfter.get(str);
        String str3 = (this.addedBefore == null || !this.addedBefore.containsKey(str)) ? null : this.addedBefore.get(str);
        return (this.documentIds == null || !this.documentIds.containsKey(str)) ? str2 != null ? str3 != null ? new SysomosHeartbeatStream(this, str, RFC3339Utils.parseToUTC(str3), RFC3339Utils.parseToUTC(str2)) : new SysomosHeartbeatStream(this, str, null, RFC3339Utils.parseToUTC(str2)) : new SysomosHeartbeatStream(this, str) : new SysomosHeartbeatStream(this, str, this.documentIds.get(str));
    }

    protected void pauseForSpace() {
        while (this.providerQueue.size() >= this.maxQueued) {
            LOGGER.trace("Sleeping the current thread due to a full queue");
            try {
                Thread.sleep(100L);
                LOGGER.trace("Resuming thread after wait period");
            } catch (InterruptedException e) {
                LOGGER.warn("Thread was interrupted", e);
            }
        }
    }

    protected void extractConfigFromMap(Map map) {
        if (map.containsKey(MODE_KEY)) {
            Object obj = map.get(MODE_KEY);
            if (!(obj instanceof Mode)) {
                throw new IllegalStateException("Invalid configuration.  Mode must be an instance of the Mode enum but was " + obj);
            }
            this.mode = (Mode) obj;
        }
        if (map.containsKey(STARTING_DOCS_KEY)) {
            Object obj2 = map.get(STARTING_DOCS_KEY);
            if (!(obj2 instanceof Map)) {
                throw new IllegalStateException("Invalid configuration.  StartingDocs must be an instance of Map<String,String> but was " + obj2);
            }
            this.documentIds = (Map) obj2;
        }
        if (map.containsKey(STARTING_TIME_KEY)) {
            Object obj3 = map.get(STARTING_TIME_KEY);
            if (!(obj3 instanceof Map)) {
                throw new IllegalStateException("Invalid configuration.  Added after key must be an instance of Map<String,String> but was " + obj3);
            }
            this.addedAfter = (Map) obj3;
        }
        if (map.containsKey(ENDING_TIME_KEY)) {
            Object obj4 = map.get(ENDING_TIME_KEY);
            if (!(obj4 instanceof Map)) {
                throw new IllegalStateException("Invalid configuration.  Added before key must be an instance of Map<String,String> but was " + obj4);
            }
            this.addedBefore = (Map) obj4;
        }
    }

    private Queue<StreamsDatum> constructQueue() {
        return Queues.newConcurrentLinkedQueue();
    }

    public int getCount() {
        return this.count.get();
    }

    public static void main(String[] strArr) throws Exception {
        Preconditions.checkArgument(strArr.length >= 2);
        String str = strArr[0];
        String str2 = strArr[1];
        Config load = ConfigFactory.load();
        File file = new File(str);
        if (!$assertionsDisabled && !file.exists()) {
            throw new AssertionError();
        }
        Config resolve = ConfigFactory.parseFileAnySyntax(file, ConfigParseOptions.defaults().setAllowMissing(false)).withFallback(load).resolve();
        StreamsConfiguration detectConfiguration = StreamsConfigurator.detectConfiguration(resolve);
        SysomosConfiguration sysomosConfiguration = (SysomosConfiguration) new ComponentConfigurator(SysomosConfiguration.class).detectConfiguration(resolve, "rss");
        SysomosProvider sysomosProvider = new SysomosProvider(sysomosConfiguration);
        StreamsJacksonMapper streamsJacksonMapper = StreamsJacksonMapper.getInstance();
        PrintStream printStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(str2)));
        sysomosProvider.prepare(sysomosConfiguration);
        sysomosProvider.startStream();
        do {
            Uninterruptibles.sleepUninterruptibly(detectConfiguration.getBatchFrequencyMs().longValue(), TimeUnit.MILLISECONDS);
            Iterator it = sysomosProvider.readCurrent().iterator();
            while (it.hasNext()) {
                try {
                    printStream.println(streamsJacksonMapper.writeValueAsString(((StreamsDatum) it.next()).getDocument()));
                } catch (JsonProcessingException e) {
                    System.err.println(e.getMessage());
                }
            }
        } while (sysomosProvider.isRunning());
        sysomosProvider.cleanUp();
        printStream.flush();
    }

    static {
        $assertionsDisabled = !SysomosProvider.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(SysomosProvider.class);
    }
}
