package io.activej.csp.queue;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.MemSize;
import io.activej.common.tuple.Tuple2;
import io.activej.csp.file.ChannelFileReader;
import io.activej.csp.file.ChannelFileWriter;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.Executor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/csp/queue/ChannelFileBuffer.class */
public final class ChannelFileBuffer implements ChannelQueue<ByteBuf> {
    private static final Logger logger = LoggerFactory.getLogger(ChannelFileBuffer.class);
    private final ChannelFileReader reader;
    private final ChannelFileWriter writer;
    private final Executor executor;
    private final Path path;

    @Nullable
    private SettablePromise<ByteBuf> take;
    private boolean finished = false;

    @Nullable
    private Exception exception;

    private ChannelFileBuffer(ChannelFileReader channelFileReader, ChannelFileWriter channelFileWriter, Executor executor, Path path) {
        this.reader = channelFileReader;
        this.writer = channelFileWriter;
        this.executor = executor;
        this.path = path;
    }

    public static Promise<ChannelFileBuffer> create(Executor executor, Path path) {
        return create(executor, path, null);
    }

    public static Promise<ChannelFileBuffer> create(Executor executor, Path path, @Nullable MemSize memSize) {
        return Promise.ofBlockingCallable(executor, () -> {
            Files.createDirectories(path.getParent(), new FileAttribute[0]);
            return new Tuple2(FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE), FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.READ));
        }).map(tuple2 -> {
            ChannelFileWriter create = ChannelFileWriter.create(executor, (FileChannel) tuple2.getValue1());
            ChannelFileReader create2 = ChannelFileReader.create(executor, (FileChannel) tuple2.getValue2());
            if (memSize != null) {
                create2.withLimit(memSize.toLong());
            }
            return new ChannelFileBuffer(create2, create, executor, path);
        });
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public Promise<Void> put(@Nullable ByteBuf byteBuf) {
        if (this.exception != null) {
            return Promise.ofException(this.exception);
        }
        if (byteBuf == null) {
            this.finished = true;
        }
        if (this.take == null) {
            return this.writer.accept(byteBuf);
        }
        SettablePromise<ByteBuf> settablePromise = this.take;
        this.take = null;
        settablePromise.set(byteBuf);
        return byteBuf == null ? this.writer.accept(null) : Promise.complete();
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public Promise<ByteBuf> take() {
        if (this.exception != null) {
            return Promise.ofException(this.exception);
        }
        if (!isExhausted()) {
            return this.reader.get();
        }
        if (this.finished) {
            return Promise.of((Object) null);
        }
        SettablePromise<ByteBuf> settablePromise = new SettablePromise<>();
        this.take = settablePromise;
        return settablePromise;
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public boolean isSaturated() {
        return false;
    }

    @Override // io.activej.csp.queue.ChannelQueue
    public boolean isExhausted() {
        return this.reader.getPosition() >= this.writer.getPosition();
    }

    public void closeEx(@NotNull Throwable th) {
        if (this.exception != null) {
            return;
        }
        this.exception = th instanceof Exception ? (Exception) th : new RuntimeException(th);
        this.writer.closeEx(th);
        this.reader.closeEx(th);
        if (this.take != null) {
            this.take.setException(th);
            this.take = null;
        }
        this.executor.execute(() -> {
            try {
                Files.deleteIfExists(this.path);
            } catch (IOException e) {
                logger.error("failed to cleanup channel buffer file " + this.path, e);
            }
        });
    }

    @Nullable
    public Exception getException() {
        return this.exception;
    }
}
