One of the advantages gRPC offers over REST based services is the streaming bi-directional communication. Traditional implementations which depend on REST APIs often implement Web Sockets to enable real bi-directional streaming of data packets. I said real because it is still possible to simulate streaming behavior using REST, which of course is not performant, and makes little sense.
gRPC communication patterns are of 4 kinds – Unary, Server streaming, Client streaming, and Bi-directional streaming. Depending on the communication scenario you are trying to code in your distributed system, these patterns cover all the aspects quite effectively.
In this post, we will understand these patterns with the help of the examples and diagrams. If you are not familiar with gRPC, check out this post – Intro to gRPC and Protocol Buffers using Go – which provides a quick overview of gRPC with a step-by-step instructions on how to set up gRPC in Golang based microservices. The example discussed in that post involves a calculator server and a client that consumes the arithmetic functions exposed by the calculator server. This post extends the analogy to demonstrate gRPC communication patterns.
Access the complete code example here. The code also implements SSL based auth to secure the communication between client and server.
The Protobuf implementation
To use various gRPC communication patterns, you first need to declare those in the proto file. We are dealing with data streams in request and response for all the communication patterns except for Unary. Thankfully, it is not very complex to implement the grpc logic for such streaming services, as protoc compiler takes care of the same. The file below represents a Calculator server that implements 4 types of communication methods.
Copied!syntax = "proto3"; package calc; option go_package = "ldtgrpc03/proto"; service Calculator { // Unary RPC rpc Add(AddRequest) returns (AddResponse) Array // Server Streaming RPC rpc GenerateNumbers(GenerateRequest) returns (stream NumberResponse) Array // Client Streaming RPC rpc ComputeAverage(stream NumberRequest) returns (AverageResponse) Array // Bidirectional Streaming RPC rpc ProcessNumbers(stream NumberRequest) returns (stream NumberResponse) Array } message AddRequest { int64 num1 = 1; int64 num2 = 2; } message AddResponse { int64 result = 1; } message GenerateRequest { int64 limit = 1; } message NumberResponse { int64 number = 1; } message NumberRequest { int64 number = 1; } message AverageResponse { double result = 1; }
The code above is self explanatory, however here are some notable points. The protoc compiler uses this file to generate the native Go code. Protoc generates the underlying code to enable the normal and streaming RPC communication. It only leaves the server implementation of these methods up to us – which makes sense.
It is quite crucial to configure this proto file to correctly define the inbound and outbound streams for various functions exposed by the server and consumed by the clients. When generating the native Go code using the proto file below, it automatically uses appropriate grpc modules as seen in the code snippet from calc_grpc.pb.go file below.
Copied!func (UnimplementedCalculatorServer) Add(context.Context, *AddRequest) (*AddResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Add not implemented") } func (UnimplementedCalculatorServer) GenerateNumbers(*GenerateRequest, grpc.ServerStreamingServer[NumberResponse]) error { return status.Errorf(codes.Unimplemented, "method GenerateNumbers not implemented") } func (UnimplementedCalculatorServer) ComputeAverage(grpc.ClientStreamingServer[NumberRequest, AverageResponse]) error { return status.Errorf(codes.Unimplemented, "method ComputeAverage not implemented") } func (UnimplementedCalculatorServer) ProcessNumbers(grpc.BidiStreamingServer[NumberRequest, NumberResponse]) error { return status.Errorf(codes.Unimplemented, "method ProcessNumbers not implemented") }
From this point onward, it is assumed that the native Go code is generated. For more details, you can refer to this blog post, or for a full code example refer this.
Unary
Unary communication is the simplest form of the gRPC communication patterns between the client and server. The example in the introductory post actually implements the unary pattern between client and server to fetch the result of the Add() function from the server. Let us understand it here for the sake of completeness.
The diagram above represents how gRPC client and server communicates using a unary pattern. It is similar to REST API requests, except gRPC is much more performant. Client calls the Add() function using the server interface exposed gRPC – which is equivalent to client sending a single request to the server. The server processes the request, and responds back with a single response. This is a Unary communication pattern in gRPC.
In the example below, the server implements the Add() function as shown below. It simply returns a response by calculating the addition of the 2 numbers sent as parameters to it.
Copied!// Unary RPC - Server func (s *server) Add(ctx context.Context, req *pb.AddRequest) (*pb.AddResponse, error) { result := req.Num1 + req.Num2 return &pb.AddResponse{Result: result}, nil }
Below is the corresponding client code that calls the Add() function and passes numbers 10 and 20. This is as good as calling the Add() function locally. In reality, the calculator server implements the Add() function, and the code below simply calls (sends a request) the server code.
Copied!func unaryExample(client pb.CalculatorClient) { ctx := context.Background() resp, err := client.Add(ctx, &pb.AddRequest{Num1: 10, Num2: 20}) if err != nil { log.Fatalf("could not add: %v", err) } log.Printf("Sum: %d", resp.Result) }
This is the Unary communication pattern, perhaps the most basic implementation of gRPC. Below is the output of the Unary communication pattern from above example.
Server Streaming
The server streaming pattern caters to the scenarios where the server is required to stream data back to the clients. In the example below, the calculator server exposes a function to generate numbers – GenerateNumbers() – which generates a set of numbers when the client requests it. As a client developer, you just have to call the GenerateNumbers() function on the gRPC interface to call this procedure. The server then responds with a stream of numbers as depicted in the diagram below.
Below code shows the implementation of GenerateNumbers() function on the server. It accepts the “Limit” parameter from the client calling this function to limit the number of numbers generated. Note that this “Limit” is set to 1 by default in the calc.proto file above. The function responds back to the stream for each number generated – simulating the server streaming response.
Copied!// Server Streaming RPC func (s *server) GenerateNumbers(req *pb.GenerateRequest, stream pb.Calculator_GenerateNumbersServer) error { for i := int64(0); i < req.Limit; i++ { if err := stream.Send(&pb.NumberResponse{Number: i}); err != nil { return err } println("Sent number: ", i) time.Sleep(500 * time.Millisecond) } return nil }
On the client side – code below – you can see that it is simply calling the GenerateNumbers() function using gRPC client. As soon as the server starts streaming the response back, the client processes the same by printing the numbers to the console in an infinite for loop using stream.Recv() function.
Copied!func serverStreamingExample(client pb.CalculatorClient) { ctx := context.Background() stream, err := client.GenerateNumbers(ctx, &pb.GenerateRequest{Limit: 5}) if err != nil { log.Fatalf("error calling GenerateNumbers: %v", err) } for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatalf("error receiving: %v", err) } log.Printf("Received number: %d", resp.Number) } }
Below is the output of the server streaming communication pattern.
Client Streaming
The client streaming communication pattern is used where clients need to stream data to the server for processing. This is still a uni-directional streaming communication as discussed in the previous section. In the example being discussed, the calculator server exposes a function to calculate the average value of all the numbers sent by the client in the form of a stream. The diagram below shows an overview of client streaming gRPC communication.
The code below shows the server implementation of ComputeAverage() function, that accepts a stream of input from the client, and responds with an average value of all the numbers received.
Copied!// Client Streaming RPC func (s *server) ComputeAverage(stream pb.Calculator_ComputeAverageServer) error { var sum int64 var count int64 for { req, err := stream.Recv() if err != nil { return stream.SendAndClose(&pb.AverageResponse{ Result: float64(sum) / float64(count), }) } println("Received number: ", req.Number) sum += req.Number count++ } }
The client function utilizes a stream object provisioned by the gRPC framework to send the numbers. Note that this stream object is not provided by the server implementation of the ComputeAverage() function above. Instead the gRPC framework provisions it – i.e. when we use protoc compiler to compile the calc.proto file. Refer to the first section of this blog to know more.
Copied!func clientStreamingExample(client pb.CalculatorClient) { ctx := context.Background() stream, err := client.ComputeAverage(ctx) if err != nil { log.Fatalf("error calling ComputeAverage: %v", err) } numbers := []int64{1, 2, 3, 4, 5} for _, num := range numbers { if err := stream.Send(&pb.NumberRequest{Number: num}); err != nil { log.Fatalf("error sending: %v", err) } time.Sleep(500 * time.Millisecond) } resp, err := stream.CloseAndRecv() if err != nil { log.Fatalf("error receiving response: %v", err) } log.Printf("Average: %.2f", resp.Result) }
Below is the output of client streaming communication pattern.
Bidirectional Streaming
By now, you should have been familiar with how client and server code use stream objects to send and receive streaming data. Bidirectional streaming as the name suggests, caters to the scenario where servers and clients need to stream data simultaneously without waiting for any completion on either side.
In the example being discussed, the server implements a bidirectional stream function to accept a stream of numbers and respond back with a stream of numbers which are twice the incoming numbers. Client sends a stream of numbers and server responds without waiting for the stream from client to be completed. The diagram below represents the same.
gRPC takes care of most of the native Go code required, this extensively simplifies the implementation of streaming logic on both server and client. The ProcessNumbers() function code below shows how the server simultaneously receives and sends the processed data back on the same stream to the client.
Copied!// Bidirectional Streaming RPC func (s *server) ProcessNumbers(stream pb.Calculator_ProcessNumbersServer) error { for { req, err := stream.Recv() if err != nil { return nil } // Process the number (multiply by 2) and send it back result := req.Number * 2 if err := stream.Send(&pb.NumberResponse{Number: result}); err != nil { return err } println("Received number: ", req.Number) } }
On the client side, the code below initiates the stream by sending numbers to the server for processing. At the same time it listens to the same stream to hear the processed responses back from the server. The client code implements certain Go language specific nuances – if you are not familiar with Go, you can ignore them.
Copied!func bidirectionalStreamingExample(client pb.CalculatorClient) { ctx := context.Background() stream, err := client.ProcessNumbers(ctx) if err != nil { log.Fatalf("error calling ProcessNumbers: %v", err) } waitc := make(chan structArray) // Send numbers go func() { numbers := []int64{1, 2, 3, 4, 5} for _, num := range numbers { if err := stream.Send(&pb.NumberRequest{Number: num}); err != nil { log.Fatalf("error sending: %v", err) } time.Sleep(500 * time.Millisecond) } stream.CloseSend() }() // Receive processed numbers go func() { for { resp, err := stream.Recv() if err == io.EOF { close(waitc) return } if err != nil { log.Fatalf("error receiving: %v", err) } log.Printf("Received processed number: %d", resp.Number) } }() <-waitc }
Below is the output.
That’s it for this blog post. If you enjoy reading all of this, then I would encourage you to subscribe to my newsletter. I write about CNCF tools, system design, architecture, product dev and AI.
Leave a Reply