package org.apache.beam.sdk.io;

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.AutoValue_BoundedReadFromUnboundedSource_Shard;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.logging.log4j.core.jackson.JsonConstants;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.class */
public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> {
    private final UnboundedSource<T, ?> source;
    private final long maxNumRecords;
    private final Duration maxReadTime;
    private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(10)).withMaxBackoff(Duration.standardSeconds(10));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource$ReadFn.class */
    public static class ReadFn<T> extends DoFn<Shard<T>, ValueWithRecordId<T>> {
        private ReadFn() {
        }

        @DoFn.ProcessElement
        @SuppressFBWarnings(value = {"RCN_REDUNDANT_NULLCHECK_WOULD_HAVE_BEEN_A_NPE"}, justification = "https://github.com/spotbugs/spotbugs/issues/756")
        public void process(@DoFn.Element Shard<T> shard, DoFn.OutputReceiver<ValueWithRecordId<T>> outputReceiver, PipelineOptions pipelineOptions) throws Exception {
            Instant plus = shard.getMaxReadTime() == null ? null : Instant.now().plus(shard.getMaxReadTime());
            if (shard.getMaxNumRecords() > 0) {
                if (shard.getMaxReadTime() == null || shard.getMaxReadTime().getMillis() != 0) {
                    UnboundedSource.UnboundedReader<T> createReader = ((UnboundedSource) SerializableUtils.clone(shard.getSource())).createReader(pipelineOptions, null);
                    Throwable th = null;
                    long j = 0;
                    while (j < shard.getMaxNumRecords()) {
                        try {
                            try {
                                if (!(j == 0 ? createReader.start() : createReader.advance()) && !advanceWithBackoff(createReader, plus)) {
                                    break;
                                }
                                outputReceiver.outputWithTimestamp(new ValueWithRecordId<>(createReader.getCurrent(), createReader.getCurrentRecordId()), createReader.getCurrentTimestamp());
                                j++;
                            } catch (Throwable th2) {
                                th = th2;
                                throw th2;
                            }
                        } catch (Throwable th3) {
                            if (createReader != null) {
                                if (th != null) {
                                    try {
                                        createReader.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    createReader.close();
                                }
                            }
                            throw th3;
                        }
                    }
                    createReader.getCheckpointMark().finalizeCheckpoint();
                    if (createReader != null) {
                        if (0 == 0) {
                            createReader.close();
                            return;
                        }
                        try {
                            createReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                }
            }
        }

        private boolean advanceWithBackoff(UnboundedSource.UnboundedReader<T> unboundedReader, Instant instant) throws IOException {
            BackOff backoff = BoundedReadFromUnboundedSource.BACKOFF_FACTORY.backoff();
            long nextBackOffMillis = backoff.nextBackOffMillis();
            while (true) {
                long j = nextBackOffMillis;
                if (j == -1) {
                    return false;
                }
                if (instant != null && Instant.now().isAfter(instant)) {
                    return false;
                }
                if (unboundedReader.advance()) {
                    return true;
                }
                Uninterruptibles.sleepUninterruptibly(j, TimeUnit.MILLISECONDS);
                nextBackOffMillis = backoff.nextBackOffMillis();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource$Shard.class */
    public static abstract class Shard<T> implements Serializable {

        /* JADX INFO: Access modifiers changed from: package-private */
        @AutoValue.Builder
        /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource$Shard$Builder.class */
        public static abstract class Builder<T> {
            abstract Builder<T> setSource(UnboundedSource<T, ?> unboundedSource);

            abstract Builder<T> setMaxNumRecords(long j);

            abstract Builder<T> setMaxReadTime(Duration duration);

            abstract Shard<T> build();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract UnboundedSource<T, ?> getSource();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract long getMaxNumRecords();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Duration getMaxReadTime();

        abstract Builder<T> toBuilder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/BoundedReadFromUnboundedSource$SplitFn.class */
    public static class SplitFn<T> extends DoFn<Shard<T>, Shard<T>> {
        private SplitFn() {
        }

        private static long[] splitNumRecords(long j, int i) {
            long[] jArr = new long[i];
            for (int i2 = 0; i2 < i; i2++) {
                jArr[i2] = j / i;
            }
            for (int i3 = 0; i3 < j % i; i3++) {
                jArr[i3] = jArr[i3] + 1;
            }
            return jArr;
        }

        private static int numInitialSplits(long j) {
            return (int) Math.min(100L, (j / 10000) + 1);
        }

        @DoFn.ProcessElement
        public void process(@DoFn.Element Shard<T> shard, DoFn.OutputReceiver<Shard<T>> outputReceiver, PipelineOptions pipelineOptions) throws Exception {
            List<? extends UnboundedSource<T, ?>> split = shard.getSource().split(numInitialSplits(shard.getMaxNumRecords()), pipelineOptions);
            int size = split.size();
            long[] splitNumRecords = splitNumRecords(shard.getMaxNumRecords(), size);
            for (int i = 0; i < size; i++) {
                outputReceiver.output(shard.toBuilder().setSource(split.get(i)).setMaxNumRecords(splitNumRecords[i]).setMaxReadTime(shard.getMaxReadTime()).build());
            }
        }
    }

    public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long j) {
        return new BoundedReadFromUnboundedSource<>(this.source, j, this.maxReadTime);
    }

    public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration duration) {
        return new BoundedReadFromUnboundedSource<>(this.source, this.maxNumRecords, duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BoundedReadFromUnboundedSource(UnboundedSource<T, ?> unboundedSource, long j, Duration duration) {
        this.source = unboundedSource;
        this.maxNumRecords = j;
        this.maxReadTime = duration;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public PCollection<T> expand(PBegin pBegin) {
        SerializableCoder of = SerializableCoder.of(Shard.class);
        PCollection<T> coder = ((PCollection) ((PCollection) ((PCollection) ((PCollection) pBegin.apply("Create", Create.of(new AutoValue_BoundedReadFromUnboundedSource_Shard.Builder().setSource(this.source).setMaxNumRecords(this.maxNumRecords).setMaxReadTime(this.maxReadTime).build(), new Shard[0]).withCoder(of))).apply("Split", ParDo.of(new SplitFn()))).setCoder(of).apply("Reshuffle", Reshuffle.viaRandomKey())).apply("Read", ParDo.of(new ReadFn()))).setCoder(ValueWithRecordId.ValueWithRecordIdCoder.of(this.source.getOutputCoder()));
        if (this.source.requiresDeduping()) {
            coder = (PCollection) coder.apply(Distinct.withRepresentativeValueFn((v0) -> {
                return v0.getId();
            }).withRepresentativeType(TypeDescriptor.of(byte[].class)));
        }
        return ((PCollection) coder.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn()))).setCoder(this.source.getOutputCoder());
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public String getKindString() {
        return String.format("Read(%s)", NameUtils.approximateSimpleName(this.source));
    }

    @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        builder.add(DisplayData.item(JsonConstants.ELT_SOURCE, this.source.getClass()).withLabel("Read Source")).addIfNotDefault(DisplayData.item("maxRecords", Long.valueOf(this.maxNumRecords)).withLabel("Maximum Read Records"), Long.MAX_VALUE).addIfNotNull(DisplayData.item("maxReadTime", this.maxReadTime).withLabel("Maximum Read Time")).include(JsonConstants.ELT_SOURCE, this.source);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 98245393:
                if (implMethodName.equals("getId")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/ValueWithRecordId") && serializedLambda.getImplMethodSignature().equals("()[B")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
