package org.apache.beam.runners.flink.translation.wrappers.streaming;

import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/KeyedPushedBackElementsHandler.class */
class KeyedPushedBackElementsHandler<K, T> implements PushedBackElementsHandler<T> {
    private final KeySelector<T, K> keySelector;
    private final KeyedStateBackend<K> backend;
    private final String stateName;
    private final ListState<T> state;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, T> KeyedPushedBackElementsHandler<K, T> create(KeySelector<T, K> keySelector, KeyedStateBackend<K> keyedStateBackend, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        return new KeyedPushedBackElementsHandler<>(keySelector, keyedStateBackend, listStateDescriptor);
    }

    private KeyedPushedBackElementsHandler(KeySelector<T, K> keySelector, KeyedStateBackend<K> keyedStateBackend, ListStateDescriptor<T> listStateDescriptor) throws Exception {
        this.keySelector = (KeySelector) Objects.requireNonNull(keySelector);
        this.backend = (KeyedStateBackend) Objects.requireNonNull(keyedStateBackend);
        this.stateName = listStateDescriptor.getName();
        this.state = (ListState) keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.PushedBackElementsHandler
    public Stream<T> getElements() {
        return (Stream<T>) this.backend.getKeys(this.stateName, VoidNamespace.INSTANCE).flatMap(obj -> {
            try {
                this.backend.setCurrentKey(obj);
                return StreamSupport.stream(this.state.get().spliterator(), false);
            } catch (Exception e) {
                throw new RuntimeException("Error reading keyed state.", e);
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.PushedBackElementsHandler
    public void clear() throws Exception {
        Iterator it2 = ((List) this.backend.getKeys(this.stateName, VoidNamespace.INSTANCE).collect(Collectors.toList())).iterator();
        while (it2.hasNext()) {
            this.backend.setCurrentKey(it2.next());
            this.state.clear();
        }
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.PushedBackElementsHandler
    public void pushBack(T t) throws Exception {
        this.backend.setCurrentKey(this.keySelector.getKey(t));
        this.state.add(t);
    }

    @Override // org.apache.beam.runners.flink.translation.wrappers.streaming.PushedBackElementsHandler
    public void pushBackAll(Iterable<T> iterable) throws Exception {
        Iterator<T> it2 = iterable.iterator();
        while (it2.hasNext()) {
            pushBack(it2.next());
        }
    }
}
