/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.okex;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import info.bitrich.xchangestream.okex.dto.OkexLoginMessage;
import info.bitrich.xchangestream.okex.dto.OkexSubscribeMessage;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.NettyStreamingService;
import info.bitrich.xchangestream.service.netty.WebSocketClientHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.knowm.xchange.ExchangeSpecification;
import org.knowm.xchange.exceptions.ExchangeException;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
import org.knowm.xchange.okex.dto.OkexInstType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OkexStreamingService
extends JsonNettyStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(OkexStreamingService.class);
    private static final String LOGIN_SIGN_METHOD = "GET";
    private static final String LOGIN_SIGN_REQUEST_PATH = "/users/self/verify";
    private static final String SUBSCRIBE = "subscribe";
    private static final String UNSUBSCRIBE = "unsubscribe";
    public static final String TRADES = "trades";
    public static final String ORDERBOOK = "books";
    public static final String ORDERBOOK5 = "books5";
    public static final String FUNDING_RATE = "funding-rate";
    public static final String TICKERS = "tickers";
    public static final String USERTRADES = "orders";
    private final Observable<Long> pingPongSrc = Observable.interval((long)15L, (long)15L, (TimeUnit)TimeUnit.SECONDS);
    private WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler = null;
    private Disposable pingPongSubscription;
    private final ExchangeSpecification xSpec;

    public OkexStreamingService(String apiUrl, ExchangeSpecification exchangeSpecification) {
        super(apiUrl);
        this.xSpec = exchangeSpecification;
    }

    public Completable connect() {
        Completable conn = super.connect();
        return conn.andThen(completable -> {
            try {
                if (this.xSpec.getApiKey() != null) {
                    this.login();
                }
                if (this.pingPongSubscription != null && !this.pingPongSubscription.isDisposed()) {
                    this.pingPongSubscription.dispose();
                }
                this.pingPongSubscription = this.pingPongSrc.subscribe(o -> this.sendMessage("ping"));
                completable.onComplete();
            }
            catch (Exception e) {
                completable.onError((Throwable)e);
            }
        });
    }

    public void login() throws JsonProcessingException {
        Mac mac;
        try {
            mac = Mac.getInstance("HmacSHA256");
            SecretKeySpec secretKey = new SecretKeySpec(this.xSpec.getSecretKey().getBytes(StandardCharsets.UTF_8), "HmacSHA256");
            mac.init(secretKey);
        }
        catch (InvalidKeyException | NoSuchAlgorithmException e) {
            throw new ExchangeException("Invalid API secret", (Throwable)e);
        }
        String timestamp = String.valueOf(System.currentTimeMillis() / 1000L);
        String toSign = timestamp + LOGIN_SIGN_METHOD + LOGIN_SIGN_REQUEST_PATH;
        String sign = Base64.getEncoder().encodeToString(mac.doFinal(toSign.getBytes(StandardCharsets.UTF_8)));
        OkexLoginMessage message = new OkexLoginMessage();
        String passphrase = this.xSpec.getExchangeSpecificParametersItem("passphrase").toString();
        OkexLoginMessage.LoginArg loginArg = new OkexLoginMessage.LoginArg(this.xSpec.getApiKey(), passphrase, timestamp, sign);
        message.getArgs().add(loginArg);
        this.sendMessage(this.objectMapper.writeValueAsString((Object)message));
    }

    public void messageHandler(String message) {
        JsonNode jsonNode;
        LOG.debug("Received message: {}", (Object)message);
        try {
            jsonNode = this.objectMapper.readTree(message);
        }
        catch (IOException e) {
            if ("pong".equals(message)) {
                return;
            }
            LOG.error("Error parsing incoming message to JSON: {}", (Object)message);
            return;
        }
        if (this.processArrayMessageSeparately() && jsonNode.isArray()) {
            for (JsonNode node : jsonNode) {
                this.handleMessage(node);
            }
        } else {
            this.handleMessage(jsonNode);
        }
    }

    protected String getChannelNameFromMessage(JsonNode message) {
        String channelName = "";
        if (message.has("arg") && message.get("arg").has("channel") && message.get("arg").has("instId")) {
            channelName = message.get("arg").get("channel").asText() + message.get("arg").get("instId").asText();
        }
        return channelName;
    }

    public String getSubscribeMessage(String channelName, Object ... args) throws IOException {
        return this.objectMapper.writeValueAsString((Object)new OkexSubscribeMessage(SUBSCRIBE, Collections.singletonList(this.getTopic(channelName))));
    }

    public String getUnsubscribeMessage(String channelName, Object ... args) throws IOException {
        return this.objectMapper.writeValueAsString((Object)new OkexSubscribeMessage(UNSUBSCRIBE, Collections.singletonList(this.getTopic(channelName))));
    }

    private OkexSubscribeMessage.SubscriptionTopic getTopic(String channelName) {
        if (channelName.contains(ORDERBOOK5)) {
            return new OkexSubscribeMessage.SubscriptionTopic(ORDERBOOK5, null, null, channelName.replace(ORDERBOOK5, ""));
        }
        if (channelName.contains(ORDERBOOK)) {
            return new OkexSubscribeMessage.SubscriptionTopic(ORDERBOOK, null, null, channelName.replace(ORDERBOOK, ""));
        }
        if (channelName.contains(TRADES)) {
            return new OkexSubscribeMessage.SubscriptionTopic(TRADES, null, null, channelName.replace(TRADES, ""));
        }
        if (channelName.contains(TICKERS)) {
            return new OkexSubscribeMessage.SubscriptionTopic(TICKERS, null, null, channelName.replace(TICKERS, ""));
        }
        if (channelName.contains(USERTRADES)) {
            return new OkexSubscribeMessage.SubscriptionTopic(USERTRADES, OkexInstType.ANY, null, channelName.replace(USERTRADES, ""));
        }
        if (channelName.contains(FUNDING_RATE)) {
            return new OkexSubscribeMessage.SubscriptionTopic(FUNDING_RATE, null, null, channelName.replace(FUNDING_RATE, ""));
        }
        throw new NotYetImplementedForExchangeException("ChannelName: " + channelName + " has not implemented yet on " + ((Object)((Object)this)).getClass().getSimpleName());
    }

    protected WebSocketClientHandler getWebSocketClientHandler(WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) {
        LOG.info("Registering OkxWebSocketClientHandler");
        return new OkxWebSocketClientHandler(handshake, handler);
    }

    public void setChannelInactiveHandler(WebSocketClientHandler.WebSocketMessageHandler channelInactiveHandler) {
        this.channelInactiveHandler = channelInactiveHandler;
    }

    public void pingPongDisconnectIfConnected() {
        if (this.pingPongSubscription != null && !this.pingPongSubscription.isDisposed()) {
            this.pingPongSubscription.dispose();
        }
    }

    class OkxWebSocketClientHandler
    extends NettyStreamingService.NettyWebSocketClientHandler {
        public OkxWebSocketClientHandler(WebSocketClientHandshaker handshake, WebSocketClientHandler.WebSocketMessageHandler handler) {
            super((NettyStreamingService)OkexStreamingService.this, handshake, handler);
        }

        public void channelActive(ChannelHandlerContext ctx) {
            super.channelActive(ctx);
        }

        public void channelInactive(ChannelHandlerContext ctx) {
            super.channelInactive(ctx);
            if (OkexStreamingService.this.channelInactiveHandler != null) {
                OkexStreamingService.this.channelInactiveHandler.onMessage("WebSocket Client disconnected!");
            }
        }
    }
}

