gRPC

gRPC adalah framework RPC (Remote Procedure Call) modern yang dikembangkan oleh Google. gRPC menggunakan Protocol Buffers sebagai Interface Definition Language (IDL).

Contoh Masalah

Bagaimana cara:

  1. Setup gRPC server
  2. Define services
  3. Implement client
  4. Handle streaming

Penyelesaian

Pertama, buat file proto:

user.proto:

syntax = "proto3";

package user;
option go_package = "pkg/user";

import "google/protobuf/timestamp.proto";

// User service definition
service UserService {
    // Unary RPC
    rpc GetUser (GetUserRequest) returns (User) {}
    
    // Server streaming
    rpc ListUsers (ListUsersRequest) returns (stream User) {}
    
    // Client streaming
    rpc CreateUsers (stream CreateUserRequest) returns (CreateUsersResponse) {}
    
    // Bidirectional streaming
    rpc ChatStream (stream ChatMessage) returns (stream ChatMessage) {}
}

// Message definitions
message User {
    string id = 1;
    string name = 2;
    string email = 3;
    google.protobuf.Timestamp created_at = 4;
    repeated string roles = 5;
}

message GetUserRequest {
    string id = 1;
}

message ListUsersRequest {
    int32 page_size = 1;
    string page_token = 2;
}

message CreateUserRequest {
    string name = 1;
    string email = 2;
}

message CreateUsersResponse {
    repeated string user_ids = 1;
}

message ChatMessage {
    string user_id = 1;
    string content = 2;
    google.protobuf.Timestamp timestamp = 3;
}

Server implementation:

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "sync"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/protobuf/types/known/timestamppb"
)

// 1. Server structure
type userServer struct {
    pb.UnimplementedUserServiceServer
    mu    sync.RWMutex
    users map[string]*pb.User
}

func newUserServer() *userServer {
    return &userServer{
        users: make(map[string]*pb.User),
    }
}

// 2. Unary RPC implementation
func (s *userServer) GetUser(ctx context.Context,
    req *pb.GetUserRequest) (*pb.User, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()

    user, exists := s.users[req.Id]
    if !exists {
        return nil, status.Errorf(codes.NotFound,
            "user not found: %s", req.Id)
    }

    return user, nil
}

// 3. Server streaming implementation
func (s *userServer) ListUsers(req *pb.ListUsersRequest,
    stream pb.UserService_ListUsersServer) error {
    s.mu.RLock()
    defer s.mu.RUnlock()

    for _, user := range s.users {
        if err := stream.Send(user); err != nil {
            return err
        }
    }

    return nil
}

// 4. Client streaming implementation
func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
    var userIDs []string

    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return stream.SendAndClose(&pb.CreateUsersResponse{
                UserIds: userIDs,
            })
        }
        if err != nil {
            return err
        }

        // Create user
        id := fmt.Sprintf("user_%d", time.Now().UnixNano())
        user := &pb.User{
            Id:        id,
            Name:      req.Name,
            Email:     req.Email,
            CreatedAt: timestamppb.Now(),
        }

        s.mu.Lock()
        s.users[id] = user
        s.mu.Unlock()

        userIDs = append(userIDs, id)
    }
}

// 5. Bidirectional streaming implementation
func (s *userServer) ChatStream(stream pb.UserService_ChatStreamServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        // Validate user
        s.mu.RLock()
        _, exists := s.users[in.UserId]
        s.mu.RUnlock()

        if !exists {
            return status.Errorf(codes.NotFound,
                "user not found: %s", in.UserId)
        }

        // Echo message back with timestamp
        out := &pb.ChatMessage{
            UserId:    in.UserId,
            Content:   in.Content,
            Timestamp: timestamppb.Now(),
        }

        if err := stream.Send(out); err != nil {
            return err
        }
    }
}

// 6. Server setup
func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    // Create gRPC server
    s := grpc.NewServer(
        grpc.UnaryInterceptor(unaryInterceptor),
        grpc.StreamInterceptor(streamInterceptor),
    )
    
    // Register service
    pb.RegisterUserServiceServer(s, newUserServer())

    log.Println("Server starting on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

// 7. Interceptors
func unaryInterceptor(ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler) (interface{}, error) {
    start := time.Now()
    
    // Add authentication check
    if err := authenticate(ctx); err != nil {
        return nil, err
    }

    // Handle request
    resp, err := handler(ctx, req)
    
    // Log request
    log.Printf("Method: %s Duration: %v Error: %v\n",
        info.FullMethod,
        time.Since(start),
        err)
    
    return resp, err
}

func streamInterceptor(srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler) error {
    start := time.Now()

    // Wrap stream for authentication
    wrappedStream := newWrappedStream(ss)
    
    // Handle stream
    err := handler(srv, wrappedStream)
    
    // Log request
    log.Printf("Method: %s Duration: %v Error: %v\n",
        info.FullMethod,
        time.Since(start),
        err)
    
    return err
}

// 8. Authentication
func authenticate(ctx context.Context) error {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return status.Error(codes.Unauthenticated,
            "missing metadata")
    }

    token := md.Get("authorization")
    if len(token) == 0 {
        return status.Error(codes.Unauthenticated,
            "missing token")
    }

    // Validate token (simplified)
    if token[0] != "valid-token" {
        return status.Error(codes.Unauthenticated,
            "invalid token")
    }

    return nil
}

