/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.dydx.service.v3;

import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.dydx.dto.v3.dydxInitialOrderBookMessage;
import info.bitrich.xchangestream.dydx.dto.v3.dydxUpdateOrderBookMessage;
import info.bitrich.xchangestream.dydx.dydxStreamingService;
import io.reactivex.Observable;
import java.math.BigDecimal;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
import org.knowm.xchange.instrument.Instrument;

public class dydxStreamingMarketDataService
implements StreamingMarketDataService {
    private final dydxStreamingService service;
    private final Map<CurrencyPair, SortedMap<BigDecimal, BigDecimal>> bids = new ConcurrentHashMap<CurrencyPair, SortedMap<BigDecimal, BigDecimal>>();
    private final Map<CurrencyPair, SortedMap<BigDecimal, BigDecimal>> asks = new ConcurrentHashMap<CurrencyPair, SortedMap<BigDecimal, BigDecimal>>();

    public dydxStreamingMarketDataService(dydxStreamingService service) {
        this.service = service;
    }

    private boolean containsPair(List<Instrument> pairs, CurrencyPair pair) {
        return pairs.stream().anyMatch(p -> p.equals(pair));
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object ... args) {
        if (!this.containsPair(this.service.getProduct().getOrderBook(), currencyPair)) {
            throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for orderbook", currencyPair));
        }
        int maxDepth = args.length > 0 && args[0] instanceof Number ? ((Number)args[0]).intValue() : 100;
        return this.service.getRawWebsocketTransactions(currencyPair, "v3_orderbook").map(message -> {
            this.bids.computeIfAbsent(currencyPair, k -> new TreeMap(Comparator.reverseOrder()));
            this.asks.computeIfAbsent(currencyPair, k -> new TreeMap());
            switch (message.getType()) {
                case "subscribed": {
                    return ((dydxInitialOrderBookMessage)message).toOrderBook(this.bids.get(currencyPair), this.asks.get(currencyPair), maxDepth, currencyPair);
                }
                case "channel_data": {
                    return ((dydxUpdateOrderBookMessage)message).toOrderBook(this.bids.get(currencyPair), this.asks.get(currencyPair), maxDepth, currencyPair);
                }
            }
            throw new UnsupportedOperationException(String.format("Unknown message type detected in OrderBook message: %s,", message.getType()));
        });
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object ... args) {
        throw new NotYetImplementedForExchangeException("Not yet implemented!");
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object ... args) {
        throw new NotYetImplementedForExchangeException("Not yet implemented!");
    }
}

