package io.druid.segment.realtime.appenderator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.SegmentWithState;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.joda.time.DateTime;

/* loaded from: input_file:io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.class */
public abstract class BaseAppenderatorDriver implements Closeable {
    private static final Logger log = new Logger(BaseAppenderatorDriver.class);
    private final SegmentAllocator segmentAllocator;
    private final UsedSegmentChecker usedSegmentChecker;
    protected final Appenderator appenderator;
    protected final Map<String, SegmentsForSequence> segments = new TreeMap();
    protected final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-%d"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/druid/segment/realtime/appenderator/BaseAppenderatorDriver$SegmentsForSequence.class */
    public static class SegmentsForSequence {
        private final NavigableMap<Long, LinkedList<SegmentWithState>> intervalToSegmentStates;
        private String lastSegmentId;

        SegmentsForSequence() {
            this.intervalToSegmentStates = new TreeMap();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public SegmentsForSequence(NavigableMap<Long, LinkedList<SegmentWithState>> navigableMap, String str) {
            this.intervalToSegmentStates = navigableMap;
            this.lastSegmentId = str;
        }

        void add(SegmentIdentifier segmentIdentifier) {
            ((LinkedList) this.intervalToSegmentStates.computeIfAbsent(Long.valueOf(segmentIdentifier.getInterval().getStartMillis()), l -> {
                return new LinkedList();
            })).addFirst(SegmentWithState.newSegment(segmentIdentifier));
            this.lastSegmentId = segmentIdentifier.getIdentifierAsString();
        }

        Map.Entry<Long, LinkedList<SegmentWithState>> floor(long j) {
            return this.intervalToSegmentStates.floorEntry(Long.valueOf(j));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public LinkedList<SegmentWithState> get(long j) {
            return (LinkedList) this.intervalToSegmentStates.get(Long.valueOf(j));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Stream<SegmentWithState> segmentStateStream() {
            return this.intervalToSegmentStates.values().stream().flatMap((v0) -> {
                return v0.stream();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/druid/segment/realtime/appenderator/BaseAppenderatorDriver$WrappedCommitter.class */
    public static class WrappedCommitter implements Committer {
        private final Committer delegate;
        private final AppenderatorDriverMetadata metadata;

        WrappedCommitter(Committer committer, AppenderatorDriverMetadata appenderatorDriverMetadata) {
            this.delegate = committer;
            this.metadata = appenderatorDriverMetadata;
        }

        public Object getMetadata() {
            return this.metadata;
        }

        public void run() {
            this.delegate.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BaseAppenderatorDriver(Appenderator appenderator, SegmentAllocator segmentAllocator, UsedSegmentChecker usedSegmentChecker) {
        this.appenderator = (Appenderator) Preconditions.checkNotNull(appenderator, "appenderator");
        this.segmentAllocator = (SegmentAllocator) Preconditions.checkNotNull(segmentAllocator, "segmentAllocator");
        this.usedSegmentChecker = (UsedSegmentChecker) Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker");
    }

    @VisibleForTesting
    Map<String, SegmentsForSequence> getSegments() {
        return this.segments;
    }

    @Nullable
    public abstract Object startJob();

    private SegmentIdentifier getAppendableSegment(DateTime dateTime, String str) {
        synchronized (this.segments) {
            SegmentsForSequence segmentsForSequence = this.segments.get(str);
            if (segmentsForSequence == null) {
                return null;
            }
            Map.Entry<Long, LinkedList<SegmentWithState>> floor = segmentsForSequence.floor(dateTime.getMillis());
            if (floor == null || !floor.getValue().getFirst().getSegmentIdentifier().getInterval().contains(dateTime) || floor.getValue().getFirst().getState() != SegmentWithState.SegmentState.APPENDING) {
                return null;
            }
            return floor.getValue().getFirst().getSegmentIdentifier();
        }
    }

    private SegmentIdentifier getSegment(InputRow inputRow, String str, boolean z) throws IOException {
        synchronized (this.segments) {
            DateTime timestamp = inputRow.getTimestamp();
            SegmentIdentifier appendableSegment = getAppendableSegment(timestamp, str);
            if (appendableSegment != null) {
                return appendableSegment;
            }
            SegmentsForSequence segmentsForSequence = this.segments.get(str);
            SegmentIdentifier allocate = this.segmentAllocator.allocate(inputRow, str, segmentsForSequence == null ? null : segmentsForSequence.lastSegmentId, z);
            if (allocate != null) {
                for (SegmentIdentifier segmentIdentifier : this.appenderator.getSegments()) {
                    if (segmentIdentifier.equals(allocate)) {
                        throw new ISE("WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", new Object[]{allocate, segmentIdentifier});
                    }
                }
                log.info("New segment[%s] for row[%s] sequenceName[%s].", new Object[]{allocate, inputRow, str});
                addSegment(str, allocate);
            } else {
                log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", new Object[]{timestamp, str});
            }
            return allocate;
        }
    }

    private void addSegment(String str, SegmentIdentifier segmentIdentifier) {
        synchronized (this.segments) {
            this.segments.computeIfAbsent(str, str2 -> {
                return new SegmentsForSequence();
            }).add(segmentIdentifier);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AppenderatorDriverAddResult append(InputRow inputRow, String str, @Nullable Supplier<Committer> supplier, boolean z, boolean z2) throws IOException {
        Preconditions.checkNotNull(inputRow, "row");
        Preconditions.checkNotNull(str, "sequenceName");
        SegmentIdentifier segment = getSegment(inputRow, str, z);
        if (segment == null) {
            return AppenderatorDriverAddResult.fail();
        }
        try {
            Appenderator.AppenderatorAddResult add = this.appenderator.add(segment, inputRow, supplier == null ? null : wrapCommitterSupplier(supplier), z2);
            return AppenderatorDriverAddResult.ok(segment, add.getNumRowsInSegment(), this.appenderator.getTotalRowCount(), add.isPersistRequired());
        } catch (SegmentNotWritableException e) {
            throw new ISE(e, "WTF?! Segment[%s] not writable when it should have been.", new Object[]{segment});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Stream<SegmentWithState> getSegmentWithStates(Collection<String> collection) {
        Stream<SegmentWithState> flatMap;
        synchronized (this.segments) {
            Stream<String> stream = collection.stream();
            Map<String, SegmentsForSequence> map = this.segments;
            map.getClass();
            flatMap = stream.map((v1) -> {
                return r1.get(v1);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).flatMap(segmentsForSequence -> {
                return segmentsForSequence.intervalToSegmentStates.values().stream();
            }).flatMap((v0) -> {
                return v0.stream();
            });
        }
        return flatMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<SegmentsAndMetadata> pushInBackground(@Nullable WrappedCommitter wrappedCommitter, Collection<SegmentIdentifier> collection) {
        log.info("Pushing segments in background: [%s]", new Object[]{Joiner.on(", ").join(collection)});
        return Futures.transform(this.appenderator.push(collection, wrappedCommitter), segmentsAndMetadata -> {
            Set set = (Set) segmentsAndMetadata.getSegments().stream().map(SegmentIdentifier::fromDataSegment).collect(Collectors.toSet());
            if (set.equals(Sets.newHashSet(collection))) {
                return segmentsAndMetadata;
            }
            throw new ISE("WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].", new Object[]{set, collection});
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<SegmentsAndMetadata> dropInBackground(SegmentsAndMetadata segmentsAndMetadata) {
        log.info("Dropping segments[%s]", new Object[]{segmentsAndMetadata.getSegments()});
        return Futures.transform(Futures.allAsList((Iterable) segmentsAndMetadata.getSegments().stream().map(dataSegment -> {
            return this.appenderator.drop(SegmentIdentifier.fromDataSegment(dataSegment));
        }).collect(Collectors.toList())), obj -> {
            Object commitMetadata = segmentsAndMetadata.getCommitMetadata();
            return new SegmentsAndMetadata(segmentsAndMetadata.getSegments(), commitMetadata == null ? null : ((AppenderatorDriverMetadata) commitMetadata).getCallerMetadata());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ListenableFuture<SegmentsAndMetadata> publishInBackground(SegmentsAndMetadata segmentsAndMetadata, TransactionalSegmentPublisher transactionalSegmentPublisher) {
        return this.executor.submit(() -> {
            if (segmentsAndMetadata.getSegments().isEmpty()) {
                log.info("Nothing to publish, skipping publish step.", new Object[0]);
            } else {
                log.info("Publishing segments with commitMetadata[%s]: [%s]", new Object[]{segmentsAndMetadata.getCommitMetadata(), Joiner.on(", ").join(segmentsAndMetadata.getSegments())});
                try {
                    Object commitMetadata = segmentsAndMetadata.getCommitMetadata();
                    if (transactionalSegmentPublisher.publishSegments(ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), commitMetadata == null ? null : ((AppenderatorDriverMetadata) commitMetadata).getCallerMetadata())) {
                        log.info("Published segments.", new Object[0]);
                    } else {
                        log.info("Transaction failure while publishing segments, checking if someone else beat us to it.", new Object[0]);
                        if (!this.usedSegmentChecker.findUsedSegments((Set) segmentsAndMetadata.getSegments().stream().map(SegmentIdentifier::fromDataSegment).collect(Collectors.toSet())).equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
                            throw new ISE("Failed to publish segments[%s]", new Object[]{segmentsAndMetadata.getSegments()});
                        }
                        log.info("Our segments really do exist, awaiting handoff.", new Object[0]);
                    }
                } catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
            return segmentsAndMetadata;
        });
    }

    public void clear() throws InterruptedException {
        synchronized (this.segments) {
            this.segments.clear();
        }
        this.appenderator.clear();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.executor.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WrappedCommitter wrapCommitter(Committer committer) {
        ImmutableMap copyOf;
        synchronized (this.segments) {
            copyOf = ImmutableMap.copyOf(this.segments);
        }
        return new WrappedCommitter(committer, new AppenderatorDriverMetadata(ImmutableMap.copyOf(Maps.transformValues(copyOf, segmentsForSequence -> {
            return ImmutableList.copyOf((Collection) segmentsForSequence.intervalToSegmentStates.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList()));
        })), (Map) copyOf.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((SegmentsForSequence) entry.getValue()).lastSegmentId;
        })), committer.getMetadata()));
    }

    private Supplier<Committer> wrapCommitterSupplier(Supplier<Committer> supplier) {
        return () -> {
            return wrapCommitter((Committer) supplier.get());
        };
    }
}
