package org.pragmaticminds.crunch.execution;

import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.SourceShape;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import org.pragmaticminds.crunch.api.records.MRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/pragmaticminds/crunch/execution/MRecordSourceWrapper.class */
class MRecordSourceWrapper extends GraphStage<SourceShape<MRecord>> {
    private static final Logger logger = LoggerFactory.getLogger(MRecordSourceWrapper.class);
    public final Outlet<MRecord> out = Outlet.create("MRecordSource.out");
    private final SourceShape<MRecord> shape = SourceShape.of(this.out);
    private final MRecordSource source;

    public MRecordSourceWrapper(MRecordSource mRecordSource) {
        this.source = mRecordSource;
    }

    /* renamed from: shape, reason: merged with bridge method [inline-methods] */
    public SourceShape<MRecord> m2shape() {
        return this.shape;
    }

    public GraphStageLogic createLogic(Attributes attributes) {
        return new GraphStageLogic(m2shape()) { // from class: org.pragmaticminds.crunch.execution.MRecordSourceWrapper.1
            {
                setHandler(MRecordSourceWrapper.this.out, new AbstractOutHandler() { // from class: org.pragmaticminds.crunch.execution.MRecordSourceWrapper.1.1
                    public void onPull() {
                        MRecord mRecord;
                        if (!MRecordSourceWrapper.this.source.hasRemaining()) {
                            complete(MRecordSourceWrapper.this.out);
                            return;
                        }
                        while (true) {
                            mRecord = MRecordSourceWrapper.this.source.get();
                            if (mRecord != null) {
                                break;
                            } else if (MRecordSourceWrapper.logger.isTraceEnabled()) {
                                MRecordSourceWrapper.logger.trace("Skipped Null Record from source, fetching next record");
                            }
                        }
                        if (MRecordSourceWrapper.logger.isTraceEnabled()) {
                            MRecordSourceWrapper.logger.trace("Record {} from source {} processed ... pushing.", mRecord, MRecordSourceWrapper.this.source);
                        }
                        push(MRecordSourceWrapper.this.out, mRecord);
                    }
                });
            }

            public void preStart() {
                MRecordSourceWrapper.logger.info("Initializing stream source {} with Cardinality {}", MRecordSourceWrapper.this.source, MRecordSourceWrapper.this.source.getKind());
                MRecordSourceWrapper.this.source.init();
            }

            public void postStop() {
                MRecordSourceWrapper.logger.info("Closing stream source {}", MRecordSourceWrapper.this.source);
                MRecordSourceWrapper.this.source.close();
            }
        };
    }
}
