package io.trino.operator;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.memory.QueryContextVisitor;
import io.trino.memory.context.AggregatedMemoryContext;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.operator.OperationTimer;
import io.trino.plugin.base.metrics.DurationTiming;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.Page;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

/* loaded from: input_file:io/trino/operator/OperatorContext.class */
public class OperatorContext {
    private final int operatorId;
    private final PlanNodeId planNodeId;
    private final String operatorType;
    private final DriverContext driverContext;
    private final Executor executor;
    private final AtomicReference<SettableFuture<Void>> memoryFuture;
    private final AtomicReference<SettableFuture<Void>> revocableMemoryFuture;
    private final OperatorSpillContext spillContext;

    @GuardedBy("this")
    private boolean memoryRevokingRequested;

    @GuardedBy("this")
    @Nullable
    private Runnable memoryRevocationRequestListener;
    private final MemoryTrackingContext operatorMemoryContext;
    private final CounterStat physicalInputDataSize = new CounterStat();
    private final CounterStat physicalInputPositions = new CounterStat();
    private final AtomicLong physicalInputReadTimeNanos = new AtomicLong();
    private final CounterStat internalNetworkInputDataSize = new CounterStat();
    private final CounterStat internalNetworkPositions = new CounterStat();
    private final OperationTimer.OperationTiming addInputTiming = new OperationTimer.OperationTiming();
    private final CounterStat inputDataSize = new CounterStat();
    private final CounterStat inputPositions = new CounterStat();
    private final OperationTimer.OperationTiming getOutputTiming = new OperationTimer.OperationTiming();
    private final CounterStat outputDataSize = new CounterStat();
    private final CounterStat outputPositions = new CounterStat();
    private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();
    private final AtomicReference<Metrics> metrics = new AtomicReference<>(Metrics.EMPTY);
    private final AtomicReference<Metrics> connectorMetrics = new AtomicReference<>(Metrics.EMPTY);
    private final AtomicLong physicalWrittenDataSize = new AtomicLong();
    private final AtomicReference<BlockedMonitor> blockedMonitor = new AtomicReference<>();
    private final AtomicReference<ListenableFuture<Void>> finishedFuture = new AtomicReference<>();
    private final AtomicLong blockedWallNanos = new AtomicLong();
    private final OperationTimer.OperationTiming finishTiming = new OperationTimer.OperationTiming();
    private final AtomicReference<Supplier<? extends OperatorInfo>> infoSupplier = new AtomicReference<>();
    private final AtomicReference<Supplier<List<OperatorStats>>> nestedOperatorStatsSupplier = new AtomicReference<>();
    private final AtomicLong peakUserMemoryReservation = new AtomicLong();
    private final AtomicLong peakRevocableMemoryReservation = new AtomicLong();
    private final AtomicLong peakTotalMemoryReservation = new AtomicLong();

    /* loaded from: input_file:io/trino/operator/OperatorContext$BlockedMonitor.class */
    private class BlockedMonitor implements Runnable {
        private final long start = System.nanoTime();
        private boolean finished;

        private BlockedMonitor() {
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            OperatorContext.this.blockedMonitor.compareAndSet(this, null);
            OperatorContext.this.blockedWallNanos.getAndAdd(getBlockedTime());
        }

        public long getBlockedTime() {
            return OperatorContext.nanosBetween(this.start, System.nanoTime());
        }
    }

    /* loaded from: input_file:io/trino/operator/OperatorContext$InternalAggregatedMemoryContext.class */
    private static class InternalAggregatedMemoryContext implements AggregatedMemoryContext {
        private final AggregatedMemoryContext delegate;
        private final AtomicReference<SettableFuture<Void>> memoryFuture;
        private final Runnable allocationListener;
        private final boolean closeable;

        InternalAggregatedMemoryContext(AggregatedMemoryContext aggregatedMemoryContext, AtomicReference<SettableFuture<Void>> atomicReference, Runnable runnable, boolean z) {
            this.delegate = (AggregatedMemoryContext) Objects.requireNonNull(aggregatedMemoryContext, "delegate is null");
            this.memoryFuture = (AtomicReference) Objects.requireNonNull(atomicReference, "memoryFuture is null");
            this.allocationListener = (Runnable) Objects.requireNonNull(runnable, "allocationListener is null");
            this.closeable = z;
        }

        public AggregatedMemoryContext newAggregatedMemoryContext() {
            return new InternalAggregatedMemoryContext(this.delegate.newAggregatedMemoryContext(), this.memoryFuture, this.allocationListener, true);
        }

