package org.apache.storm.daemon.drpc;

import java.security.Principal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.security.auth.Subject;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.DRPCExceptionType;
import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DRPCRequest;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.security.auth.DefaultPrincipalToLocal;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.security.auth.SingleUserPrincipal;
import org.apache.storm.security.auth.authorizer.DRPCSimpleACLAuthorizer;
import org.apache.storm.security.auth.authorizer.DenyAuthorizer;
import org.apache.storm.utils.Time;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/storm/daemon/drpc/DRPCTest.class */
public class DRPCTest {
    private static final ExecutorService exec = Executors.newCachedThreadPool();

    /* loaded from: input_file:org/apache/storm/daemon/drpc/DRPCTest$ThrowStuff.class */
    public interface ThrowStuff {
        void run() throws Exception;
    }

    private static void assertThrows(ThrowStuff throwStuff, Class<? extends Exception> cls) {
        try {
            throwStuff.run();
            Assert.fail("Expected " + throwStuff + " to throw " + cls + " didn't throw at all...");
        } catch (Exception e) {
            Assert.assertTrue("Expected " + throwStuff + " to throw " + cls + " but threw " + e, cls.isInstance(e));
        }
    }

    @AfterClass
    public static void close() {
        exec.shutdownNow();
    }

    public static DRPCRequest getNextAvailableRequest(DRPC drpc, String str) throws Exception {
        DRPCRequest dRPCRequest = null;
        long currentTimeMillis = System.currentTimeMillis() + 5000;
        while (System.currentTimeMillis() < currentTimeMillis) {
            dRPCRequest = drpc.fetchRequest(str);
            if (dRPCRequest != null && dRPCRequest.get_request_id() != null && !dRPCRequest.get_request_id().isEmpty()) {
                return dRPCRequest;
            }
            Thread.sleep(1L);
        }
        Assert.fail("Test timed out waiting for a request on " + str);
        return dRPCRequest;
    }

