package org.reaktivity.reaktor.internal;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.ToIntFunction;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.broadcast.BroadcastReceiver;
import org.agrona.concurrent.broadcast.CopyBroadcastReceiver;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.Controller;
import org.reaktivity.nukleus.ControllerBuilder;
import org.reaktivity.nukleus.ControllerSpi;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
import org.reaktivity.reaktor.internal.types.control.ErrorFW;
import org.reaktivity.reaktor.internal.types.control.FrameFW;
import org.reaktivity.reaktor.internal.types.control.RoutedFW;
import org.reaktivity.reaktor.internal.types.control.UnroutedFW;

/* loaded from: input_file:org/reaktivity/reaktor/internal/ControllerBuilderImpl.class */
public final class ControllerBuilderImpl<T extends Controller> implements ControllerBuilder<T> {
    private final Configuration config;
    private final Class<T> kind;
    private Function<ControllerSpi, T> factory;
    private String name;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/reaktor/internal/ControllerBuilderImpl$ControllerSpiImpl.class */
    public final class ControllerSpiImpl implements ControllerSpi {
        private final FrameFW frameRO;
        private final RoutedFW routedRO;
        private final UnroutedFW unroutedRO;
        private final ErrorFW errorRO;
        private final Context context;
        private final RingBuffer conductorCommands;
        private final CopyBroadcastReceiver conductorResponses;
        private final Long2ObjectHashMap<CompletableFuture<?>> promisesByCorrelationId;
        private final MessageHandler readHandler;
        private final Map<String, StreamsLayout> sourcesByName;
        private final Map<String, StreamsLayout> targetsByName;
        static final /* synthetic */ boolean $assertionsDisabled;

