package org.noear.socketd.transport.core.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.noear.socketd.transport.core.Asserts;
import org.noear.socketd.transport.core.ChannelAssistant;
import org.noear.socketd.transport.core.ChannelInternal;
import org.noear.socketd.transport.core.ChannelSupporter;
import org.noear.socketd.transport.core.EntityMetas;
import org.noear.socketd.transport.core.Frame;
import org.noear.socketd.transport.core.MessageInternal;
import org.noear.socketd.transport.core.Processor;
import org.noear.socketd.transport.core.Session;
import org.noear.socketd.transport.core.entity.MessageBuilder;
import org.noear.socketd.transport.stream.StreamInternal;
import org.noear.socketd.transport.stream.StreamManger;
import org.noear.socketd.utils.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/noear/socketd/transport/core/impl/ChannelDefault.class */
public class ChannelDefault<S> extends ChannelBase implements ChannelInternal {
    private static Logger log = LoggerFactory.getLogger(ChannelDefault.class);
    private final S source;
    private final Processor processor;
    private final ChannelAssistant<S> assistant;
    private final StreamManger streamManger;
    private final ReentrantLock sendInFairLock;
    private final ReentrantLock sendNoFairLock;
    private Session session;
    private long liveTime;
    private BiConsumer<Boolean, Throwable> onOpenFuture;
    private int closeCode;

    public ChannelDefault(S s, ChannelSupporter<S> channelSupporter) {
        super(channelSupporter.getConfig());
        this.source = s;
        this.processor = channelSupporter.getProcessor();
        this.assistant = channelSupporter.getAssistant();
        this.streamManger = channelSupporter.getConfig().getStreamManger();
        this.sendInFairLock = new ReentrantLock(true);
        this.sendNoFairLock = new ReentrantLock(false);
    }

    @Override // org.noear.socketd.transport.core.Channel
    public boolean isValid() {
        return isClosed() == 0 && this.assistant.isValid(this.source);
    }

    @Override // org.noear.socketd.transport.core.Channel
    public boolean isClosing() {
        return this.closeCode == 1000;
    }

    @Override // org.noear.socketd.transport.core.Channel
    public int isClosed() {
        if (this.closeCode > 1000) {
            return this.closeCode;
        }
        return 0;
    }

    @Override // org.noear.socketd.transport.core.Channel
    public long getLiveTime() {
        return this.liveTime;
    }

    @Override // org.noear.socketd.transport.core.ChannelInternal
    public void setLiveTimeAsNow() {
        this.liveTime = System.currentTimeMillis();
    }

    @Override // org.noear.socketd.transport.core.Channel
    public InetSocketAddress getRemoteAddress() throws IOException {
        return this.assistant.getRemoteAddress(this.source);
    }

    @Override // org.noear.socketd.transport.core.Channel
    public InetSocketAddress getLocalAddress() throws IOException {
        return this.assistant.getLocalAddress(this.source);
    }

    @Override // org.noear.socketd.transport.core.Channel
    public void send(Frame frame, StreamInternal streamInternal) throws IOException {
        Asserts.assertClosed(this);
        if (log.isDebugEnabled()) {
            if (getConfig().clientMode()) {
                log.debug("C-SEN:{}", frame);
            } else {
                log.debug("S-SEN:{}", frame);
            }
        }
        if (getConfig().isNolockSend()) {
            sendDo(frame, streamInternal);
            return;
        }
        boolean isSequenceSend = getConfig().isSequenceSend();
        if (!isSequenceSend && frame.message() != null) {
            String atName = frame.message().atName();
            if (StrUtils.isNotEmpty(atName) && atName.charAt(atName.length() - 1) == '!') {
                isSequenceSend = true;
            }
        }
        if (isSequenceSend) {
            this.sendInFairLock.lock();
            try {
                sendDo(frame, streamInternal);
                this.sendInFairLock.unlock();
                return;
            } catch (Throwable th) {
                this.sendInFairLock.unlock();
                throw th;
            }
        }
        this.sendNoFairLock.lock();
        try {
            sendDo(frame, streamInternal);
            this.sendNoFairLock.unlock();
        } catch (Throwable th2) {
            this.sendNoFairLock.unlock();
            throw th2;
        }
    }

