/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.file;

import java.io.File;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.file.FileConsumerThread;
import org.apache.pulsar.io.file.FileListingThread;
import org.apache.pulsar.io.file.FileSourceConfig;
import org.apache.pulsar.io.file.ProcessedFileThread;

public class FileSource
extends PushSource<byte[]> {
    private ExecutorService executor;
    private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<File>();
    private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<File>();
    private final BlockingQueue<File> recentlyProcessed = new LinkedBlockingQueue<File>();

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        FileSourceConfig fileConfig = FileSourceConfig.load(config);
        fileConfig.validate();
        this.executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2);
        this.executor.execute(new FileListingThread(fileConfig, this.workQueue, this.inProcess, this.recentlyProcessed));
        this.executor.execute(new ProcessedFileThread(fileConfig, this.recentlyProcessed));
        for (int idx = 0; idx < fileConfig.getNumWorkers(); ++idx) {
            this.executor.execute(new FileConsumerThread(this, this.workQueue, this.inProcess, this.recentlyProcessed));
        }
    }

    public void close() throws Exception {
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(800L, TimeUnit.MILLISECONDS)) {
                this.executor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            this.executor.shutdownNow();
        }
    }
}

