package org.eclipse.ditto.services.thingsearch.persistence.write.streaming;

import akka.NotUsed;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.stream.Attributes;
import akka.stream.FanInShape2;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Broadcast;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Zip;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.stream.Collectors;
import org.bson.Document;
import org.eclipse.ditto.services.thingsearch.common.config.PersistenceStreamConfig;
import org.eclipse.ditto.services.thingsearch.persistence.PersistenceConstants;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.services.thingsearch.persistence.write.model.WriteResultAndErrors;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.StartedTimer;

/* loaded from: input_file:org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow.class */
final class MongoSearchUpdaterFlow {
    private static final String TRACE_THING_BULK_UPDATE = "things_search_thing_bulkUpdate";
    private static final String COUNT_THING_BULK_UPDATES_PER_BULK = "things_search_thing_bulkUpdate_updates_per_bulk";
    private static final String UPDATE_TYPE_TAG = "update_type";
    private final MongoCollection<Document> collection;
    private final MongoCollection<Document> collectionWithAcknowledgements;

    private MongoSearchUpdaterFlow(MongoCollection<Document> mongoCollection, PersistenceStreamConfig persistenceStreamConfig) {
        this.collection = mongoCollection;
        this.collectionWithAcknowledgements = mongoCollection.withWriteConcern(persistenceStreamConfig.getWithAcknowledgementsWriteConcern());
    }

    public static MongoSearchUpdaterFlow of(MongoDatabase mongoDatabase, PersistenceStreamConfig persistenceStreamConfig) {
        return new MongoSearchUpdaterFlow(mongoDatabase.getCollection(PersistenceConstants.THINGS_COLLECTION_NAME), persistenceStreamConfig);
    }

    public Flow<Source<AbstractWriteModel, NotUsed>, WriteResultAndErrors, NotUsed> start(boolean z, int i, int i2) {
        return Flow.fromGraph(assembleFlows(Flow.create().flatMapConcat(source -> {
            return source.grouped(i2);
        }), Flow.create().flatMapMerge(i, list -> {
            return executeBulkWrite(z, list);
        }).withAttributes(Attributes.inputBuffer(i, i)), createStartTimerFlow(), createStopTimerFlow()));
    }

    private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(boolean z, List<AbstractWriteModel> list) {
        return Source.fromPublisher((z ? this.collection : this.collectionWithAcknowledgements).bulkWrite((List) list.stream().map(abstractWriteModel -> {
            ConsistencyLag.startS5MongoBulkWrite(abstractWriteModel.getMetadata());
            return abstractWriteModel.toMongo();
        }).collect(Collectors.toList()), new BulkWriteOptions().ordered(false))).map(bulkWriteResult -> {
            return WriteResultAndErrors.success(list, bulkWriteResult);
        }).recoverWithRetries(1, new PFBuilder().match(MongoBulkWriteException.class, mongoBulkWriteException -> {
            return Source.single(WriteResultAndErrors.failure(list, mongoBulkWriteException));
        }).matchAny(th -> {
            return Source.single(WriteResultAndErrors.unexpectedError(list, th));
        }).build()).map(writeResultAndErrors -> {
            list.forEach(abstractWriteModel2 -> {
                ConsistencyLag.startS6Acknowledge(abstractWriteModel2.getMetadata());
            });
            return writeResultAndErrors;
        });
    }

    private static <T> Flow<List<T>, StartedTimer, NotUsed> createStartTimerFlow() {
        return Flow.fromFunction(list -> {
            DittoMetrics.histogram(COUNT_THING_BULK_UPDATES_PER_BULK).record(Long.valueOf(list.size()));
            return DittoMetrics.timer(TRACE_THING_BULK_UPDATE).tag(UPDATE_TYPE_TAG, "bulkUpdate").start();
        });
    }

    private static <T> Flow<Pair<T, StartedTimer>, T, NotUsed> createStopTimerFlow() {
        return Flow.fromFunction(pair -> {
            try {
                ((StartedTimer) pair.second()).stop();
            } catch (IllegalStateException e) {
            }
            return pair.first();
        });
    }

