package stream.io;

import java.util.ArrayList;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.SequenceID;

/* loaded from: input_file:stream/io/OrderedQueue.class */
public class OrderedQueue implements Queue {
    static Logger log = LoggerFactory.getLogger(OrderedQueue.class);
    protected String id;
    protected Data nextItem;
    protected String sequenceKey = "@source:item";
    protected boolean closed = false;
    protected Integer limit = 1000;
    protected Object lock = new Object();
    protected SequenceID nextOut = new SequenceID();
    protected ArrayList<Data> queue = new ArrayList<>(this.limit.intValue());

    @Override // stream.io.Barrel
    public int clear() {
        return 0;
    }

    @Override // stream.io.Sink
    public String getId() {
        return this.id;
    }

    @Override // stream.io.Sink
    public boolean write(Data data) throws Exception {
        synchronized (this.lock) {
            if (this.closed) {
                log.error("Failed to write to closed ordered-queue {}", getId());
                return false;
            }
            try {
                SequenceID sequenceID = (SequenceID) data.get(this.sequenceKey);
                if (sequenceID == null) {
                    throw new Exception("Item does not provide sequence-id - required for ordered queue insertion!");
                }
                if (this.nextOut.compareTo(sequenceID) == 0) {
                    this.nextItem = data;
                    this.lock.notifyAll();
                    return true;
                }
                for (int i = 0; i < this.queue.size(); i++) {
                    Data data2 = this.queue.get(i);
                    if (data2 != null && ((SequenceID) data2.get(this.sequenceKey)).compareTo(sequenceID) > 0) {
                        this.queue.add(i, data);
                        return true;
                    }
                }
                this.queue.add(data);
                return true;
            } catch (Exception e) {
                log.error("Failed to determine sequence ID from item at key '{}': {}", this.sequenceKey, e.getMessage());
                if (log.isDebugEnabled()) {
                    e.printStackTrace();
                }
                throw e;
            }
        }
    }

    @Override // stream.io.Sink
    public void setId(String str) {
        this.id = str;
    }

    @Override // stream.io.Sink
    public void init() throws Exception {
    }

    @Override // stream.io.Source
    public Data read() throws Exception {
        synchronized (this.lock) {
            log.debug("nextItem: {}", this.nextItem);
            while (!this.closed && this.nextItem == null) {
                this.lock.wait();
            }
            if (this.closed && this.nextItem == null && this.queue.isEmpty()) {
                return null;
            }
            Data data = this.nextItem;
            log.debug("Returning item: {}", data);
            this.nextOut.increment();
            log.debug("Next SequenceID is: {}", this.nextOut);
            this.nextItem = findNext();
            return data;
        }
    }

    private Data findNext() {
        log.debug("looking for next item with ID '{}'", this.nextOut);
        for (int i = 0; i < this.queue.size(); i++) {
            Data data = this.queue.get(i);
            SequenceID sequenceID = (SequenceID) data.get(this.sequenceKey);
            if (sequenceID.compareTo(this.nextOut) == 0) {
                this.nextItem = data;
                this.queue.remove(i);
                return this.nextItem;
            }
            if (sequenceID.compareTo(this.nextOut) > 0) {
                return null;
            }
            this.queue.remove(i);
        }
        return null;
    }

    @Override // stream.io.Sink
    public void close() throws Exception {
        this.closed = true;
        synchronized (this.lock) {
            this.lock.notifyAll();
        }
    }

    @Override // stream.io.Queue
    public Integer getSize() {
        return this.limit;
    }

    @Override // stream.io.Sink
    public boolean write(Collection<Data> collection) throws Exception {
        return false;
    }

    @Override // stream.io.Queue
    public void setCapacity(Integer num) {
        this.limit = num;
        this.queue = new ArrayList<>(num.intValue());
    }

    @Override // stream.io.Queue
    public Integer getCapacity() {
        return this.limit;
    }
}
