package org.apache.asterix.external.input.record.reader.twitter;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.input.record.GenericRecord;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.TwitterUtil;
import twitter4j.FilterQuery;
import twitter4j.TwitterStream;

/* loaded from: input_file:org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.class */
public class TwitterPushRecordReader implements IRecordReader<String> {
    private LinkedBlockingQueue<String> inputQ;
    private TwitterStream twitterStream;
    private GenericRecord<String> record;
    private boolean closed = false;

    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener, FilterQuery filterQuery) {
        init(twitterStream);
        tweetListener.setInputQ(this.inputQ);
        this.twitterStream.addListener(tweetListener);
        this.twitterStream.filter(filterQuery);
    }

    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener) {
        init(twitterStream);
        tweetListener.setInputQ(this.inputQ);
        this.twitterStream.addListener(tweetListener);
        twitterStream.sample();
    }

    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.UserTweetsListener userTweetsListener) {
        init(twitterStream);
        userTweetsListener.setInputQ(this.inputQ);
        this.twitterStream.addListener(userTweetsListener);
        twitterStream.user();
    }

    private void init(TwitterStream twitterStream) {
        this.record = new GenericRecord<>();
        this.inputQ = new LinkedBlockingQueue<>();
        this.twitterStream = twitterStream;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.twitterStream.clearListeners();
        this.twitterStream.cleanUp();
        this.twitterStream = null;
        this.closed = true;
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public boolean hasNext() throws Exception {
        return !this.closed;
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public IRawRecord<String> next() throws IOException, InterruptedException {
        String poll = this.inputQ.poll();
        if (poll == null) {
            return null;
        }
        this.record.set(poll);
        return this.record;
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public boolean stop() {
        try {
            close();
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public void setFeedLogManager(FeedLogManager feedLogManager) {
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public void setController(AbstractFeedDataFlowController abstractFeedDataFlowController) {
    }

    @Override // org.apache.asterix.external.api.IRecordReader
    public boolean handleException(Throwable th) {
        return false;
    }
}
