/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.coinbasepro;

import info.bitrich.xchangestream.coinbasepro.CoinbaseProStreamingService;
import info.bitrich.xchangestream.coinbasepro.dto.CoinbaseProWebSocketTransaction;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Observable;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import org.knowm.xchange.coinbasepro.CoinbaseProAdapters;
import org.knowm.xchange.coinbasepro.dto.marketdata.CoinbaseProProductStats;
import org.knowm.xchange.coinbasepro.dto.marketdata.CoinbaseProProductTicker;
import org.knowm.xchange.coinbasepro.dto.marketdata.CoinbaseProTrade;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.knowm.xchange.instrument.Instrument;

public class CoinbaseProStreamingMarketDataService
implements StreamingMarketDataService {
    private static final String SNAPSHOT = "snapshot";
    private static final String L2UPDATE = "l2update";
    private static final String TICKER = "ticker";
    private static final String MATCH = "match";
    private final CoinbaseProStreamingService service;
    private final Map<CurrencyPair, SortedMap<BigDecimal, LimitOrder>> bids = new ConcurrentHashMap<CurrencyPair, SortedMap<BigDecimal, LimitOrder>>();
    private final Map<CurrencyPair, SortedMap<BigDecimal, LimitOrder>> asks = new ConcurrentHashMap<CurrencyPair, SortedMap<BigDecimal, LimitOrder>>();

    CoinbaseProStreamingMarketDataService(CoinbaseProStreamingService service) {
        this.service = service;
    }

    private boolean containsPair(List<Instrument> pairs, CurrencyPair pair) {
        for (Instrument item : pairs) {
            if (pair.compareTo((CurrencyPair)item) != 0) continue;
            return true;
        }
        return false;
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object ... args) {
        if (!this.containsPair(this.service.getProduct().getOrderBook(), currencyPair)) {
            throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for orderbook", currencyPair));
        }
        int maxDepth = args.length > 0 && args[0] instanceof Number ? ((Number)args[0]).intValue() : 100;
        return this.getRawWebSocketTransactions(currencyPair, false).filter(message -> SNAPSHOT.equals(message.getType()) || L2UPDATE.equals(message.getType())).map(s -> {
            if (s.getType().equals(SNAPSHOT)) {
                this.bids.put(currencyPair, new TreeMap(Collections.reverseOrder()));
                this.asks.put(currencyPair, new TreeMap());
            } else {
                this.bids.computeIfAbsent(currencyPair, k -> new TreeMap(Collections.reverseOrder()));
                this.asks.computeIfAbsent(currencyPair, k -> new TreeMap());
            }
            return s.toOrderBook(this.bids.get(currencyPair), this.asks.get(currencyPair), maxDepth, currencyPair);
        });
    }

    public Observable<CoinbaseProProductTicker> getRawTicker(CurrencyPair currencyPair, Object ... args) {
        if (!this.containsPair(this.service.getProduct().getTicker(), currencyPair)) {
            throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for ticker", currencyPair));
        }
        return this.getRawWebSocketTransactions(currencyPair, true).filter(message -> TICKER.equals(message.getType())).map(CoinbaseProWebSocketTransaction::toCoinbaseProProductTicker);
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object ... args) {
        if (!this.containsPair(this.service.getProduct().getTicker(), currencyPair)) {
            throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for ticker", currencyPair));
        }
        return this.getRawWebSocketTransactions(currencyPair, true).filter(message -> TICKER.equals(message.getType())).map(s -> CoinbaseProAdapters.adaptTicker((CoinbaseProProductTicker)s.toCoinbaseProProductTicker(), (CoinbaseProProductStats)s.toCoinbaseProProductStats(), (CurrencyPair)currencyPair));
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object ... args) {
        if (!this.containsPair(this.service.getProduct().getTrades(), currencyPair)) {
            throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for trades", currencyPair));
        }
        return this.getRawWebSocketTransactions(currencyPair, true).filter(message -> MATCH.equals(message.getType())).filter(s -> s.getUserId() == null).map(s -> s.toCoinbaseProTrade()).map(t -> CoinbaseProAdapters.adaptTrades((CoinbaseProTrade[])new CoinbaseProTrade[]{t}, (CurrencyPair)currencyPair)).map(h -> (Trade)h.getTrades().get(0));
    }

    public Observable<CoinbaseProWebSocketTransaction> getRawWebSocketTransactions(CurrencyPair currencyPair, boolean filterChannelName) {
        return this.service.getRawWebSocketTransactions(currencyPair, filterChannelName);
    }
}

