package io.zeebe.broker.transport.commandapi;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.transport.backpressure.BackpressureMetrics;
import io.zeebe.broker.transport.backpressure.RequestLimiter;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.impl.record.value.incident.IncidentRecord;
import io.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.zeebe.protocol.impl.record.value.job.JobRecord;
import io.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceCreationRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.record.ExecuteCommandRequestDecoder;
import io.zeebe.protocol.record.MessageHeaderDecoder;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.Intent;
import io.zeebe.transport.RequestHandler;
import io.zeebe.transport.ServerOutput;
import java.util.EnumMap;
import java.util.Map;
import java.util.Queue;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/transport/commandapi/CommandApiRequestHandler.class */
final class CommandApiRequestHandler implements RequestHandler {
    private static final Logger LOG = Loggers.TRANSPORT_LOGGER;
    private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    private final ExecuteCommandRequestDecoder executeCommandRequestDecoder = new ExecuteCommandRequestDecoder();
    private final Queue<Runnable> cmdQueue = new ManyToOneConcurrentLinkedQueue();
    private final Consumer<Runnable> cmdConsumer = (v0) -> {
        v0.run();
    };
    private final Int2ObjectHashMap<LogStreamRecordWriter> leadingStreams = new Int2ObjectHashMap<>();
    private final Int2ObjectHashMap<RequestLimiter<Intent>> partitionLimiters = new Int2ObjectHashMap<>();
    private final RecordMetadata eventMetadata = new RecordMetadata();
    private final ErrorResponseWriter errorResponseWriter = new ErrorResponseWriter();
    private final Map<ValueType, UnpackedObject> recordsByType = new EnumMap(ValueType.class);
    private boolean isDiskSpaceAvailable = true;
    private final BackpressureMetrics metrics = new BackpressureMetrics();

    /* JADX INFO: Access modifiers changed from: package-private */
    public CommandApiRequestHandler() {
        initEventTypeMap();
    }

    private void initEventTypeMap() {
        this.recordsByType.put(ValueType.DEPLOYMENT, new DeploymentRecord());
        this.recordsByType.put(ValueType.JOB, new JobRecord());
        this.recordsByType.put(ValueType.WORKFLOW_INSTANCE, new WorkflowInstanceRecord());
        this.recordsByType.put(ValueType.MESSAGE, new MessageRecord());
        this.recordsByType.put(ValueType.JOB_BATCH, new JobBatchRecord());
        this.recordsByType.put(ValueType.INCIDENT, new IncidentRecord());
        this.recordsByType.put(ValueType.VARIABLE_DOCUMENT, new VariableDocumentRecord());
        this.recordsByType.put(ValueType.WORKFLOW_INSTANCE_CREATION, new WorkflowInstanceCreationRecord());
    }

