🌐 Golang gRPC with Auth Interceptor, Streaming and Gateway in Practice 🐹

Truong Phung - Nov 2 - - Dev Community

First in first, let's briefly talk about common features of gRPC

1. gRPC Common Features

Common features in Golang gRPC include:

  1. Unary RPC: Basic request-response call where a client sends a single request to the server and receives a single response.

  2. Streaming RPC: gRPC supports client-streaming, server-streaming, and bidirectional streaming, allowing data to flow in both directions in real-time.

  3. Protocol Buffers (Protobuf): A highly efficient serialization format that defines data structures and services in .proto files, enabling language-agnostic code generation.

  4. Multiplexing: gRPC uses HTTP/2, allowing multiple requests on a single TCP connection, improving efficiency and resource management.

  5. Built-in Authentication: gRPC includes mechanisms for SSL/TLS and token-based authentication, enhancing secure communication.

  6. Error Handling: Standardized error codes (e.g., NOT_FOUND, PERMISSION_DENIED) provide a consistent method for handling errors across different services.

  7. Interceptors: Middleware support for interceptors allows for logging, monitoring, and authentication by intercepting RPC calls.

  8. Load Balancing & Retries: Built-in load-balancing and automatic retries help distribute traffic and manage failures gracefully in microservices architectures.

These features make gRPC a powerful choice for building robust and efficient microservices in Go.

2. Golang gRPC example

Here is a comprehensive Golang gRPC example that demonstrates the key gRPC features, including unary and streaming RPCs, metadata, interceptors, error handling, and HTTP gateway support using grpc-gateway.

We'll create a ProductService with the following functionality:

  1. Unary RPC: Get product by ID.
  2. Server Streaming RPC: List all products.
  3. gRPC-Gateway: Mapping gRPC to REST endpoints

Project structure

test-grpc/
β”œβ”€β”€ auth/ 
β”‚   β”œβ”€β”€ auth.go  # authentication interceptor for client & gateway
β”œβ”€β”€ client/
β”‚   β”œβ”€β”€ main.go  # gRPC client implementation
β”œβ”€β”€ gateway/
β”‚   β”œβ”€β”€ main.go  # gRPC gateway implementation
β”œβ”€β”€ models/
β”‚   β”œβ”€β”€ product.go
β”œβ”€β”€ protocol/
β”‚   β”œβ”€β”€ gen/      # folder for storing gRPC auto generated files
β”‚   β”œβ”€β”€ product.proto
β”œβ”€β”€ server/
β”‚   β”œβ”€β”€ main.go
β”‚   β”œβ”€β”€ server.go  # gRPC server implementation
β”œβ”€β”€ go.mod
β”œβ”€β”€ go.sum

Enter fullscreen mode Exit fullscreen mode

1. Define Protobuf (protocol/product.proto)

syntax = "proto3";

// Defines the protocol buffer's package as productpb. This helps organize and prevent naming conflicts in large projects.
package productpb;

// Specifies the Go package path for the generated code, so Go files generated from this .proto file will belong to the pb package.
option go_package = "pb/";


// Imports the empty.proto file from the Protocol Buffers library, which includes an Empty message. This Empty type is useful for RPC methods that don’t require input or output, allowing a clean interface.
import "google/protobuf/empty.proto";

// The import "google/api/annotations.proto"; line is used to enable HTTP/REST mappings for gRPC services in Protocol Buffers. 
// This allows you to add annotations (like option (google.api.http)) to your gRPC methods, which map them to specific HTTP endpoints. 
// By doing so, gRPC services can be exposed as RESTful APIs, making them accessible over HTTP and compatible with standard RESTful client applications or tools like gRPC-Gateway.
import "google/api/annotations.proto";

