package org.apache.flink.streaming.api.datastream;

import java.util.ArrayList;
import java.util.Stack;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.util.Preconditions;

@Public
/* loaded from: input_file:org/apache/flink/streaming/api/datastream/KeyedStream.class */
public class KeyedStream<T, KEY> extends DataStream<T> {
    private final KeySelector<T, KEY> keySelector;
    private final TypeInformation<KEY> keyType;

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/streaming/api/datastream/KeyedStream$IntervalJoin.class */
    public static class IntervalJoin<T1, T2, KEY> {
        private final KeyedStream<T1, KEY> streamOne;
        private final KeyedStream<T2, KEY> streamTwo;

        IntervalJoin(KeyedStream<T1, KEY> keyedStream, KeyedStream<T2, KEY> keyedStream2) {
            this.streamOne = (KeyedStream) Preconditions.checkNotNull(keyedStream);
            this.streamTwo = (KeyedStream) Preconditions.checkNotNull(keyedStream2);
        }

        @PublicEvolving
        public IntervalJoined<T1, T2, KEY> between(Time time, Time time2) {
            if (this.streamOne.getExecutionEnvironment().getStreamTimeCharacteristic() != TimeCharacteristic.EventTime) {
                throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
            }
            Preconditions.checkNotNull(time, "A lower bound needs to be provided for a time-bounded join");
            Preconditions.checkNotNull(time2, "An upper bound needs to be provided for a time-bounded join");
            return new IntervalJoined<>(this.streamOne, this.streamTwo, time.toMilliseconds(), time2.toMilliseconds(), true, true);
        }
    }

    @PublicEvolving
    /* loaded from: input_file:org/apache/flink/streaming/api/datastream/KeyedStream$IntervalJoined.class */
    public static class IntervalJoined<IN1, IN2, KEY> {
        private final KeyedStream<IN1, KEY> left;
        private final KeyedStream<IN2, KEY> right;
        private final long lowerBound;
        private final long upperBound;
        private final KeySelector<IN1, KEY> keySelector1;
        private final KeySelector<IN2, KEY> keySelector2;
        private boolean lowerBoundInclusive;
        private boolean upperBoundInclusive;

        public IntervalJoined(KeyedStream<IN1, KEY> keyedStream, KeyedStream<IN2, KEY> keyedStream2, long j, long j2, boolean z, boolean z2) {
            this.left = (KeyedStream) Preconditions.checkNotNull(keyedStream);
            this.right = (KeyedStream) Preconditions.checkNotNull(keyedStream2);
            this.lowerBound = j;
            this.upperBound = j2;
            this.lowerBoundInclusive = z;
            this.upperBoundInclusive = z2;
            this.keySelector1 = keyedStream.getKeySelector();
            this.keySelector2 = keyedStream2.getKeySelector();
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
            this.upperBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
            this.lowerBoundInclusive = false;
            return this;
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
            Preconditions.checkNotNull(processJoinFunction);
            return process(processJoinFunction, TypeExtractor.getBinaryOperatorReturnType(processJoinFunction, ProcessJoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, this.left.getType(), this.right.getType(), Utils.getCallLocationName(), true));
        }

