package io.druid.segment.realtime.plumber;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.query.ReportTimelineMissingSegmentQueryRunner;
import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.SingleElementPartitionChunk;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;

/* loaded from: input_file:io/druid/segment/realtime/plumber/RealtimePlumber.class */
public class RealtimePlumber implements Plumber {
    private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class);
    private static final int WARN_DELAY = 1000;
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final RejectionPolicy rejectionPolicy;
    private final FireDepartmentMetrics metrics;
    private final ServiceEmitter emitter;
    private final QueryRunnerFactoryConglomerate conglomerate;
    private final DataSegmentAnnouncer segmentAnnouncer;
    private final ExecutorService queryExecutorService;
    private final DataSegmentPusher dataSegmentPusher;
    private final SegmentPublisher segmentPublisher;
    private final FilteredServerView serverView;
    private final Object handoffCondition = new Object();
    private final Map<Long, Sink> sinks = Maps.newConcurrentMap();
    private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(String.CASE_INSENSITIVE_ORDER);
    private volatile boolean shuttingDown = false;
    private volatile boolean stopped = false;
    private volatile boolean cleanShutdown = true;
    private volatile ExecutorService persistExecutor = null;
    private volatile ExecutorService mergeExecutor = null;
    private volatile ScheduledExecutorService scheduledExecutor = null;

    public RealtimePlumber(DataSchema dataSchema, RealtimeTuningConfig realtimeTuningConfig, FireDepartmentMetrics fireDepartmentMetrics, ServiceEmitter serviceEmitter, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, DataSegmentAnnouncer dataSegmentAnnouncer, ExecutorService executorService, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, FilteredServerView filteredServerView) {
        this.schema = dataSchema;
        this.config = realtimeTuningConfig;
        this.rejectionPolicy = realtimeTuningConfig.getRejectionPolicyFactory().create(realtimeTuningConfig.getWindowPeriod());
        this.metrics = fireDepartmentMetrics;
        this.emitter = serviceEmitter;
        this.conglomerate = queryRunnerFactoryConglomerate;
        this.segmentAnnouncer = dataSegmentAnnouncer;
        this.queryExecutorService = executorService;
        this.dataSegmentPusher = dataSegmentPusher;
        this.segmentPublisher = segmentPublisher;
        this.serverView = filteredServerView;
        log.info("Creating plumber using rejectionPolicy[%s]", new Object[]{getRejectionPolicy()});
    }

    public DataSchema getSchema() {
        return this.schema;
    }

    public RealtimeTuningConfig getConfig() {
        return this.config;
    }

    public RejectionPolicy getRejectionPolicy() {
        return this.rejectionPolicy;
    }

    public Map<Long, Sink> getSinks() {
        return this.sinks;
    }

    @Override // io.druid.segment.realtime.plumber.Plumber
    public void startJob() {
        computeBaseDir(this.schema).mkdirs();
        initializeExecutors();
        bootstrapSinksFromDisk();
        registerServerViewCallback();
        startPersistThread();
        mergeAndPush();
    }

    @Override // io.druid.segment.realtime.plumber.Plumber
    public int add(InputRow inputRow) throws IndexSizeExceededException {
        Sink sink = getSink(inputRow.getTimestampFromEpoch());
        if (sink == null) {
            return -1;
        }
        return sink.add(inputRow);
    }

    @Override // io.druid.segment.realtime.plumber.Plumber
    public Sink getSink(long j) {
        if (!this.rejectionPolicy.accept(j)) {
            return null;
        }
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        long millis = segmentGranularity.truncate(new DateTime(j)).getMillis();
        Sink sink = this.sinks.get(Long.valueOf(millis));
        if (sink == null) {
            Interval interval = new Interval(new DateTime(millis), segmentGranularity.increment(new DateTime(millis)));
            sink = new Sink(interval, this.schema, this.config, versioningPolicy.getVersion(interval));
            try {
                this.segmentAnnouncer.announceSegment(sink.getSegment());
                this.sinks.put(Long.valueOf(millis), sink);
                this.sinkTimeline.add(sink.getInterval(), sink.getVersion(), new SingleElementPartitionChunk(sink));
            } catch (IOException e) {
                log.makeAlert(e, "Failed to announce new segment[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", sink.getInterval()).emit();
            }
        }
        return sink;
    }

    @Override // io.druid.segment.realtime.plumber.Plumber
    public <T> QueryRunner<T> getQueryRunner(final Query<T> query) {
        final QueryRunnerFactory findFactory = this.conglomerate.findFactory(query);
        final QueryToolChest toolchest = findFactory.getToolchest();
        final Function<Query<T>, ServiceMetricEvent.Builder> function = new Function<Query<T>, ServiceMetricEvent.Builder>() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.1
            public ServiceMetricEvent.Builder apply(@Nullable Query<T> query2) {
                return toolchest.makeMetricBuilder(query);
            }
        };
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = query.getIntervals().iterator();
        while (it.hasNext()) {
            newArrayList.addAll(this.sinkTimeline.lookup((Interval) it.next()));
        }
        return toolchest.mergeResults(findFactory.mergeRunners(this.queryExecutorService, FunctionalIterable.create(newArrayList).transform(new Function<TimelineObjectHolder<String, Sink>, QueryRunner<T>>() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.2
            public QueryRunner<T> apply(TimelineObjectHolder<String, Sink> timelineObjectHolder) {
                if (timelineObjectHolder == null) {
                    throw new ISE("No timeline entry at all!", new Object[0]);
                }
                Sink sink = (Sink) timelineObjectHolder.getObject().getChunk(0).getObject();
                if (sink == null) {
                    throw new ISE("Missing sink for timeline entry[%s]!", new Object[]{timelineObjectHolder});
                }
                final SegmentDescriptor segmentDescriptor = new SegmentDescriptor(timelineObjectHolder.getInterval(), sink.getSegment().getVersion(), sink.getSegment().getShardSpec().getPartitionNum());
                return new SpecificSegmentQueryRunner(new MetricsEmittingQueryRunner(RealtimePlumber.this.emitter, function, findFactory.mergeRunners(MoreExecutors.sameThreadExecutor(), Iterables.transform(sink, new Function<FireHydrant, QueryRunner<T>>() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.2.1
                    public QueryRunner<T> apply(FireHydrant fireHydrant) {
                        if (fireHydrant == null || fireHydrant.getSegment() == null) {
                            return new ReportTimelineMissingSegmentQueryRunner(segmentDescriptor);
                        }
                        Closeable increment = fireHydrant.getSegment().increment();
                        try {
                            QueryRunner<T> createRunner = findFactory.createRunner(fireHydrant.getSegment());
                            if (increment != null) {
                                try {
                                    increment.close();
                                } catch (IOException e) {
                                    throw Throwables.propagate(e);
                                }
                            }
                            return createRunner;
                        } catch (Throwable th) {
                            if (increment != null) {
                                try {
                                    increment.close();
                                } catch (IOException e2) {
                                    throw Throwables.propagate(e2);
                                }
                            }
                            throw th;
                        }
                    }
                }))).withWaitMeasuredFromNow(), new SpecificSegmentSpec(segmentDescriptor));
            }
        })));
    }

    @Override // io.druid.segment.realtime.plumber.Plumber
    public void persist(final Runnable runnable) {
        final ArrayList newArrayList = Lists.newArrayList();
        for (Sink sink : this.sinks.values()) {
            if (sink.swappable()) {
                newArrayList.add(Pair.of(sink.swap(), sink.getInterval()));
            }
        }
        log.info("Submitting persist runnable for dataSource[%s]", new Object[]{this.schema.getDataSource()});
        Stopwatch createStarted = Stopwatch.createStarted();
        final Stopwatch createStarted2 = Stopwatch.createStarted();
        this.persistExecutor.execute(new ThreadRenamingRunnable(String.format("%s-incremental-persist", this.schema.getDataSource())) { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.3
            public void doRun() {
                try {
                    try {
                        for (Pair pair : newArrayList) {
                            RealtimePlumber.this.metrics.incrementRowOutputCount(RealtimePlumber.this.persistHydrant((FireHydrant) pair.lhs, RealtimePlumber.this.schema, (Interval) pair.rhs));
                        }
                        runnable.run();
                        RealtimePlumber.this.metrics.incrementNumPersists();
                        RealtimePlumber.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                        createStarted2.stop();
                    } catch (Exception e) {
                        RealtimePlumber.this.metrics.incrementFailedPersists();
                        throw e;
                    }
                } catch (Throwable th) {
                    RealtimePlumber.this.metrics.incrementNumPersists();
                    RealtimePlumber.this.metrics.incrementPersistTimeMillis(createStarted2.elapsed(TimeUnit.MILLISECONDS));
                    createStarted2.stop();
                    throw th;
                }
            }
        });
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        this.metrics.incrementPersistBackPressureMillis(elapsed);
        if (elapsed > 1000) {
            log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", new Object[]{Long.valueOf(elapsed)});
        }
        createStarted.stop();
    }

    private void persistAndMerge(final long j, final Sink sink) {
        this.mergeExecutor.execute(new ThreadRenamingRunnable(String.format("%s-%s-persist-n-merge", this.schema.getDataSource(), new DateTime(j))) { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.4
            public void doRun() {
                Interval interval = sink.getInterval();
                if (RealtimePlumber.this.sinks.get(Long.valueOf(j)) != sink) {
                    RealtimePlumber.log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", new Object[]{sink});
                    return;
                }
                File computePersistDir = RealtimePlumber.this.computePersistDir(RealtimePlumber.this.schema, interval);
                File file = new File(computePersistDir, "merged");
                File file2 = new File(computePersistDir, "isPushedMarker");
                if (file2.exists()) {
                    RealtimePlumber.log.info("Already pushed sink[%s]", new Object[]{sink});
                    return;
                }
                RealtimePlumber.this.removeSegment(sink, file);
                if (file.exists()) {
                    RealtimePlumber.log.wtf("Merged target[%s] exists?!", new Object[]{file});
                    return;
                }
                Iterator<FireHydrant> it = sink.iterator();
                while (it.hasNext()) {
                    FireHydrant next = it.next();
                    synchronized (next) {
                        if (!next.hasSwapped()) {
                            RealtimePlumber.log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", new Object[]{next, sink});
                            RealtimePlumber.this.metrics.incrementRowOutputCount(RealtimePlumber.this.persistHydrant(next, RealtimePlumber.this.schema, interval));
                        }
                    }
                }
                try {
                    ArrayList newArrayList = Lists.newArrayList();
                    Iterator<FireHydrant> it2 = sink.iterator();
                    while (it2.hasNext()) {
                        FireHydrant next2 = it2.next();
                        QueryableIndex asQueryableIndex = next2.getSegment().asQueryableIndex();
                        RealtimePlumber.log.info("Adding hydrant[%s]", new Object[]{next2});
                        newArrayList.add(asQueryableIndex);
                    }
                    File mergeQueryableIndex = RealtimePlumber.this.config.isPersistInHeap() ? IndexMaker.mergeQueryableIndex(newArrayList, RealtimePlumber.this.schema.getAggregators(), file) : IndexMerger.mergeQueryableIndex(newArrayList, RealtimePlumber.this.schema.getAggregators(), file);
                    DataSegment push = RealtimePlumber.this.dataSegmentPusher.push(mergeQueryableIndex, sink.getSegment().withDimensions(Lists.newArrayList(IndexIO.loadIndex(mergeQueryableIndex).getAvailableDimensions())));
                    RealtimePlumber.this.segmentPublisher.publishSegment(push);
                    if (!file2.createNewFile()) {
                        RealtimePlumber.log.makeAlert("Failed to create marker file for [%s]", new Object[]{RealtimePlumber.this.schema.getDataSource()}).addData("interval", sink.getInterval()).addData("partitionNum", Integer.valueOf(push.getShardSpec().getPartitionNum())).addData("marker", file2).emit();
                    }
                } catch (Exception e) {
                    RealtimePlumber.this.metrics.incrementFailedHandoffs();
                    RealtimePlumber.log.makeAlert(e, "Failed to persist merged index[%s]", new Object[]{RealtimePlumber.this.schema.getDataSource()}).addData("interval", interval).emit();
                    if (RealtimePlumber.this.shuttingDown) {
                        RealtimePlumber.this.cleanShutdown = false;
                        RealtimePlumber.this.abandonSegment(j, sink);
                    }
                }
            }
        });
    }

    @Override // io.druid.segment.realtime.plumber.Plumber
    public void finishJob() {
        log.info("Shutting down...", new Object[0]);
        this.shuttingDown = true;
        for (Map.Entry<Long, Sink> entry : this.sinks.entrySet()) {
            persistAndMerge(entry.getKey().longValue(), entry.getValue());
        }
        while (!this.sinks.isEmpty()) {
            try {
                log.info("Cannot shut down yet! Sinks remaining: %s", new Object[]{Joiner.on(", ").join(Iterables.transform(this.sinks.values(), new Function<Sink, String>() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.5
                    public String apply(Sink sink) {
                        return sink.getSegment().getIdentifier();
                    }
                }))});
                synchronized (this.handoffCondition) {
                    while (!this.sinks.isEmpty()) {
                        this.handoffCondition.wait();
                    }
                }
            } catch (InterruptedException e) {
                throw Throwables.propagate(e);
            }
        }
        shutdownExecutors();
        this.stopped = true;
        if (!this.cleanShutdown) {
            throw new ISE("Exception occurred during persist and merge.", new Object[0]);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeExecutors() {
        int maxPendingPersists = this.config.getMaxPendingPersists();
        if (this.persistExecutor == null) {
            this.persistExecutor = Execs.newBlockingSingleThreaded("plumber_persist_%d", maxPendingPersists);
        }
        if (this.mergeExecutor == null) {
            this.mergeExecutor = Execs.newBlockingSingleThreaded("plumber_merge_%d", 1);
        }
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("plumber_scheduled_%d").build());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdownExecutors() {
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
            this.persistExecutor.shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void bootstrapSinksFromDisk() {
        File[] listFiles;
        VersioningPolicy versioningPolicy = this.config.getVersioningPolicy();
        File computeBaseDir = computeBaseDir(this.schema);
        if (computeBaseDir == null || !computeBaseDir.exists() || (listFiles = computeBaseDir.listFiles()) == null) {
            return;
        }
        for (File file : listFiles) {
            Interval interval = new Interval(file.getName().replace("_", "/"));
            File[] listFiles2 = file.listFiles(new FilenameFilter() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.6
                @Override // java.io.FilenameFilter
                public boolean accept(File file2, String str) {
                    return Ints.tryParse(str) != null;
                }
            });
            Arrays.sort(listFiles2, new Comparator<File>() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.7
                @Override // java.util.Comparator
                public int compare(File file2, File file3) {
                    try {
                        return Ints.compare(Integer.parseInt(file2.getName()), Integer.parseInt(file3.getName()));
                    } catch (NumberFormatException e) {
                        RealtimePlumber.log.error(e, "Couldn't compare as numbers? [%s][%s]", new Object[]{file2, file3});
                        return file2.compareTo(file3);
                    }
                }
            });
            try {
                ArrayList newArrayList = Lists.newArrayList();
                for (File file2 : listFiles2) {
                    log.info("Loading previously persisted segment at [%s]", new Object[]{file2});
                    if (Ints.tryParse(file2.getName()) != null) {
                        newArrayList.add(new FireHydrant(new QueryableIndexSegment(DataSegment.makeDataSegmentIdentifier(this.schema.getDataSource(), interval.getStart(), interval.getEnd(), versioningPolicy.getVersion(interval), this.config.getShardSpec()), IndexIO.loadIndex(file2)), Integer.parseInt(file2.getName())));
                    }
                }
                Sink sink = new Sink(interval, this.schema, this.config, versioningPolicy.getVersion(interval), newArrayList);
                this.sinks.put(Long.valueOf(interval.getStartMillis()), sink);
                this.sinkTimeline.add(sink.getInterval(), sink.getVersion(), new SingleElementPartitionChunk(sink));
                this.segmentAnnouncer.announceSegment(sink.getSegment());
            } catch (IOException e) {
                log.makeAlert(e, "Problem loading sink[%s] from disk.", new Object[]{this.schema.getDataSource()}).addData("interval", interval).emit();
            }
        }
    }

    protected void startPersistThread() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        Period windowPeriod = this.config.getWindowPeriod();
        DateTime truncate = segmentGranularity.truncate(new DateTime());
        long millis = windowPeriod.toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", new Object[]{new DateTime().plus(new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncate).getMillis() + millis))});
        ScheduledExecutors.scheduleAtFixedRate(this.scheduledExecutor, new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncate).getMillis() + millis), new Duration(truncate, segmentGranularity.increment(truncate)), new ThreadRenamingCallable<ScheduledExecutors.Signal>(String.format("%s-overseer-%d", this.schema.getDataSource(), Integer.valueOf(this.config.getShardSpec().getPartitionNum()))) { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.8
            /* renamed from: doCall, reason: merged with bridge method [inline-methods] */
            public ScheduledExecutors.Signal m110doCall() {
                if (RealtimePlumber.this.stopped) {
                    RealtimePlumber.log.info("Stopping merge-n-push overseer thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                RealtimePlumber.this.mergeAndPush();
                if (!RealtimePlumber.this.stopped) {
                    return ScheduledExecutors.Signal.REPEAT;
                }
                RealtimePlumber.log.info("Stopping merge-n-push overseer thread", new Object[0]);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void mergeAndPush() {
        Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        long millis = this.config.getWindowPeriod().toStandardDuration().getMillis();
        log.info("Starting merge and push.", new Object[0]);
        DateTime truncate = segmentGranularity.truncate(new DateTime(Math.max(millis, this.rejectionPolicy.getCurrMaxTime().getMillis()) - millis));
        long millis2 = truncate.getMillis();
        log.info("Found [%,d] sinks. minTimestamp [%s]", new Object[]{Integer.valueOf(this.sinks.size()), truncate});
        ArrayList<Map.Entry> newArrayList = Lists.newArrayList();
        for (Map.Entry<Long, Sink> entry : this.sinks.entrySet()) {
            Long key = entry.getKey();
            if (key.longValue() < millis2) {
                log.info("Adding entry[%s] for merge and push.", new Object[]{entry});
                newArrayList.add(entry);
            } else {
                log.warn("[%s] < [%s] Skipping persist and merge.", new Object[]{new DateTime(key), truncate});
            }
        }
        log.info("Found [%,d] sinks to persist and merge", new Object[]{Integer.valueOf(newArrayList.size())});
        for (Map.Entry entry2 : newArrayList) {
            persistAndMerge(((Long) entry2.getKey()).longValue(), (Sink) entry2.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void abandonSegment(long j, Sink sink) {
        try {
            this.segmentAnnouncer.unannounceSegment(sink.getSegment());
            removeSegment(sink, computePersistDir(this.schema, sink.getInterval()));
            log.info("Removing sinkKey %d for segment %s", new Object[]{Long.valueOf(j), sink.getSegment().getIdentifier()});
            this.sinks.remove(Long.valueOf(j));
            this.sinkTimeline.remove(sink.getInterval(), sink.getVersion(), new SingleElementPartitionChunk(sink));
            synchronized (this.handoffCondition) {
                this.handoffCondition.notifyAll();
            }
        } catch (Exception e) {
            log.makeAlert(e, "Unable to abandon old segment for dataSource[%s]", new Object[]{this.schema.getDataSource()}).addData("interval", sink.getInterval()).emit();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public File computeBaseDir(DataSchema dataSchema) {
        return new File(this.config.getBasePersistDirectory(), dataSchema.getDataSource());
    }

    protected File computePersistDir(DataSchema dataSchema, Interval interval) {
        return new File(computeBaseDir(dataSchema), interval.toString().replace("/", "_"));
    }

    protected int persistHydrant(FireHydrant fireHydrant, DataSchema dataSchema, Interval interval) {
        synchronized (fireHydrant) {
            if (fireHydrant.hasSwapped()) {
                log.info("DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", new Object[]{dataSchema.getDataSource(), interval, fireHydrant});
                return 0;
            }
            log.info("DataSource[%s], Interval[%s], persisting Hydrant[%s]", new Object[]{dataSchema.getDataSource(), interval, fireHydrant});
            try {
                int size = fireHydrant.getIndex().size();
                fireHydrant.swapSegment(new QueryableIndexSegment(fireHydrant.getSegment().getIdentifier(), IndexIO.loadIndex(this.config.isPersistInHeap() ? IndexMaker.persist(fireHydrant.getIndex(), new File(computePersistDir(dataSchema, interval), String.valueOf(fireHydrant.getCount()))) : IndexMerger.persist(fireHydrant.getIndex(), new File(computePersistDir(dataSchema, interval), String.valueOf(fireHydrant.getCount()))))));
                return size;
            } catch (IOException e) {
                log.makeAlert("dataSource[%s] -- incremental persist failed", new Object[]{dataSchema.getDataSource()}).addData("interval", interval).addData("count", Integer.valueOf(fireHydrant.getCount())).emit();
                throw Throwables.propagate(e);
            }
        }
    }

    private void registerServerViewCallback() {
        this.serverView.registerSegmentCallback(this.mergeExecutor, new ServerView.BaseSegmentCallback() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.9
            @Override // io.druid.client.ServerView.BaseSegmentCallback, io.druid.client.ServerView.SegmentCallback
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                if (RealtimePlumber.this.stopped) {
                    RealtimePlumber.log.info("Unregistering ServerViewCallback", new Object[0]);
                    RealtimePlumber.this.mergeExecutor.shutdown();
                    return ServerView.CallbackAction.UNREGISTER;
                }
                if (!druidServerMetadata.isAssignable()) {
                    return ServerView.CallbackAction.CONTINUE;
                }
                RealtimePlumber.log.debug("Checking segment[%s] on server[%s]", new Object[]{dataSegment, druidServerMetadata});
                if (RealtimePlumber.this.schema.getDataSource().equals(dataSegment.getDataSource()) && RealtimePlumber.this.config.getShardSpec().getPartitionNum() == dataSegment.getShardSpec().getPartitionNum()) {
                    Interval interval = dataSegment.getInterval();
                    for (Map.Entry entry : RealtimePlumber.this.sinks.entrySet()) {
                        Long l = (Long) entry.getKey();
                        if (interval.contains(l.longValue())) {
                            Sink sink = (Sink) entry.getValue();
                            RealtimePlumber.log.info("Segment[%s] matches sink[%s] on server[%s]", new Object[]{dataSegment, sink, druidServerMetadata});
                            String version = dataSegment.getVersion();
                            String version2 = sink.getSegment().getVersion();
                            if (version.compareTo(version2) >= 0) {
                                RealtimePlumber.log.info("Segment version[%s] >= sink version[%s]", new Object[]{version, version2});
                                RealtimePlumber.this.abandonSegment(l.longValue(), sink);
                            }
                        }
                    }
                }
                return ServerView.CallbackAction.CONTINUE;
            }
        }, new Predicate<DataSegment>() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.10
            public boolean apply(final DataSegment dataSegment) {
                return RealtimePlumber.this.schema.getDataSource().equalsIgnoreCase(dataSegment.getDataSource()) && RealtimePlumber.this.config.getShardSpec().getPartitionNum() == dataSegment.getShardSpec().getPartitionNum() && Iterables.any(RealtimePlumber.this.sinks.keySet(), new Predicate<Long>() { // from class: io.druid.segment.realtime.plumber.RealtimePlumber.10.1
                    public boolean apply(Long l) {
                        return dataSegment.getInterval().contains(l.longValue());
                    }
                });
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeSegment(Sink sink, File file) {
        if (file.exists()) {
            try {
                log.info("Deleting Index File[%s]", new Object[]{file});
                FileUtils.deleteDirectory(file);
            } catch (Exception e) {
                log.makeAlert(e, "Unable to remove file for dataSource[%s]", new Object[]{this.schema.getDataSource()}).addData("file", file).addData("interval", sink.getInterval()).emit();
            }
        }
    }
}
