package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.metrics.StreamingMetrics;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.messages.CompleteMessage;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.streaming.messages.IncomingFileMessage;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.streaming.messages.PrepareMessage;
import org.apache.cassandra.streaming.messages.ReceivedMessage;
import org.apache.cassandra.streaming.messages.SessionFailedMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamSession.class */
public class StreamSession implements IEndpointStateChangeSubscriber {
    private static final Logger logger;
    public final InetAddress peer;
    private final int index;
    public final InetAddress connecting;
    private StreamResultFuture streamResult;
    private final StreamingMetrics metrics;
    private final StreamConnectionFactory factory;
    private final boolean keepSSTableLevel;
    private final boolean isIncremental;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet();

    @VisibleForTesting
    protected final ConcurrentHashMap<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>();
    private final Map<UUID, StreamReceiveTask> receivers = new ConcurrentHashMap();
    private AtomicBoolean isAborted = new AtomicBoolean(false);
    private volatile State state = State.INITIALIZED;
    private volatile boolean completeSent = false;
    public final ConnectionHandler handler = new ConnectionHandler(this);

    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$SSTableStreamingSections.class */
    public static class SSTableStreamingSections {
        public final Ref<SSTableReader> ref;
        public final List<Pair<Long, Long>> sections;
        public final long estimatedKeys;
        public final long repairedAt;

        public SSTableStreamingSections(Ref<SSTableReader> ref, List<Pair<Long, Long>> list, long j, long j2) {
            this.ref = ref;
            this.sections = list;
            this.estimatedKeys = j;
            this.repairedAt = j2;
        }
    }

    /* loaded from: input_file:org/apache/cassandra/streaming/StreamSession$State.class */
    public enum State {
        INITIALIZED,
        PREPARING,
        STREAMING,
        WAIT_COMPLETE,
        COMPLETE,
        FAILED
    }

    public StreamSession(InetAddress inetAddress, InetAddress inetAddress2, StreamConnectionFactory streamConnectionFactory, int i, boolean z, boolean z2) {
        this.peer = inetAddress;
        this.connecting = inetAddress2;
        this.index = i;
        this.factory = streamConnectionFactory;
        this.metrics = StreamingMetrics.get(inetAddress2);
        this.keepSSTableLevel = z;
        this.isIncremental = z2;
    }

