/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.execution.streaming.state;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider;
import org.apache.spark.sql.execution.streaming.state.StateStore;
import org.apache.spark.sql.execution.streaming.state.StateStoreConf;
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef;
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.state.StateStoreId;
import org.apache.spark.sql.execution.streaming.state.StateStoreProvider;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.ThreadUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

public final class StateStore$
implements Logging {
    public static final StateStore$ MODULE$;
    private final String MAINTENANCE_INTERVAL_CONFIG;
    private final int MAINTENANCE_INTERVAL_DEFAULT_SECS;
    private final HashMap<StateStoreId, StateStoreProvider> loadedProviders;
    private final ScheduledExecutorService maintenanceTaskExecutor;
    private volatile ScheduledFuture<?> maintenanceTask;
    private volatile StateStoreCoordinatorRef org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private transient int org$apache$spark$internal$Logging$$levelFlags;

    static {
        new StateStore$();
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public int org$apache$spark$internal$Logging$$levelFlags() {
        return this.org$apache$spark$internal$Logging$$levelFlags;
    }

    public void org$apache$spark$internal$Logging$$levelFlags_$eq(int x$1) {
        this.org$apache$spark$internal$Logging$$levelFlags = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public final boolean isInfoEnabled() {
        return Logging.class.isInfoEnabled((Logging)this);
    }

    public final boolean isDebugEnabled() {
        return Logging.class.isDebugEnabled((Logging)this);
    }

    public final boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter);
    }

    public String MAINTENANCE_INTERVAL_CONFIG() {
        return this.MAINTENANCE_INTERVAL_CONFIG;
    }

    public int MAINTENANCE_INTERVAL_DEFAULT_SECS() {
        return this.MAINTENANCE_INTERVAL_DEFAULT_SECS;
    }

    private HashMap<StateStoreId, StateStoreProvider> loadedProviders() {
        return this.loadedProviders;
    }

    private ScheduledExecutorService maintenanceTaskExecutor() {
        return this.maintenanceTaskExecutor;
    }

    private ScheduledFuture<?> maintenanceTask() {
        return this.maintenanceTask;
    }

    private void maintenanceTask_$eq(ScheduledFuture<?> x$1) {
        this.maintenanceTask = x$1;
    }

    public StateStoreCoordinatorRef org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef() {
        return this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef;
    }

    private void org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef_$eq(StateStoreCoordinatorRef x$1) {
        this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef = x$1;
    }

    public StateStore get(StateStoreId storeId, StructType keySchema, StructType valueSchema, long version, StateStoreConf storeConf, Configuration hadoopConf) {
        Predef$.MODULE$.require(version >= 0L);
        HashMap<StateStoreId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            this.startMaintenanceIfNeeded();
            StateStoreProvider provider = (StateStoreProvider)this.loadedProviders().getOrElseUpdate((Object)storeId, (Function0)new Serializable(storeId, keySchema, valueSchema, storeConf, hadoopConf){
                public static final long serialVersionUID = 0L;
                private final StateStoreId storeId$1;
                private final StructType keySchema$1;
                private final StructType valueSchema$1;
                private final StateStoreConf storeConf$1;
                private final Configuration hadoopConf$1;

                public final HDFSBackedStateStoreProvider apply() {
                    return new HDFSBackedStateStoreProvider(this.storeId$1, this.keySchema$1, this.valueSchema$1, this.storeConf$1, this.hadoopConf$1);
                }
                {
                    this.storeId$1 = storeId$1;
                    this.keySchema$1 = keySchema$1;
                    this.valueSchema$1 = valueSchema$1;
                    this.storeConf$1 = storeConf$1;
                    this.hadoopConf$1 = hadoopConf$1;
                }
            });
            this.reportActiveStoreInstance(storeId);
            StateStoreProvider stateStoreProvider = provider;
            // MONITOREXIT @DISABLED, blocks:[0, 1] lbl9 : MonitorExitStatement: MONITOREXIT : var9_7
            StateStoreProvider storeProvider = stateStoreProvider;
            return storeProvider.getStore(version);
        }
    }

    public void unload(StateStoreId storeId) {
        HashMap<StateStoreId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            this.loadedProviders().remove((Object)storeId);
            return;
        }
    }

    public boolean isLoaded(StateStoreId storeId) {
        HashMap<StateStoreId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            Boolean bl = BoxesRunTime.boxToBoolean((boolean)this.loadedProviders().contains((Object)storeId));
            return BoxesRunTime.unboxToBoolean((Object)bl);
        }
    }

    public boolean isMaintenanceRunning() {
        HashMap<StateStoreId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            Boolean bl = BoxesRunTime.boxToBoolean((this.maintenanceTask() != null ? 1 : 0) != 0);
            return BoxesRunTime.unboxToBoolean((Object)bl);
        }
    }

    public void stop() {
        HashMap<StateStoreId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            this.loadedProviders().clear();
            this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef_$eq(null);
            if (this.maintenanceTask() != null) {
                this.maintenanceTask().cancel(false);
                this.maintenanceTask_$eq(null);
            }
            this.logInfo((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "StateStore stopped";
                }
            });
            return;
        }
    }

    private void startMaintenanceIfNeeded() {
        HashMap<StateStoreId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            BoxedUnit boxedUnit;
            SparkEnv env = SparkEnv$.MODULE$.get();
            if (this.maintenanceTask() == null && env != null) {
                long periodMs = env.conf().getTimeAsMs(this.MAINTENANCE_INTERVAL_CONFIG(), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "s"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.MAINTENANCE_INTERVAL_DEFAULT_SECS())})));
                Runnable runnable = new Runnable(){

                    public void run() {
                        StateStore$.MODULE$.org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance();
                    }
                };
                this.maintenanceTask_$eq(this.maintenanceTaskExecutor().scheduleAtFixedRate(runnable, periodMs, periodMs, TimeUnit.MILLISECONDS));
                this.logInfo((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "State Store maintenance task started";
                    }
                });
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void org$apache$spark$sql$execution$streaming$state$StateStore$$doMaintenance() {
        Seq seq;
        this.logDebug((Function0<String>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Doing maintenance";
            }
        });
        if (SparkEnv$.MODULE$.get() == null) {
            this.stop();
            return;
        }
        HashMap<StateStoreId, StateStoreProvider> hashMap = this.loadedProviders();
        synchronized (hashMap) {
            seq = this.loadedProviders().toSeq();
        }
        ((IterableLike)seq).foreach((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            /*
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            public final void apply(Tuple2<StateStoreId, StateStoreProvider> x0$1) {
                BoxedUnit boxedUnit;
                Tuple2<StateStoreId, StateStoreProvider> tuple2 = x0$1;
                if (tuple2 == null) throw new MatchError(tuple2);
                StateStoreId id = (StateStoreId)tuple2._1();
                StateStoreProvider provider = (StateStoreProvider)tuple2._2();
                try {
                    if (StateStore$.MODULE$.org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(id)) {
                        provider.doMaintenance();
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        StateStore$.MODULE$.unload(id);
                        StateStore$.MODULE$.logInfo((Function0<String>)new Serializable(this, provider){
                            public static final long serialVersionUID = 0L;
                            private final StateStoreProvider provider$1;

                            public final String apply() {
                                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unloaded ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.provider$1}));
                            }
                            {
                                this.provider$1 = provider$1;
                            }
                        });
                        boxedUnit = BoxedUnit.UNIT;
                    }
                }
                catch (Throwable throwable) {
                    Throwable throwable2 = throwable;
                    Option option = NonFatal$.MODULE$.unapply(throwable2);
                    if (option.isEmpty()) {
                        throw throwable;
                    }
                    StateStore$.MODULE$.logWarning((Function0<String>)new Serializable(this, provider){
                        public static final long serialVersionUID = 0L;
                        private final StateStoreProvider provider$1;

                        public final String apply() {
                            return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error managing ", ", stopping management thread"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.provider$1}));
                        }
                        {
                            this.provider$1 = provider$1;
                        }
                    });
                    StateStore$.MODULE$.stop();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit3 = boxedUnit;
            }
        });
    }

    private void reportActiveStoreInstance(StateStoreId storeId) {
        if (SparkEnv$.MODULE$.get() != null) {
            String host = SparkEnv$.MODULE$.get().blockManager().blockManagerId().host();
            String executorId = SparkEnv$.MODULE$.get().blockManager().blockManagerId().executorId();
            this.coordinatorRef().foreach((Function1)new Serializable(storeId, host, executorId){
                public static final long serialVersionUID = 0L;
                private final StateStoreId storeId$2;
                private final String host$1;
                private final String executorId$1;

                public final void apply(StateStoreCoordinatorRef x$1) {
                    x$1.reportActiveInstance(this.storeId$2, this.host$1, this.executorId$1);
                }
                {
                    this.storeId$2 = storeId$2;
                    this.host$1 = host$1;
                    this.executorId$1 = executorId$1;
                }
            });
            this.logDebug((Function0<String>)new Serializable(storeId){
                public static final long serialVersionUID = 0L;
                private final StateStoreId storeId$2;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reported that the loaded instance ", " is active"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.storeId$2}));
                }
                {
                    this.storeId$2 = storeId$2;
                }
            });
        }
    }

    /*
     * WARNING - void declaration
     */
    public boolean org$apache$spark$sql$execution$streaming$state$StateStore$$verifyIfStoreInstanceActive(StateStoreId storeId) {
        boolean bl;
        if (SparkEnv$.MODULE$.get() == null) {
            bl = false;
        } else {
            void var3_3;
            String executorId = SparkEnv$.MODULE$.get().blockManager().blockManagerId().executorId();
            boolean verified = BoxesRunTime.unboxToBoolean((Object)this.coordinatorRef().map((Function1)new Serializable(storeId, executorId){
                public static final long serialVersionUID = 0L;
                private final StateStoreId storeId$3;
                private final String executorId$2;

                public final boolean apply(StateStoreCoordinatorRef x$2) {
                    return x$2.verifyIfInstanceActive(this.storeId$3, this.executorId$2);
                }
                {
                    this.storeId$3 = storeId$3;
                    this.executorId$2 = executorId$2;
                }
            }).getOrElse((Function0)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final boolean apply() {
                    return this.apply$mcZ$sp();
                }

                public boolean apply$mcZ$sp() {
                    return false;
                }
            }));
            this.logDebug((Function0<String>)new Serializable(storeId, verified){
                public static final long serialVersionUID = 0L;
                private final StateStoreId storeId$3;
                private final boolean verified$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Verified whether the loaded instance ", " is active: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.storeId$3, BoxesRunTime.boxToBoolean((boolean)this.verified$1)}));
                }
                {
                    this.storeId$3 = storeId$3;
                    this.verified$1 = verified$1;
                }
            });
            bl = var3_3;
        }
        return bl;
    }

    private synchronized Option<StateStoreCoordinatorRef> coordinatorRef() {
        None$ none$;
        SparkEnv env = SparkEnv$.MODULE$.get();
        if (env == null) {
            this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef_$eq(null);
            none$ = None$.MODULE$;
        } else {
            if (this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef() == null) {
                this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef_$eq(StateStoreCoordinatorRef$.MODULE$.forExecutor(env));
            }
            this.logDebug((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Retrieved reference to StateStoreCoordinator: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{StateStore$.MODULE$.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef()}));
                }
            });
            none$ = new Some((Object)this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef());
        }
        return none$;
    }

    private StateStore$() {
        MODULE$ = this;
        Logging.class.$init$((Logging)this);
        this.MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval";
        this.MAINTENANCE_INTERVAL_DEFAULT_SECS = 60;
        this.loadedProviders = new HashMap();
        this.maintenanceTaskExecutor = ThreadUtils$.MODULE$.newDaemonSingleThreadScheduledExecutor("state-store-maintenance-task");
        this.maintenanceTask = null;
        this.org$apache$spark$sql$execution$streaming$state$StateStore$$_coordRef = null;
    }
}

