package org.shoal.ha.cache.impl.command;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.shoal.ha.cache.api.DataStoreContext;
import org.shoal.ha.cache.api.DataStoreException;

/* loaded from: input_file:org/shoal/ha/cache/impl/command/ReplicationCommandTransmitter.class */
public class ReplicationCommandTransmitter<K, V> implements Runnable {
    private DataStoreContext<K, V> dsc;
    private String targetName;
    private AtomicInteger outgoingSeqno = new AtomicInteger();
    private AtomicInteger acknowledgedSeqno = new AtomicInteger();
    private ConcurrentLinkedQueue<Command<K, V>> list = new ConcurrentLinkedQueue<>();

    public void initialize(String str, DataStoreContext<K, V> dataStoreContext) {
        this.targetName = str;
        this.dsc = dataStoreContext;
    }

    public void addCommand(Command<K, V> command) {
        this.list.add(command);
        System.out.println("ReplicationCommandTransmitter[" + this.targetName + "] just  accumulated: " + ((int) command.getOpcode()));
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                Thread.sleep(15L);
                if (this.list.peek() != null) {
                    ReplicationFrame<K, V> replicationFrame = new ReplicationFrame<>((byte) -1, this.dsc.getServiceName(), this.dsc.getInstanceName());
                    Command<K, V> poll = this.list.poll();
                    while (poll != null) {
                        replicationFrame.addCommand(poll);
                        poll = this.list.poll();
                        if (replicationFrame.getCommands().size() >= 20) {
                            transmitFramePayload(replicationFrame);
                            replicationFrame = new ReplicationFrame<>((byte) -1, this.dsc.getServiceName(), this.dsc.getInstanceName());
                        }
                    }
                    if (replicationFrame.getCommands().size() > 0) {
                        transmitFramePayload(replicationFrame);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void transmitFramePayload(ReplicationFrame<K, V> replicationFrame) throws DataStoreException {
        replicationFrame.setSeqNo(this.outgoingSeqno.incrementAndGet());
        replicationFrame.setMinOutstandingPacketNumber(this.acknowledgedSeqno.get());
        replicationFrame.setTargetInstanceName(this.targetName);
        ReplicationFramePayloadCommand replicationFramePayloadCommand = new ReplicationFramePayloadCommand();
        replicationFramePayloadCommand.setReplicationFrame(replicationFrame);
        this.dsc.getCommandManager().execute(replicationFramePayloadCommand);
    }
}
