/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.coinbasepro;

import info.bitrich.xchangestream.coinbasepro.CoinbaseProStreamingAdapters;
import info.bitrich.xchangestream.coinbasepro.CoinbaseProStreamingService;
import info.bitrich.xchangestream.coinbasepro.dto.CoinbaseProWebSocketTransaction;
import info.bitrich.xchangestream.core.StreamingTradeService;
import io.reactivex.Observable;
import java.util.Collections;
import java.util.List;
import org.knowm.xchange.coinbasepro.CoinbaseProAdapters;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.UserTrade;
import org.knowm.xchange.exceptions.ExchangeSecurityException;
import org.knowm.xchange.instrument.Instrument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoinbaseProStreamingTradeService
implements StreamingTradeService {
    private static final Logger LOG = LoggerFactory.getLogger(CoinbaseProStreamingTradeService.class);
    private static final String MATCH = "match";
    private final CoinbaseProStreamingService service;
    private boolean orderChangesWarningLogged;

    CoinbaseProStreamingTradeService(CoinbaseProStreamingService service) {
        this.service = service;
    }

    private boolean containsPair(List<Instrument> pairs, CurrencyPair pair) {
        for (Instrument item : pairs) {
            if (pair.compareTo((CurrencyPair)item) != 0) continue;
            return true;
        }
        return false;
    }

    public Observable<UserTrade> getUserTrades(CurrencyPair currencyPair, Object ... args) {
        if (!this.containsPair(this.service.getProduct().getUserTrades(), currencyPair)) {
            throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for user trades", currencyPair));
        }
        if (!this.service.isAuthenticated()) {
            throw new ExchangeSecurityException("Not authenticated");
        }
        return this.service.getRawWebSocketTransactions(currencyPair, true).filter(message -> message.getType().equals(MATCH)).filter(s -> s.getUserId() != null).map(s -> s.toCoinbaseProFill()).map(f -> CoinbaseProAdapters.adaptTradeHistory(Collections.singletonList(f))).map(h -> (UserTrade)h.getUserTrades().get(0));
    }

    public Observable<Order> getOrderChanges(CurrencyPair currencyPair, Object ... args) {
        if (!this.containsPair(this.service.getProduct().getOrders(), currencyPair)) {
            throw new UnsupportedOperationException(String.format("The currency pair %s is not subscribed for orders", currencyPair));
        }
        if (!this.service.isAuthenticated()) {
            throw new ExchangeSecurityException("Not authenticated");
        }
        if (!this.orderChangesWarningLogged) {
            LOG.warn("The order change stream is not yet fully implemented for Coinbase Pro. Orders are not fully populated, containing only the values changed since the last update. Other values will be null.");
            this.orderChangesWarningLogged = true;
        }
        return this.service.getRawWebSocketTransactions(currencyPair, true).filter(s -> s.getUserId() != null).map(CoinbaseProStreamingAdapters::adaptOrder);
    }

    public Observable<CoinbaseProWebSocketTransaction> getRawWebSocketTransactions(CurrencyPair currencyPair, boolean filterChannelName) {
        return this.service.getRawWebSocketTransactions(currencyPair, filterChannelName);
    }
}

