/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.channel;

import java.io.File;
import java.util.function.Consumer;
import java.util.function.Predicate;
import net.openhft.affinity.AffinityLock;
import net.openhft.chronicle.bytes.SyncMode;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.ClosedIORuntimeException;
import net.openhft.chronicle.core.io.SimpleCloseable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.channel.PublishHandler;
import net.openhft.chronicle.queue.channel.SubscribeHandler;
import net.openhft.chronicle.queue.channel.impl.QueuesChannel;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.threads.TimingPauser;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.channel.AbstractHandler;
import net.openhft.chronicle.wire.channel.ChronicleChannel;
import net.openhft.chronicle.wire.channel.ChronicleChannelCfg;
import net.openhft.chronicle.wire.channel.ChronicleContext;
import net.openhft.chronicle.wire.channel.EventPoller;
import net.openhft.chronicle.wire.channel.impl.BufferedChronicleChannel;

public class PipeHandler
extends AbstractHandler<PipeHandler> {
    private String publish;
    private String subscribe;
    private SyncMode syncMode;
    private transient Thread tailerThread;
    private Predicate<Wire> filter = null;
    private int publishSourceId = 0;
    private int subscribeSourceId = 0;
    private Consumer<ExcerptTailer> subscriptionIndexController = SubscribeHandler.NO_OP;

    static ChronicleQueue newQueue(ChronicleContext context, String queueName, SyncMode syncMode, int sourceId) {
        File path = context.toFile(queueName);
        return ChronicleQueue.singleBuilder(path).blockSize(OS.isSparseFileSupported() ? 0x8000000000L : 0x4000000L).sourceId(sourceId).syncMode(syncMode).build();
    }

    public String publish() {
        return this.publish;
    }

    public PipeHandler publish(String publish) {
        this.publish = publish;
        return this;
    }

    public String subscribe() {
        return this.subscribe;
    }

    public PipeHandler subscribe(String subscribe) {
        this.subscribe = subscribe;
        return this;
    }

    public SyncMode syncMode() {
        return this.syncMode;
    }

    public PipeHandler syncMode(SyncMode syncMode) {
        this.syncMode = syncMode;
        return this;
    }

    public Predicate<Wire> filter() {
        return this.filter;
    }

    public PipeHandler filter(Predicate<Wire> filter) {
        this.filter = filter;
        return this;
    }

    public int publishSourceId() {
        return this.publishSourceId;
    }

    public PipeHandler publishSourceId(int publishSourceId) {
        this.publishSourceId = publishSourceId;
        return this;
    }

    public PipeHandler subscribeSourceId(int subscribeSourceId) {
        this.subscribeSourceId = subscribeSourceId;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(ChronicleContext context, ChronicleChannel channel) {
        TimingPauser pauser = Pauser.balanced();
        try (ChronicleQueue subscribeQ = PipeHandler.newQueue(context, this.subscribe, this.syncMode, this.subscribeSourceId);){
            if (channel instanceof BufferedChronicleChannel) {
                BufferedChronicleChannel bc = (BufferedChronicleChannel)channel;
                ExcerptTailer tailer = subscribeQ.createTailer();
                tailer.singleThreadedCheckDisabled(true);
                this.subscriptionIndexController.accept(tailer);
                bc.eventPoller(new PHEventPoller(tailer, this.filter));
            } else {
                this.tailerThread = new Thread(() -> {
                    try (AffinityLock lock = context.affinityLock();){
                        SubscribeHandler.queueTailer(pauser, channel, subscribeQ, this.filter, this.subscriptionIndexController);
                    }
                    catch (ClosedIORuntimeException e) {
                        Jvm.warn().on(PipeHandler.class, e.toString());
                    }
                    catch (Throwable t) {
                        Jvm.warn().on(PipeHandler.class, t);
                    }
                }, "pipe~tailer");
                this.tailerThread.setDaemon(true);
                this.tailerThread.start();
            }
            Thread.currentThread().setName("pipe~reader");
            try (AffinityLock lock = context.affinityLock();){
                PublishHandler.copyFromChannelToQueue(channel, pauser, PipeHandler.newQueue(context, this.publish, this.syncMode, this.publishSourceId), this.syncMode);
            }
            finally {
                if (this.tailerThread != null) {
                    this.tailerThread.interrupt();
                }
            }
        }
    }

    @Override
    public ChronicleChannel asInternalChannel(ChronicleContext context, ChronicleChannelCfg<?> channelCfg) {
        return new QueuesChannel(channelCfg, this, PipeHandler.newQueue(context, this.publish, this.syncMode, this.publishSourceId), PipeHandler.newQueue(context, this.subscribe, this.syncMode, 0));
    }

    public PipeHandler subscriptionIndexController(Consumer<ExcerptTailer> subscriptionIndexController) {
        this.subscriptionIndexController = subscriptionIndexController;
        return this;
    }

    static class PHEventPoller
    extends SimpleCloseable
    implements EventPoller {
        private final ExcerptTailer tailer;
        private final Predicate<Wire> filter;

        public PHEventPoller(ExcerptTailer tailer, Predicate<Wire> filter) {
            this.tailer = tailer;
            this.filter = filter;
        }

        @Override
        public boolean onPoll(ChronicleChannel conn) {
            boolean wrote = false;
            while (SubscribeHandler.copyOneMessage(conn, this.tailer, this.filter)) {
                wrote = true;
            }
            return wrote;
        }

        @Override
        protected void performClose() {
            Closeable.closeQuietly(this.tailer, this.tailer.queue());
            super.performClose();
        }
    }
}