        public LocalMemoryContext newLocalMemoryContext(String str) {
            return new InternalLocalMemoryContext(this.delegate.newLocalMemoryContext(str), this.memoryFuture, this.allocationListener, true);
        }

        public long getBytes() {
            return this.delegate.getBytes();
        }

        public void close() {
            if (!this.closeable) {
                throw new UnsupportedOperationException("Called close on unclosable aggregated memory context");
            }
            this.delegate.close();
        }
    }

    /* loaded from: input_file:io/trino/operator/OperatorContext$InternalLocalMemoryContext.class */
    private static class InternalLocalMemoryContext implements LocalMemoryContext {
        private final LocalMemoryContext delegate;
        private final AtomicReference<SettableFuture<Void>> memoryFuture;
        private final Runnable allocationListener;
        private final boolean closeable;

        InternalLocalMemoryContext(LocalMemoryContext localMemoryContext, AtomicReference<SettableFuture<Void>> atomicReference, Runnable runnable, boolean z) {
            this.delegate = (LocalMemoryContext) Objects.requireNonNull(localMemoryContext, "delegate is null");
            this.memoryFuture = (AtomicReference) Objects.requireNonNull(atomicReference, "memoryFuture is null");
            this.allocationListener = (Runnable) Objects.requireNonNull(runnable, "allocationListener is null");
            this.closeable = z;
        }

        public long getBytes() {
            return this.delegate.getBytes();
        }

        public ListenableFuture<Void> setBytes(long j) {
            if (j == this.delegate.getBytes()) {
                return Operator.NOT_BLOCKED;
            }
            ListenableFuture<Void> bytes = this.delegate.setBytes(j);
            OperatorContext.updateMemoryFuture(bytes, this.memoryFuture);
            this.allocationListener.run();
            return bytes;
        }

        public boolean trySetBytes(long j) {
            if (!this.delegate.trySetBytes(j)) {
                return false;
            }
            this.allocationListener.run();
            return true;
        }

        public void close() {
            if (!this.closeable) {
                throw new UnsupportedOperationException("Called close on unclosable local memory context");
            }
            this.delegate.close();
            this.allocationListener.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/trino/operator/OperatorContext$OperatorSpillContext.class */
    public static class OperatorSpillContext implements SpillContext {
        private final DriverContext driverContext;
        private final AtomicLong reservedBytes = new AtomicLong();
        private final AtomicLong spilledBytes = new AtomicLong();

        public OperatorSpillContext(DriverContext driverContext) {
            this.driverContext = driverContext;
        }

        @Override // io.trino.operator.SpillContext
        public void updateBytes(long j) {
            if (j < 0) {
                this.reservedBytes.accumulateAndGet(-j, this::decrementSpilledReservation);
                this.driverContext.freeSpill(-j);
            } else {
                this.reservedBytes.addAndGet(j);
                this.driverContext.reserveSpill(j);
                this.spilledBytes.addAndGet(j);
            }
        }

        public long getSpilledBytes() {
            return this.spilledBytes.longValue();
        }

        private long decrementSpilledReservation(long j, long j2) {
            Preconditions.checkArgument(j2 >= 0);
            Preconditions.checkArgument(j2 <= j, "tried to free %s spilled bytes from %s bytes reserved", j2, j);
            return j - j2;
        }

        @Override // io.trino.operator.SpillContext, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            throw new UnsupportedOperationException(String.format("%s should not be closed directly", getClass()));
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("usedBytes", this.reservedBytes.get()).toString();
        }
    }

    public OperatorContext(int i, PlanNodeId planNodeId, String str, DriverContext driverContext, Executor executor, MemoryTrackingContext memoryTrackingContext) {
        Preconditions.checkArgument(i >= 0, "operatorId is negative");
        this.operatorId = i;
        this.planNodeId = (PlanNodeId) Objects.requireNonNull(planNodeId, "planNodeId is null");
        this.operatorType = (String) Objects.requireNonNull(str, "operatorType is null");
        this.driverContext = (DriverContext) Objects.requireNonNull(driverContext, "driverContext is null");
        this.spillContext = new OperatorSpillContext(this.driverContext);
        this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
        this.memoryFuture = new AtomicReference<>(SettableFuture.create());
        this.memoryFuture.get().set((Object) null);
        this.revocableMemoryFuture = new AtomicReference<>(SettableFuture.create());
        this.revocableMemoryFuture.get().set((Object) null);
        this.operatorMemoryContext = (MemoryTrackingContext) Objects.requireNonNull(memoryTrackingContext, "operatorMemoryContext is null");
        memoryTrackingContext.initializeLocalMemoryContexts(str);
    }

    public int getOperatorId() {
        return this.operatorId;
    }

