/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.poloniex2;

import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.poloniex2.PoloniexStreamingService;
import info.bitrich.xchangestream.poloniex2.dto.OrderbookInsertEvent;
import info.bitrich.xchangestream.poloniex2.dto.OrderbookModifiedEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexOrderbook;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketAdapter;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketOrderbookInsertEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketOrderbookModifiedEvent;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketTickerTransaction;
import info.bitrich.xchangestream.poloniex2.dto.PoloniexWebSocketTradeEvent;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.math.BigDecimal;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.poloniex.PoloniexAdapters;
import org.knowm.xchange.poloniex.dto.marketdata.PoloniexDepth;
import org.knowm.xchange.poloniex.dto.marketdata.PoloniexTicker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PoloniexStreamingMarketDataService
implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger(PoloniexStreamingMarketDataService.class);
    private static final String TICKER_CHANNEL_ID = "1002";
    private final PoloniexStreamingService service;
    private final Supplier<Observable<Ticker>> streamingTickers;

    public PoloniexStreamingMarketDataService(PoloniexStreamingService service, Map<Integer, CurrencyPair> currencyIdMap) {
        this.service = service;
        ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
        this.streamingTickers = Suppliers.memoize(() -> service.subscribeChannel(TICKER_CHANNEL_ID, new Object[0]).map(s -> {
            PoloniexWebSocketTickerTransaction ticker = (PoloniexWebSocketTickerTransaction)mapper.treeToValue((TreeNode)s, PoloniexWebSocketTickerTransaction.class);
            CurrencyPair currencyPair = (CurrencyPair)currencyIdMap.get(ticker.getPairId());
            return PoloniexAdapters.adaptPoloniexTicker((PoloniexTicker)ticker.toPoloniexTicker(currencyPair), (CurrencyPair)currencyPair);
        }).share());
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object ... args) {
        Observable subscribedOrderbook = this.service.subscribeCurrencyPairChannel(currencyPair).scan(Optional.empty(), (orderbook, poloniexWebSocketEvents) -> poloniexWebSocketEvents.stream().filter(s -> s instanceof PoloniexWebSocketOrderbookInsertEvent || s instanceof PoloniexWebSocketOrderbookModifiedEvent).reduce(orderbook, (poloniexOrderbook, s) -> this.getPoloniexOrderbook((Optional<PoloniexOrderbook>)orderbook, (PoloniexWebSocketEvent)s), (o1, o2) -> {
            throw new UnsupportedOperationException("No parallel execution");
        })).filter(Optional::isPresent).map(Optional::get);
        return subscribedOrderbook.map(s -> PoloniexAdapters.adaptPoloniexDepth((PoloniexDepth)s.toPoloniexDepth(), (CurrencyPair)currencyPair));
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object ... args) {
        return ((Observable)this.streamingTickers.get()).filter(ticker -> ticker.getCurrencyPair().equals((Object)currencyPair));
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object ... args) {
        Observable subscribedTrades = this.service.subscribeCurrencyPairChannel(currencyPair).flatMapIterable(poloniexWebSocketEvents -> poloniexWebSocketEvents).filter(PoloniexWebSocketTradeEvent.class::isInstance).map(PoloniexWebSocketTradeEvent.class::cast).share();
        return subscribedTrades.map(s -> PoloniexWebSocketAdapter.convertPoloniexWebSocketTradeEventToTrade(s, currencyPair));
    }

    private Optional<PoloniexOrderbook> getPoloniexOrderbook(Optional<PoloniexOrderbook> orderbook, PoloniexWebSocketEvent s) {
        if (s.getEventType().equals("i")) {
            OrderbookInsertEvent insertEvent = ((PoloniexWebSocketOrderbookInsertEvent)s).getInsert();
            SortedMap<BigDecimal, BigDecimal> asks = insertEvent.toDepthLevels(0);
            SortedMap<BigDecimal, BigDecimal> bids = insertEvent.toDepthLevels(1);
            return Optional.of(new PoloniexOrderbook(asks, bids));
        }
        OrderbookModifiedEvent modifiedEvent = ((PoloniexWebSocketOrderbookModifiedEvent)s).getModifiedEvent();
        orderbook.orElseThrow(() -> new IllegalStateException("Orderbook update received before initial snapshot")).modify(modifiedEvent);
        return orderbook;
    }
}