        private ControllerSpiImpl(Context context) {
            this.frameRO = new FrameFW();
            this.routedRO = new RoutedFW();
            this.unroutedRO = new UnroutedFW();
            this.errorRO = new ErrorFW();
            this.context = context;
            this.conductorCommands = context.conductorCommands();
            this.conductorResponses = new CopyBroadcastReceiver(new BroadcastReceiver(context.conductorResponseBuffer()));
            this.promisesByCorrelationId = new Long2ObjectHashMap<>();
            this.sourcesByName = new HashMap();
            this.targetsByName = new HashMap();
            this.readHandler = (v1, v2, v3, v4) -> {
                handleResponse(v1, v2, v3, v4);
            };
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public long nextCorrelationId() {
            return this.conductorCommands.nextCorrelationId();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public int doProcess() {
            return this.conductorResponses.receive(this.readHandler);
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public void doClose() {
            this.sourcesByName.values().forEach((v0) -> {
                CloseHelper.close(v0);
            });
            this.sourcesByName.clear();
            this.targetsByName.values().forEach((v0) -> {
                CloseHelper.close(v0);
            });
            this.targetsByName.clear();
            CloseHelper.close(this.context);
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Long> doRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
            if ($assertionsDisabled || i == 1) {
                return handleCommand(Long.class, i, directBuffer, i2, i3);
            }
            throw new AssertionError();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Void> doUnroute(int i, DirectBuffer directBuffer, int i2, int i3) {
            if ($assertionsDisabled || i == 2) {
                return handleCommand(Void.class, i, directBuffer, i2, i3);
            }
            throw new AssertionError();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public <R> R doSupplySource(String str, BiFunction<MessagePredicate, ToIntFunction<MessageConsumer>, R> biFunction) {
            StreamsLayout computeIfAbsent = this.sourcesByName.computeIfAbsent(str, this::newSource);
            RingBuffer streamsBuffer = computeIfAbsent.streamsBuffer();
            streamsBuffer.getClass();
            MessagePredicate messagePredicate = streamsBuffer::write;
            RingBuffer throttleBuffer = computeIfAbsent.throttleBuffer();
            throttleBuffer.getClass();
            return biFunction.apply(messagePredicate, (v1) -> {
                return r0.read(v1);
            });
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public <R> R doSupplyTarget(String str, BiFunction<ToIntFunction<MessageConsumer>, MessagePredicate, R> biFunction) {
            StreamsLayout computeIfAbsent = this.targetsByName.computeIfAbsent(str, this::newTarget);
            RingBuffer streamsBuffer = computeIfAbsent.streamsBuffer();
            streamsBuffer.getClass();
            ToIntFunction<MessageConsumer> toIntFunction = (v1) -> {
                return r0.read(v1);
            };
            RingBuffer throttleBuffer = computeIfAbsent.throttleBuffer();
            throttleBuffer.getClass();
            return biFunction.apply(toIntFunction, throttleBuffer::write);
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public long doCount(String str) {
            return this.context.counters().readonlyCounter(str).getAsLong();
        }

        private StreamsLayout newSource(String str) {
            return new StreamsLayout.Builder().path(this.context.sourceStreamsPath().apply(str)).streamsCapacity(this.context.streamsBufferCapacity()).throttleCapacity(this.context.throttleBufferCapacity()).readonly(true).build();
        }

        private StreamsLayout newTarget(String str) {
            return new StreamsLayout.Builder().path(this.context.targetStreamsPath().apply(str)).streamsCapacity(this.context.streamsBufferCapacity()).throttleCapacity(this.context.throttleBufferCapacity()).readonly(false).build();
        }

        private <R> CompletableFuture<R> handleCommand(Class<R> cls, int i, DirectBuffer directBuffer, int i2, int i3) {
            CompletableFuture<R> completableFuture = new CompletableFuture<>();
            FrameFW wrap = this.frameRO.wrap(directBuffer, i2, i2 + i3);
            if (this.conductorCommands.write(i, directBuffer, i2, i3)) {
                commandSent(wrap.correlationId(), completableFuture);
            } else {
                commandSendFailed(completableFuture);
            }
            return completableFuture;
        }

        private int handleResponse(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741824:
                    handleErrorResponse(directBuffer, i2, i3);
                    return 1;
                case 1073741825:
                    handleRoutedResponse(directBuffer, i2, i3);
                    return 1;
                case 1073741826:
                    handleUnroutedResponse(directBuffer, i2, i3);
                    return 1;
                default:
                    return 1;
            }
        }

        private void handleErrorResponse(DirectBuffer directBuffer, int i, int i2) {
            this.errorRO.wrap(directBuffer, i, i2);
            CompletableFuture<?> remove = this.promisesByCorrelationId.remove(this.errorRO.correlationId());
            if (remove != null) {
                commandFailed(remove, "command failed");
            }
        }

        private void handleRoutedResponse(DirectBuffer directBuffer, int i, int i2) {
            RoutedFW wrap = this.routedRO.wrap(directBuffer, i, i2);
            long correlationId = wrap.correlationId();
            long sourceRef = wrap.sourceRef();
            CompletableFuture<?> remove = this.promisesByCorrelationId.remove(correlationId);
            if (remove != null) {
                commandSucceeded(remove, Long.valueOf(sourceRef));
            }
        }

        private void handleUnroutedResponse(DirectBuffer directBuffer, int i, int i2) {
            CompletableFuture<?> remove = this.promisesByCorrelationId.remove(this.unroutedRO.wrap(directBuffer, i, i2).correlationId());
            if (remove != null) {
                commandSucceeded(remove);
            }
        }

        private void commandSent(long j, CompletableFuture<?> completableFuture) {
            this.promisesByCorrelationId.put(j, (long) completableFuture);
        }

        private <R> boolean commandSucceeded(CompletableFuture<R> completableFuture) {
            return commandSucceeded(completableFuture, null);
        }

        private <R> boolean commandSucceeded(CompletableFuture<R> completableFuture, R r) {
            return completableFuture.complete(r);
        }

        private boolean commandSendFailed(CompletableFuture<?> completableFuture) {
            return commandFailed(completableFuture, "unable to offer command");
        }

        private boolean commandFailed(CompletableFuture<?> completableFuture, String str) {
            return completableFuture.completeExceptionally(new IllegalStateException(str));
        }

        static {
            $assertionsDisabled = !ControllerBuilderImpl.class.desiredAssertionStatus();
        }
    }

    public ControllerBuilderImpl(Configuration configuration, Class<T> cls) {
        this.config = configuration;
        this.kind = cls;
    }

    @Override // org.reaktivity.nukleus.ControllerBuilder
    public Class<T> kind() {
        return this.kind;
    }

    @Override // org.reaktivity.nukleus.ControllerBuilder
    public ControllerBuilder<T> setName(String str) {
        this.name = str;
        return this;
    }

    @Override // org.reaktivity.nukleus.ControllerBuilder
    public ControllerBuilder<T> setFactory(Function<ControllerSpi, T> function) {
        this.factory = function;
        return this;
    }

    @Override // org.reaktivity.nukleus.ControllerBuilder
    public T build() {
        Objects.requireNonNull(this.factory, "factory");
        Objects.requireNonNull(this.name, "name");
        Context context = new Context();
        context.name(this.name).readonly(true).conclude(this.config);
        return this.factory.apply(new ControllerSpiImpl(context));
    }
}
