/*
 * Decompiled with CFR 0.152.
 */
package info.bitrich.xchangestream.bitmex;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.google.common.collect.ImmutableSet;
import info.bitrich.xchangestream.bitmex.BitmexAuthenticator;
import info.bitrich.xchangestream.bitmex.dto.BitmexWebSocketSubscriptionMessage;
import info.bitrich.xchangestream.bitmex.dto.BitmexWebSocketTransaction;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import org.knowm.xchange.bitmex.service.BitmexDigest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BitmexStreamingService
extends JsonNettyStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(BitmexStreamingService.class);
    private static final Set<String> SIMPLE_TABLES = ImmutableSet.of((Object)"order", (Object)"funding", (Object)"settlement", (Object)"position", (Object)"wallet", (Object)"margin", (Object[])new String[0]);
    private final ObjectMapper mapper = new ObjectMapper();
    private final List<ObservableEmitter<Long>> delayEmitters = new LinkedList<ObservableEmitter<Long>>();
    private final String apiKey;
    private final String secretKey;
    public static final int DMS_CANCEL_ALL_IN = 60000;
    public static final int DMS_RESUBSCRIBE = 15000;
    private volatile long dmsCancelTime;
    private volatile Disposable dmsDisposable;

    public BitmexStreamingService(String apiUrl, String apiKey, String secretKey) {
        super(apiUrl, Integer.MAX_VALUE);
        this.apiKey = apiKey;
        this.secretKey = secretKey;
    }

    public BitmexStreamingService(String apiUrl, String apiKey, String secretKey, int maxFramePayloadLength, Duration connectionTimeout, Duration retryDuration, int idleTimeoutSeconds) {
        super(apiUrl, maxFramePayloadLength, connectionTimeout, retryDuration, idleTimeoutSeconds);
        this.apiKey = apiKey;
        this.secretKey = secretKey;
    }

    private void login() throws JsonProcessingException {
        long expires = System.currentTimeMillis() + 30L;
        String path = "/realtime";
        String signature = BitmexAuthenticator.generateSignature(this.secretKey, "GET", path, String.valueOf(expires), "");
        HashMap<String, Object> cmd = new HashMap<String, Object>();
        cmd.put("op", "authKey");
        cmd.put("args", Arrays.asList(this.apiKey, expires, signature));
        this.sendMessage(this.mapper.writeValueAsString(cmd));
    }

    public Completable connect() {
        Completable conn = super.connect();
        if (this.apiKey == null) {
            return conn;
        }
        return conn.andThen(completable -> {
            try {
                this.login();
                completable.onComplete();
            }
            catch (IOException e) {
                completable.onError((Throwable)e);
            }
        });
    }

    protected void handleMessage(JsonNode message) {
        if (!this.delayEmitters.isEmpty() && message.has("data")) {
            JsonNode data;
            String table = "";
            if (message.has("table")) {
                table = message.get("table").asText();
            }
            if ((data = message.get("data")).getNodeType().equals((Object)JsonNodeType.ARRAY)) {
                Long current = System.currentTimeMillis();
                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
                formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
                JsonNode d = data.get(0);
                if (d != null && d.has("timestamp") && (!"order".equals(table) || d.has("ordStatus") && "NEW".equals(d.get("ordStatus").asText()))) {
                    try {
                        String timestamp = d.get("timestamp").asText();
                        Date date = formatter.parse(timestamp);
                        long delay = current - date.getTime();
                        for (ObservableEmitter<Long> emitter : this.delayEmitters) {
                            emitter.onNext((Object)delay);
                        }
                    }
                    catch (ParseException e) {
                        LOG.error("Parsing timestamp error: ", (Throwable)e);
                    }
                }
            }
        }
        if (message.has("info") || message.has("success")) {
            return;
        }
        if (message.has("error")) {
            String error = message.get("error").asText();
            LOG.error("Error with message: " + error);
            return;
        }
        if (message.has("now") && message.has("cancelTime")) {
            this.handleDeadMansSwitchMessage(message);
            return;
        }
        super.handleMessage((Object)message);
    }

    private void handleDeadMansSwitchMessage(JsonNode message) {
        try {
            String cancelTime = message.get("cancelTime").asText();
            if ("0".equals(cancelTime)) {
                LOG.info("Dead man's switch disabled");
                this.dmsDisposable.dispose();
                this.dmsDisposable = null;
                this.dmsCancelTime = 0L;
            } else {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
                sdf.setTimeZone(TimeZone.getTimeZone(ZoneOffset.UTC));
                this.dmsCancelTime = sdf.parse(cancelTime).getTime();
            }
        }
        catch (ParseException e) {
            LOG.error("Error parsing deadman's confirmation ");
        }
    }

    protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
        return null;
    }

    public Observable<BitmexWebSocketTransaction> subscribeBitmexChannel(String channelName) {
        return this.subscribeChannel(channelName, new Object[0]).map(s -> (BitmexWebSocketTransaction)this.objectMapper.treeToValue((TreeNode)s, BitmexWebSocketTransaction.class)).share();
    }

    protected DefaultHttpHeaders getCustomHeaders() {
        DefaultHttpHeaders customHeaders = super.getCustomHeaders();
        if (this.secretKey == null || this.apiKey == null) {
            return customHeaders;
        }
        long expires = System.currentTimeMillis() / 1000L + 5L;
        BitmexDigest bitmexDigester = BitmexDigest.createInstance((String)this.secretKey, (String)this.apiKey);
        String stringToDigest = "GET/realtime" + expires;
        String signature = bitmexDigester.digestString(stringToDigest);
        customHeaders.add("api-key", (Object)this.apiKey);
        customHeaders.add("api-signature", (Object)signature);
        return customHeaders;
    }

    protected String getChannelNameFromMessage(JsonNode message) throws IOException {
        String table = message.get("table").asText();
        if (SIMPLE_TABLES.contains(table)) {
            return table;
        }
        JsonNode data = message.get("data");
        String instrument = data.size() > 0 ? data.get(0).get("symbol").asText() : message.get("filter").get("symbol").asText();
        return String.format("%s:%s", table, instrument);
    }

    public String getSubscribeMessage(String channelName, Object ... args) throws IOException {
        BitmexWebSocketSubscriptionMessage subscribeMessage = new BitmexWebSocketSubscriptionMessage("subscribe", new String[]{channelName});
        return this.objectMapper.writeValueAsString((Object)subscribeMessage);
    }

    public String getUnsubscribeMessage(String channelName, Object ... args) throws IOException {
        BitmexWebSocketSubscriptionMessage subscribeMessage = new BitmexWebSocketSubscriptionMessage("unsubscribe", new String[]{channelName});
        return this.objectMapper.writeValueAsString((Object)subscribeMessage);
    }

    public void enableDeadMansSwitch(long rate, long timeout) throws IOException {
        if (this.dmsDisposable != null) {
            LOG.warn("You already have Dead Man's switch enabled. Doing nothing");
            return;
        }
        BitmexWebSocketSubscriptionMessage subscriptionMessage = new BitmexWebSocketSubscriptionMessage("cancelAllAfter", new Object[]{timeout});
        String message = this.objectMapper.writeValueAsString((Object)subscriptionMessage);
        this.dmsDisposable = Schedulers.single().schedulePeriodicallyDirect(() -> this.sendMessage(message), 0L, rate, TimeUnit.MILLISECONDS);
        Schedulers.single().start();
    }

    public void disableDeadMansSwitch() throws IOException {
        BitmexWebSocketSubscriptionMessage subscriptionMessage = new BitmexWebSocketSubscriptionMessage("cancelAllAfter", new Object[]{0});
        String message = this.objectMapper.writeValueAsString((Object)subscriptionMessage);
        this.sendMessage(message);
    }

    public boolean isDeadMansSwitchEnabled() {
        return this.dmsCancelTime > 0L && System.currentTimeMillis() < this.dmsCancelTime;
    }

    public void addDelayEmitter(ObservableEmitter<Long> delayEmitter) {
        this.delayEmitters.add(delayEmitter);
    }
}

