package io.hektor.actors.io;

import io.hektor.actors.SubscriptionManagementSupport;
import io.hektor.core.ActorRef;
import io.hektor.core.Props;
import io.snice.buffer.Buffer;
import io.snice.preconditions.PreConditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hektor/actors/io/InputStreamActor.class */
public class InputStreamActor extends SubscriptionManagementSupport implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(InputStreamActor.class);
    private final InputStreamConfig config;
    private final InputStream is;
    private final ExecutorService threadPool;
    private final ActorRef self;
    private final int MAX_BUFFER_SIZE = 100;
    private int countExceptions;

    /* loaded from: input_file:io/hektor/actors/io/InputStreamActor$EndOfStream.class */
    private static final class EndOfStream {
        private static final EndOfStream EOF = new EndOfStream();

        private EndOfStream() {
        }
    }

    public static Props<InputStreamActor> props(InputStream inputStream, ExecutorService executorService) {
        PreConditions.assertNotNull(inputStream, "The InputStream cannot be null");
        PreConditions.assertNotNull(executorService, "The threadpool cannot be null");
        return props(inputStream, executorService, InputStreamConfig.of().build());
    }

    public static Props<InputStreamActor> props(InputStream inputStream, ExecutorService executorService, InputStreamConfig inputStreamConfig) {
        PreConditions.assertNotNull(inputStream, "The InputStream cannot be null");
        PreConditions.assertNotNull(executorService, "The threadpool cannot be null");
        PreConditions.assertNotNull(inputStreamConfig, "The configuration cannot be null");
        return Props.forActor(InputStreamActor.class, () -> {
            return new InputStreamActor(inputStreamConfig, inputStream, executorService);
        });
    }

    private InputStreamActor(InputStreamConfig inputStreamConfig, InputStream inputStream, ExecutorService executorService) {
        super(inputStreamConfig.isParentAutoSubscribe());
        this.MAX_BUFFER_SIZE = 100;
        this.config = inputStreamConfig;
        this.is = inputStream;
        this.threadPool = executorService;
        this.self = self();
    }

    public void start() {
        logInfo("Starting", new Object[0]);
        this.threadPool.submit(this);
    }

    @Override // io.hektor.actors.SubscriptionManagementSupport
    protected void onEvent(Object obj) {
        if (obj instanceof StreamToken) {
            tellSubscribers(obj);
            this.threadPool.submit(this);
            return;
        }
        if (obj == EndOfStream.EOF) {
            ctx().stop();
            return;
        }
        if (obj instanceof IOException) {
            int i = this.countExceptions + 1;
            this.countExceptions = i;
            if (i < 2) {
                this.threadPool.submit(this);
            } else {
                ctx().stop();
            }
        }
    }

    public void stop() {
        logInfo("Stopping", new Object[0]);
    }

    public void postStop() {
        logInfo("Stopped", new Object[0]);
    }

    @Override // java.lang.Runnable
    public void run() {
        byte[] bArr = new byte[100];
        try {
            int read = this.is.read(bArr, 0, bArr.length);
            if (read == -1) {
                this.self.tell(EndOfStream.EOF);
            } else {
                this.self.tell(StreamToken.of(Buffer.of(bArr, 0, read)));
            }
        } catch (IOException e) {
            this.self.tell(e);
        } catch (Exception e2) {
            e2.printStackTrace();
        }
    }

    @Override // io.hektor.actors.LoggingSupport
    public Logger getLogger() {
        return logger;
    }

    @Override // io.hektor.actors.LoggingSupport
    public Object getUUID() {
        return self();
    }
}
