/*
 * Decompiled with CFR 0.152.
 */
package risesoft.data.transfer.base.executor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import risesoft.data.transfer.core.close.Closed;
import risesoft.data.transfer.core.exception.CommonErrorCode;
import risesoft.data.transfer.core.exception.ErrorCode;
import risesoft.data.transfer.core.exception.TransferException;
import risesoft.data.transfer.core.executor.Executor;
import risesoft.data.transfer.core.executor.ExecutorFacotry;
import risesoft.data.transfer.core.executor.ExecutorListener;
import risesoft.data.transfer.core.executor.ExecutorTaskQueue;
import risesoft.data.transfer.core.log.Logger;
import risesoft.data.transfer.core.log.LoggerFactory;
import risesoft.data.transfer.core.util.CloseUtils;
import risesoft.data.transfer.core.util.Configuration;
import risesoft.data.transfer.core.util.pool.ObjectPool;
import risesoft.data.transfer.core.util.pool.SyncObjectPool;

public class ConcurrentThreadExecutorTaskQueue
implements ExecutorTaskQueue {
    private ExecutorListener executorListener;
    private ObjectPool<Executor> executorPool;
    private ExecutorFacotry executor;
    private ConcurrentLinkedQueue<Object> linkedQueue = new ConcurrentLinkedQueue();
    private boolean isStart = false;
    private volatile boolean isShutdown;
    private Logger logger;
    private Object source;
    private int size;

    public ConcurrentThreadExecutorTaskQueue(Configuration configuration, LoggerFactory loggerFactory) {
        this.size = configuration.getInt("size", 5);
        this.logger = loggerFactory.getLogger(configuration.getString("name", "ThreadPoolExecutorTaskQueue"));
        this.isShutdown = false;
        this.source = this;
        this.executorPool = new SyncObjectPool(this.size, () -> {
            if (this.logger.isDebug()) {
                this.logger.debug(this.source, "create executor instance:" + this.executorPool.getConcurrentSize() + " created instance size:" + this.size);
            }
            return this.executor.getInstance();
        });
        if (this.logger.isInfo()) {
            this.logger.info(this.source, "inited max size " + this.size);
        }
    }

    public synchronized void close() throws Exception {
        int size = this.executorPool.getConcurrentSize();
        this.logger.info((Object)this, "close:" + this.executorPool.getConcurrentSize());
        for (int i = 0; i < size; ++i) {
            CloseUtils.close((Closed)((Closed)this.executorPool.getInstance()));
        }
        this.executor.close();
        this.isStart = false;
    }

    public void add(Object task) {
        if (!this.isStart) {
            throw TransferException.as((ErrorCode)CommonErrorCode.CONFIG_ERROR, (String)"\u4e0d\u652f\u6301\u5728\u672a\u542f\u52a8\u7684\u72b6\u6001\u4e0b\u6267\u884c\u4efb\u52a1,\u8bf7\u786e\u4fdd\u60a8\u672a\u5c06\u6b64\u5b9e\u73b0\u7c7b\u7528\u4e8e\u751f\u4ea7\u8005,\u751f\u4ea7\u8005\u5fc5\u987b\u662f\u5f02\u6b65\u7684\u7ebf\u7a0b!");
        }
        this.linkedQueue.add(task);
        this.runJob();
    }

    public void addBatch(Collection task) {
        for (Object object : task) {
            this.add(object);
        }
    }

    public Collection<Object> getResidue() {
        return new ArrayList<Object>();
    }

    public int getResidueSize() {
        return this.executorPool.getConcurrentSize();
    }

    public void setExecutorFacoty(ExecutorFacotry executor) {
        this.executor = executor;
    }

    public void start() {
        this.executorPool.clear();
        this.executorListener.start();
        this.isStart = true;
    }

    public int getExecutorSize() {
        return this.size;
    }

    public void shutdown() throws Exception {
        this.logger.info((Object)this, "shutdown");
        this.isShutdown = true;
        this.close();
    }

    public void setExecutorListener(ExecutorListener executorListener) {
        this.executorListener = executorListener;
    }

    private void runJob() {
        try {
            if (this.isShutdown) {
                return;
            }
            if (this.logger.isDebug()) {
                this.logger.debug(this.source, "run job: ");
            }
            Executor executor = (Executor)this.executorPool.getInstance();
            Object taskObject = this.linkedQueue.poll();
            this.executorListener.taskStart(taskObject);
            if (this.isShutdown) {
                return;
            }
            executor.run(taskObject);
            this.executorPool.back((Object)executor);
            this.logger.debug(this.source, "end job");
            if (this.linkedQueue.size() == 0) {
                this.logger.debug(this.source, "task end");
                this.executorListener.taskEnd(taskObject);
            }
        }
        catch (Throwable e) {
            this.logger.error(this.source, e.getMessage());
            this.executorListener.onError(e);
        }
    }
}