    public String getOperatorType() {
        return this.operatorType;
    }

    public DriverContext getDriverContext() {
        return this.driverContext;
    }

    public Session getSession() {
        return this.driverContext.getSession();
    }

    public boolean isDone() {
        return this.driverContext.isDone();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordAddInput(OperationTimer operationTimer, Page page) {
        operationTimer.recordOperationComplete(this.addInputTiming);
        if (page != null) {
            this.inputDataSize.update(page.getSizeInBytes());
            this.inputPositions.update(page.getPositionCount());
        }
    }

    public void recordPhysicalInputWithTiming(long j, long j2, long j3) {
        this.physicalInputDataSize.update(j);
        this.physicalInputPositions.update(j2);
        this.physicalInputReadTimeNanos.getAndAdd(j3);
    }

    public void recordNetworkInput(long j, long j2) {
        this.internalNetworkInputDataSize.update(j);
        this.internalNetworkPositions.update(j2);
    }

    public void recordProcessedInput(long j, long j2) {
        this.inputDataSize.update(j);
        this.inputPositions.update(j2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordGetOutput(OperationTimer operationTimer, Page page) {
        operationTimer.recordOperationComplete(this.getOutputTiming);
        if (page != null) {
            this.outputDataSize.update(page.getSizeInBytes());
            this.outputPositions.update(page.getPositionCount());
        }
    }

    public void recordOutput(long j, long j2) {
        this.outputDataSize.update(j);
        this.outputPositions.update(j2);
    }

    public void recordDynamicFilterSplitProcessed(long j) {
        this.dynamicFilterSplitsProcessed.getAndAdd(j);
    }

    public void setLatestMetrics(Metrics metrics) {
        this.metrics.set(metrics);
    }

    public void setLatestConnectorMetrics(Metrics metrics) {
        this.connectorMetrics.set(metrics);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<ListenableFuture<Void>> getFinishedFuture() {
        return Optional.ofNullable(this.finishedFuture.get());
    }

    public void setFinishedFuture(ListenableFuture<Void> listenableFuture) {
        Preconditions.checkState(this.finishedFuture.getAndSet((ListenableFuture) Objects.requireNonNull(listenableFuture, "finishedFuture is null")) == null, "finishedFuture already set");
    }

    public void recordPhysicalWrittenData(long j) {
        this.physicalWrittenDataSize.getAndAdd(j);
    }

    public void recordBlocked(ListenableFuture<Void> listenableFuture) {
        Objects.requireNonNull(listenableFuture, "blocked is null");
        BlockedMonitor blockedMonitor = new BlockedMonitor();
        BlockedMonitor andSet = this.blockedMonitor.getAndSet(blockedMonitor);
        if (andSet != null) {
            andSet.run();
        }
        listenableFuture.addListener(blockedMonitor, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void recordFinish(OperationTimer operationTimer) {
        operationTimer.recordOperationComplete(this.finishTiming);
    }

    public ListenableFuture<Void> isWaitingForMemory() {
        return this.memoryFuture.get();
    }

    public ListenableFuture<Void> isWaitingForRevocableMemory() {
        return this.revocableMemoryFuture.get();
    }

    public LocalMemoryContext newLocalUserMemoryContext(String str) {
        return new InternalLocalMemoryContext(this.operatorMemoryContext.newUserMemoryContext(str), this.memoryFuture, this::updatePeakMemoryReservations, true);
    }

    public LocalMemoryContext localUserMemoryContext() {
        return new InternalLocalMemoryContext(this.operatorMemoryContext.localUserMemoryContext(), this.memoryFuture, this::updatePeakMemoryReservations, false);
    }

    public LocalMemoryContext localRevocableMemoryContext() {
        return new InternalLocalMemoryContext(this.operatorMemoryContext.localRevocableMemoryContext(), this.revocableMemoryFuture, this::updatePeakMemoryReservations, false);
    }

    public AggregatedMemoryContext aggregateUserMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.aggregateUserMemoryContext(), this.memoryFuture, this::updatePeakMemoryReservations, false);
    }

    public AggregatedMemoryContext aggregateRevocableMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.aggregateRevocableMemoryContext(), this.revocableMemoryFuture, this::updatePeakMemoryReservations, false);
    }

    public AggregatedMemoryContext newAggregateUserMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.newAggregateUserMemoryContext(), this.memoryFuture, this::updatePeakMemoryReservations, true);
    }

    public AggregatedMemoryContext newAggregateRevocableMemoryContext() {
        return new InternalAggregatedMemoryContext(this.operatorMemoryContext.newAggregateRevocableMemoryContext(), this.revocableMemoryFuture, this::updatePeakMemoryReservations, true);
    }

