/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2.translation.wrappers;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.BaseSourceFunc;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.logging.Logger;
import org.apache.beam.runners.twister2.Twister2TranslationContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p21p0.io.opencensus.internal.Utils;
import org.joda.time.Instant;

public class Twister2BoundedSource<T>
extends BaseSourceFunc<WindowedValue<T>> {
    private static final Logger LOG = Logger.getLogger(Twister2BoundedSource.class.getName());
    private final BoundedSource<T> source;
    private int numPartitions;
    private long splitSize = 100L;
    private transient Config twister2Config;
    private List<? extends Source<T>> partitionedSources;
    private Source<T> localPartition;
    private final transient PipelineOptions options;
    private transient Iterator<WindowedValue<T>> readerIterator;
    private static final long DEFAULT_BUNDLE_SIZE = 0x4000000L;

    public Twister2BoundedSource(BoundedSource<T> boundedSource, Twister2TranslationContext context, PipelineOptions options) {
        this.source = boundedSource;
        this.options = options;
    }

    public void prepare(TSetContext context) {
        this.numPartitions = context.getParallelism();
        try {
            this.splitSize = this.source.getEstimatedSizeBytes(this.options) / (long)this.numPartitions;
        }
        catch (Exception e) {
            LOG.warning(String.format("Failed to get estimated bundle size for source %s, using default bundle size of %d bytes.", this.source.toString(), 0x4000000L));
        }
        this.twister2Config = context.getConfig();
        int index = context.getIndex();
        try {
            this.partitionedSources = this.source.split(this.splitSize, this.options);
            if (this.partitionedSources.size() > this.numPartitions) {
                LOG.warning("Number of partitions is larger then the parallism");
            }
            this.localPartition = this.partitionedSources.get(index);
            BoundedSource.BoundedReader<T> reader = this.createReader(this.localPartition);
            this.readerIterator = new ReaderToIteratorAdapter<T>(reader);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create partitions for source " + this.source.getClass().getSimpleName(), e);
        }
    }

    public boolean hasNext() {
        return this.readerIterator.hasNext();
    }

    public WindowedValue<T> next() {
        return this.readerIterator.next();
    }

    private BoundedSource.BoundedReader<T> createReader(Source<T> partition) {
        try {
            return ((BoundedSource)partition).createReader(this.options);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
        }
    }

    static class ReaderToIteratorAdapter<T>
    implements Iterator<WindowedValue<T>> {
        private static final boolean FAILED_TO_OBTAIN_NEXT = false;
        private static final boolean SUCCESSFULLY_OBTAINED_NEXT = true;
        private final Source.Reader<T> reader;
        private boolean started = false;
        private boolean closed = false;
        private WindowedValue<T> next = null;

        ReaderToIteratorAdapter(Source.Reader<T> reader) {
            this.reader = reader;
        }

        private boolean tryProduceNext() {
            try {
                Utils.checkState((this.next == null ? 1 : 0) != 0, (Object)"unexpected non-null value for next");
                if (this.seekNext()) {
                    this.next = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
                    return true;
                }
                this.close();
                return false;
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to read data.", e);
            }
        }

        private void close() {
            this.closed = true;
            try {
                this.reader.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean seekNext() throws IOException {
            if (!this.started) {
                this.started = true;
                return this.reader.start();
            }
            return !this.closed && this.reader.advance();
        }

        private WindowedValue<T> consumeCurrent() {
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            WindowedValue<T> current = this.next;
            this.next = null;
            return current;
        }

        private WindowedValue<T> consumeNext() {
            if (this.next == null) {
                this.tryProduceNext();
            }
            return this.consumeCurrent();
        }

        @Override
        public boolean hasNext() {
            return this.next != null || this.tryProduceNext();
        }

        @Override
        public WindowedValue<T> next() {
            return this.consumeNext();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

