package io.bitcoinsv.jcl.net.network.streams;

import io.bitcoinsv.jcl.net.network.PeerAddress;
import io.bitcoinsv.jcl.tools.events.EventBus;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/* loaded from: input_file:io/bitcoinsv/jcl/net/network/streams/PeerInputStreamImpl.class */
public abstract class PeerInputStreamImpl<I, R> implements PeerInputStream<R> {
    protected EventBus eventBus;
    protected PeerAddress peerAddress;
    protected PeerInputStream<I> source;

    public PeerInputStreamImpl(PeerAddress peerAddress, ExecutorService executorService, PeerInputStream<I> peerInputStream) {
        this.eventBus = EventBus.builder().executor(executorService).build();
        this.peerAddress = peerAddress;
        this.source = peerInputStream;
        if (peerInputStream != null) {
            linkSource(peerInputStream);
        }
    }

    public PeerInputStreamImpl(ExecutorService executorService, PeerInputStream<I> peerInputStream) {
        this(peerInputStream.getPeerAddress(), executorService, peerInputStream);
    }

    protected void linkSource(PeerInputStream<I> peerInputStream) {
        peerInputStream.onData(this::receiveAndTransform);
        peerInputStream.onClose(streamCloseEvent -> {
            this.eventBus.publish(streamCloseEvent);
        });
    }

    @Override // io.bitcoinsv.jcl.net.network.streams.PeerInputStream
    public PeerAddress getPeerAddress() {
        return this.peerAddress;
    }

    @Override // io.bitcoinsv.jcl.net.network.streams.PeerInputStream
    public StreamState getState() {
        return null;
    }

    @Override // io.bitcoinsv.jcl.net.network.streams.PeerInputStream
    public void onData(Consumer<? extends StreamDataEvent<R>> consumer) {
        this.eventBus.subscribe(StreamDataEvent.class, consumer);
    }

    @Override // io.bitcoinsv.jcl.net.network.streams.PeerInputStream
    public void onClose(Consumer<? extends StreamCloseEvent> consumer) {
        this.eventBus.subscribe(StreamCloseEvent.class, consumer);
    }

    @Override // io.bitcoinsv.jcl.net.network.streams.PeerInputStream
    public void onError(Consumer<? extends StreamErrorEvent> consumer) {
        this.eventBus.subscribe(StreamErrorEvent.class, consumer);
    }

    @Override // io.bitcoinsv.jcl.net.network.streams.PeerInputStream
    public void close(StreamCloseEvent streamCloseEvent) {
        this.eventBus.publish(new StreamCloseEvent());
    }

    protected synchronized void receiveAndTransform(StreamDataEvent<I> streamDataEvent) {
        try {
            List<StreamDataEvent<R>> transform = transform(streamDataEvent);
            if (transform != null) {
                transform.forEach(streamDataEvent2 -> {
                    this.eventBus.publish(streamDataEvent2);
                });
            }
        } catch (Throwable th) {
            this.eventBus.publish(new StreamErrorEvent(th));
        }
    }

    public abstract List<StreamDataEvent<R>> transform(StreamDataEvent<I> streamDataEvent);
}
