package org.apache.asterix.external.dataflow;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.asterix.common.exceptions.ExceptionUtils;
import org.apache.asterix.external.api.IFeedMarker;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/dataflow/FeedRecordDataFlowController.class */
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
    private static final Logger LOGGER = Logger.getLogger(FeedRecordDataFlowController.class.getName());
    protected final IRecordDataParser<T> dataParser;
    protected final IRecordReader<? extends T> recordReader;
    protected final AtomicBoolean closed;
    protected static final long INTERVAL = 1000;
    protected final Object mutex;
    protected final boolean sendMarker;
    protected boolean failed;
    private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
    private Future<?> dataflowMarkerResult;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/asterix/external/dataflow/FeedRecordDataFlowController$DataflowMarker.class */
    public class DataflowMarker implements Runnable {
        private final IFeedMarker marker;
        private final VSizeFrame mark;
        private volatile boolean stopped = false;

        public DataflowMarker(IFeedMarker iFeedMarker, VSizeFrame vSizeFrame) {
            this.marker = iFeedMarker;
            this.mark = vSizeFrame;
        }

        public synchronized void stop() {
            this.stopped = true;
            notify();
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    synchronized (this) {
                        if (this.stopped) {
                            return;
                        } else {
                            wait(TimeUnit.SECONDS.toMillis(2L));
                        }
                    }
                    synchronized (FeedRecordDataFlowController.this.mutex) {
                        if (this.marker.mark(this.mark)) {
                            FeedRecordDataFlowController.this.tupleForwarder.flush();
                            this.mark.getBuffer().clear();
                            this.mark.getBuffer().put((byte) 1);
                            this.mark.getBuffer().flip();
                        }
                    }
                } catch (InterruptedException e) {
                    FeedRecordDataFlowController.LOGGER.warn("Marker stopped", e);
                    Thread.currentThread().interrupt();
                    return;
                } catch (Exception e2) {
                    FeedRecordDataFlowController.LOGGER.warn("Marker stopped", e2);
                    return;
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public FeedRecordDataFlowController(IHyracksTaskContext iHyracksTaskContext, FeedTupleForwarder feedTupleForwarder, FeedLogManager feedLogManager, int i, IRecordDataParser<T> iRecordDataParser, IRecordReader<T> iRecordReader, boolean z) throws HyracksDataException {
        super(iHyracksTaskContext, feedTupleForwarder, feedLogManager, i);
        this.closed = new AtomicBoolean(false);
        this.mutex = new Object();
        this.failed = false;
        this.dataParser = iRecordDataParser;
        this.recordReader = iRecordReader;
        this.sendMarker = z;
        iRecordReader.setFeedLogManager(feedLogManager);
        iRecordReader.setController(this);
    }

    @Override // org.apache.asterix.external.api.IDataFlowController
    public void start(IFrameWriter iFrameWriter) throws HyracksDataException {
        startDataflowMarker();
        Throwable th = null;
        try {
            this.failed = false;
            this.tupleForwarder.initialize(this.ctx, iFrameWriter);
            while (this.recordReader.hasNext()) {
                synchronized (this.mutex) {
                    IRawRecord<? extends T> next = this.recordReader.next();
                    if (next == null) {
                        flush();
                        this.mutex.wait(INTERVAL);
                    } else {
                        this.tb.reset();
                        parseAndForward(next);
                    }
                }
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Feed has been interrupted. Closing the feed", e);
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            this.failed = true;
            this.tupleForwarder.flush();
            LOGGER.warn("Failure while operating a feed source", e2);
            throw new HyracksDataException(e2);
        }
        stopDataflowMarker();
        try {
            this.tupleForwarder.close();
        } catch (Throwable th2) {
            th = ExceptionUtils.suppressIntoHyracksDataException((HyracksDataException) null, th2);
        }
        try {
            try {
                this.recordReader.close();
                closeSignal();
                if (this.sendMarker && this.dataflowMarkerResult != null) {
                    this.dataflowMarkerResult.cancel(true);
                }
            } catch (Throwable th3) {
                LOGGER.warn("Failure during while operating a feed sourcec", th3);
                th = ExceptionUtils.suppressIntoHyracksDataException(th, th3);
                closeSignal();
                if (this.sendMarker && this.dataflowMarkerResult != null) {
                    this.dataflowMarkerResult.cancel(true);
                }
            }
            if (th != null) {
                throw th;
            }
        } catch (Throwable th4) {
            closeSignal();
            if (this.sendMarker && this.dataflowMarkerResult != null) {
                this.dataflowMarkerResult.cancel(true);
            }
            throw th4;
        }
    }

    private void parseAndForward(IRawRecord<? extends T> iRawRecord) throws IOException {
        synchronized (this.dataParser) {
            try {
                this.dataParser.parse(iRawRecord, this.tb.getDataOutput());
                this.tb.addFieldEndOffset();
                addMetaPart(this.tb, iRawRecord);
                addPrimaryKeys(this.tb, iRawRecord);
                this.tupleForwarder.addTuple(this.tb);
            } catch (Exception e) {
                LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
                this.feedLogManager.logRecord(iRawRecord.toString(), ExternalDataConstants.ERROR_PARSE_RECORD);
            }
        }
    }

    protected void addMetaPart(ArrayTupleBuilder arrayTupleBuilder, IRawRecord<? extends T> iRawRecord) throws IOException {
    }

    protected void addPrimaryKeys(ArrayTupleBuilder arrayTupleBuilder, IRawRecord<? extends T> iRawRecord) throws IOException {
    }

    private void startDataflowMarker() {
        ExecutorService newSingleThreadExecutor = this.sendMarker ? Executors.newSingleThreadExecutor() : null;
        if (this.sendMarker && this.dataflowMarker == null) {
            this.dataflowMarker = new DataflowMarker(this.recordReader.getProgressReporter(), (VSizeFrame) TaskUtils.get("HYX:MSG", this.ctx));
            this.dataflowMarkerResult = newSingleThreadExecutor.submit(this.dataflowMarker);
        }
    }

    private void stopDataflowMarker() {
        if (this.dataflowMarker != null) {
            this.dataflowMarker.stop();
        }
    }

    private void closeSignal() {
        synchronized (this.closed) {
            this.closed.set(true);
            this.closed.notifyAll();
        }
    }

    private void waitForSignal() throws InterruptedException {
        synchronized (this.closed) {
            while (!this.closed.get()) {
                this.closed.wait();
            }
        }
    }

    @Override // org.apache.asterix.external.dataflow.AbstractFeedDataFlowController, org.apache.asterix.external.api.IDataFlowController
    public boolean stop() throws HyracksDataException {
        stopDataflowMarker();
        Throwable th = null;
        if (!this.recordReader.stop()) {
            return false;
        }
        if (!this.failed) {
            try {
                waitForSignal();
                return true;
            } catch (InterruptedException e) {
                throw new HyracksDataException(e);
            }
        }
        try {
            this.tupleForwarder.close();
        } catch (Throwable th2) {
            th = ExceptionUtils.suppressIntoHyracksDataException((HyracksDataException) null, th2);
        }
        try {
            this.recordReader.close();
        } catch (Throwable th3) {
            th = ExceptionUtils.suppressIntoHyracksDataException(th, th3);
        }
        if (th != null) {
            throw th;
        }
        return true;
    }

    @Override // org.apache.asterix.external.dataflow.AbstractFeedDataFlowController
    public boolean handleException(Throwable th) {
        return this.recordReader.handleException(th);
    }
}
