package org.apache.beam.runners.spark.stateful;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.HashBasedTable;
import org.apache.beam.repackaged.beam_runners_spark.com.google.common.collect.Table;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.SetState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateContext;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals.class */
public class SparkStateInternals<K> implements StateInternals {
    private final K key;
    private final Table<String, String, byte[]> stateTable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$AbstractState.class */
    public class AbstractState<T> {
        final StateNamespace namespace;
        final StateTag<? extends State> address;
        final Coder<T> coder;

        private AbstractState(StateNamespace stateNamespace, StateTag<? extends State> stateTag, Coder<T> coder) {
            this.namespace = stateNamespace;
            this.address = stateTag;
            this.coder = coder;
        }

        T readValue() {
            byte[] bArr = (byte[]) SparkStateInternals.this.stateTable.get(this.namespace.stringKey(), this.address.getId());
            if (bArr != null) {
                return (T) CoderHelpers.fromByteArray(bArr, this.coder);
            }
            return null;
        }

        void writeValue(T t) {
            SparkStateInternals.this.stateTable.put(this.namespace.stringKey(), this.address.getId(), CoderHelpers.toByteArray(t, this.coder));
        }

        public void clear() {
            SparkStateInternals.this.stateTable.remove(this.namespace.stringKey(), this.address.getId());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AbstractState abstractState = (AbstractState) obj;
            return this.namespace.equals(abstractState.namespace) && this.address.equals(abstractState.address);
        }

