package org.apache.flink.runtime.state.changelog.inmemory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.class */
class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryStateChangelogHandle> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) InMemoryStateChangelogWriter.class);
    private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup = new HashMap();
    private long sqn = 0;
    private boolean closed;

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void append(int i, byte[] bArr) {
        Preconditions.checkState(!this.closed, "LogWriter is closed");
        LOG.trace("append, keyGroup={}, {} bytes", Integer.valueOf(i), Integer.valueOf(bArr.length));
        NavigableMap<SequenceNumber, byte[]> computeIfAbsent = this.changesByKeyGroup.computeIfAbsent(Integer.valueOf(i), num -> {
            return new TreeMap();
        });
        long j = this.sqn + 1;
        this.sqn = j;
        computeIfAbsent.put(SequenceNumber.of(j), bArr);
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public SequenceNumber lastAppendedSequenceNumber() {
        return SequenceNumber.of(this.sqn);
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber sequenceNumber) {
        LOG.debug("Persist after {}", sequenceNumber);
        Preconditions.checkNotNull(sequenceNumber);
        return CompletableFuture.completedFuture(new InMemoryStateChangelogHandle(collectChanges(sequenceNumber)));
    }

    private Map<Integer, List<byte[]>> collectChanges(SequenceNumber sequenceNumber) {
        return (Map) this.changesByKeyGroup.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new ArrayList(((NavigableMap) entry.getValue()).tailMap(sequenceNumber, true).values());
        }));
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter, java.lang.AutoCloseable
    public void close() {
        Preconditions.checkState(!this.closed);
        this.closed = true;
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void truncate(SequenceNumber sequenceNumber) {
        this.changesByKeyGroup.forEach((num, navigableMap) -> {
        });
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void confirm(SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void reset(SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2) {
        throw new UnsupportedOperationException();
    }
}
