package org.apache.kafka.tiered.storage.actions;

import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher;
import org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils;
import org.hamcrest.MatcherAssert;

/* loaded from: input_file:org/apache/kafka/tiered/storage/actions/ProduceAction.class */
public final class ProduceAction implements TieredStorageTestAction {
    private static final int OFFLOAD_WAIT_TIMEOUT_SEC = 10;
    private final TopicPartition topicPartition;
    private final List<OffloadedSegmentSpec> offloadedSegmentSpecs;
    private final List<ProducerRecord<String, String>> recordsToProduce;
    private final Integer batchSize;
    private final Long expectedEarliestLocalOffset;
    private final Serde<String> serde = Serdes.String();

    public ProduceAction(TopicPartition topicPartition, List<OffloadedSegmentSpec> list, List<ProducerRecord<String, String>> list2, Integer num, Long l) {
        this.topicPartition = topicPartition;
        this.offloadedSegmentSpecs = list;
        this.recordsToProduce = list2;
        this.batchSize = num;
        this.expectedEarliestLocalOffset = l;
    }

    @Override // org.apache.kafka.tiered.storage.TieredStorageTestAction
    public void doExecute(TieredStorageTestContext tieredStorageTestContext) throws InterruptedException, ExecutionException, TimeoutException {
        List<LocalTieredStorage> remoteStorageManagers = tieredStorageTestContext.remoteStorageManagers();
        List<BrokerLocalStorage> localStorages = tieredStorageTestContext.localStorages();
        List list = (List) this.offloadedSegmentSpecs.stream().map(offloadedSegmentSpec -> {
            return LocalTieredStorageCondition.expectEvent((Iterable<LocalTieredStorage>) remoteStorageManagers, LocalTieredStorageEvent.EventType.COPY_SEGMENT, offloadedSegmentSpec.getSourceBrokerId(), offloadedSegmentSpec.getTopicPartition(), Integer.valueOf(offloadedSegmentSpec.getBaseOffset()), false);
        }).collect(Collectors.toList());
        long longValue = tieredStorageTestContext.nextOffset(this.topicPartition).longValue();
        long longValue2 = tieredStorageTestContext.beginOffset(this.topicPartition).longValue();
        tieredStorageTestContext.produce(this.recordsToProduce, this.batchSize);
        if (!list.isEmpty()) {
            ((LocalTieredStorageCondition) list.stream().reduce((v0, v1) -> {
                return v0.and(v1);
            }).get()).waitUntilTrue(10L, TimeUnit.SECONDS);
        }
        long longValue3 = this.expectedEarliestLocalOffset.longValue() != -1 ? this.expectedEarliestLocalOffset.longValue() : ((longValue + this.recordsToProduce.size()) - (this.recordsToProduce.size() % tieredStorageTestContext.topicSpec(this.topicPartition.topic()).getMaxBatchCountPerSegment())) - 1;
        for (BrokerLocalStorage brokerLocalStorage : localStorages) {
            if (tieredStorageTestContext.isAssignedReplica(this.topicPartition, brokerLocalStorage.getBrokerId()) && tieredStorageTestContext.isActive(brokerLocalStorage.getBrokerId())) {
                brokerLocalStorage.waitForEarliestLocalOffset(this.topicPartition, Long.valueOf(longValue3));
            }
        }
        MatcherAssert.assertThat(tieredStorageTestContext.consume(this.topicPartition, Integer.valueOf(this.recordsToProduce.size()), Long.valueOf(longValue)), RecordsKeyValueMatcher.correspondTo(this.recordsToProduce, this.topicPartition, this.serde, this.serde));
        List<Record> tieredStorageRecords = TieredStorageTestUtils.tieredStorageRecords(tieredStorageTestContext, this.topicPartition);
        compareRecords(tieredStorageRecords.subList((int) (longValue - longValue2), tieredStorageRecords.size()), (List) this.offloadedSegmentSpecs.stream().flatMap(offloadedSegmentSpec2 -> {
            return offloadedSegmentSpec2.getRecords().stream();
        }).collect(Collectors.toList()), this.topicPartition);
    }

    @Override // org.apache.kafka.tiered.storage.TieredStorageTestAction
    public void describe(PrintStream printStream) {
        printStream.println("produce-records: " + this.topicPartition);
        this.recordsToProduce.forEach(producerRecord -> {
            printStream.println("    " + producerRecord);
        });
        this.offloadedSegmentSpecs.forEach(offloadedSegmentSpec -> {
            printStream.println("    " + offloadedSegmentSpec);
        });
    }

    private void compareRecords(List<Record> list, List<ProducerRecord<String, String>> list2, TopicPartition topicPartition) {
        MatcherAssert.assertThat(list, RecordsKeyValueMatcher.correspondTo(list2, topicPartition, this.serde, this.serde));
    }
}
