/*
 * Decompiled with CFR 0.152.
 */
package org.fabric3.binding.zeromq.runtime.message;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.annotation.management.ManagementOperation;
import org.fabric3.api.annotation.management.OperationType;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.binding.zeromq.runtime.message.SocketHelper;
import org.fabric3.binding.zeromq.runtime.message.Subscriber;
import org.fabric3.spi.container.channel.EventStreamHandler;
import org.fabric3.spi.federation.addressing.AddressListener;
import org.fabric3.spi.federation.addressing.SocketAddress;
import org.zeromq.ZMQ;

@Management
public class NonReliableSubscriber
implements Subscriber,
AddressListener {
    private static final byte[] EMPTY_BYTES = new byte[0];
    private String id;
    private String socketId = this.getClass().getName() + ":" + UUID.randomUUID();
    private ContextManager manager;
    private List<SocketAddress> addresses;
    private EventStreamHandler handler;
    private ZeroMQMetadata metadata;
    private ExecutorService executorService;
    private AtomicInteger connectionCount = new AtomicInteger();
    private SocketReceiver receiver;
    private long timeout;

    public NonReliableSubscriber(String id, ContextManager manager, List<SocketAddress> addresses, EventStreamHandler head, ZeroMQMetadata metadata, ExecutorService executorService) {
        this.id = id;
        this.manager = manager;
        this.addresses = addresses;
        this.handler = head;
        this.metadata = metadata;
        this.executorService = executorService;
        long specifiedTimeout = metadata.getTimeout();
        this.timeout = specifiedTimeout < 0L ? specifiedTimeout : TimeUnit.MILLISECONDS.toMicros(specifiedTimeout);
    }

    @Override
    @ManagementOperation(type=OperationType.POST)
    public void start() {
        if (this.receiver == null) {
            this.receiver = new SocketReceiver();
            this.executorService.submit(this.receiver);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @ManagementOperation(type=OperationType.POST)
    public void stop() {
        try {
            this.receiver.stop();
        }
        finally {
            this.receiver = null;
        }
    }

    @ManagementOperation
    public List<String> getAddresses() {
        ArrayList<String> list = new ArrayList<String>();
        for (SocketAddress address : this.addresses) {
            list.add(address.toString());
        }
        return list;
    }

    @Override
    public void incrementConnectionCount() {
        this.connectionCount.incrementAndGet();
    }

    @Override
    public void decrementConnectionCount() {
        this.connectionCount.decrementAndGet();
    }

    @Override
    public boolean hasConnections() {
        return this.connectionCount.get() > 0;
    }

    public String getId() {
        return this.id;
    }

    public void onUpdate(List<SocketAddress> addresses) {
        this.addresses = addresses;
        if (this.receiver != null) {
            this.receiver.refresh();
        }
    }

    class SocketReceiver
    implements Runnable {
        private ZMQ.Socket socket;
        private ZMQ.Socket controlSocket;
        private ZMQ.Poller poller;
        private AtomicBoolean active = new AtomicBoolean(true);
        private AtomicBoolean doRefresh = new AtomicBoolean(true);

        SocketReceiver() {
        }

        public void refresh() {
            this.doRefresh.set(true);
        }

        public synchronized void stop() {
            this.active.set(false);
        }

        @Override
        public void run() {
            try {
                while (this.active.get()) {
                    this.reconnect();
                    long val = this.poller.poll(NonReliableSubscriber.this.timeout);
                    if (val <= 0L) continue;
                    byte[] controlPayload = this.controlSocket.recv(1);
                    if (controlPayload != null) {
                        this.closeSocket();
                        return;
                    }
                    Object frames = null;
                    byte[] payload = this.socket.recv(0);
                    int index = 1;
                    while (this.socket.hasReceiveMore()) {
                        if (frames == null) {
                            frames = new byte[2][];
                            frames[0] = payload;
                        } else {
                            byte[][] newArray = new byte[((byte[][])frames).length + 1][];
                            System.arraycopy(frames, 0, newArray, 0, ((byte[][])frames).length);
                            frames = newArray;
                        }
                        frames[index] = this.socket.recv(0);
                        ++index;
                    }
                    if (frames == null) {
                        NonReliableSubscriber.this.handler.handle((Object)payload, true);
                        continue;
                    }
                    NonReliableSubscriber.this.handler.handle(frames, true);
                }
                this.closeSocket();
            }
            catch (RuntimeException e) {
                NonReliableSubscriber.this.executorService.submit(this);
                throw e;
            }
        }

        private synchronized void reconnect() {
            if (!this.doRefresh.getAndSet(false)) {
                return;
            }
            this.closeSocket();
            NonReliableSubscriber.this.manager.reserve(NonReliableSubscriber.this.socketId);
            ZMQ.Context context = NonReliableSubscriber.this.manager.getContext();
            this.socket = context.socket(2);
            SocketHelper.configure(this.socket, NonReliableSubscriber.this.metadata);
            this.socket.subscribe(EMPTY_BYTES);
            for (SocketAddress address : NonReliableSubscriber.this.addresses) {
                this.socket.connect(address.toProtocolString());
            }
            this.controlSocket = NonReliableSubscriber.this.manager.createControlSocket();
            this.poller = context.poller();
            this.poller.register(this.controlSocket, 1);
            this.poller.register(this.socket, 1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void closeSocket() {
            if (this.socket != null) {
                try {
                    this.socket.close();
                }
                finally {
                    NonReliableSubscriber.this.manager.release(NonReliableSubscriber.this.socketId);
                }
            }
            if (this.controlSocket != null) {
                this.controlSocket.close();
            }
        }
    }
}

