/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.okex;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.okex.OkexStreamingService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.FundingRate;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.OrderBookUpdate;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.instrument.Instrument;
import org.knowm.xchange.okex.OkexAdapters;
import org.knowm.xchange.okex.dto.marketdata.OkexFundingRate;
import org.knowm.xchange.okex.dto.marketdata.OkexOrderbook;
import org.knowm.xchange.okex.dto.marketdata.OkexPublicOrder;
import org.knowm.xchange.okex.dto.marketdata.OkexTicker;
import org.knowm.xchange.okex.dto.marketdata.OkexTrade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OkexStreamingMarketDataService
implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger(OkexStreamingMarketDataService.class);
    private final OkexStreamingService service;
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
    private final Map<Instrument, PublishSubject<List<OrderBookUpdate>>> orderBookUpdatesSubscriptions;
    private final Map<String, OrderBook> orderBookMap = new HashMap<String, OrderBook>();

    public OkexStreamingMarketDataService(OkexStreamingService service) {
        this.service = service;
        this.orderBookUpdatesSubscriptions = new ConcurrentHashMap<Instrument, PublishSubject<List<OrderBookUpdate>>>();
    }

    public Observable<Ticker> getTicker(Instrument instrument, Object ... args) {
        String channelUniqueId = "tickers" + OkexAdapters.adaptInstrument((Instrument)instrument);
        return this.service.subscribeChannel(channelUniqueId, new Object[0]).filter(message -> message.has("data")).flatMap(jsonNode -> {
            List okexTickers = (List)this.mapper.treeToValue((TreeNode)jsonNode.get("data"), (JavaType)this.mapper.getTypeFactory().constructCollectionType(List.class, OkexTicker.class));
            return Observable.fromIterable((Iterable)okexTickers).map(OkexAdapters::adaptTicker);
        });
    }

    public Observable<Trade> getTrades(Instrument instrument, Object ... args) {
        String channelUniqueId = "trades" + OkexAdapters.adaptInstrument((Instrument)instrument);
        return this.service.subscribeChannel(channelUniqueId, new Object[0]).filter(message -> message.has("data")).flatMap(jsonNode -> {
            List okexTradeList = (List)this.mapper.treeToValue((TreeNode)jsonNode.get("data"), (JavaType)this.mapper.getTypeFactory().constructCollectionType(List.class, OkexTrade.class));
            return Observable.fromIterable((Iterable)OkexAdapters.adaptTrades((List)okexTradeList, (Instrument)instrument).getTrades());
        });
    }

    public Observable<FundingRate> getFundingRate(Instrument instrument, Object ... args) {
        String channelUniqueId = "funding-rate" + OkexAdapters.adaptInstrument((Instrument)instrument);
        return this.service.subscribeChannel(channelUniqueId, new Object[0]).filter(message -> message.has("data")).map(jsonNode -> {
            List okexFundingRates = (List)this.mapper.treeToValue((TreeNode)jsonNode.get("data"), (JavaType)this.mapper.getTypeFactory().constructCollectionType(List.class, OkexFundingRate.class));
            return OkexAdapters.adaptFundingRate((List)okexFundingRates);
        });
    }

    public Observable<OrderBook> getOrderBook(Instrument instrument, Object ... args) {
        String instId = OkexAdapters.adaptInstrument((Instrument)instrument);
        String channelName = args.length >= 1 ? args[0].toString() : "books";
        String channelUniqueId = "books" + instId;
        return this.service.subscribeChannel(channelUniqueId, new Object[0]).filter(message -> message.has("action")).flatMap(jsonNode -> {
            String action;
            String string = action = channelName.equals("books5") ? "snapshot" : jsonNode.get("action").asText();
            if ("snapshot".equalsIgnoreCase(action)) {
                List okexOrderbooks = (List)this.mapper.treeToValue((TreeNode)jsonNode.get("data"), (JavaType)this.mapper.getTypeFactory().constructCollectionType(List.class, OkexOrderbook.class));
                OrderBook orderBook = OkexAdapters.adaptOrderBook((List)okexOrderbooks, (Instrument)instrument);
                this.orderBookMap.put(instId, orderBook);
                return Observable.just((Object)orderBook);
            }
            if ("update".equalsIgnoreCase(action)) {
                OrderBook orderBook = this.orderBookMap.getOrDefault(instId, null);
                if (orderBook == null) {
                    LOG.error(String.format("Failed to get orderBook, instId=%s.", instId));
                    return Observable.fromIterable(new LinkedList());
                }
                List asks = (List)this.mapper.treeToValue((TreeNode)jsonNode.get("data").get(0).get("asks"), (JavaType)this.mapper.getTypeFactory().constructCollectionType(List.class, OkexPublicOrder.class));
                asks.forEach(okexPublicOrder -> orderBook.update(OkexAdapters.adaptLimitOrder((OkexPublicOrder)okexPublicOrder, (Instrument)instrument, (Order.OrderType)Order.OrderType.ASK)));
                List bids = (List)this.mapper.treeToValue((TreeNode)jsonNode.get("data").get(0).get("bids"), (JavaType)this.mapper.getTypeFactory().constructCollectionType(List.class, OkexPublicOrder.class));
                bids.forEach(okexPublicOrder -> orderBook.update(OkexAdapters.adaptLimitOrder((OkexPublicOrder)okexPublicOrder, (Instrument)instrument, (Order.OrderType)Order.OrderType.BID)));
                if (this.orderBookUpdatesSubscriptions.get(instrument) != null) {
                    this.orderBookUpdatesSubscriptions(instrument, asks, bids, new Timestamp(Long.parseLong(jsonNode.get("data").get(0).get("ts").asText())));
                }
                return Observable.just((Object)orderBook);
            }
            LOG.error(String.format("Unexpected books action=%s, message=%s", action, jsonNode));
            return Observable.fromIterable(new LinkedList());
        });
    }

    public Observable<List<OrderBookUpdate>> getOrderBookUpdates(Instrument instrument) {
        return (Observable)this.orderBookUpdatesSubscriptions.computeIfAbsent(instrument, v -> PublishSubject.create());
    }

    private void orderBookUpdatesSubscriptions(Instrument instrument, List<OkexPublicOrder> asks, List<OkexPublicOrder> bids, Date date) {
        OrderBookUpdate o;
        ArrayList<OrderBookUpdate> orderBookUpdates = new ArrayList<OrderBookUpdate>();
        for (OkexPublicOrder ask : asks) {
            o = new OrderBookUpdate(Order.OrderType.ASK, ask.getVolume(), instrument, ask.getPrice(), date, ask.getVolume());
            orderBookUpdates.add(o);
        }
        for (OkexPublicOrder bid : bids) {
            o = new OrderBookUpdate(Order.OrderType.BID, bid.getVolume(), instrument, bid.getPrice(), date, bid.getVolume());
            orderBookUpdates.add(o);
        }
        this.orderBookUpdatesSubscriptions.get(instrument).onNext(orderBookUpdates);
    }
}

