package org.web3j.protocol.websocket;

import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.web3j.protocol.core.Request;
import org.web3j.protocol.core.Response;
import org.web3j.protocol.core.methods.response.EthSubscribe;
import org.web3j.protocol.core.methods.response.Web3ClientVersion;
import org.web3j.protocol.websocket.events.NewHead;
import org.web3j.protocol.websocket.events.NewHeadsNotification;

/* loaded from: input_file:org/web3j/protocol/websocket/WebSocketServiceTest.class */
public class WebSocketServiceTest {
    private static final int REQUEST_ID = 1;
    private WebSocketListener listener;
    private Request<Object, EthSubscribe> subscribeRequest;
    private WebSocketClient webSocketClient = (WebSocketClient) Mockito.mock(WebSocketClient.class);
    private ScheduledExecutorService executorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
    private WebSocketService service = new WebSocketService(this.webSocketClient, this.executorService, true);
    private Request<?, Web3ClientVersion> request = new Request<>("web3_clientVersion", Collections.emptyList(), this.service, Web3ClientVersion.class);

    @BeforeEach
    public void before() throws InterruptedException {
        Mockito.when(Boolean.valueOf(this.webSocketClient.connectBlocking())).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.webSocketClient.reconnectBlocking())).thenReturn(true);
        this.request.setId(1L);
    }

    @Test
    public void testThrowExceptionIfServerUrlIsInvalid() {
        Assertions.assertThrows(RuntimeException.class, () -> {
            new WebSocketService("invalid\\url", true);
        });
    }

    @Test
    public void testConnectViaWebSocketClient() throws Exception {
        this.service.connect();
        ((WebSocketClient) Mockito.verify(this.webSocketClient)).connectBlocking();
    }

    @Test
    public void testReConnectAfterConnected() throws Exception {
        this.service.connect();
        this.service.close();
        this.service.connect();
        ((WebSocketClient) Mockito.verify(this.webSocketClient, Mockito.atMostOnce())).connectBlocking();
        ((WebSocketClient) Mockito.verify(this.webSocketClient, Mockito.atMostOnce())).reconnectBlocking();
    }

    @Test
    public void testInterruptCurrentThreadIfConnectionIsInterrupted() throws Exception {
        Mockito.when(Boolean.valueOf(this.webSocketClient.connectBlocking())).thenThrow(new Throwable[]{new InterruptedException()});
        this.service.connect();
        Assertions.assertTrue(Thread.currentThread().isInterrupted(), "Interrupted flag was not set properly");
    }

    @Test
    public void testThrowExceptionIfConnectionFailed() throws Exception {
        Mockito.when(Boolean.valueOf(this.webSocketClient.connectBlocking())).thenReturn(false);
        Assertions.assertThrows(ConnectException.class, () -> {
            this.service.connect();
        });
    }

    @Test
    public void testAddedWebSocketListener() throws Exception {
        ((WebSocketClient) Mockito.doAnswer(invocationOnMock -> {
            this.listener = (WebSocketListener) invocationOnMock.getArgument(0, WebSocketListener.class);
            return null;
        }).when(this.webSocketClient)).setListener((WebSocketListener) ArgumentMatchers.any());
        CountDownLatch countDownLatch = new CountDownLatch(REQUEST_ID);
        CountDownLatch countDownLatch2 = new CountDownLatch(REQUEST_ID);
        CountDownLatch countDownLatch3 = new CountDownLatch(REQUEST_ID);
        WebSocketService webSocketService = this.service;
        Consumer consumer = str -> {
            countDownLatch.countDown();
        };
        Consumer consumer2 = th -> {
            countDownLatch3.countDown();
        };
        countDownLatch2.getClass();
        webSocketService.connect(consumer, consumer2, countDownLatch2::countDown);
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        this.listener.onMessage("{\"jsonrpc\":\"2.0\",\"method\":\"web3_clientVersion\",\"params\":[],\"id\":1}");
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        this.listener.onError(new Exception());
        Assertions.assertTrue(countDownLatch3.await(2L, TimeUnit.SECONDS));
        this.listener.onClose();
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS));
    }

    @Test
    public void testNotWaitingForReplyWithUnknownId() {
        Assertions.assertFalse(this.service.isWaitingForReply(123L));
    }

    @Test
    public void testWaitingForReplyToSentRequest() throws Exception {
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        Assertions.assertTrue(this.service.isWaitingForReply(this.request.getId()));
    }

    @Test
    public void testNoLongerWaitingForResponseAfterReply() throws Exception {
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        sendGethVersionReply();
        Assertions.assertFalse(this.service.isWaitingForReply(1L));
    }

    @Test
    public void testSendWebSocketRequest() throws Exception {
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        ((WebSocketClient) Mockito.verify(this.webSocketClient)).send("{\"jsonrpc\":\"2.0\",\"method\":\"web3_clientVersion\",\"params\":[],\"id\":1}");
    }

    @Test
    public void testIgnoreInvalidReplies() {
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        Assertions.assertThrows(IOException.class, () -> {
            this.service.onWebSocketMessage("{");
        });
    }

    @Test
    public void testThrowExceptionIfIdHasInvalidType() {
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        Assertions.assertThrows(IOException.class, () -> {
            this.service.onWebSocketMessage("{\"id\":true}");
        });
    }

    @Test
    public void testThrowExceptionIfIdIsMissing() {
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        Assertions.assertThrows(IOException.class, () -> {
            this.service.onWebSocketMessage("{}");
        });
    }

    @Test
    public void testThrowExceptionIfUnexpectedIdIsReceived() {
        this.service.sendAsync(this.request, Web3ClientVersion.class);
        Assertions.assertThrows(IOException.class, () -> {
            this.service.onWebSocketMessage("{\"jsonrpc\":\"2.0\",\"id\":12345,\"result\":\"geth-version\"}");
        });
    }

    @Test
    public void testReceiveReply() throws Exception {
        CompletableFuture sendAsync = this.service.sendAsync(this.request, Web3ClientVersion.class);
        sendGethVersionReply();
        Assertions.assertTrue(sendAsync.isDone());
        Assertions.assertEquals("geth-version", ((Web3ClientVersion) sendAsync.get()).getWeb3ClientVersion());
    }

    @Test
    public void testReceiveError() throws Exception {
        CompletableFuture sendAsync = this.service.sendAsync(this.request, Web3ClientVersion.class);
        sendErrorReply();
        Assertions.assertTrue(sendAsync.isDone());
        Web3ClientVersion web3ClientVersion = (Web3ClientVersion) sendAsync.get();
        Assertions.assertTrue(web3ClientVersion.hasError());
        Assertions.assertEquals(new Response.Error(-1, "Error message"), web3ClientVersion.getError());
    }

    @Test
    public void testCloseRequestWhenConnectionIsClosed() {
        CompletableFuture sendAsync = this.service.sendAsync(this.request, Web3ClientVersion.class);
        this.service.onWebSocketClose();
        Assertions.assertTrue(sendAsync.isDone());
        Assertions.assertThrows(ExecutionException.class, () -> {
        });
    }

    @Test
    public void testCancelRequestAfterTimeout() {
        Mockito.when(this.executorService.schedule((Runnable) ArgumentMatchers.any(Runnable.class), ArgumentMatchers.eq(60L), (TimeUnit) ArgumentMatchers.eq(TimeUnit.SECONDS))).then(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgument(0, Runnable.class)).run();
            return null;
        });
        CompletableFuture sendAsync = this.service.sendAsync(this.request, Web3ClientVersion.class);
        Assertions.assertTrue(sendAsync.isDone());
        Assertions.assertThrows(ExecutionException.class, () -> {
        });
    }

    @Test
    public void testSyncRequest() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(REQUEST_ID);
        ((WebSocketClient) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.webSocketClient)).send(ArgumentMatchers.anyString());
        runAsync(() -> {
            try {
                countDownLatch.await(2L, TimeUnit.SECONDS);
                sendGethVersionReply();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        Assertions.assertEquals(this.service.send(this.request, Web3ClientVersion.class).getWeb3ClientVersion(), "geth-version");
    }

    @Test
    public void testCloseWebSocketOnClose() throws Exception {
        this.service.close();
        ((WebSocketClient) Mockito.verify(this.webSocketClient)).close();
        ((ScheduledExecutorService) Mockito.verify(this.executorService)).shutdown();
    }

    @Test
    public void testSendSubscriptionReply() throws Exception {
        runAsync(() -> {
            subscribeToEvents();
        });
        sendSubscriptionConfirmation();
        verifyStartedSubscriptionHandshake();
    }

    @Test
    public void testPropagateSubscriptionEvent() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(REQUEST_ID);
        CountDownLatch countDownLatch2 = new CountDownLatch(REQUEST_ID);
        AtomicReference atomicReference = new AtomicReference();
        runAsync(() -> {
            Disposable subscribe = subscribeToEvents().subscribe(newHeadsNotification -> {
                atomicReference.set(newHeadsNotification);
                countDownLatch.countDown();
            });
            try {
                countDownLatch.await(2L, TimeUnit.SECONDS);
                subscribe.dispose();
                countDownLatch2.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        sendSubscriptionConfirmation();
        sendWebSocketEvent();
        Assertions.assertTrue(countDownLatch2.await(6L, TimeUnit.SECONDS));
        Assertions.assertEquals("0xd9263f42a87", ((NewHead) ((NewHeadsNotification) atomicReference.get()).getParams().getResult()).getDifficulty());
    }

    @Test
    public void testSendUnsubscribeRequest() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(REQUEST_ID);
        runAsync(() -> {
            subscribeToEvents().subscribe().dispose();
            countDownLatch.countDown();
        });
        sendSubscriptionConfirmation();
        sendWebSocketEvent();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        verifyUnsubscribed();
    }

    @Test
    public void testStopWaitingForSubscriptionReplyAfterTimeout() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(REQUEST_ID);
        AtomicReference atomicReference = new AtomicReference();
        runAsync(() -> {
            subscribeToEvents().subscribe(new Subscriber<NewHeadsNotification>() { // from class: org.web3j.protocol.websocket.WebSocketServiceTest.1
                public void onComplete() {
                }

                public void onError(Throwable th) {
                    atomicReference.set(th);
                    countDownLatch.countDown();
                }

                public void onSubscribe(Subscription subscription) {
                }

                public void onNext(NewHeadsNotification newHeadsNotification) {
                }
            });
        });
        waitForRequestSent();
        IOException iOException = new IOException("timeout");
        this.service.closeRequest(1L, iOException);
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertEquals(iOException, atomicReference.get());
    }

    @Test
    public void testOnErrorCalledIfConnectionClosed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(REQUEST_ID);
        AtomicReference atomicReference = new AtomicReference();
        runAsync(() -> {
            subscribeToEvents().subscribe(new Subscriber<NewHeadsNotification>() { // from class: org.web3j.protocol.websocket.WebSocketServiceTest.2
                public void onComplete() {
                }

                public void onError(Throwable th) {
                    atomicReference.set(th);
                    countDownLatch.countDown();
                }

                public void onSubscribe(Subscription subscription) {
                }

                public void onNext(NewHeadsNotification newHeadsNotification) {
                }
            });
        });
        waitForRequestSent();
        sendSubscriptionConfirmation();
        this.service.onWebSocketClose();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Assertions.assertEquals(IOException.class, ((Throwable) atomicReference.get()).getClass());
        Assertions.assertEquals("Connection was closed", ((Throwable) atomicReference.get()).getMessage());
    }

    @Test
    public void testIfCloseObserverIfSubscriptionRequestFailed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(REQUEST_ID);
        AtomicReference atomicReference = new AtomicReference();
        runAsync(() -> {
            subscribeToEvents().subscribe(new Subscriber<NewHeadsNotification>() { // from class: org.web3j.protocol.websocket.WebSocketServiceTest.3
                public void onComplete() {
                }

                public void onError(Throwable th) {
                    atomicReference.set(th);
                    countDownLatch.countDown();
                }

                public void onSubscribe(Subscription subscription) {
                }

                public void onNext(NewHeadsNotification newHeadsNotification) {
                }
            });
        });
        waitForRequestSent();
        sendErrorReply();
        Assertions.assertTrue(countDownLatch.await(2L, TimeUnit.SECONDS));
        Throwable th = (Throwable) atomicReference.get();
        Assertions.assertEquals(IOException.class, th.getClass());
        Assertions.assertEquals("Subscription request failed with error: Error message", th.getMessage());
    }

    private void runAsync(Runnable runnable) {
        Executors.newSingleThreadExecutor().execute(runnable);
    }

    private Flowable<NewHeadsNotification> subscribeToEvents() {
        this.subscribeRequest = new Request<>("eth_subscribe", Arrays.asList("newHeads", Collections.emptyMap()), this.service, EthSubscribe.class);
        this.subscribeRequest.setId(1L);
        return this.service.subscribe(this.subscribeRequest, "eth_unsubscribe", NewHeadsNotification.class);
    }

    private void sendErrorReply() throws IOException {
        this.service.onWebSocketMessage("{  \"jsonrpc\":\"2.0\",  \"id\":1,  \"error\":{    \"code\":-1,    \"message\":\"Error message\",    \"data\":null  }}");
    }

    private void sendGethVersionReply() throws IOException {
        this.service.onWebSocketMessage("{  \"jsonrpc\":\"2.0\",  \"id\":1,  \"result\":\"geth-version\"}");
    }

    private void verifyStartedSubscriptionHandshake() {
        ((WebSocketClient) Mockito.verify(this.webSocketClient)).send("{\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":[\"newHeads\",{}],\"id\":1}");
    }

    private void verifyUnsubscribed() {
        ((WebSocketClient) Mockito.verify(this.webSocketClient)).send(ArgumentMatchers.startsWith("{\"jsonrpc\":\"2.0\",\"method\":\"eth_unsubscribe\",\"params\":[\"0xcd0c3e8af590364c09d0fa6a1210faf5\"]"));
    }

    private void sendSubscriptionConfirmation() throws Exception {
        waitForRequestSent();
        this.service.onWebSocketMessage("{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\"}");
    }

    private void waitForRequestSent() throws InterruptedException {
        while (!this.service.isWaitingForReply(1L)) {
            Thread.sleep(50L);
        }
    }

    private void sendWebSocketEvent() throws IOException {
        this.service.onWebSocketMessage("{  \"jsonrpc\":\"2.0\",  \"method\":\"eth_subscription\",  \"params\":{    \"subscription\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\",    \"result\":{      \"difficulty\":\"0xd9263f42a87\",      \"uncles\":[]    }  }}");
    }
}
