Introduction
I'm finding it fun to explore the realm of microservices and cloud-native applications. In recent years, microservices have revolutionized web application architecture, offering modular, independently deployable services that have reshaped the way we build and scale applications. Furthermore, transport protocols like REST and gRPC have evolved to enhance efficiency and speed, making them ideal choices for microservices.
Our Mission: Building a Cloud-Native Application Using Go
In this blog, I'll guide you through the process of building a cloud-native application using Go and the microservices architecture. The journey is more straightforward than you might think, and we'll break it down into several manageable steps.
Step 1: Building Microservices
1.1 Creating Microservices and Containerized Services
In this step, we'll create microservices and containerized services, each with specific, independent tasks closely related to their logical components.
1.2 Leveraging Go-Kit
We'll utilize Go-Kit, an excellent framework for structuring and developing the components of each service.
1.3 Developing APIs
We'll develop APIs using HTTP (REST) and Protobuf (gRPC) as transport mechanisms. We'll also utilize PostgreSQL for databases and explore deployment on Azure for API management and CI/CD.
Note: While we'll discuss deployment and CI/CD, the specifics of configuring them on Azure or any other cloud platform fall beyond the scope of this blog.
Prerequisites
Before we dive in, let's ensure you have the necessary prerequisites:
- A basic understanding of web services, Rest APIs, and gRPC.
- GoLand or VS Code as your development environment.
- A properly installed and configured Go environment. If not, follow this link for guidance.
- Setting up a new project directory under the GOPATH.
- Familiarity with the standard Golang project structure, which you can find here.
- A PostgreSQL client installed.
- We'll be using Go Kit to simplify microservices development.
Problem Statement
Our goal is to develop a web application to address the following problem statement:
Imagine a global publishing company that specializes in books and journals. They require a service to watermark their documents. These documents, whether books or journals, consist of a title, an author, and a watermark property. The watermark can be in one of three states: "Started," "InProgress," or "Finished." Furthermore, only specific users should have the authority to apply watermarks to documents, and once a watermark is applied, it's a one-time operation; the document can never be re-marked.
For a more detailed understanding of this requirement, please refer to this link.
Architecture
In this project, we'll establish three microservices: Authentication Service, Database Service, and the Watermark Service. These services will interact with a PostgreSQL database server, and an API Gateway will serve as the entry point.
Now, let's delve into the details of each service:
Authentication Service
This service is responsible for user-based and role-based access control. It will authenticate users based on their roles and return appropriate HTTP status codes - 200 for authorized users and 401 for unauthorized ones.
APIs:
- /user/access: A GET request to this endpoint, secured for user authentication, takes a user's name as input and returns their roles and associated privileges.
- /authenticate: Another secured GET request, this time with user and operation parameters, authenticates the user for the specified operation.
- /healthz: A secured GET request to check the service's status.
Database Service
This service manages the application's databases, storing user information, their roles, access privileges, and documents without watermarks. Documents can't have watermarks when created; success is achieved only when data input is valid, and the database service returns success.
We'll use two separate databases for the two services, adhering to the "Single Database per Service" principle of microservices architecture.
APIs:
- /get: A secured GET request that retrieves documents based on specific filters.
- /update: A secured POST request that updates a document.
- /add: A secured POST request that adds a new document and returns its title ID.
- /remove: A secured POST request to remove a document based on its title ID.
- /healthz: A secured GET request to check the service's status.
Watermark Service
This core service handles requests to watermark documents. When a user needs to watermark a document, they must provide a Ticket ID along with the desired watermark. The service will internally call the database's Update API, returning the watermark process status, initially "Started," transitioning to "InProgress," and finally "Finished" (or "Error" if the request is invalid).
APIs:
- /get: A secured GET request for retrieving documents based on specific filters.
- /status: A secured GET request to check the watermark status for a specific ticket ID.
- /addDocument: A secured POST request to add a document and return the title ID.
- /watermark: The main watermark operation, a secured POST request accepting a title and watermark string.
- /healthz: A secured GET request to check the service's status.
Operations and Flow
The Watermark Service APIs are the user's gateway to request watermarking or add documents. Authentication and Database Service APIs are private and only called by other services internally. Users interact solely with the API Gateway URL.
Here's how it works:
- A user accesses the API Gateway URL, providing their username, ticket ID, and watermark.
- Users don't need to concern themselves with the intricacies of authentication or database services.
- The API Gateway validates the user's request and payload.
- A well-defined API forwarding rule in the gateway routes the request to the relevant service.
- For watermark requests, the rule forwards it to the authentication service for user authentication. The authentication service checks the user's authorization, returning the appropriate status code.
- The authorization service checks the user's roles and permissions, and, once authorized, forwards the request to the watermark service.
- The watermark service performs the necessary operations, including adding watermarks or documents, and forwards the result to the user.
- If the request is to add a document, the service returns the Ticket ID. If it's for watermarking, it returns the operation status.
Please note that each user has specific roles based on document types, not specific book or journal names.
Getting Started
Let's kick things off by creating a folder for our application in the $GOPATH. This will serve as the root directory for our collection of services.
Project Layout
Our project will follow the standard Golang project layout.
- api: This directory stores versions of API swagger files and protobuf files for the gRPC interface.
- cmd: It contains entry point (main.go) files for all services, as well as any other container images.
- docs: This holds documentation for the project.
- config: Store sample and specific configuration files here.
- deploy: Contains deployment files used for deploying the application.
- internal: This conventional internal package is recognized by the Go compiler. It houses packages shared across the project. Directory Structure Explanation:
- pkg: Within this directory, you will find the fully functional code for all services, each organized into distinct packages.
- tests: This directory houses all integration and end-to-end (E2E) tests for the services.
- vendor: Here, you'll discover a repository of third-party dependencies stored locally to prevent version inconsistencies in the future.
Watermark Service:
- In the Go kit framework, a service should always be defined through an interface.
Create a package called "watermark" in the "pkg" directory. Inside that package, create a new file named "service.go" that serves as the blueprint for our service.
package watermark
import (
"context"
"github.com/velotiotech/watermark-service/internal"
)
type Service interface {
// Get the list of all documents
Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error)
Status(ctx context.Context, ticketID string) (internal.Status, error)
Watermark(ctx context.Context, ticketID, mark string) (int, error)
AddDocument(ctx context.Context, doc *internal.Document) (string, error)
ServiceStatus(ctx context.Context) (int, error)
}
We'll need five endpoints to handle requests for the methods defined in the service interface. To handle multiple concurrent requests effectively, we are using the context package, although we may not use it extensively in this blog. It's still the recommended way to work with it.
Implementing our service:
package watermark
import (
"context"
"net/http"
"os"
"github.com/velotiotech/watermark-service/internal"
"github.com/go-kit/kit/log"
"github.com/lithammer/shortuuid/v3"
)
type watermarkService struct{}
func NewService() Service {
return &watermarkService{}
}
func (w *watermarkService) Get(_ context.Context, filters ...internal.Filter) ([]internal.Document, error) {
// Query the database using the filters and return the list of documents.
// Return an error if the filter (key) is invalid or if no item is found.
doc := internal.Document{
Content: "book",
Title: "Harry Potter and the Half-Blood Prince",
Author: "J.K. Rowling",
Topic: "Fiction and Magic",
}
return []internal.Document{doc}, nil
}
func (w *watermarkService) Status(_ context.Context, ticketID string) (internal.Status, error) {
// Query the database using the ticketID and return the document info.
// Return an error if the ticketID is invalid or no document exists for that ticketID.
return internal.InProgress, nil
}
func (w *watermarkService) Watermark(_ context.Context, ticketID, mark string) (int, error) {
// Update the database entry with the watermark field as non-empty.
// First, check if the watermark status is not already in InProgress, Started, or Finished state.
// If yes, then return an invalid request.
// Return an error if no item is found using the ticketID.
return http.StatusOK, nil
}
func (w *watermarkService) AddDocument(_ context.Context, doc *internal.Document) (string, error) {
// Add the document entry in the database by calling the database service.
// Return an error if the doc is invalid and/or if there's a database entry error.
newTicketID := shortuuid.New()
return newTicketID, nil
}
func (w *watermarkService) ServiceStatus(_ context.Context) (int, error) {
logger.Log("Checking the Service health...")
return http.StatusOK, nil
}
var logger log.Logger
func init() {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}
We have defined the "watermarkService" type, an empty struct that implements the service interface we defined. This struct's implementation is hidden from the rest of the world.
"NewService" is created as the constructor of our service "object." This is the only function available outside this package to instantiate the service.
- Now, we will create the "endpoints" package, which will contain two files: one for storing all types of requests and responses and the other for implementing the requests parsing and calling the appropriate service functions.
- Create a file named "reqJSONMap.go." In this file, define all the request and response structs with the fields. Add the necessary fields in these structs for the request input and response output.
package endpoints
import "github.com/velotiotech/watermark-service/internal"
type GetRequest struct {
Filters []internal.Filter `json:"filters,omitempty"`
}
type GetResponse struct {
Documents []internal.Document `json:"documents"`
Err string `json:"err,omitempty"`
}
type StatusRequest struct {
TicketID string `json:"ticketID"`
}
type StatusResponse struct {
Status internal.Status `json:"status"`
Err string `json:"err,omitempty"`
}
type WatermarkRequest struct {
TicketID string `json:"ticketID"`
Mark string `json:"mark"`
}
type WatermarkResponse struct {
Code int `json:"code"`
Err string `json:"err"`
}
type AddDocumentRequest struct {
Document *internal.Document `json:"document"`
}
type AddDocumentResponse struct {
TicketID string `json:"ticketID"`
Err string `json:"err,omitempty"`
}
type ServiceStatusRequest struct{}
type ServiceStatusResponse struct {
Code int `json:"status"`
Err string `json:"err,omitempty"`
}
- Create a file named "endpoints.go." In this file, you will implement the actual calling of the service's implemented functions.
package endpoints
import (
"context"
"errors"
"os"
"github.com/aayushrangwala/watermark-service/internal"
"github.com/aayushrangwala/watermark-service/pkg/watermark"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
)
type Set struct {
GetEndpoint endpoint.Endpoint
AddDocumentEndpoint endpoint.Endpoint
StatusEndpoint endpoint.Endpoint
ServiceStatusEndpoint endpoint.Endpoint
WatermarkEndpoint endpoint.Endpoint
}
func NewEndpointSet(svc watermark.Service) Set {
return Set{
GetEndpoint: MakeGetEndpoint(svc),
AddDocumentEndpoint: MakeAddDocumentEndpoint(svc),
StatusEndpoint: MakeStatusEndpoint(svc),
ServiceStatusEndpoint: MakeServiceStatusEndpoint(svc),
WatermarkEndpoint: MakeWatermarkEndpoint(svc),
}
}
func MakeGetEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(GetRequest)
docs, err := svc.Get(ctx, req.Filters...)
if err != nil {
return GetResponse{docs, err.Error()}, nil
}
return GetResponse{docs, ""}, nil
}
}
func MakeStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(StatusRequest)
status, err := svc.Status(ctx, req.TicketID)
if err != nil {
return StatusResponse{Status: status, Err: err.Error()}, nil
}
return StatusResponse{Status: status, Err
: ""}, nil
}
}
func MakeAddDocumentEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(AddDocumentRequest)
ticketID, err := svc.AddDocument(ctx, req.Document)
if err != nil {
return AddDocumentResponse{TicketID: ticketID, Err: err.Error()}, nil
}
return AddDocumentResponse{TicketID: ticketID, Err: ""}, nil
}
}
func MakeWatermarkEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(WatermarkRequest)
code, err := svc.Watermark(ctx, req.TicketID, req.Mark)
if err != nil {
return WatermarkResponse{Code: code, Err: err.Error()}, nil
}
return WatermarkResponse{Code: code, Err: ""}, nil
}
}
func MakeServiceStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
_ = request.(ServiceStatusRequest)
code, err := svc.ServiceStatus(ctx)
if err != nil {
return ServiceStatusResponse{Code: code, Err: err.Error()}, nil
}
return ServiceStatusResponse{Code: code, Err: ""}, nil
}
}
func (s *Set) Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error) {
resp, err := s.GetEndpoint(ctx, GetRequest{Filters: filters})
if err != nil {
return []internal.Document{}, err
}
getResp := resp.(GetResponse)
if getResp.Err != "" {
return []internal.Document{}, errors.New(getResp.Err)
}
return getResp.Documents, nil
}
func (s *Set) ServiceStatus(ctx context.Context) (int, error) {
resp, err := s.ServiceStatusEndpoint(ctx, ServiceStatusRequest{})
svcStatusResp := resp.(ServiceStatusResponse)
if err != nil {
return svcStatusResp.Code, err
}
if svcStatusResp.Err != "" {
return svcStatusResp.Code, errors.New(svcStatusResp.Err)
}
return svcStatusResp.Code, nil
}
func (s *Set) AddDocument(ctx context.Context, doc *internal.Document) (string, error) {
resp, err := s.AddDocumentEndpoint(ctx, AddDocumentRequest{Document: doc})
if err != nil {
return "", err
}
adResp := resp.(AddDocumentResponse)
if adResp.Err != "" {
return "", errors.New(adResp.Err)
}
return adResp.TicketID, nil
}
func (s *Set) Status(ctx context.Context, ticketID string) (internal.Status, error) {
resp, err := s.StatusEndpoint(ctx, StatusRequest{TicketID: ticketID})
if err != nil {
return internal.Failed, err
}
stsResp := resp.(StatusResponse)
if stsResp.Err != "" {
return internal.Failed, errors.New(stsResp.Err)
}
return stsResp.Status, nil
}
func (s *Set) Watermark(ctx context.Context, ticketID, mark string) (int, error) {
resp, err := s.WatermarkEndpoint(ctx, WatermarkRequest{TicketID: ticketID, Mark: mark})
wmResp := resp.(WatermarkResponse)
if err != nil {
return wmResp.Code, err
}
if wmResp.Err != "" {
return wmResp.Code, errors.New(wmResp.Err)
}
return wmResp.Code, nil
}
var logger log.Logger
func init() {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}
In this file, we have the "Set" struct, which is a collection of all the endpoints. We also have a constructor for the "Set." The file defines internal constructor functions that return objects implementing the generic endpoint from the Go kit interface, such as "MakeGetEndpoint()," "MakeStatusEndpoint()," and so on.
To expose the Get, Status, Watermark, ServiceStatus, and AddDocument APIs, we create endpoints for each of them. These functions handle incoming requests and call the specific service methods.
- Add the "transports" method to expose the services. Our services will support HTTP and will be exposed using REST APIs and Protobuf with gRPC.
Create a separate package called "transport" in the "watermark" directory. This package will hold all the handlers, decoders, and encoders for a specific type of transport mechanism.
Step 6: Creating HTTP Transport Functions and Handlers
Begin by creating a file named http.go, which will contain transport functions and handlers for HTTP. These functions are responsible for defining API routes and their respective handler functions.
package transport
import (
context"
"encoding/json"
"net/http"
"os"
"github.com/velotiotech/watermark-service/internal/util"
"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
"github.com/go-kit/kit/log"
httptransport "github.com/go-kit/kit/transport/http"
)
// NewHTTPHandler initializes an HTTP handler for API endpoints.
func NewHTTPHandler(ep endpoints.Set) http.Handler {
m := http.NewServeMux()
// Define routes and associated handler functions.
m.Handle("/healthz", httptransport.NewServer(
ep.ServiceStatusEndpoint,
decodeHTTPServiceStatusRequest,
encodeResponse,
))
m.Handle("/status", httptransport.NewServer(
ep.StatusEndpoint,
decodeHTTPStatusRequest,
encodeResponse,
))
m.Handle("/addDocument", httptransport.NewServer(
ep.AddDocumentEndpoint,
decodeHTTPAddDocumentRequest,
encodeResponse,
))
m.Handle("/get", httptransport.NewServer(
ep.GetEndpoint,
decodeHTTPGetRequest,
encodeResponse,
))
m.Handle("/watermark", httptransport.NewServer(
ep.WatermarkEndpoint,
decodeHTTPWatermarkRequest,
encodeResponse,
))
return m
}
// Functions for decoding HTTP requests.
func decodeHTTPGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
var req endpoints.GetRequest
if r.ContentLength == 0 {
logger.Log("Get request with no body")
return req, nil
}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return nil, err
}
return req, nil
}
// ... (Similar decode functions for other endpoints)
// Function for encoding HTTP responses.
func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
if e, ok := response.(error); ok && e != nil {
encodeError(ctx, e, w)
return nil
}
return json.NewEncoder(w).Encode(response)
}
// ... (Similar functions for handling errors)
func encodeError(_ context.Context, err error, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
switch err {
case util.ErrUnknown:
w.WriteHeader(http.StatusNotFound)
case util.ErrInvalidArgument:
w.WriteHeader(http.StatusBadRequest)
default:
w.WriteHeader(http.StatusInternalServerError)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"error": err.Error(),
})
}
var logger log.Logger
func init() {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}
Step 7: Creating gRPC Transport Functions and Handlers
In a file named grpc.go within the same transport package, similar to the HTTP transport, you will create transport functions and handlers for gRPC. This file maps protobuf payload to their respective requests and responses.
package transport
import (
context"
"github.com/velotiotech/watermark-service/api/v1/pb/watermark"
"github.com/velotiotech/watermark-service/internal"
"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
grpctransport "github.com/go-kit/kit/transport/grpc"
)
// grpcServer is a struct responsible for creating gRPC servers and handling endpoint mappings.
type grpcServer struct {
get grpctransport.Handler
status grpctransport.Handler
addDocument grpctransport.Handler
watermark grpctransport.Handler
serviceStatus grpctransport.Handler
}
// NewGRPCServer initializes a gRPC server for the provided endpoints.
func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
return &grpcServer{
get: grpctransport.NewServer(
ep.GetEndpoint,
decodeGRPCGetRequest,
decodeGRPCGetResponse,
),
status: grpctransport.NewServer(
ep.StatusEndpoint,
decodeGRPCStatusRequest,
decodeGRPCStatusResponse,
),
addDocument: grpctransport.NewServer(
ep.AddDocumentEndpoint,
decodeGRPCAddDocumentRequest,
decodeGRPCAddDocumentResponse,
),
watermark: grpctransport.NewServer(
ep.WatermarkEndpoint,
decodeGRPCWatermarkRequest,
decodeGRPCWatermarkResponse,
),
serviceStatus: grpctransport.NewServer(
ep.ServiceStatusEndpoint,
decodeGRPCServiceStatusRequest,
decodeGRPCServiceStatusResponse,
),
}
}
// Functions for implementing gRPC service endpoints.
func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.GetReply), nil
}
func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.ServiceStatusReply), nil
}
func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
_, rep, err := g.addDocument.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.AddDocumentReply), nil
}
func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
_, rep, err := g.status.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.StatusReply), nil
}
func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
_, rep, err := g.watermark.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.WatermarkReply), nil
}
func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.GetRequest)
var filters []internal.Filter
for _, f := range req.Filters {
filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
}
return endpoints.GetRequest{Filters: filters}, nil
}
func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.StatusRequest)
return endpoints.StatusRequest{TicketID: req.TicketID}, nil
}
func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.WatermarkRequest)
return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
}
func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.AddDocumentRequest)
doc := &internal.Document{
Content: req.Document.Content,
Title: req.Document.Title,
Author: req.Document.Author,
Topic: req.Document.Topic,
Watermark: req.Document.Watermark,
}
return endpoints.AddDocumentRequest{Document: doc}, nil
}
func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
return endpoints.ServiceStatusRequest{}, nil
}
func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.GetReply)
var docs []internal.Document
for _, d := range reply.Documents {
doc := internal.Document{
Content: d.Content,
Title: d.Title,
Author: d.Author,
Topic: d.Topic,
Watermark: d.Watermark,
}
docs = append(docs, doc)
}
return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
}
func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.StatusReply)
return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
}
func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.WatermarkReply)
return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.AddDocumentReply)
return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
}
func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.ServiceStatusReply)
return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
Step 8: Creating Proto File for gRPC Service
Create a proto file, watermarksvc.proto, in the api/v1/pb
package. This proto file serves as the definition for the service interface and the request/response structures.
syntax = "proto3";
package pb;
service Watermark {
rpc Get (GetRequest) returns (GetReply) {}
rpc Watermark (WatermarkRequest) returns (WatermarkReply) {}
rpc Status (StatusRequest) returns (StatusReply) {}
rpc AddDocument (AddDocumentRequest) returns (AddDocumentReply) {}
rpc ServiceStatus (ServiceStatusRequest) returns (ServiceStatusReply) {}
}
message Document {
string content = 1;
string title = 2;
string author = 3;
string topic = 4;
string watermark = 5;
}
message GetRequest {
message Filters {
string key = 1;
string value = 2;
}
repeated Filters filters = 1;
}
message GetReply {
repeated Document documents = 1;
string Err = 2;
}
message StatusRequest {
string ticketID = 1;
}
message StatusReply {
enum Status {
PENDING = 0;
STARTED = 1;
IN_PROGRESS = 2;
FINISHED = 3;
FAILED = 4;
}
Status status = 1;
string Err = 2;
}
message WatermarkRequest {
string ticketID = 1;
string mark = 2;
}
message WatermarkReply {
int64 code = 1;
string err = 2;
}
message AddDocumentRequest {
Document document = 1;
}
message AddDocumentReply {
string ticketID = 1;
string err = 2;
}
message ServiceStatusRequest {}
message ServiceStatusReply {
int64 code = 1;
string err = 2;
}
This proto file defines the service interface and request/response message structures for gRPC.
Finally, you can generate the corresponding .pb
files from this proto file using the protoc
tool, but the process of generating .pb
files is not covered in this document.
Implementing Request and Response Functions:
To successfully integrate the protocol buffer (protobuf) request and response structs with our endpoint request and response structs, we need to create the necessary decode and encode functions. These functions facilitate the mapping of protobuf Request/Response structs to our endpoint req/resp structs.
Let's illustrate this process with an example: consider the function decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error)
. In this function, we extract the relevant information from grpcReq
, which is asserted to be of type pb.GetRequest
. We then use the extracted fields to populate a new struct of type endpoints.GetRequest{}
. Similar decoding and encoding functions should be implemented for other requests and responses.
Here's the code in the transport
package:
package transport
import (
// Imports...
grpctransport "github.com/go-kit/kit/transport/grpc"
)
type grpcServer struct {
get grpctransport.Handler
status grpctransport.Handler
addDocument grpctransport.Handler
watermark grpctransport.Handler
serviceStatus grpctransport.Handler
}
func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
return &grpcServer{
get: grpctransport.NewServer(
ep.GetEndpoint,
decodeGRPCGetRequest,
decodeGRPCGetResponse,
),
status: grpctransport.NewServer(
ep.StatusEndpoint,
decodeGRPCStatusRequest,
decodeGRPCStatusResponse,
),
addDocument: grpctransport.NewServer(
ep.AddDocumentEndpoint,
decodeGRPCAddDocumentRequest,
decodeGRPCAddDocumentResponse,
),
watermark: grpctransport.NewServer(
ep.WatermarkEndpoint,
decodeGRPCWatermarkRequest,
decodeGRPCWatermarkResponse,
),
serviceStatus: grpctransport.NewServer(
ep.ServiceStatusEndpoint,
decodeGRPCServiceStatusRequest,
decodeGRPCServiceStatusResponse,
),
}
}
func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.GetReply), nil
}
func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.ServiceStatusReply), nil
}
func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
_, rep, err := g.addDocument.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.AddDocumentReply), nil
}
func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
_, rep, err := g.status.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.StatusReply), nil
}
func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
_, rep, err := g.watermark.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.WatermarkReply), nil
}
func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.GetRequest)
var filters []internal.Filter
for _, f := range req.Filters {
filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
}
return endpoints.GetRequest{Filters: filters}, nil
}
func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.StatusRequest)
return endpoints.StatusRequest{TicketID: req.TicketID}, nil
}
func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.WatermarkRequest)
return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
}
func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.AddDocumentRequest)
doc := &internal.Document{
Content: req.Document.Content,
Title: req.Document.Title,
Author: req.Document.Author,
Topic: req.Document.Topic,
Watermark: req.Document.Watermark,
}
return endpoints.AddDocumentRequest{Document: doc}, nil
}
func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
return endpoints.ServiceStatusRequest{}, nil
}
func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.GetReply)
var docs []internal.Document
for _, d := range reply.Documents {
doc := internal.Document{
Content: d.Content,
Title: d.Title,
Author: d.Author,
Topic: d.Topic,
Watermark: d.Watermark,
}
docs = append(docs, doc)
}
return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
}
func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.StatusReply)
return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
}
func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.WatermarkReply)
return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.AddDocumentReply)
return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
}
func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.ServiceStatusReply)
return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
Server Entry Points:
Now, the next step involves creating entry point files (main) in the cmd
directory for each service. Having already mapped the appropriate routes to the endpoints by invoking the service functions and integrated the proto service server with the endpoints using ServeGRPC()
functions, we can focus on constructing and launching the HTTP and gRPC servers.
For this, create a watermark
package in the cmd
directory and a watermark.go
file. This file will contain the code to start and stop the HTTP and gRPC servers for the service.
Here's the code for the main
package:
package main
import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"
pb "github.com/velotiotech/watermark-service/api/v1/pb/watermark"
"github.com/velotiotech/watermark-service/pkg/watermark"
"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
"github.com/velotiotech/watermark-service/pkg/watermark/transport"
"github.com/go-kit/kit/log"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"github.com/oklog/oklog/pkg/group"
"google.golang.org/grpc"
)
const (
defaultHTTPPort = "8081"
defaultGRPCPort = "8082"
)
func main() {
var (
logger log.Logger
httpAddr = net.JoinHostPort("localhost", envString("HTTP_PORT", defaultHTTPPort))
grpcAddr = net.JoinHostPort("localhost", envString("GRPC_PORT", defaultGRPCPort))
)
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
var (
service = watermark.NewService()
eps = endpoints.NewEndpointSet(service)
httpHandler = transport.NewHTTPHandler(eps)
grpcServer = transport.NewGRPCServer(eps)
)
var g group.Group
{
// The HTTP listener mounts the Go kit HTTP handler we created.
httpListener, err := net.Listen("tcp", httpAddr)
if err != nil {
logger.Log("transport", "HTTP", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "HTTP", "addr", httpAddr)
return http.Serve(httpListener, httpHandler)
}, func(error) {
httpListener.Close()
})
}
{
// The gRPC listener mounts the Go kit gRPC server we created.
grpcListener, err := net.Listen("tcp", grpcAddr)
if err != nil {
logger.Log("transport", "gRPC", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "gRPC", "addr", grpcAddr)
// we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
// the here demonstrated zipkin tracing middleware.
baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
pb.RegisterWatermarkServer(baseServer, grpcServer)
return baseServer.Serve(grpcListener)
}, func(error) {
grpcListener.Close()
})
}
{
// This function just sits and waits for ctrl-C.
cancelInterrupt := make(chan struct{})
g.Add(func() error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-c:
return fmt.Errorf("received signal %s", sig)
case <-cancelInterrupt:
return nil
}
}, func(error) {
close(cancelInterrupt)
})
}
logger.Log("exit", g.Run())
}
func envString(env, fallback string) string {
e := os.Getenv(env)
if e == "" {
return fallback
}
return e
}
In this section of the code, we follow these steps:
We specify fixed ports for the server to listen on: 8081 for the HTTP Server and 8082 for the gRPC Server.
We proceed to create both the HTTP and gRPC servers, along with the endpoints of the service backend and the service itself.
We utilize the
oklog.Group
to effectively manage multiple Goroutines. Three Goroutines are established: one for the HTTP server, the second for the gRPC server, and the third for monitoring cancel interrupts. This ensures efficient and concurrent operation of these components.
In summary, we've discussed how to set up decode and encode functions for protobuf structs and create entry point files for HTTP and gRPC servers, all while keeping the code structure intact. The servers, once launched, will be ready to handle incoming requests.
To run the service, execute the following command:
go run ./cmd/watermark/watermark.go
This command will start the watermark service with both HTTP and gRPC servers up and running, ready to process requests.
The server has been initiated locally. You can now test it by either using Postman or executing a curl command to interact with one of the available endpoints. Here's an example:
➜ ~ curl http://localhost:8081/healthz
{"status":200}
This command checks the status of the service, and it should return a response with a "status" of 200.
We have effectively developed a service and executed its endpoints.
In addition:
I always aim to ensure that a project is comprehensive, covering all the necessary aspects related to maintenance. This includes creating a well-structured README, setting up proper .gitignore and .dockerignore files, crafting Makefiles, Dockerfiles, golang-ci-lint configuration files, and configuring CI/CD pipelines, among other considerations.
For the three services, I have created distinct Dockerfiles located in the /images/ path. A multi-staged Dockerfile is employed to create the service's binary and run it. This involves copying the relevant code directories into the Docker image, building the image in a single step, and then creating a new image in the same file while transferring the binary from the previous one. Similar Dockerfiles are established for the other services. In each Dockerfile, the CMD
is set to go run watermark
, serving as the entry point for the container.
To streamline the process, I have also constructed a Makefile with two primary targets: build-image
and build-push
. The former is used to build the Docker image, and the latter is employed to push it to a repository.
Note: Given the scope of this blog, I'm keeping it concise. The provided code repository in the beginning covers most of the critical concepts related to services. I'm continually working on enhancements and additional features, with ongoing commits.
Now, let's delve into the deployment process:
I'll demonstrate how to deploy all these services using container orchestration tools such as Kubernetes. I assume you have prior experience with Kubernetes, at least at a beginner's level.
In the deploy directory, you can create a sample deployment that includes three containers: auth
, watermark
, and database
. Since the entry point commands for each container are already defined in the Dockerfiles, there's no need to specify additional arguments or commands in the deployment.
Additionally, you'll require a service to route external traffic from another load balancer service or a nodeport type service. To make this work, you might need to create a nodeport-type service to expose the watermark-service
and keep it running.
Another crucial and intriguing aspect is the deployment of an API Gateway. Setting up an API Gateway requires a reasonable understanding of the cloud provider's stack. In my case, I've utilized the Azure stack to deploy an API Gateway using a resource called "API-Management" in the Azure platform.
Lastly, the only pending task is establishing a robust CI/CD setup, which is one of the most critical aspects of a project after its development phase. While I'd love to delve into more detail about these deployment-related topics, it goes beyond the scope of my current blog. Perhaps, I will explore these topics in another blog post.
Similar to this, I along with other open-source loving dev folks, run a developer-centric community on Slack. Where we discuss these kinds of topics, implementations, integrations, some truth bombs, weird chats, virtual meets, contribute to open--sources and everything that will help a developer remain sane ;) Afterall, too much knowledge can be dangerous too.
I'm inviting you to join our free community (no ads, I promise, and I intend to keep it that way), take part in discussions, and share your freaking experience & expertise. You can fill out this form, and a Slack invite will ring your email in a few days. We have amazing folks from some of the great companies (Atlassian, Gong, Scaler), and you wouldn't wanna miss interacting with them. Invite Form