package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.GrpcFnServer;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.ServerFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.SdkHarnessClient;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.FnDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerator;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.IdGenerators;
import org.apache.beam.repackaged.beam_runners_direct_java.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.net.HostAndPort;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DockerJobBundleFactory.class */
public class DockerJobBundleFactory implements JobBundleFactory {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DockerJobBundleFactory.class);
    private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
    private final IdGenerator stageIdGenerator;
    private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
    private final GrpcFnServer<GrpcLoggingService> loggingServer;
    private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
    private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
    private final LoadingCache<RunnerApi.Environment, WrappedSdkHarnessClient> environmentCache;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DockerJobBundleFactory$Platform.class */
    public enum Platform {
        MAC,
        LINUX,
        OTHER
    }

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DockerJobBundleFactory$SimpleStageBundleFactory.class */
    private static class SimpleStageBundleFactory<InputT> implements StageBundleFactory<InputT> {
        private final SdkHarnessClient.BundleProcessor<InputT> processor;
        private final ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor;
        private WrappedSdkHarnessClient wrappedClient;

        static <InputT> SimpleStageBundleFactory<InputT> create(WrappedSdkHarnessClient wrappedSdkHarnessClient, ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor) {
            return new SimpleStageBundleFactory<>(executableProcessBundleDescriptor, wrappedSdkHarnessClient.getClient().getProcessor(executableProcessBundleDescriptor.getProcessBundleDescriptor(), executableProcessBundleDescriptor.getRemoteInputDestination(), wrappedSdkHarnessClient.getStateServer().getService()), wrappedSdkHarnessClient);
        }

        SimpleStageBundleFactory(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor executableProcessBundleDescriptor, SdkHarnessClient.BundleProcessor<InputT> bundleProcessor, WrappedSdkHarnessClient wrappedSdkHarnessClient) {
            this.processBundleDescriptor = executableProcessBundleDescriptor;
            this.processor = bundleProcessor;
            this.wrappedClient = wrappedSdkHarnessClient;
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.StageBundleFactory
        public RemoteBundle<InputT> getBundle(OutputReceiverFactory outputReceiverFactory, StateRequestHandler stateRequestHandler, BundleProgressHandler bundleProgressHandler) throws Exception {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (Map.Entry<BeamFnApi.Target, Coder<WindowedValue<?>>> entry : this.processBundleDescriptor.getOutputTargetCoders().entrySet()) {
                BeamFnApi.Target key = entry.getKey();
                builder.put(key, RemoteOutputReceiver.of(entry.getValue(), outputReceiverFactory.create((String) Iterables.getOnlyElement(this.processBundleDescriptor.getProcessBundleDescriptor().getTransformsOrThrow(key.getPrimitiveTransformReference()).getInputsMap().values()))));
            }
            return this.processor.newBundle(builder.build(), stateRequestHandler, bundleProgressHandler);
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.wrappedClient = null;
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DockerJobBundleFactory$UnimplementedArtifactRetrievalService.class */
    private static class UnimplementedArtifactRetrievalService extends ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceImplBase implements ArtifactRetrievalService {
        private UnimplementedArtifactRetrievalService() {
        }

        @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.FnService, java.lang.AutoCloseable
        public void close() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/repackaged/beam_runners_direct_java/runners/fnexecution/control/DockerJobBundleFactory$WrappedSdkHarnessClient.class */
    public static class WrappedSdkHarnessClient implements AutoCloseable {
        private final RemoteEnvironment environment;
        private final ExecutorService executor;
        private final GrpcFnServer<GrpcDataService> dataServer;
        private final GrpcFnServer<GrpcStateService> stateServer;
        private final SdkHarnessClient client;

        static WrappedSdkHarnessClient wrapping(RemoteEnvironment remoteEnvironment, ServerFactory serverFactory) throws Exception {
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            GrpcFnServer allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(newCachedThreadPool, OutboundObserverFactory.serverDirect()), serverFactory);
            return new WrappedSdkHarnessClient(remoteEnvironment, newCachedThreadPool, allocatePortAndCreateFor, GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory), SdkHarnessClient.usingFnApiClient(remoteEnvironment.getInstructionRequestHandler(), (FnDataService) allocatePortAndCreateFor.getService()));
        }

        private WrappedSdkHarnessClient(RemoteEnvironment remoteEnvironment, ExecutorService executorService, GrpcFnServer<GrpcDataService> grpcFnServer, GrpcFnServer<GrpcStateService> grpcFnServer2, SdkHarnessClient sdkHarnessClient) {
            this.executor = executorService;
            this.environment = remoteEnvironment;
            this.dataServer = grpcFnServer;
            this.stateServer = grpcFnServer2;
            this.client = sdkHarnessClient;
        }

        SdkHarnessClient getClient() {
            return this.client;
        }

        GrpcFnServer<GrpcStateService> getStateServer() {
            return this.stateServer;
        }

        GrpcFnServer<GrpcDataService> getDataServer() {
            return this.dataServer;
        }

        /* JADX WARN: Failed to calculate best type for var: r6v1 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Failed to calculate best type for var: r7v0 ??
        java.lang.NullPointerException
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException
         */
        /* JADX WARN: Not initialized variable reg: 6, insn: 0x0078: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r6 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:38:0x0078 */
        /* JADX WARN: Not initialized variable reg: 7, insn: 0x007c: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r7 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:40:0x007c */
        /* JADX WARN: Type inference failed for: r6v1, types: [java.lang.AutoCloseable] */
        /* JADX WARN: Type inference failed for: r7v0, types: [java.lang.Throwable] */
        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            ?? r6;
            ?? r7;
            GrpcFnServer<GrpcStateService> grpcFnServer = this.stateServer;
            try {
                try {
                    GrpcFnServer<GrpcDataService> grpcFnServer2 = this.dataServer;
                    RemoteEnvironment remoteEnvironment = this.environment;
                    Throwable th = null;
                    try {
                        try {
                            ExecutorService executorService = this.executor;
                            Objects.requireNonNull(executorService);
                            AutoCloseable autoCloseable = executorService::shutdown;
                            if (autoCloseable != null) {
                                $closeResource(null, autoCloseable);
                            }
                            if (remoteEnvironment != null) {
                                $closeResource(null, remoteEnvironment);
                            }
                            if (grpcFnServer2 != null) {
                                $closeResource(null, grpcFnServer2);
                            }
                        } catch (Throwable th2) {
                            th = th2;
                            throw th2;
                        }
                    } catch (Throwable th3) {
                        if (remoteEnvironment != null) {
                            $closeResource(th, remoteEnvironment);
                        }
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (r6 != 0) {
                        $closeResource(r7, r6);
                    }
                    throw th4;
                }
            } finally {
                if (grpcFnServer != null) {
                    $closeResource(null, grpcFnServer);
                }
            }
        }

        private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
            if (th == null) {
                autoCloseable.close();
                return;
            }
            try {
                autoCloseable.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
        }
    }

    public static DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
        ServerFactory serverFactory = getServerFactory();
        IdGenerator incrementingLongs = IdGenerators.incrementingLongs();
        MapControlClientPool create = MapControlClientPool.create();
        GrpcFnServer allocatePortAndCreateFor = GrpcFnServer.allocatePortAndCreateFor(FnApiControlClientPoolService.offeringClientsToPool(create.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()), serverFactory);
        GrpcFnServer allocatePortAndCreateFor2 = GrpcFnServer.allocatePortAndCreateFor(GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
        GrpcFnServer allocatePortAndCreateFor3 = GrpcFnServer.allocatePortAndCreateFor(BeamFileSystemArtifactRetrievalService.create(), serverFactory);
        GrpcFnServer allocatePortAndCreateFor4 = GrpcFnServer.allocatePortAndCreateFor(StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
        return new DockerJobBundleFactory(DockerEnvironmentFactory.forServices(allocatePortAndCreateFor, allocatePortAndCreateFor2, allocatePortAndCreateFor3, allocatePortAndCreateFor4, create.getSource(), IdGenerators.incrementingLongs()), serverFactory, incrementingLongs, allocatePortAndCreateFor, allocatePortAndCreateFor2, allocatePortAndCreateFor3, allocatePortAndCreateFor4);
    }

    @VisibleForTesting
    DockerJobBundleFactory(final DockerEnvironmentFactory dockerEnvironmentFactory, final ServerFactory serverFactory, IdGenerator idGenerator, GrpcFnServer<FnApiControlClientPoolService> grpcFnServer, GrpcFnServer<GrpcLoggingService> grpcFnServer2, GrpcFnServer<ArtifactRetrievalService> grpcFnServer3, GrpcFnServer<StaticGrpcProvisionService> grpcFnServer4) {
        this.stageIdGenerator = idGenerator;
        this.controlServer = grpcFnServer;
        this.loggingServer = grpcFnServer2;
        this.retrievalServer = grpcFnServer3;
        this.provisioningServer = grpcFnServer4;
        this.environmentCache = CacheBuilder.newBuilder().removalListener(removalNotification -> {
            LOG.debug("Cleaning up for environment {}", ((RunnerApi.Environment) removalNotification.getKey()).getUrl());
            try {
                ((WrappedSdkHarnessClient) removalNotification.getValue()).close();
            } catch (Exception e) {
                LOG.warn(String.format("Error cleaning up environment %s", removalNotification.getKey()), (Throwable) e);
            }
        }).build(new CacheLoader<RunnerApi.Environment, WrappedSdkHarnessClient>() { // from class: org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.DockerJobBundleFactory.1
            @Override // org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.cache.CacheLoader
            public WrappedSdkHarnessClient load(RunnerApi.Environment environment) throws Exception {
                return WrappedSdkHarnessClient.wrapping(dockerEnvironmentFactory.createEnvironment(environment), serverFactory);
            }
        });
    }

    @Override // org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.control.JobBundleFactory
    public <T> StageBundleFactory<T> forStage(ExecutableStage executableStage) {
        WrappedSdkHarnessClient unchecked = this.environmentCache.getUnchecked(executableStage.getEnvironment());
        try {
            return SimpleStageBundleFactory.create(unchecked, ProcessBundleDescriptors.fromExecutableStage(this.stageIdGenerator.getId(), executableStage, unchecked.getDataServer().getApiServiceDescriptor(), unchecked.getStateServer().getApiServiceDescriptor()));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.environmentCache.invalidateAll();
        this.environmentCache.cleanUp();
        this.controlServer.close();
        this.loggingServer.close();
        this.retrievalServer.close();
        this.provisioningServer.close();
    }

    private static ServerFactory getServerFactory() {
        switch (getPlatform()) {
            case LINUX:
                return ServerFactory.createDefault();
            case MAC:
                return ServerFactory.createWithUrlFactory((str, i) -> {
                    return HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, i).toString();
                });
            default:
                LOG.warn("Unknown Docker platform. Falling back to default server factory");
                return ServerFactory.createDefault();
        }
    }

    private static Platform getPlatform() {
        String lowerCase = System.getProperty("os.name").toLowerCase();
        return lowerCase.startsWith("mac") ? Platform.MAC : lowerCase.startsWith("linux") ? Platform.LINUX : Platform.OTHER;
    }
}
