package de.codecentric.boot.admin.server.services;

import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import java.util.logging.Level;
import javax.annotation.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/spring-boot-admin-server-2.1.6.jar:de/codecentric/boot/admin/server/services/AbstractEventHandler.class */
public abstract class AbstractEventHandler<T extends InstanceEvent> {
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Publisher<InstanceEvent> publisher;
    private final Class<T> eventType;

    @Nullable
    private Disposable subscription;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractEventHandler(Publisher<InstanceEvent> publisher, Class<T> cls) {
        this.publisher = publisher;
        this.eventType = cls;
    }

    public void start() {
        this.subscription = Flux.from(this.publisher).log(this.log.getName(), Level.FINEST, new SignalType[0]).doOnSubscribe(subscription -> {
            this.log.debug("Subscribed to {} events", this.eventType);
        }).ofType(this.eventType).cast(this.eventType).transform(this::handle).retryWhen(Retry.any().retryMax(Long.MAX_VALUE).doOnRetry(retryContext -> {
            this.log.warn("Unexpected error", retryContext.exception());
        })).subscribe();
    }

    protected abstract Publisher<Void> handle(Flux<T> flux);

    public void stop() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }
}