Client implementation:

```go
package main

import (
    "context"
    "io"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
)

func main() {
    // Connect to server
    conn, err := grpc.Dial("localhost:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    // Create client
    client := pb.NewUserServiceClient(conn)

    // Create context with metadata
    ctx := metadata.NewOutgoingContext(context.Background(),
        metadata.Pairs("authorization", "valid-token"))

    // Example 1: Unary RPC
    user, err := client.GetUser(ctx, &pb.GetUserRequest{
        Id: "user_1",
    })
    if err != nil {
        log.Printf("GetUser error: %v", err)
    } else {
        log.Printf("Got user: %v", user)
    }

    // Example 2: Server streaming
    stream, err := client.ListUsers(ctx, &pb.ListUsersRequest{
        PageSize: 10,
    })
    if err != nil {
        log.Fatalf("ListUsers error: %v", err)
    }

    for {
        user, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("ListUsers stream error: %v", err)
            break
        }
        log.Printf("Received user: %v", user)
    }

    // Example 3: Client streaming
    createStream, err := client.CreateUsers(ctx)
    if err != nil {
        log.Fatalf("CreateUsers error: %v", err)
    }

    users := []struct{ name, email string }{
        {"Alice", "alice@example.com"},
        {"Bob", "bob@example.com"},
        {"Charlie", "charlie@example.com"},
    }

    for _, u := range users {
        if err := createStream.Send(&pb.CreateUserRequest{
            Name:  u.name,
            Email: u.email,
        }); err != nil {
            log.Printf("CreateUsers send error: %v", err)
            break
        }
    }

    resp, err := createStream.CloseAndRecv()
    if err != nil {
        log.Printf("CreateUsers close error: %v", err)
    } else {
        log.Printf("Created users: %v", resp.UserIds)
    }

    // Example 4: Bidirectional streaming
    chatStream, err := client.ChatStream(ctx)
    if err != nil {
        log.Fatalf("ChatStream error: %v", err)
    }

    // Send messages
    go func() {
        messages := []string{
            "Hello!",
            "How are you?",
            "Goodbye!",
        }

        for _, msg := range messages {
            if err := chatStream.Send(&pb.ChatMessage{
                UserId:  "user_1",
                Content: msg,
            }); err != nil {
                log.Printf("ChatStream send error: %v", err)
                return
            }
            time.Sleep(time.Second)
        }
        chatStream.CloseSend()
    }()

    // Receive messages
    for {
        msg, err := chatStream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Printf("ChatStream receive error: %v", err)
            break
        }
        log.Printf("Received message: %v", msg)
    }
}

Penjelasan Kode

  1. gRPC Components

    • Protocol Buffers
    • Service definition
    • Server implementation
    • Client implementation
  2. Features

    • Unary RPC
    • Server streaming
    • Client streaming
    • Bidirectional streaming
  3. Best Practices

    • Error handling
    • Authentication
    • Logging
    • Interceptors

Setup

  1. Install Protocol Buffers compiler:
brew install protobuf
  1. Install Go plugins:
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
  1. Generate code:
protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    user.proto

Output

2024/01/21 11:57:22 Server starting on :50051
2024/01/21 11:57:23 Method: /user.UserService/GetUser Duration: 123.45µs Error: <nil>
2024/01/21 11:57:24 Got user: {id:"user_1" name:"Alice" email:"alice@example.com"}
2024/01/21 11:57:25 Method: /user.UserService/ListUsers Duration: 234.56µs Error: <nil>
2024/01/21 11:57:26 Received user: {id:"user_1" name:"Alice" email:"alice@example.com"}
2024/01/21 11:57:27 Created users: ["user_1234567890", "user_1234567891"]
2024/01/21 11:57:28 Received message: {user_id:"user_1" content:"Hello!" timestamp:{seconds:1234567890}}

Tips

  • Generate code dengan protoc
  • Handle errors dengan status codes
  • Implement interceptors
  • Use metadata for auth
  • Monitor performance