package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.util.Disposable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/BackendRestorerProcedure.class */
public class BackendRestorerProcedure<T extends Closeable & Disposable, S extends StateObject> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BackendRestorerProcedure.class);
    private final FunctionWithException<Collection<S>, T, Exception> instanceSupplier;
    private final CloseableRegistry backendCloseableRegistry;
    private final String logDescription;

    public BackendRestorerProcedure(@Nonnull FunctionWithException<Collection<S>, T, Exception> functionWithException, @Nonnull CloseableRegistry closeableRegistry, @Nonnull String str) {
        this.instanceSupplier = (FunctionWithException) Preconditions.checkNotNull(functionWithException);
        this.backendCloseableRegistry = (CloseableRegistry) Preconditions.checkNotNull(closeableRegistry);
        this.logDescription = str;
    }

    @Nonnull
    public T createAndRestore(@Nonnull List<? extends Collection<S>> list) throws Exception {
        if (list.isEmpty()) {
            list = Collections.singletonList(Collections.emptyList());
        }
        int i = 0;
        Exception exc = null;
        while (i < list.size()) {
            Collection<S> collection = list.get(i);
            i++;
            if (collection.isEmpty()) {
                LOG.debug("Creating {} with empty state.", this.logDescription);
            } else if (LOG.isTraceEnabled()) {
                LOG.trace("Creating {} and restoring with state {} from alternative ({}/{}).", this.logDescription, collection, Integer.valueOf(i), Integer.valueOf(list.size()));
            } else {
                LOG.debug("Creating {} and restoring with state from alternative ({}/{}).", this.logDescription, Integer.valueOf(i), Integer.valueOf(list.size()));
            }
            try {
                return attemptCreateAndRestore(collection);
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
                LOG.warn("Exception while restoring {} from alternative ({}/{}), will retry while more alternatives are available.", this.logDescription, Integer.valueOf(i), Integer.valueOf(list.size()), e);
                if (this.backendCloseableRegistry.isClosed()) {
                    throw new FlinkException("Stopping restore attempts for already cancelled task.", exc);
                }
            }
        }
        throw new FlinkException("Could not restore " + this.logDescription + " from any of the " + list.size() + " provided restore options.", exc);
    }

    private T attemptCreateAndRestore(Collection<S> collection) throws Exception {
        T apply = this.instanceSupplier.apply(collection);
        try {
            this.backendCloseableRegistry.registerCloseable(apply);
            return apply;
        } catch (Exception e) {
            e = e;
            try {
                apply.dispose();
            } catch (Exception e2) {
                e = (Exception) ExceptionUtils.firstOrSuppressed(e2, e);
            }
            throw e;
        }
    }
}