    private void updatePeakMemoryReservations() {
        long userMemory = this.operatorMemoryContext.getUserMemory();
        long revocableMemory = this.operatorMemoryContext.getRevocableMemory();
        this.peakUserMemoryReservation.accumulateAndGet(userMemory, Math::max);
        this.peakRevocableMemoryReservation.accumulateAndGet(revocableMemory, Math::max);
        this.peakTotalMemoryReservation.accumulateAndGet(userMemory, Math::max);
    }

    public long getReservedRevocableBytes() {
        return this.operatorMemoryContext.getRevocableMemory();
    }

    private static void updateMemoryFuture(ListenableFuture<Void> listenableFuture, AtomicReference<SettableFuture<Void>> atomicReference) {
        if (listenableFuture.isDone()) {
            return;
        }
        SettableFuture<Void> settableFuture = atomicReference.get();
        while (true) {
            SettableFuture<Void> settableFuture2 = settableFuture;
            if (!settableFuture2.isDone()) {
                listenableFuture.addListener(() -> {
                    settableFuture2.set((Object) null);
                }, MoreExecutors.directExecutor());
                return;
            } else {
                SettableFuture<Void> create = SettableFuture.create();
                settableFuture = atomicReference.compareAndSet(settableFuture2, create) ? create : atomicReference.get();
            }
        }
    }

    public void destroy() {
        synchronized (this) {
            this.memoryRevocationRequestListener = null;
        }
        Supplier<? extends OperatorInfo> supplier = this.infoSupplier.get();
        if (supplier != null) {
            OperatorInfo operatorInfo = supplier.get();
            this.infoSupplier.set(operatorInfo == null ? null : Suppliers.ofInstance(operatorInfo));
        }
        Supplier<List<OperatorStats>> supplier2 = this.nestedOperatorStatsSupplier.get();
        if (supplier2 != null) {
            List<OperatorStats> list = supplier2.get();
            this.nestedOperatorStatsSupplier.set(list == null ? null : Suppliers.ofInstance(ImmutableList.copyOf(list)));
        }
        this.operatorMemoryContext.close();
        if (this.operatorMemoryContext.getUserMemory() != 0) {
            throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Operator %s has non-zero user memory (%d bytes) after destroy()", this, Long.valueOf(this.operatorMemoryContext.getUserMemory())));
        }
        if (this.operatorMemoryContext.getRevocableMemory() != 0) {
            throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, String.format("Operator %s has non-zero revocable memory (%d bytes) after destroy()", this, Long.valueOf(this.operatorMemoryContext.getRevocableMemory())));
        }
    }

    public SpillContext getSpillContext() {
        return this.spillContext;
    }

    public synchronized boolean isMemoryRevokingRequested() {
        return this.memoryRevokingRequested;
    }

    public long requestMemoryRevoking() {
        long j = 0;
        Runnable runnable = null;
        synchronized (this) {
            if (!isMemoryRevokingRequested() && this.operatorMemoryContext.getRevocableMemory() > 0) {
                this.memoryRevokingRequested = true;
                j = this.operatorMemoryContext.getRevocableMemory();
                runnable = this.memoryRevocationRequestListener;
            }
        }
        if (runnable != null) {
            runListener(runnable);
        }
        return j;
    }

    public synchronized void resetMemoryRevokingRequested() {
        this.memoryRevokingRequested = false;
    }

    public void setMemoryRevocationRequestListener(Runnable runnable) {
        boolean z;
        Objects.requireNonNull(runnable, "listener is null");
        synchronized (this) {
            Preconditions.checkState(this.memoryRevocationRequestListener == null, "listener already set");
            this.memoryRevocationRequestListener = runnable;
            z = this.memoryRevokingRequested;
        }
        if (z) {
            runListener(runnable);
        }
    }

