package io.zeebe.broker.job;

import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.test.EmbeddedBrokerRule;
import io.zeebe.exporter.record.Record;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.protocol.intent.Intent;
import io.zeebe.protocol.intent.JobBatchIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.test.broker.protocol.clientapi.ClientApiRule;
import io.zeebe.test.broker.protocol.clientapi.ExecuteCommandRequestBuilder;
import io.zeebe.test.broker.protocol.clientapi.PartitionTestClient;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

/* loaded from: input_file:io/zeebe/broker/job/JobTimeOutTest.class */
public class JobTimeOutTest {
    public EmbeddedBrokerRule brokerRule = new EmbeddedBrokerRule(new Consumer[0]);
    public ClientApiRule apiRule;

    @Rule
    public RuleChain ruleChain;
    private PartitionTestClient client;

    public JobTimeOutTest() {
        EmbeddedBrokerRule embeddedBrokerRule = this.brokerRule;
        embeddedBrokerRule.getClass();
        this.apiRule = new ClientApiRule(embeddedBrokerRule::getClientAddress);
        this.ruleChain = RuleChain.outerRule(this.brokerRule).around(this.apiRule);
    }

    @Before
    public void setup() {
        this.client = this.apiRule.partitionClient();
    }

    @Test
    public void shouldNotTimeOutIfDeadlineNotExceeded() {
        this.brokerRule.getClock().pinCurrentTime();
        Duration ofSeconds = Duration.ofSeconds(60L);
        createJob("jobType");
        this.apiRule.activateJobs(this.apiRule.getDefaultPartitionId(), "jobType", ofSeconds.toMillis()).await();
        this.client.receiveFirstJobEvent(JobIntent.ACTIVATED);
        this.brokerRule.getClock().addTime(ofSeconds.minus(Duration.ofSeconds(1L)));
        assertNoMoreJobsReceived(JobIntent.ACTIVATED);
    }

    @Test
    public void shouldNotTimeOutIfJobCompleted() {
        this.brokerRule.getClock().pinCurrentTime();
        Duration ofSeconds = Duration.ofSeconds(60L);
        createJob("jobType");
        this.apiRule.activateJobs(this.apiRule.getDefaultPartitionId(), "jobType", ofSeconds.toMillis()).await();
        this.client.completeJob(this.client.receiveFirstJobEvent(JobIntent.ACTIVATED).getKey(), "{}");
        this.brokerRule.getClock().addTime(ofSeconds.plus(Duration.ofSeconds(1L)));
        assertNoMoreJobsReceived(JobIntent.COMPLETED);
    }

    @Test
    public void shouldNotTimeOutIfJobFailed() {
        this.brokerRule.getClock().pinCurrentTime();
        Duration ofSeconds = Duration.ofSeconds(60L);
        createJob("jobType");
        this.apiRule.activateJobs(this.apiRule.getDefaultPartitionId(), "jobType", ofSeconds.toMillis()).await();
        this.client.failJob(this.client.receiveFirstJobEvent(JobIntent.ACTIVATED).getKey(), 0);
        this.brokerRule.getClock().addTime(ofSeconds.plus(Duration.ofSeconds(1L)));
        assertNoMoreJobsReceived(JobIntent.FAILED);
    }

