package org.mbari.vcr4j.decorators;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.subjects.Subject;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.mbari.vcr4j.VideoCommand;
import org.mbari.vcr4j.VideoError;
import org.mbari.vcr4j.VideoIO;
import org.mbari.vcr4j.VideoIndex;
import org.mbari.vcr4j.VideoState;

/* loaded from: input_file:org/mbari/vcr4j/decorators/SchedulerVideoIO.class */
public class SchedulerVideoIO<S extends VideoState, E extends VideoError> implements VideoIO<S, E>, Decorator {
    private final VideoIO io;
    private final Observable<E> errorObservable;
    private final Observable<S> stateObservable;
    private final Observable<VideoIndex> indexObservable;
    private final SchedulerVideoIO<S, E>.CommandQueue commandQueue;
    private final Subject<VideoCommand<?>> commandSubject;
    private final Scheduler scheduler;
    private final Observer<VideoCommand<?>> commandObserver;
    private final List<Disposable> disposables;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mbari/vcr4j/decorators/SchedulerVideoIO$CommandQueue.class */
    public class CommandQueue {
        final Thread thread;
        final BlockingQueue<VideoCommand> pendingQueue = new LinkedBlockingQueue();
        AtomicBoolean isRunning = new AtomicBoolean(true);
        final Runnable runnable = () -> {
            while (this.isRunning.get()) {
                VideoCommand videoCommand = null;
                try {
                    videoCommand = this.pendingQueue.poll(3600L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                }
                if (videoCommand != null) {
                    SchedulerVideoIO.this.io.getCommandSubject().onNext(videoCommand);
                }
            }
        };

        void kill() {
            this.isRunning.set(false);
        }

        void send(VideoCommand videoCommand) {
            this.pendingQueue.offer(videoCommand);
        }

        public CommandQueue() {
            this.thread = new Thread(this.runnable, SchedulerVideoIO.this.getClass().getSimpleName());
            this.thread.setDaemon(true);
            this.thread.start();
        }
    }

    public SchedulerVideoIO(VideoIO<S, E> videoIO, Executor executor) {
        this(videoIO, Schedulers.from(executor));
    }

    public SchedulerVideoIO(final VideoIO<S, E> videoIO, Scheduler scheduler) {
        this.commandQueue = new CommandQueue();
        this.disposables = new ArrayList();
        this.io = videoIO;
        this.scheduler = scheduler;
        this.errorObservable = videoIO.getErrorObservable().observeOn(scheduler);
        this.stateObservable = videoIO.getStateObservable().observeOn(scheduler);
        this.indexObservable = videoIO.getIndexObservable().observeOn(scheduler);
        this.commandObserver = new Observer<VideoCommand<?>>() { // from class: org.mbari.vcr4j.decorators.SchedulerVideoIO.1
            public void onComplete() {
                videoIO.getCommandSubject().onComplete();
            }

            public void onError(Throwable th) {
                videoIO.getCommandSubject().onError(th);
            }

            public void onNext(VideoCommand videoCommand) {
                SchedulerVideoIO.this.commandQueue.send(videoCommand);
            }

            public void onSubscribe(Disposable disposable) {
                SchedulerVideoIO.this.disposables.add(disposable);
            }
        };
        this.commandSubject = PublishSubject.create().toSerialized();
        this.commandSubject.subscribe(this.commandObserver);
    }

    @Override // org.mbari.vcr4j.decorators.Decorator
    public void unsubscribe() {
        this.disposables.forEach((v0) -> {
            v0.dispose();
        });
        this.errorObservable.unsubscribeOn(this.scheduler);
        this.stateObservable.unsubscribeOn(this.scheduler);
        this.indexObservable.unsubscribeOn(this.scheduler);
        this.commandQueue.kill();
    }

    @Override // org.mbari.vcr4j.VideoIO
    public void send(VideoCommand videoCommand) {
        this.commandSubject.onNext(videoCommand);
    }

    @Override // org.mbari.vcr4j.VideoIO
    public Subject<VideoCommand<?>> getCommandSubject() {
        return this.commandSubject;
    }

    @Override // org.mbari.vcr4j.VideoIO
    public String getConnectionID() {
        return this.io.getConnectionID();
    }

    @Override // org.mbari.vcr4j.VideoIO, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        unsubscribe();
        this.io.close();
    }

    @Override // org.mbari.vcr4j.VideoIO
    public Observable<E> getErrorObservable() {
        return this.errorObservable;
    }

    @Override // org.mbari.vcr4j.VideoIO
    public Observable<S> getStateObservable() {
        return this.stateObservable;
    }

    @Override // org.mbari.vcr4j.VideoIO
    public Observable<VideoIndex> getIndexObservable() {
        return this.indexObservable;
    }
}
