package org.apache.falcon.rerun.handler;

import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.LateInput;
import org.apache.falcon.entity.v0.process.LateProcess;
import org.apache.falcon.entity.v0.process.PolicyType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.policy.RerunPolicyFactory;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:org/apache/falcon/rerun/handler/LateRerunHandler.class */
public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends AbstractRerunHandler<LaterunEvent, M> {
    private Thread daemon;

    @Override // org.apache.falcon.rerun.handler.AbstractRerunHandler
    public void handleRerun(String str, String str2, String str3, String str4, String str5, String str6, String str7, long j) {
        try {
            Entity entity = EntityUtil.getEntity(str2, str3);
            int parseInt = Integer.parseInt(str5);
            Date parseDateUTC = EntityUtil.parseDateUTC(str4);
            Long valueOf = Long.valueOf(getEventDelay(entity, str4));
            if (valueOf.longValue() != -1) {
                LOG.debug("Scheduling the late rerun for entity instance: {} ({}): {} And WorkflowId: {}", new Object[]{str2, str3, str4, str6});
                offerToQueue(new LaterunEvent(str, str6, parseDateUTC.getTime(), valueOf.longValue(), str2, str3, str4, parseInt, str7));
                return;
            }
            LOG.info("Late rerun expired for entity: {} ({})", str2, str3);
            Properties workflowProperties = getWfEngine().getWorkflowProperties(str, str6);
            Path lateLogPath = getLateLogPath(workflowProperties.getProperty("logDir"), EntityUtil.fromUTCtoURIDate(str4), workflowProperties.getProperty("srcClusterName"));
            LOG.info("Going to delete path: {}", lateLogPath);
            FileSystem createProxiedFileSystem = HadoopClientFactory.get().createProxiedFileSystem(getConfiguration(workflowProperties.getProperty("nameNode")));
            if (createProxiedFileSystem.exists(lateLogPath) && createProxiedFileSystem.delete(lateLogPath, true)) {
                LOG.info("Successfully deleted late file path: {}", lateLogPath);
            }
        } catch (Exception e) {
            LOG.error("Unable to schedule late rerun for entity instance: {} ({}): {} And WorkflowId: {}", new Object[]{str2, str3, str4, str6, e});
            GenericAlert.alertLateRerunFailed(str2, str3, str4, str6, str7, str5, e.getMessage());
        }
    }

    private long getEventDelay(Entity entity, String str) throws FalconException {
        Date parseDateUTC = EntityUtil.parseDateUTC(str);
        LateProcess lateProcess = EntityUtil.getLateProcess(entity);
        if (lateProcess == null) {
            LOG.warn("Late run not applicable for entity: {} ({})", entity.getEntityType(), entity.getName());
            return -1L;
        }
        PolicyType policy = lateProcess.getPolicy();
        Date cutOffTime = getCutOffTime(entity, str);
        if (!new Date().after(cutOffTime)) {
            return Long.valueOf(RerunPolicyFactory.getRetryPolicy(policy).getDelay(lateProcess.getDelay(), parseDateUTC, cutOffTime)).longValue();
        }
        LOG.warn("Feed Cut Off time: {} has expired, Late Rerun can not be scheduled", SchemaHelper.formatDateUTC(cutOffTime));
        return -1L;
    }

    public static Date addTime(Date date, long j) {
        return new Date(date.getTime() + j);
    }

    public static Date getCutOffTime(Entity entity, String str) throws FalconException {
        ExpressionHelper expressionHelper = ExpressionHelper.get();
        ExpressionHelper.setReferenceDate(EntityUtil.parseDateUTC(str));
        Date date = new Date(0L);
        if (entity.getEntityType() == EntityType.FEED) {
            if (((Feed) entity).getLateArrival() == null) {
                LOG.debug("Feed's {} late arrival cut-off is not configured, returning", entity.getName());
                return date;
            }
            return addTime(EntityUtil.parseDateUTC(str), ((Long) expressionHelper.evaluate(((Feed) entity).getLateArrival().getCutOff().toString(), Long.class)).longValue());
        }
        if (entity.getEntityType() != EntityType.PROCESS) {
            throw new FalconException("Invalid entity while getting cut-off time:" + entity.getName());
        }
        Process process = (Process) entity;
        ConfigurationStore configurationStore = ConfigurationStore.get();
        for (LateInput lateInput : process.getLateProcess().getLateInputs()) {
            Feed feed = null;
            String str2 = "";
            if (process.getInputs() != null) {
                Iterator it = process.getInputs().getInputs().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Input input = (Input) it.next();
                    if (input.getName().equals(lateInput.getInput())) {
                        str2 = input.getEnd();
                        feed = (Feed) configurationStore.get(EntityType.FEED, input.getFeed());
                        break;
                    }
                }
            }
            if (feed == null) {
                throw new IllegalStateException("No such feed: " + lateInput.getInput());
            }
            if (feed.getLateArrival() == null) {
                LOG.debug("Feed's {} late arrival cut-off is not configured, ignoring this feed", feed.getName());
            } else {
                Date addTime = addTime((Date) expressionHelper.evaluate(str2, Date.class), ((Long) expressionHelper.evaluate(feed.getLateArrival().getCutOff().toString(), Long.class)).longValue());
                if (addTime.after(date)) {
                    date = addTime;
                }
            }
        }
        return date;
    }

    @Override // org.apache.falcon.rerun.handler.AbstractRerunHandler
    public void init(M m) throws FalconException {
        super.init(m);
        this.daemon = new Thread(new LateRerunConsumer(this));
        this.daemon.setName("LaterunHandler");
        this.daemon.setDaemon(true);
        this.daemon.start();
        LOG.info("Laterun Handler thread started");
    }

    @Override // org.apache.falcon.rerun.handler.AbstractRerunHandler
    public void close() throws FalconException {
        this.daemon.interrupt();
        super.close();
    }

    public Path getLateLogPath(String str, String str2, String str3) {
        return new Path(str + "/latedata/" + str2 + "/" + (str3 == null ? "" : str3));
    }

    public static Configuration getConfiguration(String str) throws FalconException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", str);
        return configuration;
    }

    public void onSuccess(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
        Entity entity = EntityUtil.getEntity(workflowExecutionContext.getEntityType(), workflowExecutionContext.getEntityName());
        if (workflowExecutionContext.getOperation() == WorkflowExecutionContext.EntityOperations.DELETE || EntityUtil.getLateProcess(entity) == null) {
            LOG.info("Late date handling not applicable for entityType: " + workflowExecutionContext.getEntityType() + ", entityName: " + workflowExecutionContext.getEntityName() + " operation: " + workflowExecutionContext.getOperation());
        } else {
            handleRerun(workflowExecutionContext.getClusterName(), workflowExecutionContext.getEntityType(), workflowExecutionContext.getEntityName(), workflowExecutionContext.getNominalTimeAsISO8601(), workflowExecutionContext.getWorkflowRunIdString(), workflowExecutionContext.getWorkflowId(), workflowExecutionContext.getWorkflowUser(), workflowExecutionContext.getExecutionCompletionTime());
        }
    }

    public void onFailure(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }

    public void onStart(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }

    public void onSuspend(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }

    public void onWait(WorkflowExecutionContext workflowExecutionContext) throws FalconException {
    }
}