    private static void runListener(Runnable runnable) {
        Objects.requireNonNull(runnable, "listener is null");
        try {
            runnable.run();
        } catch (RuntimeException e) {
            throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "Exception while running the listener", e);
        }
    }

    public void setInfoSupplier(Supplier<? extends OperatorInfo> supplier) {
        Objects.requireNonNull(supplier, "infoSupplier is null");
        this.infoSupplier.set(supplier);
    }

    public void setNestedOperatorStatsSupplier(Supplier<List<OperatorStats>> supplier) {
        Objects.requireNonNull(supplier, "nestedOperatorStatsSupplier is null");
        this.nestedOperatorStatsSupplier.set(supplier);
    }

    public CounterStat getInputDataSize() {
        return this.inputDataSize;
    }

    public CounterStat getInputPositions() {
        return this.inputPositions;
    }

    public CounterStat getOutputDataSize() {
        return this.outputDataSize;
    }

    public CounterStat getOutputPositions() {
        return this.outputPositions;
    }

    public long getPhysicalWrittenDataSize() {
        return this.physicalWrittenDataSize.get();
    }

    public String toString() {
        return String.format("%s-%s", this.operatorType, this.planNodeId);
    }

    public List<OperatorStats> getNestedOperatorStats() {
        return (List) Optional.ofNullable(this.nestedOperatorStatsSupplier.get()).map((v0) -> {
            return v0.get();
        }).orElseGet(() -> {
            return ImmutableList.of(getOperatorStats());
        });
    }

    public static Metrics getOperatorMetrics(Metrics metrics, long j, double d, double d2, double d3) {
        return metrics.mergeWith(new Metrics(ImmutableMap.of("Input rows distribution", TDigestHistogram.fromValue(j), "CPU time distribution (s)", TDigestHistogram.fromValue(d), "Scheduled time distribution (s)", TDigestHistogram.fromValue(d2), "Blocked time distribution (s)", TDigestHistogram.fromValue(d3))));
    }

    public static Metrics getConnectorMetrics(Metrics metrics, long j) {
        return j == 0 ? metrics : metrics.mergeWith(new Metrics(ImmutableMap.of("Physical input read time", new DurationTiming(new Duration(j, TimeUnit.NANOSECONDS)))));
    }

    public <C, R> R accept(QueryContextVisitor<C, R> queryContextVisitor, C c) {
        return queryContextVisitor.visitOperatorContext(this, c);
    }

    private OperatorStats getOperatorStats() {
        OperatorInfo operatorInfo = (OperatorInfo) Optional.ofNullable(this.infoSupplier.get()).map((v0) -> {
            return v0.get();
        }).orElse(null);
        long totalCount = this.inputPositions.getTotalCount();
        return new OperatorStats(this.driverContext.getTaskId().getStageId().getId(), this.driverContext.getPipelineContext().getPipelineId(), this.operatorId, this.planNodeId, this.operatorType, 1L, this.addInputTiming.getCalls(), new Duration(this.addInputTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(this.addInputTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(this.physicalInputDataSize.getTotalCount()), this.physicalInputPositions.getTotalCount(), new Duration(this.physicalInputReadTimeNanos.get(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(this.internalNetworkInputDataSize.getTotalCount()), this.internalNetworkPositions.getTotalCount(), DataSize.ofBytes(this.physicalInputDataSize.getTotalCount() + this.internalNetworkInputDataSize.getTotalCount()), DataSize.ofBytes(this.inputDataSize.getTotalCount()), totalCount, totalCount * totalCount, this.getOutputTiming.getCalls(), new Duration(this.getOutputTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(this.getOutputTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(this.outputDataSize.getTotalCount()), this.outputPositions.getTotalCount(), this.dynamicFilterSplitsProcessed.get(), getOperatorMetrics(this.metrics.get(), totalCount, new Duration(this.addInputTiming.getCpuNanos() + this.getOutputTiming.getCpuNanos() + this.finishTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue(), new Duration(this.addInputTiming.getWallNanos() + this.getOutputTiming.getWallNanos() + this.finishTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue(), new Duration(this.blockedWallNanos.get(), TimeUnit.NANOSECONDS).convertTo(TimeUnit.SECONDS).getValue()), getConnectorMetrics(this.connectorMetrics.get(), this.physicalInputReadTimeNanos.get()), DataSize.ofBytes(this.physicalWrittenDataSize.get()), new Duration(this.blockedWallNanos.get(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), this.finishTiming.getCalls(), new Duration(this.finishTiming.getWallNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), new Duration(this.finishTiming.getCpuNanos(), TimeUnit.NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(this.operatorMemoryContext.getUserMemory()), DataSize.ofBytes(getReservedRevocableBytes()), DataSize.ofBytes(this.peakUserMemoryReservation.get()), DataSize.ofBytes(this.peakRevocableMemoryReservation.get()), DataSize.ofBytes(this.peakTotalMemoryReservation.get()), DataSize.ofBytes(this.spillContext.getSpilledBytes()), this.memoryFuture.get().isDone() ? Optional.empty() : Optional.of(BlockedReason.WAITING_FOR_MEMORY), operatorInfo);
    }

    private static long nanosBetween(long j, long j2) {
        return Math.max(0L, j2 - j);
    }

    @VisibleForTesting
    public MemoryTrackingContext getOperatorMemoryContext() {
        return this.operatorMemoryContext;
    }
}
