package org.apache.asterix.external.input.stream;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.generator.TweetGenerator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/input/stream/TwitterFirehoseInputStream.class */
public class TwitterFirehoseInputStream extends AsterixInputStream {
    private static final Logger LOGGER = LogManager.getLogger();
    private final DataProvider dataProvider;
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final PipedOutputStream outputStream = new PipedOutputStream();
    private final PipedInputStream inputStream = new PipedInputStream(this.outputStream);
    private boolean started = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/asterix/external/input/stream/TwitterFirehoseInputStream$DataProvider.class */
    public static class DataProvider implements Runnable {
        public static final String KEY_MODE = "mode";
        private final TweetGenerator tweetGenerator;
        private boolean continuePush = true;
        private int batchSize;
        private final Mode mode;
        private final OutputStream os;

        /* loaded from: input_file:org/apache/asterix/external/input/stream/TwitterFirehoseInputStream$DataProvider$Mode.class */
        public enum Mode {
            AGGRESSIVE,
            CONTROLLED
        }

        public DataProvider(Map<String, String> map, int i, OutputStream outputStream) {
            this.tweetGenerator = new TweetGenerator(map, i);
            this.tweetGenerator.registerSubscriber(outputStream);
            this.os = outputStream;
            this.mode = map.get("mode") != null ? Mode.valueOf(map.get("mode").toUpperCase()) : Mode.AGGRESSIVE;
            switch (this.mode) {
                case CONTROLLED:
                    String str = map.get(TweetGenerator.KEY_TPS);
                    if (str == null) {
                        throw new IllegalArgumentException("TPS value not configured. use tps=<value>");
                    }
                    this.batchSize = Integer.parseInt(str);
                    return;
                case AGGRESSIVE:
                    this.batchSize = 5000;
                    return;
                default:
                    return;
            }
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:9:0x0018. Please report as an issue. */
        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            while (true) {
                if (z) {
                    try {
                    } catch (Exception e) {
                        TwitterFirehoseInputStream.LOGGER.warn("Exception in adapter " + e.getMessage());
                    }
                    if (this.continuePush) {
                        switch (this.mode) {
                            case CONTROLLED:
                                long currentTimeMillis = System.currentTimeMillis();
                                z = this.tweetGenerator.generateNextBatch(this.batchSize);
                                long currentTimeMillis2 = System.currentTimeMillis();
                                if (currentTimeMillis2 - currentTimeMillis < 1000) {
                                    Thread.sleep(1000 - (currentTimeMillis2 - currentTimeMillis));
                                    break;
                                }
                                break;
                            case AGGRESSIVE:
                                z = this.tweetGenerator.generateNextBatch(this.batchSize);
                                break;
                        }
                    }
                }
                this.os.close();
                return;
            }
        }

        public void stop() {
            this.continuePush = false;
        }
    }

    public TwitterFirehoseInputStream(Map<String, String> map, int i) throws IOException {
        this.dataProvider = new DataProvider(map, i, this.outputStream);
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public boolean stop() throws IOException {
        this.dataProvider.stop();
        return true;
    }

    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.executorService.execute(this.dataProvider);
        this.started = true;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        if (!this.started) {
            start();
        }
        return this.inputStream.read();
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        if (!this.started) {
            start();
        }
        return this.inputStream.read(bArr, i, i2);
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public boolean handleException(Throwable th) {
        return false;
    }
}
