In the last blog post, I demonstrated how to decompose the sign-up and send-mail functions into two separate workers.
One reason was to use Cloudflare Queues to scale the Workers logically independently.
I recently learned from Harshil at Cloudflare that it's possible to scale up different functions within a single worker using a Queue.
Meaning you can include the code of your Producer and Consumer into the same worker and they execute separately.
Coming from a service-oriented mindset, this was mind-blowing and new to me, and I'm still quite amazed by it. So let's break it down.
How does this work
Every worker in Cloudflare is single-threaded.
When a worker produces a message to a Queue and the same worker also implements a consumer to handle the message, Cloudflare handles concurrency and ensures each message gets processed by scaling up the workers automatically.
This mechanism allows you to scale up different functions within a single worker, meaning the same worker that produces a message to a queue also implements a consumer that handles the same message.
Example Let's move the code back into a single worker and implement
newsletter functionality for all signed-up users. This functionality will retrieve all existing users from the database and send them newsletters using a handleNewsletterMessage
function. Instead of calling the function directly from the index.tsx file, we send a message for each database entry to the Queue.
In addition, we implement in the same worker a consumer that will handle the messages in batches from the Queue.
When a worker produces a message to a Queue and implements a consumer, Cloudflare will automatically scale up the worker when needed. This means our Worker can scale horizontally based on the Queue's workload.
It is also important to note that by using a consumer handler, the life cycle of the worker will be controlled by Cloudflare's Queue. There is a detailed post about the internals of Queues and how they work.
Implementation
Make sure to include the Queue bindings in your wrangler.toml
file for both consumer and producer, and add the Binding to your index.tsx
file.
Now let's add a endpoint to send a newsletter, get the database entries and produce a message for each entry to the Queue.
app.post("/api/send-newsletter", async (c) => {
const { newsletterText, subject } = await c.req.json();
// Connect to the database
const sql = neon(c.env.DATABASE_URL);
const db = drizzle(sql);
// Fetch all registered users
const allUsers = await db.select().from(runners);
// Send newsletter to each user
for (const user of allUsers) {
try {
console.log(
`Sending message to queue for user: ${user.firstName} (${user.email})`
);
await c.env.NEWSLETTER_QUEUE.send({
email: user.email,
firstName: user.firstName,
newsletterText,
subject,
type: "newsletter"
});
} catch (error) {
console.error(`Failed to send newsletter to ${user.email}:`, error);
}
}
return c.json({
message: `Newsletter queued for ${allUsers.length} recipients`,
status: "success"
});
});
Next, we need to implement a queue handler within the same worker to send out the emails.
export default {
fetch: instrument(app).fetch,
async queue(batch: MessageBatch<any>, env: Bindings): Promise<void> {
await handleSignUpMessage(message as Message<RunnerData>, env);
}
};
You can configure the batch size and other Queue parameters in the wrangler.toml
file. For more details about Queue configuration, check out the previous blog post in this series.
A second queue handler
It is also possible to have two different consumers in the same worker, each receiving message batches from different queues. Therefore, you might want to consider moving the logic for sending the registration email after signing up for the Marathon to a queue handler.
This approach can be useful if you expect a high load during the registration opening. For example, assume this worker handles signups for the New York Marathon. As a passionate runner, I know how busy the site gets during the first few hours after registration opens.
Instead of sending the email within the thread that starts when someone hits the api/marathon-sign-up
route, you can publish a message to a different queue.
//delete this line
c.executionCtx.waitUntil(sendMail(email, c.env.RESEND_API, firstName));
//add in this line
c.executionCtx.waitUntil(
c.env.SIGN_UP_QUEUE.send({
email,
firstName
})
);
Now you can implement two consumers for your worker
export default {
fetch: instrument(app).fetch,
async queue(
batch: MessageBatch<NewsletterMessage | RunnerData>,
env: Bindings
) {
batch.messages.forEach(async (message) => {
switch (batch.queue) {
case "sign-up-queue":
await handleSignUpMessage(message as Message<RunnerData>, env);
break;
case "newsletter-queue":
await handleNewsletterMessage(
message as Message<NewsletterMessage>,
env
);
break;
}
});
}
};
Make sure to include the sign-up-queue
as a producer and consumer in your wrangler.toml
.
The image below shows the architecture of a Worker with two Queue consumers.
Conclusion
Cloudflare Queues provide a powerful way to scale different functions within a single worker. By producing messages to a Queue and implementing a consumer in the same worker, you can achieve horizontal scaling while maintaining your code in a monolithic codebase.
So while it might seem counterintuitive at first, combining Producer and Consumer code in one Worker is a legitimate pattern when using Cloudflare Queues, as the platform handles the separation of concerns at runtime through its instance creation mechanism.
However, there are still valid reasons to split your code into multiple workers:
- Independent deployability of services
- Separation of concerns
- Team organization, especially when different teams are responsible for different services
The choice between a single worker with Queues or multiple workers depends on your specific needs and organizational structure.