    private void handleExecuteCommandRequest(ServerOutput serverOutput, int i, long j, RecordMetadata recordMetadata, DirectBuffer directBuffer, int i2, int i3) {
        if (!this.isDiskSpaceAvailable) {
            this.errorResponseWriter.resourceExhausted(String.format("Cannot accept requests for partition %d. Broker is out of disk space", Integer.valueOf(i))).tryWriteResponse(serverOutput, i, j);
            return;
        }
        this.executeCommandRequestDecoder.wrap(directBuffer, i2 + this.messageHeaderDecoder.encodedLength(), this.messageHeaderDecoder.blockLength(), this.messageHeaderDecoder.version());
        long key = this.executeCommandRequestDecoder.key();
        LogStreamRecordWriter logStreamRecordWriter = (LogStreamRecordWriter) this.leadingStreams.get(i);
        if (logStreamRecordWriter == null) {
            this.errorResponseWriter.partitionLeaderMismatch(i).tryWriteResponseOrLogFailure(serverOutput, i, j);
            return;
        }
        ValueType valueType = this.executeCommandRequestDecoder.valueType();
        short intent = this.executeCommandRequestDecoder.intent();
        UnpackedObject unpackedObject = this.recordsByType.get(valueType);
        if (unpackedObject == null) {
            this.errorResponseWriter.unsupportedMessage(valueType.name(), this.recordsByType.keySet().toArray()).tryWriteResponseOrLogFailure(serverOutput, i, j);
            return;
        }
        int limit = this.executeCommandRequestDecoder.limit() + ExecuteCommandRequestDecoder.valueHeaderLength();
        int valueLength = this.executeCommandRequestDecoder.valueLength();
        unpackedObject.reset();
        try {
            unpackedObject.wrap(directBuffer, limit, valueLength);
            recordMetadata.recordType(RecordType.COMMAND);
            Intent fromProtocolValue = Intent.fromProtocolValue(valueType, intent);
            recordMetadata.intent(fromProtocolValue);
            recordMetadata.valueType(valueType);
            this.metrics.receivedRequest(i);
            RequestLimiter requestLimiter = (RequestLimiter) this.partitionLimiters.get(i);
            if (!requestLimiter.tryAcquire(i, j, fromProtocolValue)) {
                this.metrics.dropped(i);
                LOG.trace("Partition-{} receiving too many requests. Current limit {} inflight {}, dropping request {} from gateway", new Object[]{Integer.valueOf(i), Integer.valueOf(requestLimiter.getLimit()), Integer.valueOf(requestLimiter.getInflightCount()), Long.valueOf(j)});
                this.errorResponseWriter.resourceExhausted().tryWriteResponse(serverOutput, i, j);
                return;
            }
            boolean z = false;
            try {
                try {
                    z = writeCommand(recordMetadata, directBuffer, key, logStreamRecordWriter, limit, valueLength);
                    if (z) {
                        return;
                    }
                    requestLimiter.onIgnore(i, j);
                } catch (Exception e) {
                    LOG.error("Unexpected error on writing {} command", fromProtocolValue, e);
                    if (z) {
                        return;
                    }
                    requestLimiter.onIgnore(i, j);
                }
            } catch (Throwable th) {
                if (!z) {
                    requestLimiter.onIgnore(i, j);
                }
                throw th;
            }
        } catch (RuntimeException e2) {
            LOG.error("Failed to deserialize message of type {} in client API", valueType.name(), e2);
            this.errorResponseWriter.malformedRequest(e2).tryWriteResponseOrLogFailure(serverOutput, i, j);
        }
    }

    private boolean writeCommand(RecordMetadata recordMetadata, DirectBuffer directBuffer, long j, LogStreamRecordWriter logStreamRecordWriter, int i, int i2) {
        logStreamRecordWriter.reset();
        if (j != ExecuteCommandRequestDecoder.keyNullValue()) {
            logStreamRecordWriter.key(j);
        } else {
            logStreamRecordWriter.keyNull();
        }
        return logStreamRecordWriter.metadataWriter(recordMetadata).value(directBuffer, i, i2).tryWrite() >= 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addPartition(int i, LogStreamRecordWriter logStreamRecordWriter, RequestLimiter<Intent> requestLimiter) {
        this.cmdQueue.add(() -> {
            this.leadingStreams.put(i, logStreamRecordWriter);
            this.partitionLimiters.put(i, requestLimiter);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removePartition(int i) {
        this.cmdQueue.add(() -> {
            this.leadingStreams.remove(i);
            this.partitionLimiters.remove(i);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onDiskSpaceNotAvailable() {
        this.cmdQueue.add(() -> {
            this.isDiskSpaceAvailable = false;
            LOG.debug("Broker is out of disk space. All client requests will be rejected");
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onDiskSpaceAvailable() {
        this.cmdQueue.add(() -> {
            this.isDiskSpaceAvailable = true;
        });
    }

    public void onRequest(ServerOutput serverOutput, int i, long j, DirectBuffer directBuffer, int i2, int i3) {
        drainCommandQueue();
        this.messageHeaderDecoder.wrap(directBuffer, i2);
        int templateId = this.messageHeaderDecoder.templateId();
        int version = this.messageHeaderDecoder.version();
        if (version > 3) {
            this.errorResponseWriter.invalidClientVersion(3, version).tryWriteResponse(serverOutput, i, j);
            return;
        }
        this.eventMetadata.reset();
        this.eventMetadata.protocolVersion(version);
        this.eventMetadata.requestId(j);
        this.eventMetadata.requestStreamId(i);
        if (templateId == 20) {
            handleExecuteCommandRequest(serverOutput, i, j, this.eventMetadata, directBuffer, i2, i3);
        } else {
            this.errorResponseWriter.invalidMessageTemplate(templateId, 20).tryWriteResponse(serverOutput, i, j);
        }
    }

    private void drainCommandQueue() {
        while (!this.cmdQueue.isEmpty()) {
            Runnable poll = this.cmdQueue.poll();
            if (poll != null) {
                this.cmdConsumer.accept(poll);
            }
        }
    }
}