        @PublicEvolving
        public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> typeInformation) {
            Preconditions.checkNotNull(processJoinFunction);
            Preconditions.checkNotNull(typeInformation);
            return this.left.connect(this.right).keyBy((KeySelector) this.keySelector1, (KeySelector<R, ?>) this.keySelector2).transform("Interval Join", typeInformation, new IntervalJoinOperator(this.lowerBound, this.upperBound, this.lowerBoundInclusive, this.upperBoundInclusive, this.left.getType().createSerializer(this.left.getExecutionConfig()), this.right.getType().createSerializer(this.right.getExecutionConfig()), (ProcessJoinFunction) this.left.getExecutionEnvironment().clean(processJoinFunction)));
        }
    }

    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
        this(dataStream, keySelector, TypeExtractor.getKeySelectorTypes(keySelector, dataStream.getType()));
    }

    public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> typeInformation) {
        this(dataStream, new PartitionTransformation(dataStream.getTransformation(), new KeyGroupStreamPartitioner(keySelector, 128)), keySelector, typeInformation);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    public KeyedStream(DataStream<T> dataStream, PartitionTransformation<T> partitionTransformation, KeySelector<T, KEY> keySelector, TypeInformation<KEY> typeInformation) {
        super(dataStream.getExecutionEnvironment(), partitionTransformation);
        this.keySelector = (KeySelector) clean(keySelector);
        this.keyType = validateKeyType(typeInformation);
    }

    private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> typeInformation) {
        Stack stack = new Stack();
        stack.push(typeInformation);
        ArrayList arrayList = new ArrayList();
        while (!stack.isEmpty()) {
            TypeInformation<?> typeInformation2 = (TypeInformation) stack.pop();
            if (!validateKeyTypeIsHashable(typeInformation2)) {
                arrayList.add(typeInformation2);
            }
            if (typeInformation2 instanceof TupleTypeInfoBase) {
                for (int i = 0; i < typeInformation2.getArity(); i++) {
                    stack.push(((TupleTypeInfoBase) typeInformation2).getTypeAt(i));
                }
            }
        }
        if (arrayList.isEmpty()) {
            return typeInformation;
        }
        throw new InvalidProgramException("Type " + typeInformation + " cannot be used as key. Contained UNSUPPORTED key types: " + StringUtils.join(arrayList, ", ") + ". Look at the keyBy() documentation for the conditions a type has to satisfy in order to be eligible for a key.");
    }

    private boolean validateKeyTypeIsHashable(TypeInformation<?> typeInformation) {
        try {
            return typeInformation instanceof PojoTypeInfo ? !typeInformation.getTypeClass().getMethod("hashCode", new Class[0]).getDeclaringClass().equals(Object.class) : ((typeInformation instanceof PrimitiveArrayTypeInfo) || (typeInformation instanceof BasicArrayTypeInfo) || (typeInformation instanceof ObjectArrayTypeInfo)) ? false : true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }

    @Internal
    public KeySelector<T, KEY> getKeySelector() {
        return this.keySelector;
    }

    @Internal
    public TypeInformation<KEY> getKeyType() {
        return this.keyType;
    }

    @Override // org.apache.flink.streaming.api.datastream.DataStream
    protected DataStream<T> setConnectionType(StreamPartitioner<T> streamPartitioner) {
        throw new UnsupportedOperationException("Cannot override partitioning for KeyedStream.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.datastream.DataStream
    public <R> SingleOutputStreamOperator<R> doTransform(String str, TypeInformation<R> typeInformation, StreamOperatorFactory<R> streamOperatorFactory) {
        SingleOutputStreamOperator<R> doTransform = super.doTransform(str, typeInformation, streamOperatorFactory);
        OneInputTransformation oneInputTransformation = (OneInputTransformation) doTransform.getTransformation();
        oneInputTransformation.setStateKeySelector(this.keySelector);
        oneInputTransformation.setStateKeyType(this.keyType);
        return doTransform;
    }

    @Override // org.apache.flink.streaming.api.datastream.DataStream
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
        DataStreamSink<T> addSink = super.addSink(sinkFunction);
        addSink.getTransformation().setStateKeySelector(this.keySelector);
        addSink.getTransformation().setStateKeyType(this.keyType);
        return addSink;
    }

    @Override // org.apache.flink.streaming.api.datastream.DataStream
    @PublicEvolving
    @Deprecated
    public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
        return process(processFunction, TypeExtractor.getUnaryOperatorReturnType(processFunction, ProcessFunction.class, 0, 1, TypeExtractor.NO_INDEX, getType(), Utils.getCallLocationName(), true));
    }

    @Override // org.apache.flink.streaming.api.datastream.DataStream
    @Internal
    @Deprecated
    public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction, TypeInformation<R> typeInformation) {
        return transform("Process", typeInformation, new LegacyKeyedProcessOperator((ProcessFunction) clean(processFunction)));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction) {
        return process(keyedProcessFunction, TypeExtractor.getUnaryOperatorReturnType(keyedProcessFunction, KeyedProcessFunction.class, 1, 2, TypeExtractor.NO_INDEX, getType(), Utils.getCallLocationName(), true));
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<KEY, T, R> keyedProcessFunction, TypeInformation<R> typeInformation) {
        return transform("KeyedProcess", typeInformation, new KeyedProcessOperator((KeyedProcessFunction) clean(keyedProcessFunction)));
    }

    @PublicEvolving
    public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> keyedStream) {
        return new IntervalJoin<>(this, keyedStream);
    }

    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time time) {
        return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? (WindowedStream<T, KEY, TimeWindow>) window(TumblingProcessingTimeWindows.of(time)) : (WindowedStream<T, KEY, TimeWindow>) window(TumblingEventTimeWindows.of(time));
    }

    public WindowedStream<T, KEY, TimeWindow> timeWindow(Time time, Time time2) {
        return this.environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime ? (WindowedStream<T, KEY, TimeWindow>) window(SlidingProcessingTimeWindows.of(time, time2)) : (WindowedStream<T, KEY, TimeWindow>) window(SlidingEventTimeWindows.of(time, time2));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long j) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(j)));
    }

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long j, long j2) {
        return window(GlobalWindows.create()).evictor(CountEvictor.of(j)).trigger(CountTrigger.of(j2));
    }

    @PublicEvolving
    public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> windowAssigner) {
        return new WindowedStream<>(this, windowAssigner);
    }

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reduceFunction) {
        return (SingleOutputStreamOperator<T>) transform("Keyed Reduce", getType(), new StreamGroupedReduce((ReduceFunction) clean(reduceFunction), getType().createSerializer(getExecutionConfig())));
    }

    @Deprecated
    public <R> SingleOutputStreamOperator<R> fold(R r, FoldFunction<T, R> foldFunction) {
        return transform("Keyed Fold", TypeExtractor.getFoldReturnTypes((FoldFunction) clean(foldFunction), getType(), Utils.getCallLocationName(), true), new StreamGroupedFold((FoldFunction) clean(foldFunction), r));
    }

    public SingleOutputStreamOperator<T> sum(int i) {
        return aggregate(new SumAggregator(i, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String str) {
        return aggregate(new SumAggregator(str, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int i) {
        return aggregate(new ComparableAggregator(i, getType(), AggregationFunction.AggregationType.MIN, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String str) {
        return aggregate(new ComparableAggregator(str, (TypeInformation) getType(), AggregationFunction.AggregationType.MIN, false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int i) {
        return aggregate(new ComparableAggregator(i, getType(), AggregationFunction.AggregationType.MAX, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String str) {
        return aggregate(new ComparableAggregator(str, (TypeInformation) getType(), AggregationFunction.AggregationType.MAX, false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String str, boolean z) {
        return aggregate(new ComparableAggregator(str, getType(), AggregationFunction.AggregationType.MINBY, z, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String str, boolean z) {
        return aggregate(new ComparableAggregator(str, getType(), AggregationFunction.AggregationType.MAXBY, z, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int i) {
        return minBy(i, true);
    }

    public SingleOutputStreamOperator<T> minBy(String str) {
        return minBy(str, true);
    }

    public SingleOutputStreamOperator<T> minBy(int i, boolean z) {
        return aggregate(new ComparableAggregator(i, getType(), AggregationFunction.AggregationType.MINBY, z, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int i) {
        return maxBy(i, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String str) {
        return maxBy(str, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int i, boolean z) {
        return aggregate(new ComparableAggregator(i, getType(), AggregationFunction.AggregationType.MAXBY, z, getExecutionConfig()));
    }

    protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregationFunction) {
        return (SingleOutputStreamOperator<T>) transform("Keyed Aggregation", getType(), new StreamGroupedReduce((ReduceFunction) clean(aggregationFunction), getType().createSerializer(getExecutionConfig())));
    }

    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String str) {
        return asQueryableState(str, new ValueStateDescriptor<>(UUID.randomUUID().toString(), getType()));
    }

    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String str, ValueStateDescriptor<T> valueStateDescriptor) {
        transform("Queryable state: " + str, getType(), new QueryableValueStateOperator(str, valueStateDescriptor));
        valueStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        return new QueryableStateStream<>(str, valueStateDescriptor, getKeyType().createSerializer(getExecutionConfig()));
    }

    @PublicEvolving
    @Deprecated
    public <ACC> QueryableStateStream<KEY, ACC> asQueryableState(String str, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) {
        transform("Queryable state: " + str, getType(), new QueryableAppendingStateOperator(str, foldingStateDescriptor));
        foldingStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        return new QueryableStateStream<>(str, foldingStateDescriptor, getKeyType().createSerializer(getExecutionConfig()));
    }

    @PublicEvolving
    public QueryableStateStream<KEY, T> asQueryableState(String str, ReducingStateDescriptor<T> reducingStateDescriptor) {
        transform("Queryable state: " + str, getType(), new QueryableAppendingStateOperator(str, reducingStateDescriptor));
        reducingStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        return new QueryableStateStream<>(str, reducingStateDescriptor, getKeyType().createSerializer(getExecutionConfig()));
    }
}
