package io.vertx.cassandra.impl;

import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.impl.InboundBuffer;

/* loaded from: input_file:io/vertx/cassandra/impl/CassandraRowStreamImpl.class */
public class CassandraRowStreamImpl implements CassandraRowStream {
    private final Context context;
    private final ResultSet resultSet;
    private final InboundBuffer<Row> internalQueue;
    private State state = State.IDLE;
    private int inFlight;
    private Handler<Row> handler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/cassandra/impl/CassandraRowStreamImpl$State.class */
    public enum State {
        IDLE,
        STARTED,
        EXHAUSTED,
        STOPPED
    }

    public CassandraRowStreamImpl(Context context, ResultSet resultSet) {
        this.context = context;
        this.resultSet = resultSet;
        this.internalQueue = new InboundBuffer(context).exceptionHandler(this::handleException).drainHandler(r3 -> {
            fetchRow();
        });
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public synchronized CassandraRowStream exceptionHandler(Handler<Throwable> handler) {
        if (this.state != State.STOPPED) {
            this.exceptionHandler = handler;
        }
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public synchronized CassandraRowStream handler(Handler<Row> handler) {
        if (this.state == State.STOPPED) {
            return this;
        }
        if (handler == null) {
            stop();
            this.context.runOnContext(r3 -> {
                handleEnd();
            });
        } else {
            this.handler = handler;
            this.internalQueue.handler(this::handleRow);
            if (this.state == State.IDLE) {
                this.state = State.STARTED;
                this.context.runOnContext(r32 -> {
                    fetchRow();
                });
            }
        }
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: pause */
    public synchronized CassandraRowStream mo3pause() {
        if (this.state != State.STOPPED) {
            this.internalQueue.pause();
        }
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: resume */
    public synchronized CassandraRowStream mo2resume() {
        if (this.state != State.STOPPED) {
            this.internalQueue.resume();
        }
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public synchronized CassandraRowStream endHandler(Handler<Void> handler) {
        if (this.state != State.STOPPED) {
            this.endHandler = handler;
        }
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: fetch */
    public synchronized CassandraRowStream mo1fetch(long j) {
        if (this.state != State.STOPPED) {
            this.internalQueue.fetch(j);
        }
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public ExecutionInfo executionInfo() {
        return this.resultSet.getExecutionInfo();
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public ColumnDefinitions columnDefinitions() {
        return this.resultSet.getColumnDefinitions();
    }

    private synchronized void fetchRow() {
        if (this.state == State.STOPPED) {
            return;
        }
        if (this.resultSet.remaining() > 0) {
            handleFetched(this.resultSet.one());
        } else if (this.resultSet.hasMorePages()) {
            this.resultSet.fetchNextPage().map(resultSet -> {
                return this.resultSet.one();
            }).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    handleFetched((Row) asyncResult.result());
                } else {
                    handleException(asyncResult.cause());
                }
            });
        } else {
            handleFetched(null);
        }
    }

    private synchronized void handleFetched(Row row) {
        if (this.state == State.STOPPED) {
            return;
        }
        if (row != null) {
            this.inFlight++;
            if (this.internalQueue.write(row)) {
                this.context.runOnContext(r3 -> {
                    fetchRow();
                });
                return;
            }
            return;
        }
        this.state = State.EXHAUSTED;
        if (this.inFlight == 0) {
            stop();
            handleEnd();
        }
    }

    private void handleRow(Row row) {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return;
            }
            this.inFlight--;
            this.handler.handle(row);
            synchronized (this) {
                if (this.state == State.EXHAUSTED && this.inFlight == 0) {
                    stop();
                    handleEnd();
                }
            }
        }
    }

    private void handleException(Throwable th) {
        Handler<Throwable> handler;
        synchronized (this) {
            if (this.state != State.STOPPED) {
                stop();
                handler = this.exceptionHandler;
            } else {
                handler = null;
            }
        }
        if (handler != null) {
            handler.handle(th);
        }
    }

    private synchronized void handleEnd() {
        Handler<Void> handler;
        handler = this.endHandler;
        if (handler != null) {
            handler.handle((Object) null);
        }
    }

    private synchronized void stop() {
        this.state = State.STOPPED;
        this.internalQueue.handler((Handler) null).drainHandler((Handler) null);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo0endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: handler */
    public /* bridge */ /* synthetic */ ReadStream mo4handler(Handler handler) {
        return handler((Handler<Row>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo6exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
