package org.ikasan.rest.module.sse;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/* loaded from: input_file:BOOT-INF/lib/ikasan-rest-module-3.3.2.jar:org/ikasan/rest/module/sse/MonitoringFileServiceThread.class */
public class MonitoringFileServiceThread extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MonitoringFileServiceThread.class);
    private final SseEmitter sseEmitter;
    private final Path filePath;
    private File file;
    private final Path monitoringDirectory;
    private final WatchKey key;
    private final int streamThreadWaitTime;
    private long inactiveTimeForFileInMillis;
    private AtomicLong counter = new AtomicLong(0);
    private long lastTimeFileChanged = System.currentTimeMillis();

    public MonitoringFileServiceThread(String str, SseEmitter sseEmitter, int i, long j) throws IOException {
        this.streamThreadWaitTime = i;
        this.sseEmitter = sseEmitter;
        this.inactiveTimeForFileInMillis = j;
        this.file = new FileSystemResource(URLDecoder.decode(str, StandardCharsets.UTF_8)).getFile();
        this.filePath = this.file.toPath();
        this.monitoringDirectory = this.file.getParentFile().toPath();
        this.key = this.monitoringDirectory.register(FileSystems.getDefault().newWatchService(), StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_CREATE);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        sendAllMessagesForTheFirstTime();
        while (true) {
            monitorForMessages();
            sendDisconnectIfNoActivity();
        }
    }

    private void monitorForMessages() {
        try {
            Thread.sleep(this.streamThreadWaitTime);
            for (WatchEvent<?> watchEvent : this.key.pollEvents()) {
                Path resolve = this.monitoringDirectory.resolve((Path) watchEvent.context());
                if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_MODIFY && resolve.equals(this.filePath)) {
                    sendMessage();
                } else if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE && resolve.equals(this.filePath)) {
                    this.counter = new AtomicLong(0L);
                }
                this.lastTimeFileChanged = System.currentTimeMillis();
            }
            if (!this.key.reset()) {
                LOG.error("Key is no longer valid: " + this.key);
                end(new IOException("Watch key is no longer valid"));
            }
        } catch (NoSuchFileException e) {
        } catch (Exception e2) {
            end(e2);
        }
    }

    private void sendDisconnectIfNoActivity() {
        if (shouldDisconnect()) {
            this.sseEmitter.complete();
            interrupt();
            throw new ThreadDeath();
        }
    }

    private boolean shouldDisconnect() {
        return System.currentTimeMillis() > this.lastTimeFileChanged + this.inactiveTimeForFileInMillis;
    }

    private void sendAllMessagesForTheFirstTime() {
        try {
            sendMessage();
        } catch (IOException e) {
            end(e);
        }
    }

    private void sendMessage() throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile(this.file, "r");
        randomAccessFile.seek(this.counter.get());
        while (true) {
            String readLine = randomAccessFile.readLine();
            if (readLine == null) {
                this.counter.set(randomAccessFile.getFilePointer());
                randomAccessFile.close();
                return;
            } else {
                try {
                    this.sseEmitter.send(SseEmitter.event().data(readLine));
                } catch (IOException e) {
                    end(e);
                }
            }
        }
    }

    private void end(Exception exc) {
        this.sseEmitter.completeWithError(exc);
        interrupt();
        throw new ThreadDeath();
    }
}
