package org.jetlang.remote.acceptor;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jetlang.core.Disposable;
import org.jetlang.fibers.Fiber;
import org.jetlang.remote.core.ErrorHandler;
import org.jetlang.remote.core.JetlangRemotingProtocol;

/* loaded from: input_file:org/jetlang/remote/acceptor/JetlangStreamSession.class */
public class JetlangStreamSession extends JetlangBaseSession implements JetlangRemotingProtocol.Handler {
    private final MessageStreamWriter socket;
    private final Fiber sendFiber;
    private final ErrorHandler errorHandler;
    private final Set<String> subscriptions;
    private volatile boolean loggedOut;
    private volatile Runnable hbStopper;

    public JetlangStreamSession(Object obj, MessageStreamWriter messageStreamWriter, Fiber fiber, ErrorHandler errorHandler) {
        super(obj);
        this.subscriptions = Collections.synchronizedSet(new HashSet());
        this.hbStopper = new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.1
            @Override // java.lang.Runnable
            public void run() {
            }
        };
        this.socket = messageStreamWriter;
        this.sendFiber = fiber;
        this.errorHandler = errorHandler;
    }

    public void startHeartbeat(int i, TimeUnit timeUnit) {
        if (i > 0) {
            final Disposable scheduleWithFixedDelay = this.sendFiber.scheduleWithFixedDelay(new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.2
                @Override // java.lang.Runnable
                public void run() {
                    JetlangStreamSession.this.write(1);
                }

                public String toString() {
                    return "JetlangStreamSession.writeHeartbeat()";
                }
            }, i, i, timeUnit);
            this.hbStopper = new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.3
                AtomicBoolean stopped = new AtomicBoolean(false);

                @Override // java.lang.Runnable
                public void run() {
                    if (this.stopped.compareAndSet(false, true)) {
                        scheduleWithFixedDelay.dispose();
                    }
                }
            };
        }
    }

    @Override // org.jetlang.remote.acceptor.JetlangBaseSession, org.jetlang.remote.core.JetlangRemotingProtocol.Handler
    public void onSubscriptionRequest(String str) {
        this.subscriptions.add(str);
        this.SubscriptionRequest.publish(new SessionTopic(str, this));
    }

    @Override // org.jetlang.remote.acceptor.JetlangBaseSession, org.jetlang.remote.core.JetlangRemotingProtocol.Handler
    public void onUnsubscribeRequest(String str) {
        this.subscriptions.remove(str);
        this.UnsubscribeRequest.publish(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void write(final int i) {
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.4
            @Override // java.lang.Runnable
            public void run() {
                try {
                    JetlangStreamSession.this.socket.writeByteAsInt(i);
                } catch (IOException e) {
                    JetlangStreamSession.this.handleDisconnect(e);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleDisconnect(IOException iOException) {
        this.socket.tryClose();
        if (this.loggedOut) {
            return;
        }
        this.errorHandler.onException(iOException);
    }

    @Override // org.jetlang.remote.acceptor.JetlangSession
    public void disconnect() {
        this.socket.tryClose();
    }

    @Override // org.jetlang.remote.acceptor.JetlangBaseSession, org.jetlang.remote.core.JetlangRemotingProtocol.Handler
    public void onLogout() {
        write(2);
        this.Logout.publish(new LogoutEvent());
        this.loggedOut = true;
        this.hbStopper.run();
    }

    @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
    public void onUnknownMessage(int i) {
        this.errorHandler.onException(new RuntimeException("Unknown message type " + i + " from " + getSessionId()));
    }

    @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
    public void onHandlerException(Exception exc) {
        this.errorHandler.onException(exc);
    }

    @Override // org.jetlang.remote.acceptor.JetlangSession
    public <T> void publish(final String str, final T t) {
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.5
            @Override // java.lang.Runnable
            public void run() {
                if (JetlangStreamSession.this.subscriptions.contains(str)) {
                    try {
                        JetlangStreamSession.this.socket.write(str, t);
                    } catch (IOException e) {
                        JetlangStreamSession.this.handleDisconnect(e);
                    }
                }
            }

            public String toString() {
                return "JetlangStreamSession.publish(" + str + ", " + t + ")";
            }
        });
    }

    @Override // org.jetlang.remote.acceptor.JetlangBaseSession, org.jetlang.remote.acceptor.JetlangSession
    public void publish(final byte[] bArr) {
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.6
            @Override // java.lang.Runnable
            public void run() {
                JetlangStreamSession.this.writeBytesOnSendFiberThread(bArr);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeBytesOnSendFiberThread(byte[] bArr) {
        try {
            this.socket.writeBytes(bArr);
        } catch (IOException e) {
            handleDisconnect(e);
        }
    }

    @Override // org.jetlang.remote.acceptor.JetlangBaseSession, org.jetlang.remote.acceptor.JetlangMessagePublisher
    public void reply(final int i, final String str, final Object obj) {
        this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.7
            @Override // java.lang.Runnable
            public void run() {
                try {
                    JetlangStreamSession.this.socket.writeReply(i, str, obj);
                } catch (IOException e) {
                    JetlangStreamSession.this.handleDisconnect(e);
                }
            }
        });
    }

    @Override // org.jetlang.remote.core.JetlangRemotingProtocol.Handler
    public void onRequestReply(int i, String str, Object obj) {
        this.errorHandler.onException(new RuntimeException("Reply is not supported: " + str + " msg: " + obj));
    }

    @Override // org.jetlang.remote.acceptor.JetlangBaseSession
    public void publishIfSubscribed(String str, final byte[] bArr) {
        if (this.subscriptions.contains(str)) {
            this.sendFiber.execute(new Runnable() { // from class: org.jetlang.remote.acceptor.JetlangStreamSession.8
                @Override // java.lang.Runnable
                public void run() {
                    JetlangStreamSession.this.writeBytesOnSendFiberThread(bArr);
                }
            });
        }
    }
}
