package com.barchart.feed.ddf.resolver.provider;

import com.barchart.feed.api.model.meta.Instrument;
import com.barchart.feed.ddf.instrument.provider.DDF_InstrumentProvider;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

/* loaded from: input_file:com/barchart/feed/ddf/resolver/provider/TaskUpdate.class */
class TaskUpdate implements Callable<Void> {
    static final int BATCH_SIZE = 500;
    static final int FETCH_SIZE = 4000;
    private static Logger log = LoggerFactory.getLogger(TaskUpdate.class);
    private final BlockingQueue<String> symbolQueue = new LinkedBlockingDeque();
    private final IndexSearcher searcher;
    private final IndexWriter writer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskUpdate(IndexSearcher indexSearcher, IndexWriter indexWriter) {
        this.searcher = indexSearcher;
        this.writer = indexWriter;
    }

    private void fetchQueue(URL url) throws Exception {
        log.trace("fetch url : {}", url);
        InputStream openStream = url.openStream();
        try {
            BufferedInputStream bufferedInputStream = new BufferedInputStream(openStream);
            SAXParser newSAXParser = SAXParserFactory.newInstance().newSAXParser();
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            newSAXParser.parse(bufferedInputStream, new DefaultHandler() { // from class: com.barchart.feed.ddf.resolver.provider.TaskUpdate.1
                @Override // org.xml.sax.helpers.DefaultHandler, org.xml.sax.ContentHandler
                public void startElement(String str, String str2, String str3, Attributes attributes) throws SAXException {
                    if ("Result".equals(str3)) {
                        try {
                            TaskUpdate.this.symbolQueue.put(attributes.getValue("symbol"));
                            if (Thread.currentThread().isInterrupted()) {
                                throw new SAXException(new InterruptedException("sax handler"));
                            }
                            atomicInteger.getAndIncrement();
                            if (atomicInteger.get() % 2000 == 0) {
                                TaskUpdate.log.debug("fetch count : {}", atomicInteger);
                            }
                        } catch (InterruptedException e) {
                            throw new SAXException(new InterruptedException("symbol queue"));
                        }
                    }
                }
            });
            log.debug("fetch count : {}", atomicInteger);
            openStream.close();
        } catch (Throwable th) {
            openStream.close();
            throw th;
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Void call() throws Exception {
        log.info("fetch start");
        for (String str : ConstResolver.getSymbolPrefixList()) {
            log.info("fetch prefix : [{}]", str);
            try {
                fetchQueue(new URL(ConstResolver.getSymbolLookupURI(str)));
                storeQueue();
            } catch (InterruptedException e) {
                log.warn("fetch interrupted in : {}", e.getMessage());
                return null;
            } catch (Exception e2) {
                log.error("prefix fetch failed", e2);
            }
        }
        try {
            setStatus(new Status(System.currentTimeMillis(), true));
        } catch (Exception e3) {
            log.error("failed to set status", e3);
        }
        log.info("fetch finish");
        return null;
    }

    private void setStatus(Status status) throws Exception {
        this.writer.updateDocument(Status.TERM, Status.encode(status));
        this.writer.close();
        log.info("status updated");
    }

    private void storeQueue() throws Exception {
        log.info("store start");
        int i = 0;
        int i2 = 0;
        while (true) {
            List<String> symbolBatch = getSymbolBatch();
            if (symbolBatch.isEmpty()) {
                this.writer.commit();
                log.info("store update : {} ;  batch : {}; ", Integer.valueOf(i2), Integer.valueOf(i));
                return;
            }
            i += symbolBatch.size();
            for (Map.Entry entry : DDF_InstrumentProvider.fromSymbols(symbolBatch).entrySet()) {
                checkInterrupt("intrument update");
                if (!CodecHelper.isPresent(this.searcher, (Instrument) entry.getValue())) {
                    CodecHelper.update(this.writer, (Instrument) entry.getValue());
                    i2++;
                }
            }
        }
    }

    private List<String> getSymbolBatch() throws Exception {
        LinkedList linkedList = new LinkedList();
        int i = 0;
        while (true) {
            checkInterrupt("symbol batch");
            String poll = this.symbolQueue.poll();
            if (poll == null) {
                break;
            }
            int length = poll.length();
            if (length != 0) {
                i += length + 1;
                linkedList.add(poll);
                if (i >= FETCH_SIZE || linkedList.size() >= BATCH_SIZE) {
                    break;
                }
            }
        }
        return linkedList;
    }

    private void checkInterrupt(String str) throws InterruptedException {
        if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException(str);
        }
    }
}