    @Test
    public void shouldTimeOutJob() {
        long createJob = createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.apiRule.activateJobs(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 10L);
        this.client.receiveFirstJobEvent(JobIntent.ACTIVATED);
        this.brokerRule.getClock().addTime(JobStreamProcessor.TIME_OUT_POLLING_INTERVAL);
        this.client.receiveFirstJobEvent(JobIntent.TIME_OUT);
        this.apiRule.activateJobs(TypedStreamProcessorTest.STREAM_NAME);
        List list = (List) this.client.receiveJobs().limit(8L).collect(Collectors.toList());
        Assertions.assertThat(list).extracting(record -> {
            return Long.valueOf(record.getKey());
        }).contains(new Long[]{Long.valueOf(createJob)});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.getMetadata();
        }).extracting((v0) -> {
            return v0.getIntent();
        }).containsExactly(new Intent[]{JobIntent.CREATE, JobIntent.CREATED, JobIntent.ACTIVATED, JobIntent.TIME_OUT, JobIntent.TIMED_OUT, JobIntent.ACTIVATED});
    }

    @Test
    public void shouldSetCorrectSourcePositionAfterJobTimeOut() {
        createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.apiRule.activateJobs(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 10L);
        this.client.receiveFirstJobEvent(JobIntent.ACTIVATED);
        this.brokerRule.getClock().addTime(JobStreamProcessor.TIME_OUT_POLLING_INTERVAL);
        this.client.receiveFirstJobEvent(JobIntent.TIME_OUT);
        this.apiRule.activateJobs(TypedStreamProcessorTest.STREAM_NAME);
        Record record = (Record) this.client.receiveJobs().skipUntil(record2 -> {
            return record2.getMetadata().getIntent() == JobIntent.TIME_OUT;
        }).withIntent(JobIntent.ACTIVATED).getFirst();
        Record record3 = (Record) this.client.receiveFirstJobBatchCommands().withIntent(JobBatchIntent.ACTIVATE).getFirst();
        Assertions.assertThat(record.getSourceRecordPosition()).isNotEqualTo(record3.getPosition());
        Assertions.assertThat(record.getSourceRecordPosition()).isEqualTo(((Record) this.client.receiveFirstJobBatchCommands().withIntent(JobBatchIntent.ACTIVATE).skipUntil(record4 -> {
            return record4.getPosition() > record3.getPosition();
        }).findFirst().get()).getPosition());
    }

    @Test
    public void shouldExpireMultipleActivatedJobsAtOnce() {
        long createJob = createJob(TypedStreamProcessorTest.STREAM_NAME);
        long createJob2 = createJob(TypedStreamProcessorTest.STREAM_NAME);
        this.apiRule.activateJobs(this.apiRule.getDefaultPartitionId(), TypedStreamProcessorTest.STREAM_NAME, 10L);
        this.client.receiveJobs().withIntent(JobIntent.ACTIVATED).limit(2L).count();
        this.brokerRule.getClock().addTime(JobStreamProcessor.TIME_OUT_POLLING_INTERVAL);
        this.client.receiveFirstJobEvent(JobIntent.TIMED_OUT);
        this.apiRule.activateJobs(TypedStreamProcessorTest.STREAM_NAME);
        List list = (List) this.client.receiveJobs().limit(16L).collect(Collectors.toList());
        Assertions.assertThat(list).filteredOn(record -> {
            return record.getMetadata().getIntent() == JobIntent.ACTIVATED;
        }).hasSize(4).extracting(record2 -> {
            return Long.valueOf(record2.getKey());
        }).containsExactlyInAnyOrder(new Long[]{Long.valueOf(createJob), Long.valueOf(createJob2), Long.valueOf(createJob), Long.valueOf(createJob2)});
        Assertions.assertThat(list).filteredOn(record3 -> {
            return record3.getMetadata().getIntent() == JobIntent.TIMED_OUT;
        }).extracting(record4 -> {
            return Long.valueOf(record4.getKey());
        }).containsExactlyInAnyOrder(new Long[]{Long.valueOf(createJob), Long.valueOf(createJob2)});
    }

    private long createJob(String str) {
        return ((ExecuteCommandRequestBuilder) this.apiRule.createCmdRequest().type(ValueType.JOB, JobIntent.CREATE).command().put("type", str).put("retries", 3).done()).sendAndAwait().getKey();
    }

    private void assertNoMoreJobsReceived(Intent intent) {
        Assertions.assertThat(this.client.receiveJobs().skip(this.client.receiveJobs().limit(record -> {
            return record.getMetadata().getIntent() == intent;
        }).count()).exists()).isFalse();
    }
}
