package org.opennms.smoketest.telemetry;

import com.google.gson.Gson;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.HttpClientConfig;
import io.searchbox.core.Search;
import io.searchbox.core.SearchResult;
import io.searchbox.indices.template.GetTemplate;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.opennms.features.jest.client.SearchResultUtils;
import org.opennms.netmgt.flows.elastic.NetflowVersion;
import org.opennms.smoketest.utils.RestClient;
import org.opennms.smoketest.utils.SshClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/smoketest/telemetry/FlowTester.class */
public class FlowTester {
    private static final String TEMPLATE_NAME = "netflow";
    private static Logger LOG = LoggerFactory.getLogger(FlowTester.class);
    private static final Gson gson = new Gson();
    private final List<Delivery> deliveries;
    private final List<Consumer<FlowTester>> runBefore = new ArrayList();
    private final List<Consumer<FlowTester>> runAfter = new ArrayList();
    private final InetSocketAddress elasticRestAddress;
    private final int totalFlowCount;
    private JestClient client;

    /* loaded from: input_file:org/opennms/smoketest/telemetry/FlowTester$Block.class */
    public interface Block {
        boolean test() throws Exception;
    }

    /* loaded from: input_file:org/opennms/smoketest/telemetry/FlowTester$Delivery.class */
    public static class Delivery {
        private final FlowPacket packet;
        private final Sender sender;

        public Delivery(FlowPacket flowPacket, Sender sender) {
            this.packet = (FlowPacket) Objects.requireNonNull(flowPacket);
            this.sender = (Sender) Objects.requireNonNull(sender);
        }

        public void send() throws IOException {
            this.packet.send(this.sender);
        }
    }

    public FlowTester(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, List<Delivery> list) {
        this.elasticRestAddress = (InetSocketAddress) Objects.requireNonNull(inetSocketAddress);
        this.deliveries = (List) Objects.requireNonNull(list);
        this.totalFlowCount = list.stream().mapToInt(delivery -> {
            return delivery.packet.getFlowCount();
        }).sum();
        if (this.totalFlowCount <= 0) {
            throw new IllegalStateException("Cannot verify flow creation/procession, as total flow count is <= 0, but must be > 0");
        }
        if (inetSocketAddress2 != null) {
            RestClient restClient = new RestClient(inetSocketAddress2);
            this.runBefore.add(flowTester -> {
                Assert.assertEquals(0L, restClient.getFlowCount(0L, System.currentTimeMillis()));
            });
            this.runAfter.add(flowTester2 -> {
                Awaitility.with().pollInterval(15L, TimeUnit.SECONDS).await().atMost(1L, TimeUnit.MINUTES).until(() -> {
                    return restClient.getFlowCount(0L, System.currentTimeMillis());
                }, IsEqual.equalTo(Long.valueOf(this.totalFlowCount)));
            });
        }
        if (inetSocketAddress2 != null) {
            RestClient restClient2 = new RestClient(inetSocketAddress2);
            this.runBefore.add(flowTester3 -> {
                Assert.assertEquals(0L, restClient2.getFlowCount(0L, System.currentTimeMillis()));
            });
            this.runAfter.add(flowTester4 -> {
                Awaitility.with().pollInterval(15L, TimeUnit.SECONDS).await().atMost(1L, TimeUnit.MINUTES).until(() -> {
                    return restClient2.getFlowCount(0L, System.currentTimeMillis());
                }, IsEqual.equalTo(Long.valueOf(this.totalFlowCount)));
            });
        }
    }

    public void verifyFlows() throws IOException {
        String format = String.format("http://%s:%d", this.elasticRestAddress.getHostString(), Integer.valueOf(this.elasticRestAddress.getPort()));
        JestClientFactory jestClientFactory = new JestClientFactory();
        jestClientFactory.setHttpClientConfig(new HttpClientConfig.Builder(format).connTimeout(SshClient.DEFAULT_TIMEOUT_MS).readTimeout(10000).multiThreaded(true).build());
        try {
            this.client = jestClientFactory.getObject();
            this.runBefore.forEach(consumer -> {
                consumer.accept(this);
            });
            Map map = (Map) this.deliveries.stream().collect(Collectors.groupingBy(delivery -> {
                return delivery.packet.getNetflowVersion();
            }));
            LOG.info("Verifying flows. Expecting to persist {} flows across protocols: {}", Integer.valueOf(this.totalFlowCount), map.keySet());
            for (Delivery delivery2 : this.deliveries) {
                LOG.info("Sending packet payload from {} containing {} flows to: {}", new Object[]{delivery2.packet.getPayload(), Integer.valueOf(delivery2.packet.getFlowCount()), delivery2.sender});
                delivery2.send();
            }
            for (NetflowVersion netflowVersion : map.keySet()) {
                List list = (List) map.get(netflowVersion);
                int sum = list.stream().mapToInt(delivery3 -> {
                    return delivery3.packet.getFlowCount();
                }).sum();
                LOG.info("Verifying flows for {}", netflowVersion);
                verify(() -> {
                    String str = "{\"query\":{\"term\":{\"netflow.version\":{\"value\":" + gson.toJson(netflowVersion) + "}}}}";
                    LOG.info("Executing query: {}", str);
                    SearchResult execute = this.client.execute(((Search.Builder) new Search.Builder(str).addIndex("netflow-*")).build());
                    Logger logger = LOG;
                    Object[] objArr = new Object[3];
                    objArr[0] = execute.isSucceeded() ? "successful" : "failed";
                    objArr[1] = Long.valueOf(SearchResultUtils.getTotal(execute));
                    objArr[2] = execute.getJsonString();
                    logger.info("Response {} with {} flow documents: {}", objArr);
                    boolean z = execute.isSucceeded() && SearchResultUtils.getTotal(execute) >= ((long) sum);
                    if (!z) {
                        Iterator it = list.iterator();
                        while (it.hasNext()) {
                            Delivery delivery4 = (Delivery) it.next();
                            LOG.info("Sending packet payload from {} containing {} flows to: {}", new Object[]{delivery4.packet.getPayload(), Integer.valueOf(delivery4.packet.getFlowCount()), delivery4.sender});
                            delivery4.send();
                        }
                    }
                    return z;
                });
            }
            LOG.info("Ensuring that the index template was created...");
            verify(() -> {
                JestResult execute = this.client.execute(new GetTemplate.Builder(TEMPLATE_NAME).build());
                return execute.isSucceeded() && execute.getJsonObject().get(TEMPLATE_NAME) != null;
            });
            this.runAfter.forEach(consumer2 -> {
                consumer2.accept(this);
            });
            if (this.client != null) {
                this.client.close();
            }
        } catch (Throwable th) {
            if (this.client != null) {
                this.client.close();
            }
            throw th;
        }
    }

    public void setRunBefore(List<Consumer<FlowTester>> list) {
        this.runBefore.clear();
        this.runBefore.addAll(list);
    }

    public void setRunAfter(List<Consumer<FlowTester>> list) {
        this.runAfter.clear();
        this.runAfter.addAll(list);
    }

    public JestClient getJestClient() {
        return (JestClient) Objects.requireNonNull(this.client);
    }

    public static void verify(Block block) {
        Objects.requireNonNull(block);
        Awaitility.with().pollInterval(15L, TimeUnit.SECONDS).await().atMost(5L, TimeUnit.MINUTES).until(() -> {
            try {
                LOG.info("Querying elastic search");
                return Boolean.valueOf(block.test());
            } catch (Exception e) {
                LOG.error("Error while querying to elastic search", e);
                return false;
            }
        });
    }
}