    private void sendDo(Frame frame, StreamInternal streamInternal) throws IOException {
        if (frame.message() != null) {
            MessageInternal message = frame.message();
            if (streamInternal != null) {
                this.streamManger.addStream(message.sid(), streamInternal);
            }
            if (message.entity() != null) {
                if (message.dataSize() > getConfig().getFragmentSize()) {
                    message.putMeta(EntityMetas.META_DATA_LENGTH, String.valueOf(message.dataSize()));
                }
                getConfig().getFragmentHandler().spliFragment(this, streamInternal, message, entity -> {
                    this.assistant.write(this.source, entity instanceof MessageInternal ? new Frame(frame.flag(), (MessageInternal) entity) : new Frame(frame.flag(), new MessageBuilder().flag(frame.flag()).sid(message.sid()).event(message.event()).entity(entity).build()));
                });
                return;
            }
        }
        this.assistant.write(this.source, frame);
        if (streamInternal != null) {
            streamInternal.onProgress(true, 1, 1);
        }
    }

    @Override // org.noear.socketd.transport.core.Channel
    public void retrieve(Frame frame, StreamInternal streamInternal) {
        if (streamInternal == null) {
            if (log.isDebugEnabled()) {
                log.debug("{} stream not found, sid={}, sessionId={}", new Object[]{getConfig().getRoleName(), frame.message().sid(), getSession().sessionId()});
                return;
            }
            return;
        }
        if (streamInternal.demands() < 2 || frame.flag() == 49) {
            this.streamManger.removeStream(frame.message().sid());
        }
        if (streamInternal.demands() < 2) {
            streamInternal.onReply(frame.message());
        } else {
            getConfig().getExchangeExecutor().submit(() -> {
                streamInternal.onReply(frame.message());
            });
        }
    }

    @Override // org.noear.socketd.transport.core.Channel
    public void reconnect() throws IOException {
    }

    @Override // org.noear.socketd.transport.core.Channel
    public void onError(Throwable th) {
        this.processor.onError(this, th);
    }

    @Override // org.noear.socketd.transport.core.Channel
    public Session getSession() {
        if (this.session == null) {
            this.session = new SessionDefault(this);
        }
        return this.session;
    }

    @Override // org.noear.socketd.transport.core.ChannelInternal
    public void setSession(Session session) {
        this.session = session;
    }

    @Override // org.noear.socketd.transport.core.ChannelInternal
    public StreamInternal getStream(String str) {
        return this.streamManger.getStream(str);
    }

    @Override // org.noear.socketd.transport.core.ChannelInternal
    public void onOpenFuture(BiConsumer<Boolean, Throwable> biConsumer) {
        this.onOpenFuture = biConsumer;
    }

    @Override // org.noear.socketd.transport.core.ChannelInternal
    public void doOpenFuture(boolean z, Throwable th) {
        if (this.onOpenFuture != null) {
            this.onOpenFuture.accept(Boolean.valueOf(z), th);
        }
    }

    @Override // org.noear.socketd.transport.core.impl.ChannelBase, org.noear.socketd.transport.core.Channel
    public void close(int i) {
        try {
            this.closeCode = i;
            super.close(i);
            if (i > 1000) {
                this.assistant.close(this.source);
                if (log.isDebugEnabled()) {
                    log.debug("{} channel closed, sessionId={}", getConfig().getRoleName(), getSession().sessionId());
                }
            }
        } catch (Throwable th) {
            if (log.isWarnEnabled()) {
                log.warn("{} channel close error, sessionId={}", new Object[]{getConfig().getRoleName(), getSession().sessionId(), th});
            }
        }
    }
}