    private static <A, B, C, D> Graph<FlowShape<A, C>, NotUsed> assembleFlows(Flow<A, B, NotUsed> flow, Flow<B, C, NotUsed> flow2, Flow<B, D, NotUsed> flow3, Flow<Pair<C, D>, C, NotUsed> flow4) {
        return GraphDSL.create(builder -> {
            FlowShape add = builder.add(flow);
            FlowShape add2 = builder.add(flow2);
            FlowShape add3 = builder.add(flow3);
            FlowShape add4 = builder.add(flow4);
            UniformFanOutShape add5 = builder.add(Broadcast.create(2));
            FanInShape2 add6 = builder.add(Zip.create());
            builder.from(add.out()).toInlet(add5.in());
            builder.from(add5.out(0)).toInlet(add3.in());
            builder.from(add5.out(1)).toInlet(add2.in());
            builder.from(add2.out()).toInlet(add6.in0());
            builder.from(add3.out()).toInlet(add6.in1());
            builder.from(add6.out()).toInlet(add4.in());
            return FlowShape.of(add.in(), add4.out());
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1295597202:
                if (implMethodName.equals("lambda$createStartTimerFlow$a1f2755$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1273026456:
                if (implMethodName.equals("lambda$assembleFlows$daef8a17$1")) {
                    z = 2;
                    break;
                }
                break;
            case -912142495:
                if (implMethodName.equals("lambda$start$a6318b91$1")) {
                    z = false;
                    break;
                }
                break;
            case -66631367:
                if (implMethodName.equals("lambda$start$c4a51ee9$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1536480294:
                if (implMethodName.equals("lambda$createStopTimerFlow$e4b3c660$1")) {
                    z = true;
                    break;
                }
                break;
            case 1609800989:
                if (implMethodName.equals("lambda$executeBulkWrite$48ce8c4d$1")) {
                    z = 6;
                    break;
                }
                break;
            case 1609800990:
                if (implMethodName.equals("lambda$executeBulkWrite$48ce8c4d$2")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow") && serializedLambda.getImplMethodSignature().equals("(ILakka/stream/javadsl/Source;)Lakka/stream/Graph;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return source -> {
                        return source.grouped(intValue);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow") && serializedLambda.getImplMethodSignature().equals("(Lakka/japi/Pair;)Ljava/lang/Object;")) {
                    return pair -> {
                        try {
                            ((StartedTimer) pair.second()).stop();
                        } catch (IllegalStateException e) {
                        }
                        return pair.first();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Flow;Lakka/stream/javadsl/Flow;Lakka/stream/javadsl/Flow;Lakka/stream/javadsl/Flow;Lakka/stream/javadsl/GraphDSL$Builder;)Lakka/stream/FlowShape;")) {
                    Flow flow = (Flow) serializedLambda.getCapturedArg(0);
                    Flow flow2 = (Flow) serializedLambda.getCapturedArg(1);
                    Flow flow3 = (Flow) serializedLambda.getCapturedArg(2);
                    Flow flow4 = (Flow) serializedLambda.getCapturedArg(3);
                    return builder -> {
                        FlowShape add = builder.add(flow);
                        FlowShape add2 = builder.add(flow2);
                        FlowShape add3 = builder.add(flow3);
                        FlowShape add4 = builder.add(flow4);
                        UniformFanOutShape add5 = builder.add(Broadcast.create(2));
                        FanInShape2 add6 = builder.add(Zip.create());
                        builder.from(add.out()).toInlet(add5.in());
                        builder.from(add5.out(0)).toInlet(add3.in());
                        builder.from(add5.out(1)).toInlet(add2.in());
                        builder.from(add2.out()).toInlet(add6.in0());
                        builder.from(add3.out()).toInlet(add6.in1());
                        builder.from(add6.out()).toInlet(add4.in());
                        return FlowShape.of(add.in(), add4.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;)Lorg/eclipse/ditto/services/utils/metrics/instruments/timer/StartedTimer;")) {
                    return list -> {
                        DittoMetrics.histogram(COUNT_THING_BULK_UPDATES_PER_BULK).record(Long.valueOf(list.size()));
                        return DittoMetrics.timer(TRACE_THING_BULK_UPDATE).tag(UPDATE_TYPE_TAG, "bulkUpdate").start();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/WriteResultAndErrors;)Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/WriteResultAndErrors;")) {
                    List list2 = (List) serializedLambda.getCapturedArg(0);
                    return writeResultAndErrors -> {
                        list2.forEach(abstractWriteModel2 -> {
                            ConsistencyLag.startS6Acknowledge(abstractWriteModel2.getMetadata());
                        });
                        return writeResultAndErrors;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow") && serializedLambda.getImplMethodSignature().equals("(ZLjava/util/List;)Lakka/stream/Graph;")) {
                    MongoSearchUpdaterFlow mongoSearchUpdaterFlow = (MongoSearchUpdaterFlow) serializedLambda.getCapturedArg(0);
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(1)).booleanValue();
                    return list3 -> {
                        return executeBulkWrite(booleanValue, list3);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/thingsearch/persistence/write/streaming/MongoSearchUpdaterFlow") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lcom/mongodb/bulk/BulkWriteResult;)Lorg/eclipse/ditto/services/thingsearch/persistence/write/model/WriteResultAndErrors;")) {
                    List list4 = (List) serializedLambda.getCapturedArg(0);
                    return bulkWriteResult -> {
                        return WriteResultAndErrors.success(list4, bulkWriteResult);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
