RxJava 2 API

The Rxified API supports RxJava 1 and RxJava 2, the following examples use RxJava 2.

Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();

// Execute the query
single.subscribe(result -> {
  System.out.println("Got " + result.size() + " rows ");
}, err -> {
  System.out.println("Failure: " + err.getMessage());
});

Streaming

RxJava 2 supports Observable and Flowable types, these are exposed using the RowStream that you can get from a PreparedQuery:

Observable<Row> observable = pool.rxBegin() // Cursors require a transaction
  .flatMapObservable(tx -> tx
    .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
    .flatMapObservable(preparedQuery -> {
      // Fetch 50 rows at a time
      RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
      return stream.toObservable();
    })
    // Commit the transaction after usage
    .doAfterTerminate(tx::commit));

// Then subscribe
observable.subscribe(row -> {
  System.out.println("User: " + row.getString("last_name"));
}, err -> {
  System.out.println("Error: " + err.getMessage());
}, () -> {
  System.out.println("End of stream");
});

The same example using Flowable:

Flowable<Row> flowable = pool.rxBegin()  // Cursors require a transaction
  .flatMapPublisher(tx -> tx.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
    .flatMapPublisher(preparedQuery -> {
      // Fetch 50 rows at a time
      RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
      return stream.toFlowable();
    })
    // Commit the transaction after usage
    .doAfterTerminate(tx::commit));

// Then subscribe
flowable.subscribe(new Subscriber<Row>() {

  private Subscription sub;

  @Override
  public void onSubscribe(Subscription subscription) {
    sub = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(Row row) {
    sub.request(1);
    System.out.println("User: " + row.getString("last_name"));
  }

  @Override
  public void onError(Throwable err) {
    System.out.println("Error: " + err.getMessage());
  }

  @Override
  public void onComplete() {
    System.out.println("End of stream");
  }
});

Transaction

The simplified transaction API allows to easily write transactional asynchronous flows:

Completable completable = pool
  .rxBegin()
  .flatMapCompletable(tx -> tx
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .rxExecute()
    .flatMap(result -> tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')").rxExecute())
    .flatMapCompletable(result -> tx.rxCommit()));

completable.subscribe(() -> {
  // Transaction succeeded
}, err -> {
  // Transaction failed
});