Vert.x gRPC Server

Vert.x gRPC Server is a new gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.

This server provides a gRPC request/response oriented API as well as a the generated stub approach with a service bridge.

Using Vert.x gRPC Server

To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-grpc-server</artifactId>
 <version>4.3.3</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-grpc-server:4.3.3'
}

gRPC request/response server API

The gRPC request/response server API provides an alternative way to interact with a client without the need of a generated stub.

A GrpcServer is a Handler<HttpServerRequest> and can be used as an HTTP server request handler.

GrpcServer grpcServer = GrpcServer.server(vertx);

HttpServer server = vertx.createHttpServer(options);

server
  .requestHandler(grpcServer)
  .listen();
Tip
a GrpcServer can be mounted in a Vert.x Web router

Request/response

Each service method is processed by a handler

server.callHandler(GreeterGrpc.getSayHelloMethod(), request -> {

  request.handler(hello -> {

    GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

    response.end(reply);
  });
});

Streaming request

You can set handlers to process request events

server.callHandler(StreamingGrpc.getSinkMethod(), request -> {
  request.handler(item -> {
    // Process item
  });
  request.endHandler(v ->{
    // No more items
    // Send the response
    request.response().end(Empty.getDefaultInstance());
  });
  request.exceptionHandler(err -> {
    // Something wrong happened
  });
});

Streaming response

A streaming response involves calling write for each element of the stream and using end to end the stream

server.callHandler(StreamingGrpc.getSourceMethod(), request -> {
  GrpcServerResponse<Empty, Item> response = request.response();
  request.handler(empty -> {
    for (int i = 0;i < 10;i++) {
      response.write(Item.newBuilder().setValue("1").build());
    }
    response.end();
  });
});

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response

server.callHandler(StreamingGrpc.getPipeMethod(), request -> {

  request.handler(item -> request.response().write(item));
  request.endHandler(v -> request.response().end());
});

Flow control

Request and response are back pressured Vert.x streams.

You can pause/resume/fetch a request

request.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  request.resume();
});

You can check the writability of a response and set a drain handler

if (response.writeQueueFull()) {
  response.drainHandler(v -> {
    // Writable again
  });
} else {
  response.write(item);
}

Compression

You can compress response messages by setting the response encoding prior before sending any message

response.encoding("gzip");

// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());

Decompression

Decompression is done transparently by the server when the client send encoded requests.

Stub API

The Vert.x gRPC Server can bridge a gRPC service to use with a generated server stub in a more traditional fashion

GrpcServer grpcServer = GrpcServer.server(vertx);

GreeterGrpc.GreeterImplBase service = new GreeterGrpc.GreeterImplBase() {
  @Override
  public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
    responseObserver.onNext(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
    responseObserver.onCompleted();
  }
};

// Bind the service bridge in the gRPC server
GrpcServiceBridge serverStub = GrpcServiceBridge.bridge(service);
serverStub.bind(grpcServer);

// Start the HTTP/2 server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

Message level API

The server provides a message level API to interact directly with protobuf encoded gRPC messages.

Tip
the server message level API can be used with the client message level API to write a gRPC reverse proxy

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.handler(protoHello -> {
      // Handle protobuf encoded hello
      performAsyncOperation(protoHello)
        .onSuccess(protoReply -> {
          // Reply with protobuf encoded reply
          request.response().end(protoReply);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case the message does not need to be decompressed and compressed again.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.messageHandler(helloMessage -> {

      // Can be identity or gzip
      String helloEncoding = helloMessage.encoding();

      // Handle hello message
      handleGrpcMessage(helloMessage)
        .onSuccess(replyMessage -> {
          // Reply with reply message

          // Can be identity or gzip
          String replyEncoding = replyMessage.encoding();

          // Send the reply
          request.response().endMessage(replyMessage);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed