/*
 * Decompiled with CFR 0.152.
 */
package org.mule.module.http.functional.listener;

import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.fluent.Executor;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.fluent.Response;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mule.api.MuleEventContext;
import org.mule.module.http.api.HttpConstants;
import org.mule.tck.functional.EventCallback;
import org.mule.tck.junit4.FunctionalTestCase;
import org.mule.tck.junit4.rule.DynamicPort;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.util.concurrent.Latch;

public class HttpListenerWorkerThreadingProfileTestCase
extends FunctionalTestCase {
    private static final int CUSTOM_MAX_THREADS_ACTIVE = 3;
    private static final int HTTP_CLIENT_MAX_CONNECTIONS = 200;
    @Rule
    public DynamicPort listenPort1 = new DynamicPort("port1");
    @Rule
    public DynamicPort listenPort2 = new DynamicPort("port2");
    @Rule
    public DynamicPort listenPort3 = new DynamicPort("port3");
    @Rule
    public SystemProperty maxThreadsActive = new SystemProperty("max.threads.active", String.valueOf(3));
    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private CountDownLatch maxActiveNumberOfRequestExecutedLatch;
    private Latch waitingLatch = new Latch();
    private AtomicInteger numberOfRequest = new AtomicInteger();
    private Executor httpClientExecutor;

    protected String getConfigFile() {
        return "http-listener-worker-threading-profile-config.xml";
    }

    @Before
    public void setup() {
        PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
        mgr.setDefaultMaxPerRoute(200);
        mgr.setMaxTotal(200);
        this.httpClientExecutor = Executor.newInstance((HttpClient)HttpClientBuilder.create().setConnectionManager((HttpClientConnectionManager)mgr).build());
    }

    @Test
    public void useMaxThreadsActiveThreadingProfile() throws Exception {
        String url = String.format("http://localhost:%s", this.listenPort1.getNumber());
        this.assertMaxThreadsActive("maxActiveThreadsConfigFlow", url, 3);
    }

    @Test
    public void useDefaultMaxThreadsActiveThreadingProfile() throws Exception {
        String url = String.format("http://localhost:%s", this.listenPort2.getNumber());
        this.assertMaxThreadsActive("defaultMaxActiveThreadsConfigFlow", url, 128);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void assertMaxThreadsActive(String flowName, String url, int maxThreadsActive) throws Exception {
        this.maxActiveNumberOfRequestExecutedLatch = new CountDownLatch(maxThreadsActive);
        this.sendRequestUntilNoMoreWorkers(flowName, url, maxThreadsActive);
        this.expectedException.expect((Matcher)CoreMatchers.anyOf((Matcher)CoreMatchers.instanceOf(NoHttpResponseException.class), (Matcher)CoreMatchers.instanceOf(SocketException.class)));
        try {
            this.httpClientExecutor.execute(Request.Get((String)url));
        }
        finally {
            this.waitingLatch.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void hitDifferentRequestConfigAndRun() throws Exception {
        String url = String.format("http://localhost:%s", this.listenPort1.getNumber());
        this.sendRequestUntilNoMoreWorkers("maxActiveThreadsConfigFlow", url, 3);
        try {
            url = String.format("http://localhost:%s", this.listenPort3.getNumber());
            Response response = this.httpClientExecutor.execute(Request.Post((String)url).bodyByteArray("Test Message".getBytes()).connectTimeout(100).socketTimeout(100));
            HttpResponse httpResponse = response.returnResponse();
            Assert.assertThat((Object)httpResponse.getStatusLine().getStatusCode(), (Matcher)Is.is((Object)HttpConstants.HttpStatus.OK.getStatusCode()));
            Assert.assertThat((Object)IOUtils.toString((InputStream)httpResponse.getEntity().getContent()), (Matcher)Is.is((Object)"Test Message"));
        }
        finally {
            this.waitingLatch.release();
        }
    }

    private void sendRequestUntilNoMoreWorkers(String flowName, String url, int maxThreadsActive) throws Exception {
        this.configureTestComponent(flowName, maxThreadsActive);
        this.maxActiveNumberOfRequestExecutedLatch = new CountDownLatch(maxThreadsActive);
        for (int i = 0; i < maxThreadsActive; ++i) {
            this.executeRequestInAnotherThread(url);
        }
        if (!this.maxActiveNumberOfRequestExecutedLatch.await(5000L, TimeUnit.MILLISECONDS)) {
            Assert.fail((String)"message processor wasn't executed the number of times required.");
        }
    }

    private void executeRequestInAnotherThread(final String url) {
        new Thread(){

            @Override
            public void run() {
                try {
                    HttpListenerWorkerThreadingProfileTestCase.this.httpClientExecutor.execute(Request.Get((String)url).connectTimeout(5000));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }.start();
    }

    private void configureTestComponent(String flowName, final int maxThreadsActive) throws Exception {
        this.getFunctionalTestComponent(flowName).setEventCallback(new EventCallback(){

            public void eventReceived(MuleEventContext context, Object component) throws Exception {
                try {
                    HttpListenerWorkerThreadingProfileTestCase.this.maxActiveNumberOfRequestExecutedLatch.countDown();
                    HttpListenerWorkerThreadingProfileTestCase.this.numberOfRequest.incrementAndGet();
                    if (HttpListenerWorkerThreadingProfileTestCase.this.numberOfRequest.get() <= maxThreadsActive) {
                        HttpListenerWorkerThreadingProfileTestCase.this.waitingLatch.await();
                    }
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }
}

