It’s the 23rd day of Makuake Advent Calendar 2022 .
As we know OpenTelemetry is an observability framework to generate and export telemetry data. As more companies adopt microservices and SLI/SLOs, we need it to answer new, never-before-seen (and complex) questions.
In that context, observability of systems that communicate using asynchronous messaging is as important as systems communicates using synchronous messagin like HTTP or gRPC.
How about a notification system as an example:
- How long does it take for the end-user to receive the notification after the notification event is fired?
- What’s the error rate access the notification lifecycle?
To answer those questions, we may need expensive and unique structures.
Instead, in this article, I’ll demonstrate an easy way to do that with OpenTelemetry.
Example Case
All source code is here:
https://github.com/ymtdzzz/batch-tracing-sample
Processing flow:
- Enqueue the notification content as a message into a queue (Rabbit MQ) in batch
- A worker (consumer) asynchronously receives the message and send a request to the notification server (/email or /push)
- In the notification server, it responses 200 or 500
It’s assumed that the instrumentation of each component has been completed, and the HTTP communication has also been instrumented by net/http auto instrumentation library.
Problem
In the current state, batch processing and subsequent processing (worker) cannot be traced.
Moreover, there doesn’t seem to be an instrumentation library for RabbitMQ in Golang.
https://opentelemetry.io/registry/?s=rabbitmq&component=&language=
What Should We Do?
To propagate context, we can use OpenTelemetry Propagator
for both sync and async types of messaging!
Implement Custom Propagator for RabbitMQ
What’s Propagator?
https://opentelemetry.io/docs/reference/specification/context/api-propagators/
Propagator API is interface definitions for propagating contexts across process - how sender *Inject*s context into message and how receiver *Extract*s it from message. Propagator has Carrier which has a responsibility to actual injection and extraction from any type of messages.
Fortunately, RabbitMQ allows to put key-value
format Headers
in messages (docs), so we can use TextMapPropagator
.
Propagator Implementation
Actually, since it is not Propagator but the Carrier that manipulates the TextMap
, all we have to do is implementing the struct that satisfies the TextMapCarrier
interface!
TextMapCarrier
interface (doc):
type TextMapCarrier interface {
// Get returns the value associated with the passed key.
Get(key string) string
// Set stores the key-value pair.
Set(key string, value string)
// Keys lists the keys stored in this carrier.
Keys() []string
}
Carrier implementation for this interface (source code):
type AMQPCarrier struct {
headers amqp.Table
}
func (c *AMQPCarrier) Get(key string) string {
return fmt.Sprintf("%s", c.headers[key])
}
func (c *AMQPCarrier) Set(key string, value string) {
c.headers[key] = value
}
func (c *AMQPCarrier) Keys() []string {
keys := make([]string, len(c.headers))
for k := range c.headers {
keys = append(keys, k)
}
return keys
}
amqp.Table
is just map[string]interface{}
. Get()
implementation is a little rough but it’s enough for example… ;)
Sender Side Implementation
At the sender side, we can inject context into header and just send the message (source code).
<span class="c">// Create an empty amqp.Tables</span>
<span class="n">headers</span> <span class="o">:=</span> <span class="n">amqp</span><span class="o">.</span><span class="n">NewConnectionProperties</span><span class="p">()</span>
<span class="c">// Assign it to custom Carrier</span>
<span class="n">carrier</span> <span class="o">:=</span> <span class="n">internal</span><span class="o">.</span><span class="n">NewAMQPCarrier</span><span class="p">(</span><span class="n">headers</span><span class="p">)</span>
<span class="c">// Inject the context</span>
<span class="n">otel</span><span class="o">.</span><span class="n">GetTextMapPropagator</span><span class="p">()</span><span class="o">.</span><span class="n">Inject</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span> <span class="n">carrier</span><span class="p">)</span>
<span class="n">err</span> <span class="o">=</span> <span class="n">ch</span><span class="o">.</span><span class="n">PublishWithContext</span><span class="p">(</span>
<span class="n">ctx</span><span class="p">,</span>
<span class="s">""</span><span class="p">,</span>
<span class="n">q</span><span class="o">.</span><span class="n">Name</span><span class="p">,</span>
<span class="no">false</span><span class="p">,</span>
<span class="no">false</span><span class="p">,</span>
<span class="n">amqp</span><span class="o">.</span><span class="n">Publishing</span><span class="p">{</span>
<span class="n">ContentType</span><span class="o">:</span> <span class="s">"application/octet-stream"</span><span class="p">,</span>
<span class="n">Body</span><span class="o">:</span> <span class="n">msg</span><span class="p">,</span>
<span class="n">Headers</span><span class="o">:</span> <span class="n">headers</span><span class="p">,</span> <span class="c">// Assign the context injected headers</span>
<span class="p">},</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
<span class="nb">panic</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
<span class="p">}</span>
<span class="n">log</span><span class="o">.</span><span class="n">Println</span><span class="p">(</span><span class="s">"Message has been sent"</span><span class="p">)</span>
Receiver Side Implementation
Receiver side is the same.
<span class="c">// Assign the received headers to custom Carrier</span>
<span class="n">carrier</span> <span class="o">:=</span> <span class="n">internal</span><span class="o">.</span><span class="n">NewAMQPCarrier</span><span class="p">(</span><span class="n">d</span><span class="o">.</span><span class="n">Headers</span><span class="p">)</span>
<span class="c">// Extract the context</span>
<span class="n">ctx</span> <span class="o">:=</span> <span class="n">otel</span><span class="o">.</span><span class="n">GetTextMapPropagator</span><span class="p">()</span><span class="o">.</span><span class="n">Extract</span><span class="p">(</span><span class="n">context</span><span class="o">.</span><span class="n">Background</span><span class="p">(),</span> <span class="n">carrier</span><span class="p">)</span>
<span class="c">// Generate child Span with received context as parent Span</span>
<span class="n">ctx</span><span class="p">,</span> <span class="n">span</span> <span class="o">:=</span> <span class="n">otel</span><span class="o">.</span><span class="n">Tracer</span><span class="p">(</span><span class="s">"notification"</span><span class="p">)</span><span class="o">.</span><span class="n">Start</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span> <span class="s">"consume"</span><span class="p">)</span>
<span class="n">msg</span><span class="p">,</span> <span class="n">err</span> <span class="o">:=</span> <span class="n">internal</span><span class="o">.</span><span class="n">DecodeNotificationMessage</span><span class="p">(</span><span class="n">d</span><span class="o">.</span><span class="n">Body</span><span class="p">)</span>
<span class="k">if</span> <span class="n">err</span> <span class="o">!=</span> <span class="no">nil</span> <span class="p">{</span>
<span class="nb">panic</span><span class="p">(</span><span class="n">err</span><span class="p">)</span>
<span class="p">}</span>
<span class="n">log</span><span class="o">.</span><span class="n">Printf</span><span class="p">(</span><span class="s">"received msg: %v</span><span class="se">\n</span><span class="s">"</span><span class="p">,</span> <span class="n">msg</span><span class="p">)</span>
<span class="n">internal</span><span class="o">.</span><span class="n">CallServer</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span> <span class="o">&</span><span class="n">client</span><span class="p">,</span> <span class="n">msg</span><span class="p">)</span>
<span class="n">span</span><span class="o">.</span><span class="n">End</span><span class="p">()</span>
We’re All Set 🎉
Now, let’s start the apps and check the Jaeger UI endpoint (http://localhost:16686/).
By connecting the traces, we can now investigate any errors throughout the notification lifecycle easily.
Moreover, since the duration of the entire Trace is able to be measured, we can analyze bottlenecks of performance decreasing and notice slow notifications based on user experience.
I hope you learned something new from this post, please let me know if you have any feedback in the comments or Twitter!