package io.zeebe.logstreams.processor;

import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.DisabledLogStreamWriter;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.snapshot.TimeBasedSnapshotPolicy;
import io.zeebe.logstreams.spi.SnapshotPolicy;
import io.zeebe.logstreams.spi.SnapshotPositionProvider;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.actor.ActorScheduler;
import java.time.Duration;
import java.util.Objects;

/* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorBuilder.class */
public class StreamProcessorBuilder {
    protected int id;
    protected String name;
    protected StreamProcessor streamProcessor;
    protected LogStream sourceStream;
    protected LogStream targetStream;
    protected ActorScheduler actorScheduler;
    protected SnapshotPolicy snapshotPolicy;
    protected SnapshotStorage snapshotStorage;
    protected SnapshotPositionProvider snapshotPositionProvider;
    protected LogStreamReader sourceLogStreamReader;
    protected LogStreamReader targetLogStreamReader;
    protected LogStreamWriter logStreamWriter;
    protected EventFilter eventFilter;
    protected EventFilter reprocessingEventFilter;
    protected StreamProcessorErrorHandler errorHandler;
    protected DeferredCommandContext streamProcessorCmdQueue;
    protected boolean readOnly;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorBuilder$DefaultErrorHandler.class */
    public static final class DefaultErrorHandler implements StreamProcessorErrorHandler {
        private DefaultErrorHandler() {
        }

        @Override // io.zeebe.logstreams.processor.StreamProcessorErrorHandler
        public boolean canHandle(Exception exc) {
            return false;
        }

        @Override // io.zeebe.logstreams.processor.StreamProcessorErrorHandler
        public boolean onError(LoggedEvent loggedEvent, Exception exc) {
            return false;
        }
    }

    public StreamProcessorBuilder(int i, String str, StreamProcessor streamProcessor) {
        this.id = i;
        this.name = str;
        this.streamProcessor = streamProcessor;
    }

    public StreamProcessorBuilder sourceStream(LogStream logStream) {
        this.sourceStream = logStream;
        return this;
    }

    public StreamProcessorBuilder targetStream(LogStream logStream) {
        this.targetStream = logStream;
        return this;
    }

    public StreamProcessorBuilder actorScheduler(ActorScheduler actorScheduler) {
        this.actorScheduler = actorScheduler;
        return this;
    }

    public StreamProcessorBuilder snapshotPolicy(SnapshotPolicy snapshotPolicy) {
        this.snapshotPolicy = snapshotPolicy;
        return this;
    }

    public StreamProcessorBuilder snapshotStorage(SnapshotStorage snapshotStorage) {
        this.snapshotStorage = snapshotStorage;
        return this;
    }

    public StreamProcessorBuilder snapshotPositionProvider(SnapshotPositionProvider snapshotPositionProvider) {
        this.snapshotPositionProvider = snapshotPositionProvider;
        return this;
    }

    public StreamProcessorBuilder streamProcessorCmdQueue(DeferredCommandContext deferredCommandContext) {
        this.streamProcessorCmdQueue = deferredCommandContext;
        return this;
    }

    public StreamProcessorBuilder eventFilter(EventFilter eventFilter) {
        this.eventFilter = eventFilter;
        return this;
    }

    public StreamProcessorBuilder readOnly(boolean z) {
        this.readOnly = z;
        return this;
    }

    public StreamProcessorBuilder reprocessingEventFilter(EventFilter eventFilter) {
        this.reprocessingEventFilter = eventFilter;
        return this;
    }

    public StreamProcessorBuilder errorHandler(StreamProcessorErrorHandler streamProcessorErrorHandler) {
        this.errorHandler = streamProcessorErrorHandler;
        return this;
    }

    protected void initContext() {
        Objects.requireNonNull(this.streamProcessor, "No stream processor provided.");
        Objects.requireNonNull(this.sourceStream, "No source stream provided.");
        Objects.requireNonNull(this.targetStream, "No target stream provided.");
        Objects.requireNonNull(this.actorScheduler, "No task scheduler provided.");
        Objects.requireNonNull(this.snapshotStorage, "No snapshot storage provided.");
        if (this.streamProcessorCmdQueue == null) {
            this.streamProcessorCmdQueue = new DeferredCommandContext(100);
        }
        if (this.snapshotPolicy == null) {
            this.snapshotPolicy = new TimeBasedSnapshotPolicy(Duration.ofMinutes(1L));
        }
        if (this.snapshotPositionProvider == null) {
            if (this.sourceStream.getPartitionId() == this.targetStream.getPartitionId() && this.sourceStream.getTopicName().equals(this.targetStream.getTopicName())) {
                this.snapshotPositionProvider = new LastProcessedEventPositionProvider();
            } else {
                this.snapshotPositionProvider = new LastWrittenEventPositionProvider();
            }
        }
        this.sourceLogStreamReader = new BufferedLogStreamReader();
        this.targetLogStreamReader = new BufferedLogStreamReader();
        if (this.readOnly) {
            this.logStreamWriter = new DisabledLogStreamWriter();
        } else {
            this.logStreamWriter = new LogStreamWriterImpl();
        }
        if (this.errorHandler == null) {
            this.errorHandler = new DefaultErrorHandler();
        }
    }

    public StreamProcessorController build() {
        initContext();
        StreamProcessorContext streamProcessorContext = new StreamProcessorContext();
        streamProcessorContext.setId(this.id);
        streamProcessorContext.setName(this.name);
        streamProcessorContext.setStreamProcessor(this.streamProcessor);
        streamProcessorContext.setStreamProcessorCmdQueue(this.streamProcessorCmdQueue);
        streamProcessorContext.setSourceStream(this.sourceStream);
        streamProcessorContext.setTargetStream(this.targetStream);
        streamProcessorContext.setTaskScheduler(this.actorScheduler);
        streamProcessorContext.setSourceLogStreamReader(this.sourceLogStreamReader);
        streamProcessorContext.setTargetLogStreamReader(this.targetLogStreamReader);
        streamProcessorContext.setLogStreamWriter(this.logStreamWriter);
        streamProcessorContext.setSnapshotPolicy(this.snapshotPolicy);
        streamProcessorContext.setSnapshotStorage(this.snapshotStorage);
        streamProcessorContext.setSnapshotPositionProvider(this.snapshotPositionProvider);
        streamProcessorContext.setEventFilter(this.eventFilter);
        streamProcessorContext.setReprocessingEventFilter(this.reprocessingEventFilter);
        streamProcessorContext.setReadOnly(this.readOnly);
        streamProcessorContext.setErrorHandler(this.errorHandler);
        return new StreamProcessorController(streamProcessorContext);
    }
}
