Claude-skill-registry grpc-expert
Expert-level gRPC, Protocol Buffers, microservices communication, and streaming
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/grpc-expert" ~/.claude/skills/majiayu000-claude-skill-registry-grpc-expert && rm -rf "$T"
manifest:
skills/data/grpc-expert/SKILL.mdsource content
gRPC Expert
Expert guidance for gRPC services, Protocol Buffers, microservices communication, and streaming patterns.
Core Concepts
gRPC Fundamentals
- Protocol Buffers (protobuf)
- Service definitions
- RPC patterns (unary, server streaming, client streaming, bidirectional)
- HTTP/2 transport
- Code generation
- Interceptors and middleware
Communication Patterns
- Unary RPC (request-response)
- Server streaming RPC
- Client streaming RPC
- Bidirectional streaming RPC
- Deadline/timeout handling
- Error handling and status codes
Production Features
- Load balancing
- Service discovery
- Health checking
- Authentication (TLS, tokens)
- Monitoring and tracing
- Retry policies
Protocol Buffer Definition
syntax = "proto3"; package user.v1; import "google/protobuf/timestamp.proto"; service UserService { // Unary RPC rpc GetUser(GetUserRequest) returns (GetUserResponse); // Server streaming rpc ListUsers(ListUsersRequest) returns (stream User); // Client streaming rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse); // Bidirectional streaming rpc Chat(stream ChatMessage) returns (stream ChatMessage); } message User { string id = 1; string email = 2; string name = 3; google.protobuf.Timestamp created_at = 4; UserRole role = 5; } enum UserRole { USER_ROLE_UNSPECIFIED = 0; USER_ROLE_USER = 1; USER_ROLE_ADMIN = 2; } message GetUserRequest { string id = 1; } message GetUserResponse { User user = 1; } message ListUsersRequest { int32 page_size = 1; string page_token = 2; } message CreateUserRequest { string email = 1; string name = 2; } message CreateUsersResponse { repeated string user_ids = 1; int32 created_count = 2; } message ChatMessage { string user_id = 1; string message = 2; google.protobuf.Timestamp timestamp = 3; }
Python gRPC Server
import grpc from concurrent import futures import logging from typing import Iterator import user_pb2 import user_pb2_grpc class UserService(user_pb2_grpc.UserServiceServicer): def __init__(self): self.users = {} def GetUser(self, request, context): """Unary RPC""" user_id = request.id if user_id not in self.users: context.abort(grpc.StatusCode.NOT_FOUND, f"User {user_id} not found") user = self.users[user_id] return user_pb2.GetUserResponse(user=user) def ListUsers(self, request, context): """Server streaming RPC""" page_size = request.page_size or 10 for i, user in enumerate(self.users.values()): if i >= page_size: break yield user def CreateUsers(self, request_iterator, context): """Client streaming RPC""" created_ids = [] for request in request_iterator: user_id = self._generate_id() user = user_pb2.User( id=user_id, email=request.email, name=request.name ) self.users[user_id] = user created_ids.append(user_id) return user_pb2.CreateUsersResponse( user_ids=created_ids, created_count=len(created_ids) ) def Chat(self, request_iterator, context): """Bidirectional streaming RPC""" for message in request_iterator: # Echo back with modification response = user_pb2.ChatMessage( user_id="server", message=f"Echo: {message.message}", timestamp=message.timestamp ) yield response def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) user_pb2_grpc.add_UserServiceServicer_to_server(UserService(), server) server.add_insecure_port('[::]:50051') server.start() print("Server started on port 50051") server.wait_for_termination() if __name__ == '__main__': logging.basicConfig() serve()
Go gRPC Server
package main import ( "context" "io" "log" "net" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" pb "example.com/user/v1" ) type userServer struct { pb.UnimplementedUserServiceServer users map[string]*pb.User } func (s *userServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.GetUserResponse, error) { user, exists := s.users[req.Id] if !exists { return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id) } return &pb.GetUserResponse{User: user}, nil } func (s *userServer) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error { pageSize := req.PageSize if pageSize == 0 { pageSize = 10 } count := 0 for _, user := range s.users { if count >= int(pageSize) { break } if err := stream.Send(user); err != nil { return err } count++ } return nil } func (s *userServer) CreateUsers(stream pb.UserService_CreateUsersServer) error { var userIds []string for { req, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.CreateUsersResponse{ UserIds: userIds, CreatedCount: int32(len(userIds)), }) } if err != nil { return err } userId := generateID() user := &pb.User{ Id: userId, Email: req.Email, Name: req.Name, } s.users[userId] = user userIds = append(userIds, userId) } } func (s *userServer) Chat(stream pb.UserService_ChatServer) error { for { msg, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } response := &pb.ChatMessage{ UserId: "server", Message: "Echo: " + msg.Message, Timestamp: msg.Timestamp, } if err := stream.Send(response); err != nil { return err } } } func main() { lis, err := net.Listen("tcp", ":50051") if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterUserServiceServer(s, &userServer{ users: make(map[string]*pb.User), }) log.Println("Server listening on :50051") if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }
gRPC Client with Interceptors
import grpc import user_pb2 import user_pb2_grpc from typing import Callable def logging_interceptor( method: Callable, request, call_details: grpc.ClientCallDetails ): """Unary-unary interceptor for logging""" print(f"Calling method: {call_details.method}") print(f"Request: {request}") response = method(request, call_details) print(f"Response: {response}") return response class UserClient: def __init__(self, host: str = 'localhost:50051'): # Create channel with interceptor self.channel = grpc.insecure_channel(host) self.channel = grpc.intercept_channel( self.channel, logging_interceptor ) self.stub = user_pb2_grpc.UserServiceStub(self.channel) def get_user(self, user_id: str) -> user_pb2.User: """Call unary RPC""" request = user_pb2.GetUserRequest(id=user_id) try: response = self.stub.GetUser( request, timeout=5.0 # 5 second timeout ) return response.user except grpc.RpcError as e: print(f"RPC failed: {e.code()} - {e.details()}") raise def list_users(self, page_size: int = 10): """Call server streaming RPC""" request = user_pb2.ListUsersRequest(page_size=page_size) try: for user in self.stub.ListUsers(request): yield user except grpc.RpcError as e: print(f"RPC failed: {e.code()} - {e.details()}") def create_users_batch(self, users: list): """Call client streaming RPC""" def request_generator(): for user in users: yield user_pb2.CreateUserRequest( email=user['email'], name=user['name'] ) try: response = self.stub.CreateUsers(request_generator()) return response except grpc.RpcError as e: print(f"RPC failed: {e.code()} - {e.details()}") def close(self): self.channel.close()
Best Practices
Design
- Use semantic versioning for protobuf packages
- Design backward-compatible changes
- Use proper field numbering (never reuse)
- Include metadata in messages
- Use enums with explicit UNSPECIFIED value
- Design for pagination in list operations
Performance
- Enable HTTP/2 connection pooling
- Use streaming for large data transfers
- Implement proper timeouts
- Use compression for large payloads
- Batch operations when possible
- Monitor and tune thread pool sizes
Production
- Implement health checks
- Use TLS for secure communication
- Add authentication/authorization
- Implement retry policies with backoff
- Monitor gRPC metrics (latency, errors)
- Use service discovery for dynamic endpoints
Anti-Patterns
❌ Breaking backward compatibility ❌ No timeout configuration ❌ Ignoring error status codes ❌ Not handling stream cancellation ❌ Over-sized messages ❌ No health checking ❌ Missing authentication
Resources
- gRPC: https://grpc.io/
- Protocol Buffers: https://protobuf.dev/
- gRPC-Go: https://github.com/grpc/grpc-go
- gRPC-Python: https://grpc.io/docs/languages/python/