service ProductService {
    // Only Use this when we'd like to expose this function to gRPC-Gateway
    //rpc CreateProduct(ProductRequest) returns (ProductResponse) {
    //    option (google.api.http) = {
    //       post: "/api/v1/products"
    //       body: "*"
    //    };
    //}

    // This case we don't want to expose CreateProduct to gRPC-Gateway, so it can only be called by gRPC common method
    rpc CreateProduct(ProductRequest) returns (ProductResponse);

    rpc GetProduct(ProductID) returns (ProductResponse) {
        option (google.api.http) = {
            get: "/api/v1/products/{id}"
        };
    }
    rpc GetAllProducts(google.protobuf.Empty) returns (ProductList) {
        option (google.api.http) = {
            get: "/api/v1/products/all"
        };
    }
    rpc ListProducts(google.protobuf.Empty) returns (stream Product) {
        option (google.api.http) = {
            get: "/api/v1/products"
        };
    }

}

message Product {
    string id = 1;
    string name = 2;
    float price = 3;
}

message ProductList {
    repeated Product products = 1; // repeated for defining array of Product
}

message ProductRequest {
    Product product = 1;
}

message ProductResponse {
    Product product = 1;
}

message ProductID {
    string id = 1;
}

/*
protoc -I . \
-I /path/to/googleapis \
--go_out gen --go_opt paths=source_relative \
--go-grpc_out gen --go-grpc_opt paths=source_relative,require_unimplemented_servers=false \
--grpc-gateway_out gen --grpc-gateway_opt paths=source_relative \
product.proto
*/
Enter fullscreen mode Exit fullscreen mode

2. Generate gRPC Code

  1. Install Necessary Plugins:
    Ensure the protoc-gen-go, protoc-gen-go-grpc, and protoc-gen-grpc-gateway plugins are installed:

    go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
    go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
    go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@latest
    go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2@latest
    
  2. Clone the googleapis Repository: Download the required .proto files by cloning the googleapis repo to a known location. (These are for gRPC Gateway)

    git clone https://github.com/googleapis/googleapis.git
    
  3. Create output dir to store auto generated gRPC files

    mkdir gen
    

    So gen folder will be use to store generate gRPC files in later steps

  4. Run the protoc Command: Navigate to your project folder and use this command to generate both gRPC and REST gateway code:

    protoc -I . \
    -I /path/to/googleapis \
    --go_out gen --go_opt paths=source_relative \
    --go-grpc_out gen --go-grpc_opt paths=source_relative,require_unimplemented_servers=false \
    --grpc-gateway_out gen --grpc-gateway_opt paths=source_relative \
    product.proto
    

    Explanation of Flags:

    • -I . and -I /path/to/googleapis: Set the import paths for locating .proto files, including the Google API library for annotations.proto.
    • --go_out gen --go_opt paths=source_relative: Generates Go code for the message types in the gen directory, keeping file paths relative to the source.
    • --go-grpc_out gen --go-grpc_opt paths=source_relative: Generates Go code for the gRPC service definitions in the gen directory, also with source-relative paths.
    • --grpc-gateway_out gen --grpc-gateway_opt paths=source_relative: Generates a reverse-proxy HTTP server with gRPC-Gateway, allowing RESTful HTTP calls to interact with the gRPC server, outputting code to the gen directory.
    • The option require_unimplemented_servers=false in the --go-grpc_out flag:

      • Suppresses generation of unimplemented server code, meaning that only explicitly defined RPC methods will be included in the generated service code.
      • This can be useful for reducing boilerplate, especially when you want a leaner service definition or don’t intend to implement all methods immediately.

      Without this flag, the generated gRPC code includes placeholder "unimplemented" methods for all RPCs defined in the.proto file, which are required to be implemented or intentionally left as is if they aren’t needed.

    Replace ./path/to/googleapis with the actual path where the googleapis repository is located on your system (Ex: on Mac it could be something like /Users/yourusername/Downloads/googleapis ).

2. Define Product Struct and Conversion Functions (models/product.go)

package models

import (
    productpb "test-grpc/protocol/gen"
)

