/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.queue.channel;

import java.io.File;
import net.openhft.affinity.AffinityLock;
import net.openhft.chronicle.bytes.SyncMode;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.ClosedIORuntimeException;
import net.openhft.chronicle.core.io.SimpleCloseable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.channel.PublishHandler;
import net.openhft.chronicle.queue.channel.SubscribeHandler;
import net.openhft.chronicle.queue.channel.impl.QueuesChannel;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.threads.TimingPauser;
import net.openhft.chronicle.wire.channel.AbstractHandler;
import net.openhft.chronicle.wire.channel.ChronicleChannel;
import net.openhft.chronicle.wire.channel.ChronicleChannelCfg;
import net.openhft.chronicle.wire.channel.ChronicleContext;
import net.openhft.chronicle.wire.channel.EventPoller;
import net.openhft.chronicle.wire.channel.impl.BufferedChronicleChannel;

public class PipeHandler
extends AbstractHandler<PipeHandler> {
    private String publish;
    private String subscribe;
    private SyncMode syncMode;
    private transient Thread tailerThread;

    static ChronicleQueue newQueue(ChronicleContext context, String subscribe, SyncMode syncMode) {
        File path = context.toFile(subscribe);
        return ChronicleQueue.singleBuilder(path).blockSize(OS.isSparseFileSupported() ? 0x8000000000L : 0x4000000L).syncMode(syncMode).build();
    }

    public String publish() {
        return this.publish;
    }

    public PipeHandler publish(String publish) {
        this.publish = publish;
        return this;
    }

    public String subscribe() {
        return this.subscribe;
    }

    public PipeHandler subscribe(String subscribe) {
        this.subscribe = subscribe;
        return this;
    }

    public SyncMode syncMode() {
        return this.syncMode;
    }

    public PipeHandler syncMode(SyncMode syncMode) {
        this.syncMode = syncMode;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(ChronicleContext context, ChronicleChannel channel) {
        TimingPauser pauser = Pauser.balanced();
        ChronicleQueue subscribeQ = PipeHandler.newQueue(context, this.subscribe, this.syncMode);
        if (channel instanceof BufferedChronicleChannel) {
            BufferedChronicleChannel bc = (BufferedChronicleChannel)channel;
            ExcerptTailer tailer = subscribeQ.createTailer();
            bc.eventPoller(new PHEventPoller(tailer));
        } else {
            this.tailerThread = new Thread(() -> {
                try (AffinityLock lock = context.affinityLock();){
                    SubscribeHandler.queueTailer(pauser, channel, subscribeQ);
                }
                catch (ClosedIORuntimeException e) {
                    Jvm.warn().on(PipeHandler.class, e.toString());
                }
                catch (Throwable t) {
                    Jvm.warn().on(PipeHandler.class, t);
                }
            }, "pipe~tailer");
            this.tailerThread.setDaemon(true);
            this.tailerThread.start();
        }
        Thread.currentThread().setName("pipe~reader");
        try (AffinityLock lock = context.affinityLock();){
            PublishHandler.copyFromChannelToQueue(channel, pauser, PipeHandler.newQueue(context, this.publish, this.syncMode), this.syncMode);
        }
        finally {
            if (this.tailerThread != null) {
                this.tailerThread.interrupt();
            }
        }
    }

    @Override
    public ChronicleChannel asInternalChannel(ChronicleContext context, ChronicleChannelCfg channelCfg) {
        return new QueuesChannel(channelCfg, this, PipeHandler.newQueue(context, this.publish, this.syncMode), PipeHandler.newQueue(context, this.subscribe, this.syncMode));
    }

    static class PHEventPoller
    extends SimpleCloseable
    implements EventPoller {
        private final ExcerptTailer tailer;

        public PHEventPoller(ExcerptTailer tailer) {
            this.tailer = tailer;
        }

        @Override
        public boolean onPoll(ChronicleChannel conn) {
            boolean wrote = false;
            while (SubscribeHandler.copyOneMessage(conn, this.tailer)) {
                wrote = true;
            }
            return wrote;
        }

        @Override
        protected void performClose() {
            Closeable.closeQuietly(this.tailer, this.tailer.queue());
            super.performClose();
        }
    }
}

