package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.OperatorStateHandle;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.class */
public class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
    private final ClassLoader userClassLoader;
    private final boolean asynchronousSnapshots;
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final CloseableRegistry closeStreamOnCancelRegistry;

    /* JADX INFO: Access modifiers changed from: protected */
    public DefaultOperatorStateBackendSnapshotStrategy(ClassLoader classLoader, boolean z, Map<String, PartitionableListState<?>> map, Map<String, BackendWritableBroadcastState<?, ?>> map2, CloseableRegistry closeableRegistry) {
        super("DefaultOperatorStateBackend snapshot");
        this.userClassLoader = classLoader;
        this.asynchronousSnapshots = z;
        this.registeredOperatorStates = map;
        this.registeredBroadcastStates = map2;
        this.closeStreamOnCancelRegistry = closeableRegistry;
    }

    @Override // org.apache.flink.runtime.state.SnapshotStrategy
    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long j, long j2, @Nonnull final CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws IOException {
        if (this.registeredOperatorStates.isEmpty() && this.registeredBroadcastStates.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }
        final HashMap hashMap = new HashMap(this.registeredOperatorStates.size());
        final HashMap hashMap2 = new HashMap(this.registeredBroadcastStates.size());
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.userClassLoader);
        try {
            if (!this.registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> value = entry.getValue();
                    if (null != value) {
                        value = value.deepCopy();
                    }
                    hashMap.put(entry.getKey(), value);
                }
            }
            if (!this.registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry2 : this.registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState<?, ?> value2 = entry2.getValue();
                    if (null != value2) {
                        value2 = value2.deepCopy();
                    }
                    hashMap2.put(entry2.getKey(), value2);
                }
            }
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>.AsyncSnapshotTask asyncSnapshotFutureTask = new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() { // from class: org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.1
                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
                public SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
                    CheckpointStreamFactory.CheckpointStateOutputStream createCheckpointStateOutputStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                    this.snapshotCloseableRegistry.registerCloseable(createCheckpointStateOutputStream);
                    ArrayList arrayList = new ArrayList(hashMap.size());
                    Iterator it2 = hashMap.entrySet().iterator();
                    while (it2.hasNext()) {
                        arrayList.add(((PartitionableListState) ((Map.Entry) it2.next()).getValue()).getStateMetaInfo().snapshot());
                    }
                    ArrayList arrayList2 = new ArrayList(hashMap2.size());
                    Iterator it3 = hashMap2.entrySet().iterator();
                    while (it3.hasNext()) {
                        arrayList2.add(((BackendWritableBroadcastState) ((Map.Entry) it3.next()).getValue()).getStateMetaInfo().snapshot());
                    }
                    new OperatorBackendSerializationProxy(arrayList, arrayList2).write(new DataOutputViewStreamWrapper(createCheckpointStateOutputStream));
                    HashMap hashMap3 = new HashMap(hashMap.size() + hashMap2.size());
                    for (Map.Entry entry3 : hashMap.entrySet()) {
                        PartitionableListState partitionableListState = (PartitionableListState) entry3.getValue();
                        hashMap3.put(entry3.getKey(), new OperatorStateHandle.StateMetaInfo(partitionableListState.write(createCheckpointStateOutputStream), partitionableListState.getStateMetaInfo().getAssignmentMode()));
                    }
                    for (Map.Entry entry4 : hashMap2.entrySet()) {
                        BackendWritableBroadcastState backendWritableBroadcastState = (BackendWritableBroadcastState) entry4.getValue();
                        hashMap3.put(entry4.getKey(), new OperatorStateHandle.StateMetaInfo(new long[]{backendWritableBroadcastState.write(createCheckpointStateOutputStream)}, backendWritableBroadcastState.getStateMetaInfo().getAssignmentMode()));
                    }
                    if (!this.snapshotCloseableRegistry.unregisterCloseable(createCheckpointStateOutputStream)) {
                        throw new IOException("Stream was already unregistered.");
                    }
                    StreamStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
                    return SnapshotResult.of(closeAndGetHandle != null ? new OperatorStreamStateHandle(hashMap3, closeAndGetHandle) : null);
                }

                @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
                protected void cleanupProvidedResources() {
                }

                @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
                protected void logAsyncSnapshotComplete(long j3) {
                    if (DefaultOperatorStateBackendSnapshotStrategy.this.asynchronousSnapshots) {
                        DefaultOperatorStateBackendSnapshotStrategy.this.logAsyncCompleted(checkpointStreamFactory, j3);
                    }
                }
            }.toAsyncSnapshotFutureTask(this.closeStreamOnCancelRegistry);
            if (!this.asynchronousSnapshots) {
                asyncSnapshotFutureTask.run();
            }
            return asyncSnapshotFutureTask;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }
}