// Product struct for GORM and SQLite database
type Product struct {
    ID    string  `json:"id" gorm:"primaryKey"`
    Name  string  `json:"name"`
    Price float32 `json:"price"`
}

// ToProto converts a Product struct to a protobuf Product message
func (p *Product) ToProto() *productpb.Product {
    return &productpb.Product{
        Id:    p.ID,
        Name:  p.Name,
        Price: p.Price,
    }
}

// ProductFromProto converts a protobuf Product message to a Product struct
func ProductFromProto(proto *productpb.Product) *Product {
    return &Product{
        ID:    proto.Id,
        Name:  proto.Name,
        Price: proto.Price,
    }
}
Enter fullscreen mode Exit fullscreen mode

These functions handle conversions between the GORM model and protobuf Product message.

3. Server Implementation with Streaming and GORM for SQLite (server/server.go)

package main

import (
    "context"
    "log"

    "test-grpc/models"
    productpb "test-grpc/protocol/gen"

    "github.com/google/uuid"
    emptypb "google.golang.org/protobuf/types/known/emptypb"
    "gorm.io/driver/sqlite"
    "gorm.io/gorm"
)

type server struct {
    db *gorm.DB
}

func NewServer() *server {
    db, err := gorm.Open(sqlite.Open("products.db"), &gorm.Config{})
    if err != nil {
        log.Fatalf("Failed to connect database: %v", err)
    }
    db.AutoMigrate(&models.Product{})
    return &server{db: db}
}

func (s *server) CreateProduct(ctx context.Context, req *productpb.ProductRequest) (*productpb.ProductResponse, error) {

    product := models.ProductFromProto(req.Product)
    product.ID = uuid.New().String()
    if err := s.db.Create(&product).Error; err != nil {
        return nil, err
    }
    return &productpb.ProductResponse{Product: product.ToProto()}, nil
}

func (s *server) GetProduct(ctx context.Context, req *productpb.ProductID) (*productpb.ProductResponse, error) {

    var product models.Product
    if err := s.db.First(&product, "id = ?", req.Id).Error; err != nil {
        return nil, err
    }
    return &productpb.ProductResponse{Product: product.ToProto()}, nil
}

func (s *server) GetAllProducts(ctx context.Context, req *emptypb.Empty) (*productpb.ProductList, error) {
    var products []models.Product
    if err := s.db.Find(&products).Error; err != nil {
        return nil, err
    }

    var productList []*productpb.Product
    for _, product := range products {
        productList = append(productList, product.ToProto())
    }

    return &productpb.ProductList{Products: productList}, nil
}

// Streaming method to list products
func (s *server) ListProducts(req *emptypb.Empty, stream productpb.ProductService_ListProductsServer) error {
    var products []models.Product
    if err := s.db.Find(&products).Error; err != nil {
        return err
    }
    for _, product := range products {
        if err := stream.Send(product.ToProto()); err != nil {
            return err
        }
    }
    return nil
}

// Additional CRUD methods...
Enter fullscreen mode Exit fullscreen mode

4. Server Middleware Interceptor (server/main.go)

package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"

    productpb "test-grpc/protocol/gen"
)

func ServerAuthInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok || len(md["authorization"]) == 0 {
        return nil, status.Errorf(codes.Unauthenticated, "no auth token")
    }

    authToken := md["authorization"][0]
    if authToken != "unary-token" {
        return nil, status.Errorf(codes.Unauthenticated, "invalid token")
    }

    return handler(ctx, req)
}

func ServerStreamAuthInterceptor(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    // Extract metadata from stream context
    md, ok := metadata.FromIncomingContext(ss.Context())
    if !ok || len(md["authorization"]) == 0 {
        return status.Errorf(codes.Unauthenticated, "no auth token")
    }

    // Validate the authorization token
    authToken := md["authorization"][0]
    if authToken != "stream-token" {
        return status.Errorf(codes.Unauthenticated, "invalid token")
    }

    // Continue to the handler if authenticated
    return handler(srv, ss)
}
Enter fullscreen mode Exit fullscreen mode

