package org.coos.messaging.transport;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.coos.messaging.Channel;
import org.coos.messaging.Endpoint;
import org.coos.messaging.Message;
import org.coos.messaging.Processor;
import org.coos.messaging.ProcessorException;
import org.coos.messaging.Service;
import org.coos.messaging.Transport;
import org.coos.messaging.impl.DefaultMessage;
import org.coos.messaging.impl.DefaultProcessor;
import org.coos.messaging.util.Log;
import org.coos.messaging.util.LogFactory;

/* loaded from: input_file:org/coos/messaging/transport/TCPTransport.class */
public class TCPTransport extends DefaultProcessor implements Transport, Service {
    static final String PROPERTY_HOST = "host";
    static final String PROPERTY_PORT = "port";
    static final String PROPERTY_RETRY = "retry";
    static final String PROPERTY_RETRY_TIME = "retryTime";
    protected static final Log logger = LogFactory.getLog(TCPTransport.class.getName());
    protected String hostName;
    protected int hostPort;
    protected int retryTime;
    protected boolean retry;
    protected Socket socket;
    private Reader reader;
    private Writer writer;
    protected List<Message> mailbox;
    protected Processor transportProcessor;
    protected boolean running;
    protected Channel channel;
    TCPTransportManager tm;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/coos/messaging/transport/TCPTransport$Reader.class */
    public class Reader implements Runnable {
        InputStream is;
        boolean failed;
        boolean running = true;
        Thread readerThread = new Thread(this);

        Reader() {
            this.readerThread.start();
            TCPTransport.logger.info("Reader started on :" + TCPTransport.this.socket.getLocalSocketAddress());
            this.failed = false;
        }

        public void stop() {
            this.running = false;
            if (this.failed) {
                return;
            }
            TCPTransport.this.retry = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.is = TCPTransport.this.socket.getInputStream();
                DataInputStream dataInputStream = new DataInputStream(this.is);
                while (this.running) {
                    try {
                        try {
                            try {
                                try {
                                    TCPTransport.this.transportProcessor.processMessage(new DefaultMessage(dataInputStream));
                                } catch (EOFException e) {
                                    TCPTransport.logger.info("Connection closing EOF");
                                    this.running = false;
                                    this.failed = true;
                                }
                            } catch (ProcessorException e2) {
                                e2.printStackTrace();
                            }
                        } catch (Exception e3) {
                            e3.printStackTrace();
                            TCPTransport.logger.error("Error in Message deserialization. Aborting");
                            this.failed = true;
                            this.running = false;
                        }
                    } catch (SocketException e4) {
                        TCPTransport.logger.info("Connection closing");
                        this.running = false;
                        this.failed = true;
                    }
                }
                this.is.close();
                if (TCPTransport.this.channel != null) {
                    TCPTransport.this.channel.disconnect();
                    if (TCPTransport.this.retry) {
                        Thread.sleep(TCPTransport.this.retryTime);
                        TCPTransport.this.channel.connect(TCPTransport.this.channel.getLinkManager());
                    }
                }
            } catch (Exception e5) {
                e5.printStackTrace();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/coos/messaging/transport/TCPTransport$Writer.class */
    public class Writer implements Runnable {
        OutputStream os;
        boolean running = true;
        Thread writerThread = new Thread(this);

        Writer() {
            this.writerThread.start();
            TCPTransport.logger.info("Writer started on :" + TCPTransport.this.socket.getLocalSocketAddress());
        }

        public void stop() {
            this.running = false;
            this.writerThread.interrupt();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.os = TCPTransport.this.socket.getOutputStream();
                while (this.running) {
                    if (TCPTransport.this.mailbox.isEmpty()) {
                        synchronized (TCPTransport.this) {
                            try {
                                TCPTransport.this.wait();
                            } catch (InterruptedException e) {
                                if (!this.running) {
                                    return;
                                }
                            }
                        }
                    } else {
                        try {
                            this.os.write(TCPTransport.this.mailbox.remove(0).serialize());
                            this.os.flush();
                        } catch (SocketException e2) {
                            if (e2.getMessage().equals("socket closed")) {
                                TCPTransport.logger.info("Connection closing");
                                this.running = false;
                            }
                        } catch (Exception e3) {
                            e3.printStackTrace();
                            TCPTransport.logger.error("Error in Message writing. Aborting");
                            this.running = false;
                        }
                    }
                }
                this.os.close();
                if (!TCPTransport.this.mailbox.isEmpty()) {
                    TCPTransport.logger.warn("Discarding " + TCPTransport.this.mailbox.size() + " messages");
                }
                TCPTransport.this.mailbox.clear();
            } catch (Exception e4) {
                e4.printStackTrace();
            }
        }
    }

    public TCPTransport() {
        this.hostPort = 15656;
        this.retryTime = Endpoint.DEFAULT_TIMEOUT;
        this.retry = false;
        this.mailbox = Collections.synchronizedList(new LinkedList());
        this.running = true;
    }

