package com.barchart.feed.ddf.datalink.provider;

import com.barchart.feed.client.api.FeedStateListener;
import com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase;
import com.barchart.feed.ddf.datalink.api.DDF_MessageListener;
import com.barchart.feed.ddf.datalink.api.DummyFuture;
import com.barchart.feed.ddf.datalink.api.EventPolicy;
import com.barchart.feed.ddf.datalink.api.FailedFuture;
import com.barchart.feed.ddf.datalink.api.Subscription;
import com.barchart.feed.ddf.datalink.enums.DDF_FeedEvent;
import com.barchart.feed.ddf.instrument.enums.DDF_InstrumentField;
import com.barchart.feed.ddf.message.api.DDF_BaseMessage;
import com.barchart.feed.ddf.message.api.DDF_MarketBase;
import com.barchart.feed.ddf.message.enums.DDF_MessageType;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/barchart/feed/ddf/datalink/provider/HistoricReplayListenerClientDDF.class */
public class HistoricReplayListenerClientDDF extends SimpleChannelHandler implements DDF_FeedClientBase {
    private static final Logger log;
    private final ConnectionlessBootstrap boot;
    private DatagramChannel channel;
    private final Executor runner;
    private final int socketAddress;
    private volatile DDF_MessageListener msgListener = null;
    private final Map<String, Subscription> subscriptions = new ConcurrentHashMap();
    private final BlockingQueue<DDF_BaseMessage> messageQueue = new LinkedBlockingQueue();
    private final RunnerDDF messageTask = new RunnerDDF() { // from class: com.barchart.feed.ddf.datalink.provider.HistoricReplayListenerClientDDF.1
        @Override // com.barchart.feed.ddf.datalink.provider.RunnerDDF
        protected void runCore() {
            while (true) {
                try {
                    DDF_BaseMessage dDF_BaseMessage = (DDF_BaseMessage) HistoricReplayListenerClientDDF.this.messageQueue.take();
                    if (HistoricReplayListenerClientDDF.this.msgListener != null && HistoricReplayListenerClientDDF.this.filter(dDF_BaseMessage)) {
                        HistoricReplayListenerClientDDF.this.msgListener.handleMessage(dDF_BaseMessage);
                    }
                } catch (InterruptedException e) {
                    HistoricReplayListenerClientDDF.log.trace("terminated");
                    return;
                } catch (Throwable th) {
                    HistoricReplayListenerClientDDF.log.error("message delivery failed", th);
                }
            }
        }
    };

    HistoricReplayListenerClientDDF(int i, Executor executor) {
        this.socketAddress = i;
        this.runner = executor;
        this.boot = new ConnectionlessBootstrap(new NioDatagramChannelFactory(this.runner));
        this.boot.setPipelineFactory(new PipelineFactoryDDF(this));
        this.boot.setOption("broadcast", "false");
        this.boot.setOption("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(2048));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean filter(DDF_BaseMessage dDF_BaseMessage) {
        return dDF_BaseMessage.getMessageType().isMarketMessage && this.subscriptions.containsKey(((DDF_MarketBase) dDF_BaseMessage).getInstrument().get(DDF_InstrumentField.DDF_SYMBOL_REALTIME).toString());
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public void startup() {
        this.runner.execute(this.messageTask);
        this.boot.bind(new InetSocketAddress(this.socketAddress));
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public void shutdown() {
        this.messageTask.interrupt();
        this.messageQueue.clear();
        if (this.channel != null) {
            this.channel.close();
            this.channel = null;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        log.warn("SimpleChannelHandler caught exception");
    }

    private void postMessage(DDF_BaseMessage dDF_BaseMessage) {
        try {
            this.messageQueue.put(dDF_BaseMessage);
        } catch (InterruptedException e) {
            log.trace("terminated");
        }
    }

    private void doMarket(DDF_BaseMessage dDF_BaseMessage) {
        postMessage(dDF_BaseMessage);
    }

    private void doTimestamp(DDF_BaseMessage dDF_BaseMessage) {
        postMessage(dDF_BaseMessage);
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        Object message = messageEvent.getMessage();
        if (!(message instanceof DDF_BaseMessage)) {
            channelHandlerContext.sendUpstream(messageEvent);
            return;
        }
        DDF_BaseMessage dDF_BaseMessage = (DDF_BaseMessage) message;
        DDF_MessageType messageType = dDF_BaseMessage.getMessageType();
        if (messageType.isMarketMessage) {
            doMarket(dDF_BaseMessage);
        } else if (messageType.isControlTimestamp) {
            doTimestamp(dDF_BaseMessage);
        } else {
            log.debug("unknown message : {}", dDF_BaseMessage);
        }
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public void bindMessageListener(DDF_MessageListener dDF_MessageListener) {
        this.msgListener = dDF_MessageListener;
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public void bindStateListener(FeedStateListener feedStateListener) {
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public void startUpProxy() {
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public Future<Boolean> subscribe(Set<Subscription> set) {
        if (set == null) {
            log.error("Null subscribes request recieved");
            return new FailedFuture();
        }
        for (Subscription subscription : set) {
            if (subscription != null) {
                String instrument = subscription.getInstrument();
                if (this.subscriptions.containsKey(instrument)) {
                    this.subscriptions.get(instrument).addInterests(subscription.getInterests());
                } else {
                    this.subscriptions.put(instrument, subscription);
                }
            }
        }
        return new DummyFuture();
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public Future<Boolean> subscribe(Subscription subscription) {
        if (subscription == null) {
            log.error("Null subscribe request recieved");
            return new FailedFuture();
        }
        String instrument = subscription.getInstrument();
        if (this.subscriptions.containsKey(instrument)) {
            this.subscriptions.get(instrument).addInterests(subscription.getInterests());
        } else {
            this.subscriptions.put(instrument, subscription);
        }
        return new DummyFuture();
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public Future<Boolean> unsubscribe(Set<Subscription> set) {
        if (set == null) {
            log.error("Null subscribes request recieved");
            return new FailedFuture();
        }
        for (Subscription subscription : set) {
            if (subscription != null) {
                this.subscriptions.remove(subscription.getInstrument());
            }
        }
        return new DummyFuture();
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public Future<Boolean> unsubscribe(Subscription subscription) {
        if (subscription == null) {
            log.error("Null subscribe request recieved");
            return new FailedFuture();
        }
        this.subscriptions.remove(subscription.getInstrument());
        return new DummyFuture();
    }

    @Override // com.barchart.feed.ddf.datalink.api.DDF_FeedClientBase
    public void setPolicy(DDF_FeedEvent dDF_FeedEvent, EventPolicy eventPolicy) {
    }

    static {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        log = LoggerFactory.getLogger(HistoricReplayListenerClientDDF.class);
    }
}