5. Set Up gRPC Server with Interceptors (server/main.go)

func main() {
    grpcServer := grpc.NewServer(grpc.UnaryInterceptor(ServerAuthInterceptor), grpc.StreamInterceptor(ServerStreamAuthInterceptor))
    productpb.RegisterProductServiceServer(grpcServer, NewServer())

    listener, err := net.Listen("tcp", ":50052")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    log.Println("Server is running on port :50052")
    if err := grpcServer.Serve(listener); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}
Enter fullscreen mode Exit fullscreen mode

6. Setup Auth Interceptor for gRPC Client and gRPC Gateway (auth/auth.go)

package auth

import (
    "context"

    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
)

// AuthInterceptor adds authorization metadata to each outgoing gRPC request.
func AuthInterceptor(token string) grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req interface{},
        reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        // Append metadata to outgoing context
        ctx = metadata.AppendToOutgoingContext(ctx, "authorization", token)
        return invoker(ctx, method, req, reply, cc, opts...)
    }
}

// We can add a streaming interceptor similarly:
func AuthStreamInterceptor(token string) grpc.StreamClientInterceptor {
    return func(
        ctx context.Context,
        desc *grpc.StreamDesc,
        cc *grpc.ClientConn,
        method string,
        streamer grpc.Streamer,
        opts ...grpc.CallOption,
    ) (grpc.ClientStream, error) {
        // Append metadata to outgoing context
        ctx = metadata.AppendToOutgoingContext(ctx, "authorization", token)
        return streamer(ctx, desc, cc, method, opts...)
    }
}
Enter fullscreen mode Exit fullscreen mode

7. Configure gRPC Gateway (Optional) (gateway/main.go)

package main

import (
    "context"
    "log"
    "net/http"

    "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"

    "test-grpc/auth"
    "test-grpc/protocol/gen"
)

func main() {
    mux := runtime.NewServeMux()
    err := productpb.RegisterProductServiceHandlerFromEndpoint(context.Background(), mux, ":50052", []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(auth.AuthInterceptor("unary-token"))})
    if err != nil {
        log.Fatalf("Failed to start HTTP gateway: %v", err)
    }

    log.Println("HTTP Gateway running on :8080")
    http.ListenAndServe(":8080", mux)
}
Enter fullscreen mode Exit fullscreen mode

8. Run Server and Gateway

To test this setup, run the gRPC server:

go run server/server.go server/main.go
Enter fullscreen mode Exit fullscreen mode

And then the HTTP gateway: (Run this in another Terminal Window)

go run gateway/main.go
Enter fullscreen mode Exit fullscreen mode

Test the gateway with following APIs

  • [GET] localhost:8080/api/v1/products/all
  • [GET] localhost:8080/api/v1/products/123
  • [GET] localhost:8080/api/v1/products

9. Client Implementation (client/main.go)

The client will have a background goroutine to continuously stream new products created on the server, logging each as it’s received. The client also exposes RESTful APIsusing Gin to fetch and interact with the product data, converting between proto and Golang structs.

package main

import (
    "context"
    "io"
    "log"
    "test-grpc/auth"
    "test-grpc/models"
    "test-grpc/protocol/gen"
    "time"

    "github.com/gin-gonic/gin"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    emptypb "google.golang.org/protobuf/types/known/emptypb"
)

type ProductClient struct {
    client productpb.ProductServiceClient
}

func NewProductClient(cc *grpc.ClientConn) *ProductClient {
    return &ProductClient{client: productpb.NewProductServiceClient(cc)}
}

// REST API handler functions
func (c *ProductClient) createProduct(ctx *gin.Context) {
    var product models.Product
    if err := ctx.ShouldBindJSON(&product); err != nil {
        ctx.JSON(400, gin.H{"error": "Invalid product data"})
        return
    }

    protoProduct := product.ToProto()
    req := &productpb.ProductRequest{Product: protoProduct}
    res, err := c.client.CreateProduct(ctx, req)
    if err != nil {
        ctx.JSON(500, gin.H{"error": err.Error()})
        return
    }

    ctx.JSON(201, models.ProductFromProto(res.Product))
}