    public TCPTransport(String str, int i) {
        this.hostPort = 15656;
        this.retryTime = Endpoint.DEFAULT_TIMEOUT;
        this.retry = false;
        this.mailbox = Collections.synchronizedList(new LinkedList());
        this.running = true;
        this.hostName = str;
        this.hostPort = i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TCPTransport(Socket socket, TCPTransportManager tCPTransportManager) {
        this.hostPort = 15656;
        this.retryTime = Endpoint.DEFAULT_TIMEOUT;
        this.retry = false;
        this.mailbox = Collections.synchronizedList(new LinkedList());
        this.running = true;
        this.socket = socket;
        this.tm = tCPTransportManager;
    }

    public Processor getTransportProcessor() {
        return this.transportProcessor;
    }

    @Override // org.coos.messaging.ChannelProcessor
    public void setChainedProcessor(Processor processor) {
        this.transportProcessor = processor;
    }

    public Reader getReader() {
        return this.reader;
    }

    public Writer getWriter() {
        return this.writer;
    }

    @Override // org.coos.messaging.Processor
    public void processMessage(Message message) throws ProcessorException {
        if (this.running) {
            String header = message.getHeader(Message.PRIORITY);
            if (header == null) {
                this.mailbox.add(message);
                synchronized (this) {
                    notify();
                }
                return;
            }
            int intValue = Integer.valueOf(header).intValue();
            int i = 0;
            Iterator<Message> it = this.mailbox.iterator();
            while (it.hasNext()) {
                String header2 = it.next().getHeader(Message.PRIORITY);
                if (header2 != null && intValue < Integer.valueOf(header2).intValue()) {
                    this.mailbox.add(i, message);
                    synchronized (this) {
                        notify();
                    }
                    return;
                }
                i++;
            }
        }
    }

    public void start() throws Exception {
        this.running = true;
        if (this.socket != null) {
            this.reader = new Reader();
            this.writer = new Writer();
            return;
        }
        if (this.hostName == null) {
            this.hostName = (String) this.properties.get(PROPERTY_HOST);
        }
        if (this.properties.get(PROPERTY_PORT) != null) {
            this.hostPort = Integer.valueOf((String) this.properties.get(PROPERTY_PORT)).intValue();
        }
        String str = (String) this.properties.get(PROPERTY_RETRY);
        if (str == null || !str.equals("true")) {
            this.retry = false;
        } else {
            this.retry = true;
        }
        if (this.properties.get(PROPERTY_RETRY_TIME) != null) {
            this.retryTime = Integer.valueOf((String) this.properties.get(PROPERTY_RETRY_TIME)).intValue();
        }
        logger.info("Establishing transport to " + this.hostName + ":" + this.hostPort);
        if (this.retry) {
            new Thread(new Runnable() { // from class: org.coos.messaging.transport.TCPTransport.1
                @Override // java.lang.Runnable
                public void run() {
                    boolean z = true;
                    while (z && TCPTransport.this.running) {
                        try {
                            TCPTransport.this.socket = new Socket();
                            TCPTransport.this.socket.connect(new InetSocketAddress(TCPTransport.this.hostName, TCPTransport.this.hostPort));
                            z = false;
                        } catch (IOException e) {
                            TCPTransport.logger.warn("Establishing transport to " + TCPTransport.this.hostName + ":" + TCPTransport.this.hostPort + " failed. Retrying in " + TCPTransport.this.retryTime + " millisec.");
                            try {
                                Thread.sleep(TCPTransport.this.retryTime);
                            } catch (InterruptedException e2) {
                                e2.printStackTrace();
                            }
                        }
                        if (!z && TCPTransport.this.running) {
                            TCPTransport.logger.info("Transport from " + TCPTransport.this.socket.getLocalSocketAddress() + " to " + TCPTransport.this.socket.getRemoteSocketAddress() + " established.");
                        }
                    }
                    if (TCPTransport.this.running) {
                        TCPTransport.this.reader = new Reader();
                        TCPTransport.this.writer = new Writer();
                    }
                }
            }).start();
            return;
        }
        try {
            this.socket = new Socket();
            this.socket.connect(new InetSocketAddress(this.hostName, this.hostPort));
            if (this.running) {
                this.reader = new Reader();
                this.writer = new Writer();
            }
        } catch (IOException e) {
            this.running = false;
            logger.warn("Establishing transport to " + this.hostName + ":" + this.hostPort + " failed.");
            throw e;
        }
    }

    public void stop() throws Exception {
        logger.info("Closing transport: " + this.hostName + ":" + this.hostPort);
        this.running = false;
        if (this.reader != null) {
            this.reader.stop();
        }
        if (this.writer != null) {
            this.writer.stop();
        }
        if (this.socket != null) {
            this.socket.close();
        }
        this.socket = null;
        if (this.tm != null) {
            this.tm.disconnected(this);
        }
    }

    public int getQueueSize() {
        return this.mailbox.size();
    }

    @Override // org.coos.messaging.ChannelProcessor
    public void setChannel(Channel channel) {
        this.channel = channel;
    }
}