        public int hashCode() {
            return (31 * this.namespace.hashCode()) + this.address.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkBagState.class */
    public final class SparkBagState<T> extends SparkStateInternals<K>.AbstractState<List<T>> implements BagState<T> {
        private SparkBagState(StateNamespace stateNamespace, StateTag<BagState<T>> stateTag, Coder<T> coder) {
            super(stateNamespace, stateTag, ListCoder.of(coder));
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.sdk.state.BagState, org.apache.beam.sdk.state.GroupingState, org.apache.beam.sdk.state.ReadableState
        public SparkStateInternals<K>.SparkBagState<T> readLater() {
            return this;
        }

        @Override // org.apache.beam.sdk.state.BagState, org.apache.beam.sdk.state.ReadableState
        public List<T> read() {
            List<T> list = (List) super.readValue();
            if (list == null) {
                list = new ArrayList();
            }
            return list;
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public void add(T t) {
            List<T> read = read();
            read.add(t);
            writeValue(read);
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.spark.stateful.SparkStateInternals.SparkBagState.1
                @Override // org.apache.beam.sdk.state.ReadableState
                public ReadableState<Boolean> readLater() {
                    return this;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.beam.sdk.state.ReadableState
                public Boolean read() {
                    return Boolean.valueOf(SparkStateInternals.this.stateTable.get(SparkBagState.this.namespace.stringKey(), SparkBagState.this.address.getId()) == null);
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkCombiningState.class */
    public class SparkCombiningState<K, InputT, AccumT, OutputT> extends SparkStateInternals<K>.AbstractState<AccumT> implements CombiningState<InputT, AccumT, OutputT> {
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;

        private SparkCombiningState(StateNamespace stateNamespace, StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            super(stateNamespace, stateTag, coder);
            this.combineFn = combineFn;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.sdk.state.CombiningState, org.apache.beam.sdk.state.GroupingState, org.apache.beam.sdk.state.ReadableState
        public SparkStateInternals<K>.SparkCombiningState<K, InputT, AccumT, OutputT> readLater() {
            return this;
        }

        @Override // org.apache.beam.sdk.state.CombiningState, org.apache.beam.sdk.state.ReadableState
        public OutputT read() {
            return this.combineFn.extractOutput(getAccum());
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public void add(InputT inputt) {
            AccumT accum = getAccum();
            this.combineFn.addInput(accum, inputt);
            writeValue(accum);
        }

        @Override // org.apache.beam.sdk.state.CombiningState
        public AccumT getAccum() {
            AccumT readValue = readValue();
            if (readValue == null) {
                readValue = this.combineFn.createAccumulator();
            }
            return readValue;
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.spark.stateful.SparkStateInternals.SparkCombiningState.1
                @Override // org.apache.beam.sdk.state.ReadableState
                public ReadableState<Boolean> readLater() {
                    return this;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.beam.sdk.state.ReadableState
                public Boolean read() {
                    return Boolean.valueOf(SparkStateInternals.this.stateTable.get(SparkCombiningState.this.namespace.stringKey(), SparkCombiningState.this.address.getId()) == null);
                }
            };
        }

        @Override // org.apache.beam.sdk.state.CombiningState
        public void addAccum(AccumT accumt) {
            writeValue(this.combineFn.mergeAccumulators(Arrays.asList(getAccum(), accumt)));
        }

        @Override // org.apache.beam.sdk.state.CombiningState
        public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
            return this.combineFn.mergeAccumulators(iterable);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkStateBinder.class */
    private class SparkStateBinder implements StateTag.StateBinder {
        private final StateNamespace namespace;
        private final StateContext<?> c;

        private SparkStateBinder(StateNamespace stateNamespace, StateContext<?> stateContext) {
            this.namespace = stateNamespace;
            this.c = stateContext;
        }

        @Override // org.apache.beam.runners.core.StateTag.StateBinder
        public <T> ValueState<T> bindValue(StateTag<ValueState<T>> stateTag, Coder<T> coder) {
            return new SparkValueState(this.namespace, stateTag, coder);
        }

        @Override // org.apache.beam.runners.core.StateTag.StateBinder
        public <T> BagState<T> bindBag(StateTag<BagState<T>> stateTag, Coder<T> coder) {
            return new SparkBagState(this.namespace, stateTag, coder);
        }

        @Override // org.apache.beam.runners.core.StateTag.StateBinder
        public <T> SetState<T> bindSet(StateTag<SetState<T>> stateTag, Coder<T> coder) {
            throw new UnsupportedOperationException(String.format("%s is not supported", SetState.class.getSimpleName()));
        }

        @Override // org.apache.beam.runners.core.StateTag.StateBinder
        public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(StateTag<MapState<KeyT, ValueT>> stateTag, Coder<KeyT> coder, Coder<ValueT> coder2) {
            throw new UnsupportedOperationException(String.format("%s is not supported", MapState.class.getSimpleName()));
        }

        @Override // org.apache.beam.runners.core.StateTag.StateBinder
        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
            return new SparkCombiningState(this.namespace, stateTag, coder, combineFn);
        }

        @Override // org.apache.beam.runners.core.StateTag.StateBinder
        public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValueWithContext(StateTag<CombiningState<InputT, AccumT, OutputT>> stateTag, Coder<AccumT> coder, CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
            return new SparkCombiningState(this.namespace, stateTag, coder, CombineFnUtil.bindContext(combineFnWithContext, this.c));
        }

        @Override // org.apache.beam.runners.core.StateTag.StateBinder
        public WatermarkHoldState bindWatermark(StateTag<WatermarkHoldState> stateTag, TimestampCombiner timestampCombiner) {
            return new SparkWatermarkHoldState(this.namespace, stateTag, timestampCombiner);
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkValueState.class */
    private class SparkValueState<T> extends SparkStateInternals<K>.AbstractState<T> implements ValueState<T> {
        private SparkValueState(StateNamespace stateNamespace, StateTag<ValueState<T>> stateTag, Coder<T> coder) {
            super(stateNamespace, stateTag, coder);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.beam.sdk.state.ValueState, org.apache.beam.sdk.state.ReadableState
        public SparkStateInternals<K>.SparkValueState<T> readLater() {
            return this;
        }

        @Override // org.apache.beam.sdk.state.ReadableState
        public T read() {
            return readValue();
        }

        @Override // org.apache.beam.sdk.state.ValueState
        public void write(T t) {
            writeValue(t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/stateful/SparkStateInternals$SparkWatermarkHoldState.class */
    public class SparkWatermarkHoldState extends SparkStateInternals<K>.AbstractState<Instant> implements WatermarkHoldState {
        private final TimestampCombiner timestampCombiner;

        public SparkWatermarkHoldState(StateNamespace stateNamespace, StateTag<WatermarkHoldState> stateTag, TimestampCombiner timestampCombiner) {
            super(stateNamespace, stateTag, InstantCoder.of());
            this.timestampCombiner = timestampCombiner;
        }

        @Override // org.apache.beam.sdk.state.WatermarkHoldState, org.apache.beam.sdk.state.GroupingState, org.apache.beam.sdk.state.ReadableState
        public SparkStateInternals<K>.SparkWatermarkHoldState readLater() {
            return this;
        }

        @Override // org.apache.beam.sdk.state.ReadableState
        public Instant read() {
            return readValue();
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public void add(Instant instant) {
            Instant read = read();
            writeValue(read == null ? instant : getTimestampCombiner().combine(read, instant));
        }

        @Override // org.apache.beam.sdk.state.GroupingState
        public ReadableState<Boolean> isEmpty() {
            return new ReadableState<Boolean>() { // from class: org.apache.beam.runners.spark.stateful.SparkStateInternals.SparkWatermarkHoldState.1
                @Override // org.apache.beam.sdk.state.ReadableState
                public ReadableState<Boolean> readLater() {
                    return this;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.beam.sdk.state.ReadableState
                public Boolean read() {
                    return Boolean.valueOf(SparkStateInternals.this.stateTable.get(SparkWatermarkHoldState.this.namespace.stringKey(), SparkWatermarkHoldState.this.address.getId()) == null);
                }
            };
        }

        @Override // org.apache.beam.sdk.state.WatermarkHoldState
        public TimestampCombiner getTimestampCombiner() {
            return this.timestampCombiner;
        }
    }

    private SparkStateInternals(K k) {
        this.key = k;
        this.stateTable = HashBasedTable.create();
    }

    private SparkStateInternals(K k, Table<String, String, byte[]> table) {
        this.key = k;
        this.stateTable = table;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> SparkStateInternals<K> forKey(K k) {
        return new SparkStateInternals<>(k);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> SparkStateInternals<K> forKeyAndState(K k, Table<String, String, byte[]> table) {
        return new SparkStateInternals<>(k, table);
    }

    public Table<String, String, byte[]> getState() {
        return this.stateTable;
    }

    @Override // org.apache.beam.runners.core.StateInternals
    public K getKey() {
        return this.key;
    }

    @Override // org.apache.beam.runners.core.StateInternals
    public <T extends State> T state(StateNamespace stateNamespace, StateTag<T> stateTag, StateContext<?> stateContext) {
        return stateTag.bind(new SparkStateBinder(stateNamespace, stateContext));
    }
}
