package org.apache.iotdb.db.mpp.execution.exchange.sink;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager;
import org.apache.iotdb.db.mpp.metric.DataExchangeCostMetricSet;
import org.apache.iotdb.db.mpp.metric.QueryMetricsManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle.class */
public class ShuffleSinkHandle implements ISinkHandle {
    private final List<ISinkChannel> downStreamChannelList;
    private final boolean[] hasSetNoMoreTsBlocks;
    private final boolean[] channelOpened;
    private final DownStreamChannelIndex downStreamChannelIndex;
    private final int channelNum;
    private final ShuffleStrategy shuffleStrategy;
    private final String localPlanNodeId;
    private final TFragmentInstanceId localFragmentInstanceId;
    private final MPPDataExchangeManager.SinkListener sinkListener;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ShuffleSinkHandle.class);
    private static final QueryMetricsManager QUERY_METRICS = QueryMetricsManager.getInstance();
    private final Set<Integer> closedChannel = Sets.newConcurrentHashSet();
    private boolean aborted = false;
    private boolean closed = false;
    private long maxBytesCanReserve = IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle$PlainShuffleStrategy.class */
    public class PlainShuffleStrategy implements ShuffleStrategy {
        PlainShuffleStrategy() {
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle.ShuffleStrategy
        public void shuffle() {
            if (ShuffleSinkHandle.LOGGER.isDebugEnabled()) {
                ShuffleSinkHandle.LOGGER.debug("PlainShuffleStrategy needs to do nothing, current channel index is {}", Integer.valueOf(ShuffleSinkHandle.this.downStreamChannelIndex.getCurrentIndex()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle$ShuffleStrategy.class */
    public interface ShuffleStrategy {
        void shuffle();
    }

    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle$ShuffleStrategyEnum.class */
    public enum ShuffleStrategyEnum {
        PLAIN,
        SIMPLE_ROUND_ROBIN
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/exchange/sink/ShuffleSinkHandle$SimpleRoundRobinStrategy.class */
    public class SimpleRoundRobinStrategy implements ShuffleStrategy {
        private final long channelMemoryThreshold;

        SimpleRoundRobinStrategy() {
            this.channelMemoryThreshold = (ShuffleSinkHandle.this.maxBytesCanReserve / ShuffleSinkHandle.this.channelNum) * 3;
        }

        @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ShuffleSinkHandle.ShuffleStrategy
        public void shuffle() {
            int currentIndex = ShuffleSinkHandle.this.downStreamChannelIndex.getCurrentIndex();
            for (int i = 1; i < ShuffleSinkHandle.this.channelNum; i++) {
                int i2 = (currentIndex + i) % ShuffleSinkHandle.this.channelNum;
                if (satisfy(i2)) {
                    ShuffleSinkHandle.this.downStreamChannelIndex.setCurrentIndex(i2);
                    return;
                }
            }
        }

        private boolean satisfy(int i) {
            ISinkChannel iSinkChannel = (ISinkChannel) ShuffleSinkHandle.this.downStreamChannelList.get(i);
            return !iSinkChannel.isNoMoreTsBlocks() && !iSinkChannel.isClosed() && iSinkChannel.getBufferRetainedSizeInBytes() <= this.channelMemoryThreshold && iSinkChannel.getNumOfBufferedTsBlocks() < 3;
        }
    }

    public ShuffleSinkHandle(TFragmentInstanceId tFragmentInstanceId, List<ISinkChannel> list, DownStreamChannelIndex downStreamChannelIndex, ShuffleStrategyEnum shuffleStrategyEnum, String str, MPPDataExchangeManager.SinkListener sinkListener) {
        this.localFragmentInstanceId = (TFragmentInstanceId) Validate.notNull(tFragmentInstanceId);
        this.downStreamChannelList = (List) Validate.notNull(list);
        this.downStreamChannelIndex = (DownStreamChannelIndex) Validate.notNull(downStreamChannelIndex);
        this.localPlanNodeId = (String) Validate.notNull(str);
        this.sinkListener = (MPPDataExchangeManager.SinkListener) Validate.notNull(sinkListener);
        this.channelNum = list.size();
        this.shuffleStrategy = getShuffleStrategy(shuffleStrategyEnum);
        this.hasSetNoMoreTsBlocks = new boolean[this.channelNum];
        this.channelOpened = new boolean[this.channelNum];
        tryOpenChannel(0);
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public TFragmentInstanceId getLocalFragmentInstanceId() {
        return this.localFragmentInstanceId;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle
    public ISinkChannel getChannel(int i) {
        return this.downStreamChannelList.get(i);
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public synchronized ListenableFuture<?> isFull() {
        return this.downStreamChannelList.get(this.downStreamChannelIndex.getCurrentIndex()).isFull();
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public synchronized void send(TsBlock tsBlock) {
        long nanoTime = System.nanoTime();
        try {
            ISinkChannel iSinkChannel = this.downStreamChannelList.get(this.downStreamChannelIndex.getCurrentIndex());
            checkState();
            iSinkChannel.send(tsBlock);
            switchChannelIfNecessary();
            QUERY_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - nanoTime);
        } catch (Throwable th) {
            switchChannelIfNecessary();
            QUERY_METRICS.recordDataExchangeCost(DataExchangeCostMetricSet.SINK_HANDLE_SEND_TSBLOCK_REMOTE, System.nanoTime() - nanoTime);
            throw th;
        }
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public synchronized void setNoMoreTsBlocks() {
        for (int i = 0; i < this.downStreamChannelList.size(); i++) {
            if (!this.hasSetNoMoreTsBlocks[i]) {
                this.downStreamChannelList.get(i).setNoMoreTsBlocks();
                this.hasSetNoMoreTsBlocks[i] = true;
            }
        }
        this.sinkListener.onEndOfBlocks(this);
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle
    public synchronized void setNoMoreTsBlocksOfOneChannel(int i) {
        if (this.hasSetNoMoreTsBlocks[i]) {
            return;
        }
        this.downStreamChannelList.get(i).setNoMoreTsBlocks();
        this.hasSetNoMoreTsBlocks[i] = true;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public boolean isClosed() {
        return this.closedChannel.size() == this.downStreamChannelList.size();
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public synchronized boolean isAborted() {
        return this.aborted;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public synchronized boolean isFinished() {
        Iterator<ISinkChannel> it = this.downStreamChannelList.iterator();
        while (it.hasNext()) {
            if (!it.next().isFinished()) {
                return false;
            }
        }
        return true;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public synchronized void abort() {
        if (this.aborted) {
            return;
        }
        LOGGER.debug("[StartAbortShuffleSinkHandle]");
        boolean z = false;
        Exception exc = null;
        Iterator<ISinkChannel> it = this.downStreamChannelList.iterator();
        while (it.hasNext()) {
            try {
                it.next().abort();
            } catch (Exception e) {
                if (!z) {
                    exc = e;
                    z = true;
                }
            }
        }
        if (z) {
            LOGGER.warn("Error occurred when try to abort channel.", (Throwable) exc);
        }
        this.aborted = true;
        this.sinkListener.onAborted(this);
        LOGGER.debug("[EndAbortShuffleSinkHandle]");
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        LOGGER.debug("[StartCloseShuffleSinkHandle]");
        boolean z = false;
        Exception exc = null;
        Iterator<ISinkChannel> it = this.downStreamChannelList.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
                if (!z) {
                    exc = e;
                    z = true;
                }
            }
        }
        if (z) {
            LOGGER.warn("Error occurred when try to close channel.", (Throwable) exc);
        }
        this.closed = true;
        this.sinkListener.onFinish(this);
        LOGGER.debug("[EndCloseShuffleSinkHandle]");
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public void setMaxBytesCanReserve(long j) {
        this.maxBytesCanReserve = j;
        this.downStreamChannelList.forEach(iSinkChannel -> {
            iSinkChannel.setMaxBytesCanReserve(j);
        });
    }

    private void checkState() {
        if (this.aborted) {
            throw new IllegalStateException("ShuffleSinkHandle is aborted.");
        }
        if (this.closed) {
            throw new IllegalStateException("ShuffleSinkHandle is closed.");
        }
    }

    private void switchChannelIfNecessary() {
        this.shuffleStrategy.shuffle();
        tryOpenChannel(this.downStreamChannelIndex.getCurrentIndex());
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle
    public void tryOpenChannel(int i) {
        if (this.channelOpened[i]) {
            return;
        }
        this.downStreamChannelList.get(i).open();
        this.channelOpened[i] = true;
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISinkHandle
    public boolean isChannelClosed(int i) {
        if (this.closedChannel.contains(Integer.valueOf(i))) {
            return true;
        }
        if (!this.downStreamChannelList.get(i).isClosed()) {
            return false;
        }
        this.closedChannel.add(Integer.valueOf(i));
        return true;
    }

    private ShuffleStrategy getShuffleStrategy(ShuffleStrategyEnum shuffleStrategyEnum) {
        switch (shuffleStrategyEnum) {
            case PLAIN:
                return new PlainShuffleStrategy();
            case SIMPLE_ROUND_ROBIN:
                return new SimpleRoundRobinStrategy();
            default:
                throw new UnsupportedOperationException("Unsupported type of shuffle strategy");
        }
    }

    @Override // org.apache.iotdb.db.mpp.execution.exchange.sink.ISink
    public long getBufferRetainedSizeInBytes() {
        return ((Long) this.downStreamChannelList.stream().map((v0) -> {
            return v0.getBufferRetainedSizeInBytes();
        }).reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).orElse(0L)).longValue();
    }
}
