package org.forgerock.audit.handlers.elasticsearch;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.forgerock.audit.Audit;
import org.forgerock.audit.events.AccessAuditEventBuilder;
import org.forgerock.audit.events.EventTopicsMetaData;
import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
import org.forgerock.audit.events.handlers.buffering.BatchConsumer;
import org.forgerock.audit.events.handlers.buffering.BatchException;
import org.forgerock.audit.events.handlers.buffering.BatchPublisher;
import org.forgerock.audit.events.handlers.buffering.BufferedBatchPublisher;
import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration;
import org.forgerock.audit.util.ElasticsearchUtil;
import org.forgerock.http.Client;
import org.forgerock.http.HttpApplicationException;
import org.forgerock.http.apache.async.AsyncHttpClientProvider;
import org.forgerock.http.handler.HttpClientHandler;
import org.forgerock.http.protocol.Entity;
import org.forgerock.http.protocol.Request;
import org.forgerock.http.protocol.Response;
import org.forgerock.http.spi.Loader;
import org.forgerock.http.swagger.SwaggerApiProducer;
import org.forgerock.json.JsonValue;
import org.forgerock.json.resource.CountPolicy;
import org.forgerock.json.resource.InternalServerErrorException;
import org.forgerock.json.resource.NotFoundException;
import org.forgerock.json.resource.QueryRequest;
import org.forgerock.json.resource.QueryResourceHandler;
import org.forgerock.json.resource.QueryResponse;
import org.forgerock.json.resource.ResourceException;
import org.forgerock.json.resource.ResourceResponse;
import org.forgerock.json.resource.Responses;
import org.forgerock.json.resource.ServiceUnavailableException;
import org.forgerock.services.context.Context;
import org.forgerock.util.CloseSilentlyFunction;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.forgerock.util.encode.Base64;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.promise.Promises;
import org.forgerock.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/handler-elasticsearch-2.0.12.jar:org/forgerock/audit/handlers/elasticsearch/ElasticsearchAuditEventHandler.class */
public class ElasticsearchAuditEventHandler extends AuditEventHandlerBase implements BatchConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ElasticsearchAuditEventHandler.class);
    private static final ElasticsearchQueryFilterVisitor ELASTICSEARCH_QUERY_FILTER_VISITOR = new ElasticsearchQueryFilterVisitor();
    private static final String QUERY = "query";
    private static final String GET = "GET";
    private static final String SEARCH = "/_search";
    private static final String BULK = "/_bulk";
    private static final String HITS = "hits";
    private static final String SOURCE = "_source";
    private static final String DOC = "_doc/";
    private static final int DEFAULT_PAGE_SIZE = 10;
    private static final String TOTAL = "total";
    private static final String PUT = "PUT";
    private static final String POST = "POST";
    private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;
    private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
    private static final int DEFAULT_OFFSET = 0;
    private final String indexName;
    private final String basicAuthHeaderValue;
    private final String baseUri;
    private final String bulkUri;
    private final ElasticsearchAuditEventHandlerConfiguration configuration;
    private final Client client;
    private final BatchPublisher batchIndexer;
    private final HttpClientHandler defaultHttpClientHandler;

    public ElasticsearchAuditEventHandler(ElasticsearchAuditEventHandlerConfiguration elasticsearchAuditEventHandlerConfiguration, EventTopicsMetaData eventTopicsMetaData, @Audit Client client) {
        super(elasticsearchAuditEventHandlerConfiguration.getName(), eventTopicsMetaData, elasticsearchAuditEventHandlerConfiguration.getTopics(), elasticsearchAuditEventHandlerConfiguration.isEnabled());
        this.configuration = (ElasticsearchAuditEventHandlerConfiguration) Reject.checkNotNull(elasticsearchAuditEventHandlerConfiguration);
        if (client == null) {
            this.defaultHttpClientHandler = defaultHttpClientHandler();
            this.client = new Client(this.defaultHttpClientHandler);
        } else {
            this.defaultHttpClientHandler = null;
            this.client = client;
        }
        this.indexName = elasticsearchAuditEventHandlerConfiguration.getIndexMapping().getIndexName();
        this.basicAuthHeaderValue = buildBasicAuthHeaderValue();
        this.baseUri = buildBaseUri();
        this.bulkUri = buildBulkUri();
        ElasticsearchAuditEventHandlerConfiguration.EventBufferingConfiguration buffering = elasticsearchAuditEventHandlerConfiguration.getBuffering();
        if (!buffering.isEnabled()) {
            this.batchIndexer = null;
        } else {
            this.batchIndexer = BufferedBatchPublisher.newBuilder(this).capacity(buffering.getMaxSize()).writeInterval((buffering.getWriteInterval() == null || buffering.getWriteInterval().isEmpty()) ? null : Duration.duration(buffering.getWriteInterval())).maxBatchEvents(buffering.getMaxBatchedEvents()).averagePerEventPayloadSize(BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE).autoFlush(true).build();
        }
    }

    @Override // org.forgerock.audit.events.handlers.AuditEventHandler
    public void startup() throws ResourceException {
        if (this.batchIndexer != null) {
            this.batchIndexer.startup();
        }
    }

    @Override // org.forgerock.audit.events.handlers.AuditEventHandler
    public void shutdown() throws ResourceException {
        if (this.batchIndexer != null) {
            this.batchIndexer.shutdown();
        }
        if (this.defaultHttpClientHandler != null) {
            try {
                this.defaultHttpClientHandler.close();
            } catch (IOException e) {
                throw ResourceException.newResourceException(500, "An error occurred while closing the default HTTP client handler", e);
            }
        }
    }

    @Override // org.forgerock.audit.events.handlers.AuditEventHandler
    public Promise<QueryResponse, ResourceException> queryEvents(Context context, final String str, QueryRequest queryRequest, final QueryResourceHandler queryResourceHandler) {
        final int pageSize = queryRequest.getPageSize() <= 0 ? 10 : queryRequest.getPageSize();
        int pagedResultsOffset = queryRequest.getPagedResultsOffset() != 0 ? queryRequest.getPagedResultsOffset() : queryRequest.getPagedResultsCookie() != null ? Integer.valueOf(queryRequest.getPagedResultsCookie()).intValue() : 0;
        try {
            final int i = pagedResultsOffset;
            return this.client.send(createRequest("GET", buildSearchUri(str, pageSize, pagedResultsOffset), JsonValue.json(JsonValue.object((Map.Entry<String, Object>[]) new Map.Entry[]{JsonValue.field(QUERY, ((JsonValue) queryRequest.getQueryFilter().accept(ELASTICSEARCH_QUERY_FILTER_VISITOR, null)).getObject())})).getObject())).then(CloseSilentlyFunction.closeSilently(new Function<Response, QueryResponse, ResourceException>() { // from class: org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandler.1
                @Override // org.forgerock.util.Function
                public QueryResponse apply(Response response) throws ResourceException {
                    if (!response.getStatus().isSuccessful()) {
                        throw ResourceException.newResourceException(response.getStatus().getCode(), "Elasticsearch response (" + ElasticsearchAuditEventHandler.this.indexName + SwaggerApiProducer.VersionTransformer.PATH_FRAGMENT_COMPONENT_SEPARATOR + str + ElasticsearchAuditEventHandler.SEARCH + "): " + response.getEntity());
                    }
                    try {
                        JsonValue json = JsonValue.json(response.getEntity().getJson());
                        Iterator<JsonValue> it = json.get(ElasticsearchAuditEventHandler.HITS).get(ElasticsearchAuditEventHandler.HITS).iterator();
                        while (it.hasNext()) {
                            JsonValue next = it.next();
                            queryResourceHandler.handleResource(Responses.newResourceResponse(next.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, ElasticsearchUtil.denormalizeJson(next.get(ElasticsearchAuditEventHandler.SOURCE))));
                        }
                        int intValue = json.get(ElasticsearchAuditEventHandler.HITS).get(ElasticsearchAuditEventHandler.TOTAL).asInteger().intValue();
                        return Responses.newQueryResponse(pageSize + i >= intValue ? null : Integer.toString(pageSize + i), CountPolicy.EXACT, intValue);
                    } catch (IOException e) {
                        throw new InternalServerErrorException(e.getMessage(), e);
                    }
                }
            }), org.forgerock.http.protocol.Responses.noopExceptionFunction());
        } catch (URISyntaxException e) {
            return new InternalServerErrorException(e.getMessage(), e).asPromise();
        }
    }

    @Override // org.forgerock.audit.events.handlers.AuditEventHandler
    public Promise<ResourceResponse, ResourceException> readEvent(Context context, final String str, final String str2) {
        try {
            return this.client.send(createRequest("GET", buildEventUri(str, str2), null)).then(CloseSilentlyFunction.closeSilently(new Function<Response, ResourceResponse, ResourceException>() { // from class: org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandler.2
                @Override // org.forgerock.util.Function
                public ResourceResponse apply(Response response) throws ResourceException {
                    if (!response.getStatus().isSuccessful()) {
                        throw ElasticsearchAuditEventHandler.resourceException(ElasticsearchAuditEventHandler.this.indexName, str, str2, response);
                    }
                    try {
                        JsonValue denormalizeJson = ElasticsearchUtil.denormalizeJson(JsonValue.json(response.getEntity().getJson()).get(ElasticsearchAuditEventHandler.SOURCE));
                        denormalizeJson.put(ResourceResponse.FIELD_CONTENT_ID, str2);
                        return Responses.newResourceResponse(str2, null, denormalizeJson);
                    } catch (IOException e) {
                        throw new InternalServerErrorException(e.getMessage(), e);
                    }
                }
            }), org.forgerock.http.protocol.Responses.noopExceptionFunction());
        } catch (Exception e) {
            String format = String.format("Unable to read audit entry for topic=%s, _id=%s", str, str2);
            LOGGER.error(format, (Throwable) e);
            return new InternalServerErrorException(format, e).asPromise();
        }
    }

    @Override // org.forgerock.audit.events.handlers.AuditEventHandler
    public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String str, JsonValue jsonValue) {
        return this.batchIndexer == null ? publishSingleEvent(str, jsonValue) : !this.batchIndexer.offer(str, jsonValue) ? new ServiceUnavailableException("Elasticsearch batch indexer full, so dropping audit event " + this.indexName + SwaggerApiProducer.VersionTransformer.PATH_FRAGMENT_COMPONENT_SEPARATOR + str + "/" + jsonValue.get(ResourceResponse.FIELD_CONTENT_ID).asString()).asPromise() : Responses.newResourceResponse(jsonValue.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, jsonValue).asPromise();
    }

    protected Promise<ResourceResponse, ResourceException> publishSingleEvent(final String str, final JsonValue jsonValue) {
        final String asString = jsonValue.get(ResourceResponse.FIELD_CONTENT_ID).asString();
        jsonValue.remove(ResourceResponse.FIELD_CONTENT_ID);
        try {
            String normalizeJson = ElasticsearchUtil.normalizeJson(jsonValue);
            jsonValue.put(ResourceResponse.FIELD_CONTENT_ID, asString);
            return this.client.send(createRequest("POST", buildEventUri(str, DOC + asString), normalizeJson)).then(CloseSilentlyFunction.closeSilently(new Function<Response, ResourceResponse, ResourceException>() { // from class: org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandler.3
                @Override // org.forgerock.util.Function
                public ResourceResponse apply(Response response) throws ResourceException {
                    if (response.getStatus().isSuccessful()) {
                        return Responses.newResourceResponse(jsonValue.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, jsonValue);
                    }
                    throw ElasticsearchAuditEventHandler.resourceException(ElasticsearchAuditEventHandler.this.indexName, str, asString, response);
                }
            }), org.forgerock.http.protocol.Responses.noopExceptionFunction());
        } catch (Exception e) {
            String format = String.format("Unable to create audit entry for topic=%s, _id=%s", str, asString);
            LOGGER.error(format, (Throwable) e);
            return new InternalServerErrorException(format, e).asPromise();
        }
    }

    @Override // org.forgerock.audit.events.handlers.buffering.BatchConsumer
    public void addToBatch(String str, JsonValue jsonValue, StringBuilder sb) throws BatchException {
        try {
            String str2 = this.indexName + SwaggerApiProducer.VersionTransformer.PATH_FRAGMENT_COMPONENT_SEPARATOR + str;
            String asString = jsonValue.get(ResourceResponse.FIELD_CONTENT_ID).asString();
            jsonValue.remove(ResourceResponse.FIELD_CONTENT_ID);
            String normalizeJson = ElasticsearchUtil.normalizeJson(jsonValue);
            jsonValue.put(ResourceResponse.FIELD_CONTENT_ID, asString);
            sb.append("{ \"index\" : { \"_index\" : ").append(ElasticsearchUtil.OBJECT_MAPPER.writeValueAsString(str2)).append(", \"_id\" : ").append(ElasticsearchUtil.OBJECT_MAPPER.writeValueAsString(asString)).append(" } }\n").append(normalizeJson).append('\n');
        } catch (IOException e) {
            throw new BatchException("Unexpected error while adding to batch", e);
        }
    }

    @Override // org.forgerock.audit.events.handlers.buffering.BatchConsumer
    public Promise<Void, BatchException> publishBatch(String str) {
        try {
            return this.client.send(createRequest("POST", buildBulkUri(), str)).then(CloseSilentlyFunction.closeSilently(processBatchResponse()), org.forgerock.http.protocol.Responses.noopExceptionFunction());
        } catch (URISyntaxException e) {
            return Promises.newExceptionPromise(new BatchException("Incorrect URI", e));
        }
    }

    private Function<Response, Void, BatchException> processBatchResponse() {
        return new Function<Response, Void, BatchException>() { // from class: org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandler.4
            @Override // org.forgerock.util.Function
            public Void apply(Response response) throws BatchException {
                try {
                    if (!response.getStatus().isSuccessful()) {
                        throw new BatchException("Elasticsearch batch index failed: " + response.getEntity());
                    }
                    JsonValue json = JsonValue.json(response.getEntity().getJson());
                    if (!json.get("errors").asBoolean().booleanValue()) {
                        return null;
                    }
                    JsonValue jsonValue = json.get("items");
                    int size = jsonValue.size();
                    ArrayList arrayList = new ArrayList(size);
                    for (int i = 0; i < size; i++) {
                        JsonValue jsonValue2 = jsonValue.get(i).get("index");
                        if (jsonValue2.get(AccessAuditEventBuilder.STATUS).asInteger().intValue() >= 400) {
                            arrayList.add(jsonValue2);
                        }
                    }
                    throw new BatchException("One or more Elasticsearch batch index entries failed: " + ElasticsearchUtil.OBJECT_MAPPER.writeValueAsString(arrayList));
                } catch (IOException e) {
                    throw new BatchException("Unexpected error while publishing batch", e);
                }
            }
        };
    }

    protected String buildBasicAuthHeaderValue() {
        if (this.basicAuthHeaderValue != null) {
            return this.basicAuthHeaderValue;
        }
        ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration connection = this.configuration.getConnection();
        if (connection.getUsername() == null || connection.getUsername().isEmpty() || connection.getPassword() == null || connection.getPassword().isEmpty()) {
            return null;
        }
        return "Basic " + Base64.encode((connection.getUsername() + ":" + connection.getPassword()).getBytes());
    }

    protected String buildEventUri(String str, String str2) {
        return buildBaseUri() + SwaggerApiProducer.VersionTransformer.PATH_FRAGMENT_COMPONENT_SEPARATOR + str + "/" + str2;
    }

    protected String buildBulkUri() {
        if (this.bulkUri != null) {
            return this.bulkUri;
        }
        ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration connection = this.configuration.getConnection();
        return (connection.isUseSSL() ? "https" : "http") + "://" + connection.getHost() + ":" + connection.getPort() + BULK;
    }

    protected String buildSearchUri(String str, int i, int i2) {
        return buildBaseUri() + SwaggerApiProducer.VersionTransformer.PATH_FRAGMENT_COMPONENT_SEPARATOR + str + SEARCH + "?size=" + i + "&from=" + i2;
    }

    protected String buildBaseUri() {
        if (this.baseUri != null) {
            return this.baseUri;
        }
        ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration connection = this.configuration.getConnection();
        return (connection.isUseSSL() ? "https" : "http") + "://" + connection.getHost() + ":" + connection.getPort() + "/" + this.indexName;
    }

    protected static ResourceException resourceException(String str, String str2, String str3, Response response) {
        if (response.getStatus().getCode() == 404) {
            return new NotFoundException("Object " + str3 + " not found in " + str + "-" + str2);
        }
        return ResourceException.newResourceException(response.getStatus().getCode(), "Elasticsearch response (" + str + SwaggerApiProducer.VersionTransformer.PATH_FRAGMENT_COMPONENT_SEPARATOR + str2 + "/" + str3 + "): " + response.getEntity());
    }

    private Request createRequest(String str, String str2, Object obj) throws URISyntaxException {
        Request request = new Request();
        request.setMethod(str);
        request.setUri(str2);
        if (obj != null) {
            request.getHeaders().put("Content-Type", (Object) Entity.APPLICATION_JSON_CHARSET_UTF_8);
            request.setEntity(obj);
        }
        if (this.basicAuthHeaderValue != null) {
            request.getHeaders().put("Authorization", (Object) this.basicAuthHeaderValue);
        }
        return request;
    }

    private HttpClientHandler defaultHttpClientHandler() {
        try {
            return new HttpClientHandler(Options.defaultOptions().set(HttpClientHandler.OPTION_LOADER, new Loader() { // from class: org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandler.5
                @Override // org.forgerock.http.spi.Loader
                public <S> S load(Class<S> cls, Options options) {
                    return cls.cast(new AsyncHttpClientProvider());
                }
            }));
        } catch (HttpApplicationException e) {
            throw new RuntimeException("Error while building default HTTP Client", e);
        }
    }
}
