/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.poloniex2;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketEventsTransaction;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketOrderbookModifiedEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketSubscriptionMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.reactivex.Observable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.knowm.xchange.currency.CurrencyPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PoloniexStreamingService
extends JsonNettyStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(PoloniexStreamingService.class);
    private static final String HEARTBEAT = "1010";
    private final Map<String, String> subscribedChannels = new ConcurrentHashMap<String, String>();
    private final Map<String, Observable<JsonNode>> subscriptions = new ConcurrentHashMap<String, Observable<JsonNode>>();

    public PoloniexStreamingService(String apiUrl) {
        super(apiUrl, Integer.MAX_VALUE, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_RETRY_DURATION, 2);
    }

    protected void handleMessage(JsonNode message) {
        if (message.isArray()) {
            JsonNode event;
            JsonNode events;
            int channelId;
            if (message.size() < 3) {
                if (message.get(0).asText().equals(HEARTBEAT)) {
                    return;
                }
                if ("1002".equals(message.get(0).asText())) {
                    return;
                }
            }
            if ((channelId = Integer.parseInt(message.get(0).toString())) > 0 && channelId < 1000 && (events = message.get(2)) != null && events.isArray() && (event = events.get(0)).get(0).toString().equals("\"i\"") && event.get(1).has("orderBook")) {
                this.subscribedChannels.compute(String.valueOf(channelId), (key, oldValue) -> {
                    String currencyPair = event.get(1).get("currencyPair").asText();
                    if (oldValue != null && !oldValue.equals(currencyPair)) {
                        throw new RuntimeException("Attempted currency pair channel id reassignment");
                    }
                    if (oldValue == null) {
                        LOG.info("Register {} as {}", (Object)channelId, (Object)currencyPair);
                    } else {
                        LOG.debug("Order book reinitialization {} {}", (Object)channelId, (Object)currencyPair);
                    }
                    return currencyPair;
                });
            }
        }
        if (message.has("error")) {
            LOG.error("Error with message: " + message.get("error").asText());
            return;
        }
        super.handleMessage((Object)message);
    }

    public boolean processArrayMessageSeparately() {
        return false;
    }

    public synchronized Observable<JsonNode> subscribeChannel(String channelName, Object ... args) {
        if (!this.channels.containsKey(channelName)) {
            this.subscriptions.put(channelName, (Observable<JsonNode>)super.subscribeChannel(channelName, args));
        }
        return this.subscriptions.get(channelName);
    }

    public Observable<List<PoloniexWebSocketEvent>> subscribeCurrencyPairChannel(CurrencyPair currencyPair) {
        String channelName = currencyPair.counter.toString() + "_" + currencyPair.base.toString();
        return this.subscribeChannel(channelName, new Object[0]).map(jsonNode -> (PoloniexWebSocketEventsTransaction)this.objectMapper.treeToValue((TreeNode)jsonNode, PoloniexWebSocketEventsTransaction.class)).scan((poloniexWebSocketEventsTransactionOld, poloniexWebSocketEventsTransactionNew) -> {
            boolean sequenceContinuous;
            boolean initialSnapshot = poloniexWebSocketEventsTransactionNew.getEvents().stream().anyMatch(PoloniexWebSocketOrderbookModifiedEvent.class::isInstance);
            boolean bl = sequenceContinuous = poloniexWebSocketEventsTransactionOld.getSeqId() + 1L == poloniexWebSocketEventsTransactionNew.getSeqId();
            if (!initialSnapshot || sequenceContinuous) {
                return poloniexWebSocketEventsTransactionNew;
            }
            throw new RuntimeException(String.format("Invalid sequencing, old: %s new: %s", this.objectMapper.writeValueAsString(poloniexWebSocketEventsTransactionOld), this.objectMapper.writeValueAsString(poloniexWebSocketEventsTransactionNew)));
        }).map(PoloniexWebSocketEventsTransaction::getEvents).share();
    }

    protected String getChannelNameFromMessage(JsonNode message) {
        String strChannelId = message.get(0).asText();
        int channelId = Integer.parseInt(strChannelId);
        if (channelId >= 1000) {
            return strChannelId;
        }
        return this.subscribedChannels.get(message.get(0).asText());
    }

    public String getSubscribeMessage(String channelName, Object ... args) throws IOException {
        PoloniexWebSocketSubscriptionMessage subscribeMessage = new PoloniexWebSocketSubscriptionMessage("subscribe", channelName);
        return this.objectMapper.writeValueAsString((Object)subscribeMessage);
    }

    public String getUnsubscribeMessage(String channelName, Object ... args) throws IOException {
        PoloniexWebSocketSubscriptionMessage subscribeMessage = new PoloniexWebSocketSubscriptionMessage("unsubscribe", channelName);
        return this.objectMapper.writeValueAsString((Object)subscribeMessage);
    }
}

