gRPC Client
Developing the gRPC Client and Implementing Endpoints.
Creating the gRPC Client
Creating the client is not complicated, as you'll directly call the methods on the server. To set up the client, you'll interact with pb.TaskServiceClient
. When you call a method on this instance, it invokes the corresponding method with the same name on the server. For example, calling GetTask
on the client will invoke the GetTask
function on the server.
Calling the CreateTask Endpoint
Create a directory named client
and a client.go
file inside it. Start by importing the necessary module from src/go
, then create a function to call the CreateTask
function:
package main
import (
"context"
pb "go-grpc-demo/src/go"
)
var createdTasks []string
func createTask(client pb.TaskServiceClient, createTaskRequest *pb.CreateTaskRequest) {
}
You can see that a createdTasks
slice is also defined, which stores the IDs of the created tasks so that you can call the other endpoints on existing tasks.
The method is relatively straightforward, as you just need to call the CreateTask
method on the client and pass it a Context
and a CreateTaskRequest
:
func createTask(client pb.TaskServiceClient, createTaskRequest *pb.CreateTaskRequest) {
log.Printf("Creating Task: { description: '%s', user_id: '%s', deadline: '%s' }", createTaskRequest.Description, createTaskRequest.UserId, createTaskRequest.Deadline)
createdTask, err := client.CreateTask(context.Background(), createTaskRequest)
if err == nil {
log.Println(createdTask)
createdTasks = append(createdTasks, createdTask.Id)
}
}
Here, context.Background()
has been used to provide an empty Context
. This ensures this request is never canceled and has no timeout.
Calling the GetTask Endpoint
Next, create a getTask
function that invokes the GetTask
endpoint, which simply calls the GetTask
method of client
:
func getTask(client pb.TaskServiceClient, getTaskRequest *pb.GetTaskRequest) {
task, err := client.GetTask(context.Background(), getTaskRequest)
if err == nil {
log.Println(task)
}
}
Calling the RecordTasks Endpoint
You'll now tackle your first streaming endpoint. Let's start with the RecordTasks
endpoint, a client-side streaming endpoint. Being a streaming endpoint, the client.RecordTasks
method only takes a Context
and returns a stream that can be used to send data to the server. Start by defining a runRecordTasks
function that creates between two and ten tasks (chosen randomly). For each task, the deadline is chosen randomly between the current date and a hundred days later. The appropriate CreateTaskRequest
entries are stored in a slice:
func runRecordTasks(client pb.TaskServiceClient) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
tasksCount := int(r.Int31n(10)) + 2
var createTaskRequests []*pb.CreateTaskRequest
for i := 0; i < tasksCount; i++ {
deadlineDaysAfter := int(r.Int31n(100))
createTaskRequests = append(createTaskRequests, &pb.CreateTaskRequest{
Description: "Task " + strconv.Itoa(i),
UserId: "1",
Deadline: timestamppb.New(time.Now().AddDate(0, 0, deadlineDaysAfter)),
})
}
}
Since this is a streaming endpoint, it's a good idea to use a context with a timeout so that the requests can be canceled in case they are not finished within a certain time:
log.Printf("Recording %d tasks", tasksCount)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Cancel after 10 seconds
defer cancel()
Call the RecordTasks
method on client
to receive a stream:
stream, err := client.RecordTasks(ctx)
if err != nil {
log.Fatalf("error %v", err)
}
Iterate over createTaskRequests
and send each CreateTaskRequest
to the stream. Finally, call CloseAndRecv
to close the stream and receive the TaskSummary
response from the server:
for _, createTaskRequest := range createTaskRequests {
if err := stream.Send(createTaskRequest); err != nil {
log.Fatalf("stream.Send(%v) failed: %v", createTaskRequest, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("error %v", err)
}
log.Printf("Tasks summary: %s tasks created", reply.NoOfTasksCreated)
Here's the full function:
func runRecordTasks(client pb.TaskServiceClient) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
tasksCount := int(r.Int31n(10)) + 2
var createTaskRequests []*pb.CreateTaskRequest
for i := 0; i < tasksCount; i++ {
deadlineDaysAfter := int(r.Int31n(100))
createTaskRequests = append(createTaskRequests, &pb.CreateTaskRequest{
Description: "Task " + strconv.Itoa(i),
UserId: "1",
Deadline: timestamppb.New(time.Now().AddDate(0, 0, deadlineDaysAfter)),
})
}
log.Printf("Recording %d tasks", tasksCount)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.RecordTasks(ctx)
if err != nil {
log.Fatalf("error %v", err)
}
for _, createTaskRequest := range createTaskRequests {
if err := stream.Send(createTaskRequest); err != nil {
log.Fatalf("stream.Send(%v) failed: %v", createTaskRequest, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("error %v", err)
}
log.Printf("Tasks summary: %s tasks created", reply.NoOfTasksCreated)
}
Calling the ListTasks Endpoint
The next streaming endpoint you'll call is ListTasks
, a server-side streaming endpoint. This time, calling client.ListTasks
will give you a stream through which you can call the Recv
method to read data:
func listTasks(client pb.TaskServiceClient, userId string, deadline string) {
log.Printf("Listing all tasks of User %s with deadline within %s", userId, deadline)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
deadlineTime, err := time.Parse(time.RFC3339, deadline)
if err != nil {
log.Fatalf("error parsing time: %v", err)
}
stream, err := client.ListTasks(ctx, &pb.ListTasksRequest{
UserId: userId,
Deadline: timestamppb.New(deadlineTime),
})
for {
task, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error %v", err)
}
log.Printf(
"Task: { Id: %s, Description: '%s', Status: '%s', Deadline: '%s' }",
task.Id, task.Description, task.Status, task.Deadline.AsTime().Format(time.RFC3339))
}
}
Calling the TaskChat Endpoint
The last call is the TaskChat
endpoint. Create a runTaskChat
function and prepare a bunch of TaskComment
s:
func runTaskChat(client pb.TaskServiceClient) {
comments := []*pb.TaskComment{
{UserId: "1", TaskId: createdTasks[0], Comment: "Comment 1", CreatedAt: timestamppb.Now()},
{UserId: "2", TaskId: createdTasks[0], Comment: "Comment 2", CreatedAt: timestamppb.Now()},
{UserId: "1", TaskId: createdTasks[0], Comment: "Comment 3", CreatedAt: timestamppb.Now()},
{UserId: "1", TaskId: createdTasks[0], Comment: "Comment 4", CreatedAt: timestamppb.Now()},
{UserId: "3", TaskId: createdTasks[0], Comment: "Comment 5", CreatedAt: timestamppb.Now()},
{UserId: "2", TaskId: createdTasks[0], Comment: "Comment 6", CreatedAt: timestamppb.Now()},
{UserId: "3", TaskId: createdTasks[0], Comment: "Comment 7", CreatedAt: timestamppb.Now()},
}
}
As with the other streaming endpoints, calling client.TaskChat
will give you a stream:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.TaskChat(ctx)
if err != nil {
log.Fatalf("error: %v", err)
}
To read the comments, you'll create a goroutine that will call the Recv
method of the stream and print the received comments:
wc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(wc)
return
}
if err != nil {
log.Fatalf("error: %v", err)
}
log.Printf("Got comment at task %s by user %s", in.TaskId, in.UserId)
}
}()
In the main thread, you'll use stream.Send
to send the comments to the server. Once finished, call the CloseSend
method to close the stream:
for _, comment := range comments {
if err := stream.Send(comment); err != nil {
log.Fatalf("Send failed %v", err)
}
}
stream.CloseSend()
<-wc
The whole function looks like this:
func runTaskChat(client pb.TaskServiceClient) {
comments := []*pb.TaskComment{
{UserId: "1", TaskId: createdTasks[0], Comment: "Comment 1", CreatedAt: timestamppb.Now()},
{UserId: "2", TaskId: createdTasks[0], Comment: "Comment 2", CreatedAt: timestamppb.Now()},
{UserId: "1", TaskId: createdTasks[0], Comment: "Comment 3", CreatedAt: timestamppb.Now()},
{UserId: "1", TaskId: createdTasks[0], Comment: "Comment 4", CreatedAt: timestamppb.Now()},
{UserId: "3", TaskId: createdTasks[0], Comment: "Comment 5", CreatedAt: timestamppb.Now()},
{UserId: "2", TaskId: createdTasks[0], Comment: "Comment 6", CreatedAt: timestamppb.Now()},
{UserId: "3", TaskId: createdTasks[0], Comment: "Comment 7", CreatedAt: timestamppb.Now()},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.TaskChat(ctx)
if err != nil {
log.Fatalf("error: %v", err)
}
wc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(wc)
return
}
if err != nil {
log.Fatalf("error: %v", err)
}
log.Printf("Got comment at task %s by user %s", in.TaskId, in.UserId)
}
}()
for _, comment := range comments {
if err := stream.Send(comment); err != nil {
log.Fatalf("Send failed %v", err)
}
}
stream.CloseSend()
<-wc
}