    public UUID planId() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.planId;
    }

    public int sessionIndex() {
        return this.index;
    }

    public String description() {
        if (this.streamResult == null) {
            return null;
        }
        return this.streamResult.description;
    }

    public boolean keepSSTableLevel() {
        return this.keepSSTableLevel;
    }

    public boolean isIncremental() {
        return this.isIncremental;
    }

    public LifecycleTransaction getTransaction(UUID uuid) {
        if ($assertionsDisabled || this.receivers.containsKey(uuid)) {
            return this.receivers.get(uuid).getTransaction();
        }
        throw new AssertionError();
    }

    public void init(StreamResultFuture streamResultFuture) {
        this.streamResult = streamResultFuture;
        StreamHook.instance.reportStreamFuture(this, streamResultFuture);
    }

    public void start() {
        if (this.requests.isEmpty() && this.transfers.isEmpty()) {
            logger.info("[Stream #{}] Session does not have any tasks.", planId());
            closeSession(State.COMPLETE);
            return;
        }
        try {
            Logger logger2 = logger;
            Object[] objArr = new Object[3];
            objArr[0] = planId();
            objArr[1] = this.peer;
            objArr[2] = this.peer.equals(this.connecting) ? "" : " through " + this.connecting;
            logger2.info("[Stream #{}] Starting streaming to {}{}", objArr);
            this.handler.initiate();
            onInitializationComplete();
        } catch (Exception e) {
            JVMStabilityInspector.inspectThrowable(e);
            onError(e);
        }
    }

    public Socket createConnection() throws IOException {
        if ($assertionsDisabled || this.factory != null) {
            return this.factory.createConnection(this.connecting);
        }
        throw new AssertionError();
    }

    public void addStreamRequest(String str, Collection<Range<Token>> collection, Collection<String> collection2, long j) {
        this.requests.add(new StreamRequest(str, collection, collection2, j));
    }

    public synchronized void addTransferRanges(String str, Collection<Range<Token>> collection, Collection<String> collection2, boolean z, long j) {
        failIfFinished();
        Collection<ColumnFamilyStore> columnFamilyStores = getColumnFamilyStores(str, collection2);
        if (z) {
            flushSSTables(columnFamilyStores);
        }
        List<SSTableStreamingSections> sSTableSectionsForRanges = getSSTableSectionsForRanges(Range.normalize(collection), columnFamilyStores, j, this.isIncremental);
        try {
            addTransferFiles(sSTableSectionsForRanges);
            Iterator<SSTableStreamingSections> it2 = sSTableSectionsForRanges.iterator();
            while (it2.hasNext()) {
                it2.next().ref.release();
            }
        } catch (Throwable th) {
            Iterator<SSTableStreamingSections> it3 = sSTableSectionsForRanges.iterator();
            while (it3.hasNext()) {
                it3.next().ref.release();
            }
            throw th;
        }
    }

    private void failIfFinished() {
        if (state() == State.COMPLETE || state() == State.FAILED) {
            throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
        }
    }

    private Collection<ColumnFamilyStore> getColumnFamilyStores(String str, Collection<String> collection) {
        HashSet hashSet = new HashSet();
        if (collection.isEmpty()) {
            hashSet.addAll(Keyspace.open(str).getColumnFamilyStores());
        } else {
            Iterator<String> it2 = collection.iterator();
            while (it2.hasNext()) {
                hashSet.add(Keyspace.open(str).getColumnFamilyStore(it2.next()));
            }
        }
        return hashSet;
    }

    @VisibleForTesting
    public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> collection, Collection<ColumnFamilyStore> collection2, long j, boolean z) {
        Refs refs = new Refs();
        try {
            for (ColumnFamilyStore columnFamilyStore : collection2) {
                ArrayList arrayList = new ArrayList(collection.size());
                Iterator<Range<Token>> it2 = collection.iterator();
                while (it2.hasNext()) {
                    arrayList.add(Range.makeRowRange(it2.next()));
                }
                refs.addAll((Refs) columnFamilyStore.selectAndReference(view -> {
                    HashSet newHashSet = Sets.newHashSet();
                    SSTableIntervalTree build = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
                    Iterator it3 = arrayList.iterator();
                    while (it3.hasNext()) {
                        Range range = (Range) it3.next();
                        for (SSTableReader sSTableReader : View.sstablesInBounds((PartitionPosition) range.left, (PartitionPosition) range.right, build)) {
                            if (!z || !sSTableReader.isRepaired()) {
                                newHashSet.add(sSTableReader);
                            }
                        }
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("ViewFilter for {}/{} sstables", Integer.valueOf(newHashSet.size()), Integer.valueOf(Iterables.size(view.select(SSTableSet.CANONICAL))));
                    }
                    return newHashSet;
                }).refs);
            }
            ArrayList arrayList2 = new ArrayList(refs.size());
            Iterator it3 = refs.iterator();
            while (it3.hasNext()) {
                SSTableReader sSTableReader = (SSTableReader) it3.next();
                long j2 = j;
                if (j == 0) {
                    j2 = sSTableReader.getSSTableMetadata().repairedAt;
                }
                arrayList2.add(new SSTableStreamingSections(refs.get(sSTableReader), sSTableReader.getPositionsForRanges(collection), sSTableReader.estimatedKeysForRanges(collection), j2));
            }
            return arrayList2;
        } catch (Throwable th) {
            refs.release();
            throw th;
        }
    }

    public synchronized void addTransferFiles(Collection<SSTableStreamingSections> collection) {
        failIfFinished();
        Iterator<SSTableStreamingSections> it2 = collection.iterator();
        while (it2.hasNext()) {
            SSTableStreamingSections next = it2.next();
            if (next.sections.isEmpty()) {
                next.ref.release();
                it2.remove();
            } else {
                UUID uuid = next.ref.get().metadata.cfId;
                StreamTransferTask streamTransferTask = this.transfers.get(uuid);
                if (streamTransferTask == null) {
                    StreamTransferTask streamTransferTask2 = new StreamTransferTask(this, uuid);
                    streamTransferTask = this.transfers.putIfAbsent(uuid, streamTransferTask2);
                    if (streamTransferTask == null) {
                        streamTransferTask = streamTransferTask2;
                    }
                }
                streamTransferTask.addTransferFile(next.ref, next.estimatedKeys, next.sections, next.repairedAt);
                it2.remove();
            }
        }
    }

    private synchronized void closeSession(State state) {
        if (this.isAborted.compareAndSet(false, true)) {
            state(state);
            if (state == State.FAILED) {
                Iterator it2 = Iterables.concat(this.receivers.values(), this.transfers.values()).iterator();
                while (it2.hasNext()) {
                    ((StreamTask) it2.next()).abort();
                }
            }
            this.handler.close();
            this.streamResult.handleSessionComplete(this);
        }
    }

    public void state(State state) {
        this.state = state;
    }

    public State state() {
        return this.state;
    }

    public boolean isSuccess() {
        return this.state == State.COMPLETE;
    }

    public void messageReceived(StreamMessage streamMessage) {
        switch (streamMessage.type) {
            case PREPARE:
                PrepareMessage prepareMessage = (PrepareMessage) streamMessage;
                prepare(prepareMessage.requests, prepareMessage.summaries);
                return;
            case FILE:
                receive((IncomingFileMessage) streamMessage);
                return;
            case RECEIVED:
                ReceivedMessage receivedMessage = (ReceivedMessage) streamMessage;
                received(receivedMessage.cfId, receivedMessage.sequenceNumber);
                return;
            case COMPLETE:
                complete();
                return;
            case SESSION_FAILED:
                sessionFailed();
                return;
            default:
                return;
        }
    }

    public void onInitializationComplete() {
        state(State.PREPARING);
        PrepareMessage prepareMessage = new PrepareMessage();
        prepareMessage.requests.addAll(this.requests);
        Iterator<StreamTransferTask> it2 = this.transfers.values().iterator();
        while (it2.hasNext()) {
            prepareMessage.summaries.add(it2.next().getSummary());
        }
        this.handler.sendMessage(prepareMessage);
        if (this.requests.isEmpty()) {
            startStreamingFiles();
        }
    }

    public void onError(Throwable th) {
        if (th instanceof SocketTimeoutException) {
            logger.error("[Stream #{}] Streaming socket timed out. This means the session peer stopped responding or is still processing received data. If there is no sign of failure in the other end or a very dense table is being transferred you may want to increase streaming_socket_timeout_in_ms property. Current value is {}ms.", planId(), Integer.valueOf(DatabaseDescriptor.getStreamingSocketTimeout()), th);
        } else {
            Logger logger2 = logger;
            Object[] objArr = new Object[4];
            objArr[0] = planId();
            objArr[1] = this.peer.getHostAddress();
            objArr[2] = this.peer.equals(this.connecting) ? "" : " through " + this.connecting.getHostAddress();
            objArr[3] = th;
            logger2.error("[Stream #{}] Streaming error occurred on session with peer {}{}", objArr);
        }
        if (this.handler.isOutgoingConnected()) {
            this.handler.sendMessage(new SessionFailedMessage());
        }
        closeSession(State.FAILED);
    }

    public void prepare(Collection<StreamRequest> collection, Collection<StreamSummary> collection2) {
        state(State.PREPARING);
        for (StreamRequest streamRequest : collection) {
            addTransferRanges(streamRequest.keyspace, streamRequest.ranges, streamRequest.columnFamilies, true, streamRequest.repairedAt);
        }
        Iterator<StreamSummary> it2 = collection2.iterator();
        while (it2.hasNext()) {
            prepareReceiving(it2.next());
        }
        if (!collection.isEmpty()) {
            PrepareMessage prepareMessage = new PrepareMessage();
            Iterator<StreamTransferTask> it3 = this.transfers.values().iterator();
            while (it3.hasNext()) {
                prepareMessage.summaries.add(it3.next().getSummary());
            }
            this.handler.sendMessage(prepareMessage);
        }
        if (maybeCompleted()) {
            return;
        }
        startStreamingFiles();
    }

    public void fileSent(FileMessageHeader fileMessageHeader) {
        long size = fileMessageHeader.size();
        StreamingMetrics.totalOutgoingBytes.inc(size);
        this.metrics.outgoingBytes.inc(size);
        StreamTransferTask streamTransferTask = this.transfers.get(fileMessageHeader.cfId);
        if (streamTransferTask != null) {
            streamTransferTask.scheduleTimeout(fileMessageHeader.sequenceNumber, 12L, TimeUnit.HOURS);
        }
    }

    public void receive(IncomingFileMessage incomingFileMessage) {
        long size = incomingFileMessage.header.size();
        StreamingMetrics.totalIncomingBytes.inc(size);
        this.metrics.incomingBytes.inc(size);
        this.handler.sendMessage(new ReceivedMessage(incomingFileMessage.header.cfId, incomingFileMessage.header.sequenceNumber));
        this.receivers.get(incomingFileMessage.header.cfId).received(incomingFileMessage.sstable);
    }

    public void progress(String str, ProgressInfo.Direction direction, long j, long j2) {
        this.streamResult.handleProgress(new ProgressInfo(this.peer, this.index, str, direction, j, j2));
    }

    public void received(UUID uuid, int i) {
        this.transfers.get(uuid).complete(i);
    }

    public synchronized void complete() {
        if (this.state != State.WAIT_COMPLETE) {
            state(State.WAIT_COMPLETE);
            this.handler.closeIncoming();
        } else {
            if (!this.completeSent) {
                this.handler.sendMessage(new CompleteMessage());
                this.completeSent = true;
            }
            closeSession(State.COMPLETE);
        }
    }

    public synchronized void sessionFailed() {
        logger.error("[Stream #{}] Remote peer {} failed stream session.", planId(), this.peer.getHostAddress());
        closeSession(State.FAILED);
    }

    public SessionInfo getSessionInfo() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<StreamReceiveTask> it2 = this.receivers.values().iterator();
        while (it2.hasNext()) {
            newArrayList.add(it2.next().getSummary());
        }
        ArrayList newArrayList2 = Lists.newArrayList();
        Iterator<StreamTransferTask> it3 = this.transfers.values().iterator();
        while (it3.hasNext()) {
            newArrayList2.add(it3.next().getSummary());
        }
        return new SessionInfo(this.peer, this.index, this.connecting, newArrayList, newArrayList2, this.state);
    }

    public synchronized void taskCompleted(StreamReceiveTask streamReceiveTask) {
        this.receivers.remove(streamReceiveTask.cfId);
        maybeCompleted();
    }

    public synchronized void taskCompleted(StreamTransferTask streamTransferTask) {
        this.transfers.remove(streamTransferTask.cfId);
        maybeCompleted();
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onJoin(InetAddress inetAddress, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void beforeChange(InetAddress inetAddress, EndpointState endpointState, ApplicationState applicationState, VersionedValue versionedValue) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onChange(InetAddress inetAddress, ApplicationState applicationState, VersionedValue versionedValue) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onAlive(InetAddress inetAddress, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onDead(InetAddress inetAddress, EndpointState endpointState) {
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRemove(InetAddress inetAddress) {
        logger.error("[Stream #{}] Session failed because remote peer {} has left.", planId(), this.peer.getHostAddress());
        closeSession(State.FAILED);
    }

    @Override // org.apache.cassandra.gms.IEndpointStateChangeSubscriber
    public void onRestart(InetAddress inetAddress, EndpointState endpointState) {
        logger.error("[Stream #{}] Session failed because remote peer {} was restarted.", planId(), this.peer.getHostAddress());
        closeSession(State.FAILED);
    }

    private boolean maybeCompleted() {
        boolean z = this.receivers.isEmpty() && this.transfers.isEmpty();
        if (z) {
            if (this.state == State.WAIT_COMPLETE) {
                if (!this.completeSent) {
                    this.handler.sendMessage(new CompleteMessage());
                    this.completeSent = true;
                }
                closeSession(State.COMPLETE);
            } else {
                this.handler.sendMessage(new CompleteMessage());
                this.completeSent = true;
                state(State.WAIT_COMPLETE);
                this.handler.closeOutgoing();
            }
        }
        return z;
    }

    private void flushSSTables(Iterable<ColumnFamilyStore> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<ColumnFamilyStore> it2 = iterable.iterator();
        while (it2.hasNext()) {
            arrayList.add(it2.next().forceFlush());
        }
        FBUtilities.waitOnFutures(arrayList);
    }

    private synchronized void prepareReceiving(StreamSummary streamSummary) {
        failIfFinished();
        if (streamSummary.files > 0) {
            this.receivers.put(streamSummary.cfId, new StreamReceiveTask(this, streamSummary.cfId, streamSummary.files, streamSummary.totalSize));
        }
    }

    private void startStreamingFiles() {
        this.streamResult.handleSessionPrepared(this);
        state(State.STREAMING);
        for (StreamTransferTask streamTransferTask : this.transfers.values()) {
            Collection<OutgoingFileMessage> fileMessages = streamTransferTask.getFileMessages();
            if (fileMessages.size() > 0) {
                this.handler.sendMessages(fileMessages);
            } else {
                taskCompleted(streamTransferTask);
            }
        }
    }

    static {
        $assertionsDisabled = !StreamSession.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) StreamSession.class);
    }
}
