/*
 * Decompiled with CFR 0.152.
 */
package risesoft.data.transfer.base.executor;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import risesoft.data.transfer.core.close.Closed;
import risesoft.data.transfer.core.executor.Executor;
import risesoft.data.transfer.core.executor.ExecutorFacotry;
import risesoft.data.transfer.core.executor.ExecutorListener;
import risesoft.data.transfer.core.executor.ExecutorTaskQueue;
import risesoft.data.transfer.core.log.Logger;
import risesoft.data.transfer.core.log.LoggerFactory;
import risesoft.data.transfer.core.util.CloseUtils;
import risesoft.data.transfer.core.util.Configuration;
import risesoft.data.transfer.core.util.pool.SimpledObjectPool;

public class ThreadPoolExecutorTaskQueue
implements ExecutorTaskQueue {
    private ExecutorListener executorListener;
    private int size;
    private SimpledObjectPool<Executor> executorPool;
    private ExecutorFacotry executor;
    private ThreadPoolExecutor executorService;
    private ConcurrentLinkedQueue<Object> linkedQueue;
    private boolean isStart = false;
    private volatile boolean isShutdown;
    private Logger logger;
    private Object source;

    public ThreadPoolExecutorTaskQueue(Configuration configuration, LoggerFactory loggerFactory) {
        this.size = configuration.getInt("size", 5);
        this.executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(this.size);
        this.logger = loggerFactory.getLogger(configuration.getString("name", "ThreadPoolExecutorTaskQueue"));
        this.linkedQueue = new ConcurrentLinkedQueue();
        this.isShutdown = false;
        this.source = this;
        this.executorPool = new SimpledObjectPool(this.size, () -> {
            if (this.logger.isDebug()) {
                this.logger.debug(this.source, "create executor instance:" + this.executorPool.getConcurrentSize() + " created instance size:" + this.size);
            }
            return this.executor.getInstance();
        });
        if (this.logger.isInfo()) {
            this.logger.info(this.source, "inited max size " + this.size);
        }
    }

    public synchronized void close() throws Exception {
        int size = this.executorPool.getConcurrentSize();
        this.logger.info((Object)this, "close:" + this.executorPool.getConcurrentSize());
        for (int i = 0; i < size; ++i) {
            CloseUtils.close((Closed)((Closed)this.executorPool.getInstance()));
        }
        this.executor.close();
        this.isStart = false;
    }

    public void add(Object task) {
        this.linkedQueue.add(task);
        if (this.isStart) {
            this.runJob();
        }
    }

    public void addBatch(Collection task) {
        for (Object object : task) {
            this.add(object);
        }
    }

    public Collection<Object> getResidue() {
        return this.linkedQueue;
    }

    public int getResidueSize() {
        return this.linkedQueue.size();
    }

    public void setExecutorFacoty(ExecutorFacotry executor) {
        this.executor = executor;
    }

    public void start() {
        this.executorPool.clear();
        this.executorListener.start();
        if (this.logger.isDebug()) {
            this.logger.debug(this.source, "start job " + this.linkedQueue.size());
        }
        for (int i = 0; i < this.linkedQueue.size(); ++i) {
            this.runJob();
        }
        this.isStart = true;
    }

    public int getExecutorSize() {
        return this.size;
    }

    public void shutdown() throws Exception {
        this.logger.info((Object)this, "shutdown");
        this.isShutdown = true;
        this.close();
        this.executorService.shutdownNow();
    }

    public void setExecutorListener(ExecutorListener executorListener) {
        this.executorListener = executorListener;
    }

    private void runJob() {
        this.executorService.execute(() -> {
            try {
                if (this.isShutdown) {
                    return;
                }
                if (this.logger.isDebug()) {
                    this.logger.debug(this.source, "run job: " + this.linkedQueue.size());
                }
                Executor executor = (Executor)this.executorPool.getInstance();
                Object taskObject = this.linkedQueue.poll();
                this.executorListener.taskStart(taskObject);
                executor.run(taskObject);
                this.executorPool.back((Object)executor);
                this.logger.debug(this.source, "end job");
                if (this.executorService.getActiveCount() == 1 && this.linkedQueue.size() == 0) {
                    this.logger.debug(this.source, "task end");
                    this.executorListener.taskEnd(taskObject);
                }
            }
            catch (Throwable e) {
                this.logger.error(this.source, e.getMessage());
                this.executorListener.onError(e);
            }
        });
    }
}