    @Test
    public void testGoodBlocking() throws Exception {
        DRPC drpc = new DRPC(new StormMetricsRegistry(), (IAuthorizer) null, 100L);
        Throwable th = null;
        try {
            Future submit = exec.submit(() -> {
                return drpc.executeBlocking("testing", "test");
            });
            DRPCRequest nextAvailableRequest = getNextAvailableRequest(drpc, "testing");
            Assert.assertNotNull(nextAvailableRequest);
            Assert.assertEquals("test", nextAvailableRequest.get_func_args());
            Assert.assertNotNull(nextAvailableRequest.get_request_id());
            drpc.returnResult(nextAvailableRequest.get_request_id(), "tested");
            Assert.assertEquals("tested", (String) submit.get(10L, TimeUnit.MILLISECONDS));
            if (drpc != null) {
                if (0 == 0) {
                    drpc.close();
                    return;
                }
                try {
                    drpc.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (drpc != null) {
                if (0 != 0) {
                    try {
                        drpc.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    drpc.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testFailedBlocking() throws Exception {
        DRPC drpc = new DRPC(new StormMetricsRegistry(), (IAuthorizer) null, 100L);
        Throwable th = null;
        try {
            Future submit = exec.submit(() -> {
                return drpc.executeBlocking("testing", "test");
            });
            DRPCRequest nextAvailableRequest = getNextAvailableRequest(drpc, "testing");
            Assert.assertNotNull(nextAvailableRequest);
            Assert.assertEquals("test", nextAvailableRequest.get_func_args());
            Assert.assertNotNull(nextAvailableRequest.get_request_id());
            drpc.failRequest(nextAvailableRequest.get_request_id(), (DRPCExecutionException) null);
            try {
                submit.get(100L, TimeUnit.MILLISECONDS);
                Assert.fail("exec did not throw an exception");
            } catch (ExecutionException e) {
                DRPCExecutionException cause = e.getCause();
                Assert.assertTrue(cause instanceof DRPCExecutionException);
                Assert.assertEquals(DRPCExceptionType.FAILED_REQUEST, cause.get_type());
            }
            if (drpc != null) {
                if (0 == 0) {
                    drpc.close();
                    return;
                }
                try {
                    drpc.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (drpc != null) {
                if (0 != 0) {
                    try {
                        drpc.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    drpc.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testDequeueAfterTimeout() throws Exception {
        DRPC drpc = new DRPC(new StormMetricsRegistry(), (IAuthorizer) null, 1000L);
        Throwable th = null;
        try {
            long currentTimeMillis = Time.currentTimeMillis();
            try {
                drpc.executeBlocking("testing", "test");
                Assert.fail("Should have timed out....");
            } catch (DRPCExecutionException e) {
                long currentTimeMillis2 = Time.currentTimeMillis() - currentTimeMillis;
                Assert.assertTrue(currentTimeMillis2 < 1000 * 2);
                Assert.assertTrue(currentTimeMillis2 >= 1000);
                Assert.assertEquals(DRPCExceptionType.SERVER_TIMEOUT, e.get_type());
            }
            DRPCRequest fetchRequest = drpc.fetchRequest("testing");
            Assert.assertNotNull(fetchRequest);
            Assert.assertEquals("", fetchRequest.get_request_id());
            Assert.assertEquals("", fetchRequest.get_func_args());
            if (drpc != null) {
                if (0 == 0) {
                    drpc.close();
                    return;
                }
                try {
                    drpc.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (drpc != null) {
                if (0 != 0) {
                    try {
                        drpc.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    drpc.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testDeny() throws Exception {
        DRPC drpc = new DRPC(new StormMetricsRegistry(), new DenyAuthorizer(), 100L);
        Throwable th = null;
        try {
            assertThrows(() -> {
                drpc.executeBlocking("testing", "test");
            }, AuthorizationException.class);
            assertThrows(() -> {
                drpc.fetchRequest("testing");
            }, AuthorizationException.class);
            if (drpc != null) {
                if (0 == 0) {
                    drpc.close();
                    return;
                }
                try {
                    drpc.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (drpc != null) {
                if (0 != 0) {
                    try {
                        drpc.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    drpc.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testStrict() throws Exception {
        ReqContext reqContext = new ReqContext(new Subject());
        Principal singleUserPrincipal = new SingleUserPrincipal("jump_topo");
        reqContext.subject().getPrincipals().add(singleUserPrincipal);
        ReqContext reqContext2 = new ReqContext(new Subject());
        Principal singleUserPrincipal2 = new SingleUserPrincipal("jump_client");
        reqContext2.subject().getPrincipals().add(singleUserPrincipal2);
        ReqContext reqContext3 = new ReqContext(new Subject());
        reqContext3.subject().getPrincipals().add(new SingleUserPrincipal("other"));
        final HashMap hashMap = new HashMap();
        hashMap.put("jump", new DRPCSimpleACLAuthorizer.AclFunctionEntry(Arrays.asList(singleUserPrincipal2.getName()), singleUserPrincipal.getName()));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("drpc.authorizer.acl.strict", true);
        hashMap2.put("storm.principal.tolocal", DefaultPrincipalToLocal.class.getName());
        DRPCSimpleACLAuthorizer dRPCSimpleACLAuthorizer = new DRPCSimpleACLAuthorizer() { // from class: org.apache.storm.daemon.drpc.DRPCTest.1
            protected Map<String, DRPCSimpleACLAuthorizer.AclFunctionEntry> readAclFromConfig() {
                return hashMap;
            }
        };
        dRPCSimpleACLAuthorizer.prepare(hashMap2);
        DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "fetchRequest", "jump");
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "fetchRequest", "jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "fetchRequest", "jump");
        }, AuthorizationException.class);
        DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "result", "jump");
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "result", "jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "result", "jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "execute", "jump");
        }, AuthorizationException.class);
        DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "execute", "jump");
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "execute", "jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "fetchRequest", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "fetchRequest", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "fetchRequest", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "result", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "result", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "result", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "execute", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "execute", "not_jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "execute", "not_jump");
        }, AuthorizationException.class);
    }

    @Test
    public void testNotStrict() throws Exception {
        ReqContext reqContext = new ReqContext(new Subject());
        Principal singleUserPrincipal = new SingleUserPrincipal("jump_topo");
        reqContext.subject().getPrincipals().add(singleUserPrincipal);
        ReqContext reqContext2 = new ReqContext(new Subject());
        Principal singleUserPrincipal2 = new SingleUserPrincipal("jump_client");
        reqContext2.subject().getPrincipals().add(singleUserPrincipal2);
        ReqContext reqContext3 = new ReqContext(new Subject());
        reqContext3.subject().getPrincipals().add(new SingleUserPrincipal("other"));
        final HashMap hashMap = new HashMap();
        hashMap.put("jump", new DRPCSimpleACLAuthorizer.AclFunctionEntry(Arrays.asList(singleUserPrincipal2.getName()), singleUserPrincipal.getName()));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("drpc.authorizer.acl.strict", false);
        hashMap2.put("storm.principal.tolocal", DefaultPrincipalToLocal.class.getName());
        DRPCSimpleACLAuthorizer dRPCSimpleACLAuthorizer = new DRPCSimpleACLAuthorizer() { // from class: org.apache.storm.daemon.drpc.DRPCTest.2
            protected Map<String, DRPCSimpleACLAuthorizer.AclFunctionEntry> readAclFromConfig() {
                return hashMap;
            }
        };
        dRPCSimpleACLAuthorizer.prepare(hashMap2);
        DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "fetchRequest", "jump");
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "fetchRequest", "jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "fetchRequest", "jump");
        }, AuthorizationException.class);
        DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "result", "jump");
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "result", "jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "result", "jump");
        }, AuthorizationException.class);
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "execute", "jump");
        }, AuthorizationException.class);
        DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "execute", "jump");
        assertThrows(() -> {
            DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "execute", "jump");
        }, AuthorizationException.class);
        DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "fetchRequest", "not_jump");
        DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "fetchRequest", "not_jump");
        DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "fetchRequest", "not_jump");
        DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "result", "not_jump");
        DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "result", "not_jump");
        DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "result", "not_jump");
        DRPC.checkAuthorization(reqContext, dRPCSimpleACLAuthorizer, "execute", "not_jump");
        DRPC.checkAuthorization(reqContext2, dRPCSimpleACLAuthorizer, "execute", "not_jump");
        DRPC.checkAuthorization(reqContext3, dRPCSimpleACLAuthorizer, "execute", "not_jump");
    }
}