func (c *ProductClient) getAllProducts(ctx *gin.Context) {
    deadlineCtx, cancel := context.WithTimeout(ctx, time.Second)
    defer cancel()

    res, err := c.client.GetAllProducts(deadlineCtx, &emptypb.Empty{})
    if err != nil {
        ctx.JSON(500, gin.H{"error": err.Error()})
        return
    }

    var products []models.Product
    for _, protoProduct := range res.Products {
        products = append(products, *models.ProductFromProto(protoProduct))
    }

    ctx.JSON(200, products)
}

// Background job for streaming new products
func (c *ProductClient) StreamNewProducts() {
    ctx := context.Background()
    go func() {
        for {
            stream, err := c.client.ListProducts(ctx, &emptypb.Empty{})
            if err != nil {
                log.Printf("Error connecting to ListProducts: %v", err)
                time.Sleep(5 * time.Second) // Retry delay
                continue
            }

            for {
                product, err := stream.Recv()
                if err == io.EOF {
                    // The EOF error in your StreamNewProducts function likely indicates that the server has closed the stream, often because there are no new products to send, and the stream reaches the end
                    log.Println("Completed!, Stream closed by server.")
                    break // Break inner loop to reconnect
                }
                if err != nil {
                    log.Printf("Error receiving product: %v", err)
                    break
                }
                log.Printf("New Product: %v", product)
            }

            // Optional reconnect delay
            time.Sleep(5 * time.Second)
        }
    }()
}

func setupRouter(pc *ProductClient) *gin.Engine {
    r := gin.Default()
    r.POST("/products", pc.createProduct)
    r.GET("/products", pc.getAllProducts)
    return r
}

func main() {
    unaryToken := "unary-token"
    streamToken := "stream-token"
    // This approach keeps the authorization token consistent across all requests without manually adding it each time.
    conn, err := grpc.NewClient(":50052", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(auth.AuthInterceptor(unaryToken)), grpc.WithStreamInterceptor(auth.AuthStreamInterceptor(streamToken)))

    if err != nil {
        log.Fatalf("Could not connect: %v", err)
    }
    defer conn.Close()

    productClient := NewProductClient(conn)

    // Start background streaming of new products
    productClient.StreamNewProducts()

    // Setup Gin REST API server
    r := setupRouter(productClient)
    r.Run(":8081")
}
Enter fullscreen mode Exit fullscreen mode

And Run Client: (Run in a separate Terminal Window for client)

go run client/main.go
Enter fullscreen mode Exit fullscreen mode

Test Client RESTful APIs

  • [POST] localhost:8081/products -- JSON { "id":"1", "name":"product 1", "price":99 }
  • [GET] localhost:8081/products?id=productId

Explanation

  • Server: Provides gRPC methods with metadata-based authentication, manages Product data using GORM and SQLite.
  • gRPC-Gateway: Exposes REST endpoints, mapping directly to gRPC methods for seamless HTTP/2 and REST support.
  • Client: Using Auth Interceptor, invokes CreateProduct and streams ListProducts.
  • Gin REST API: getAllProducts and createProduct handlers use the gRPC client to interact with the server, convert responses to native structs, and return JSON.
  • Background Streaming: StreamNewProducts runs in a background goroutine, logging each product received through streaming.

This implementation covers a full setup for a gRPC service with an authenticated client and RESTful gateway, with embedded messages, conversion functions, GORM integration, metadata context, and streaming capabilities, showcasing comprehensive gRPC features in a Golang application.

If you found this helpful, let me know by leaving a πŸ‘ or a comment!, or if you think this post could help someone, feel free to share it! Thank you very much! πŸ˜ƒ

. . . . . . . . . . . . . . . . . . . . . . . . . . .