Actors and Workflows: Building a Customer Loyalty Program with Temporal

Fitz - Aug 3 '23 - - Dev Community

This post is technically a followup of another post. You don't need to read that one to make sense of this one, but it might give some useful background.

That post talked through how the Actor Model can be implemented using "Workflows" (on https://github.com/temporalio/temporal), even though these two concepts don't immediately appear compatible.

Here, I dive into a concrete example: a Workflow representing a customer's loyalty status.

If you want to skip the prose and just jump right into the code, you can find it all in this GitHub repository, with implementations in Go, Java, and Python.

Actor Model Refresher

As formally defined, Actors must be able to do three things:

  1. Send and receive messages
  2. Create new Actors
  3. Maintain state

Exact implementation details vary depending on what framework, library, or tools you're using, but the biggest challenge is having some kind of software artifact running somewhere that can handle these things.

That's where most Actor frameworks come in to help: providing both the programming model and the runtime environment for being able to build an Actor-based application in a highly distributed, concurrent, and scalable way.

Temporal differs here in that it’s general-purpose, rather than specific to one model or system design pattern. With Workflows, you define a function that Temporal will ensure runs to completion (or reliably runs forever, if the function doesn’t return).

I recognize that statement is both rather bold and also so generic as to be hard to disprove. So, let's look at a concrete example.

Loyal Customers

Many consumer businesses have some kind of loyalty program. Buy 10 items, get the 11th free! Fly 10,000 miles, get free access to the airport lounge! Earn one million points over the lifetime of your account, earn a gold star!

At the highest level, the application's logic isn't complex: Each customer has an integer counter that's incremented after the customer does certain things (e.g., buy something, or take a trip). When that counter crosses different thresholds, new rewards are unlocked. And, although we may not like it, customers can always close their accounts.

When we create the diagram for the app, it might look like this:

customer loyalty diagram version 1

In terms of the Actor Model, two of the three requirements are on display:

  1. Send and receive messages: A customer can send either an "earn points" message or a "try to use reward" message.
  2. Create new Actors: ??? (This is the Actor requirement not apparent in this application, but we'll see later how it can be incorporated.)
  3. Maintain state: A customer loyalty account needs to maintain the points counter and which rewards are unlocked (or be able to look up this information based on the points value).

Requirement #2, the ability to create other Actors, isn't immediately obvious here, but it isn't too far out of reach. We could define in this example application that one of the rewards for earning enough points is the ability to gift status to someone else, inviting them (i.e., creating their account) to the program if they aren't already a member.

If our goal is to create a demo application for the Actor Model (as it is in this post), then there's actually one other thing missing: the ability for a customer (or rather, their loyalty account) to send messages. For that, we could also declare that customers with enough points can gift points or status levels (i.e., which rewards are unlocked) to their guests. Then they can send messages, too!

Reworking the previous diagram to be more befitting of a full "Actor," we'd get the following:

customer loyalty diagram version 2

And, as for the exact implementation details, read on!

Loyal (Temporal!) Customers

Imagine being able to write the customer loyalty program above in just a function or two. Conceptually, that's not hard. In pseudocode, that might look like the following:

INVITE_REWARD_MINIMUM_POINTS = 1000

function CustomerLoyaltyAccount:
    account_canceled = false
    points = 0

    while !account_canceled:
        message = receive_message()
        switch message.type:
            case 'cancel':
                account_canceled = true
            case 'add_points':
                fallthrough
            case 'gift_points':
                points += message.value
            case 'invite_guest'
                if points >= INVITE_REWARD_MINIMUM_POINTS:
                    spawn(new CustomerLoyaltyAccount())
Enter fullscreen mode Exit fullscreen mode

But there are a few crucial details that are, well, rather undefined in this pseudo-function. Specifically:

  1. What's receive_message() doing? How is it receiving messages?
  2. Similarly, what's spawn(new CustomerLoyaltyAccount()) doing?
  3. And most importantly, where is this function running? What happens if that runtime crashes or the function otherwise stops running?

Each of these maps to core Temporal features that we can implement in an example Workflow:

  1. Data can be sent to Workflows via Signals
  2. Workflows can create new Workflow instances
  3. As long as there are Workers running somewhere that can communicate with the Temporal Server, then if the Worker running the function dies, the function will continue running on another (you know, kind of Temporal's main benefit)

Customers Go Loyal

Let's build this up in Go. If you are more comfortable with other languages, I've also written the same Workflow in Python and Java. While the languages are different, most of the same concepts and patterns should carry over.

(For brevity in the body of this blog post, I'll in most cases omit error handling but include it when non-trivial and relevant.)

First, we write the skeleton of a Workflow and an Activity. For some of the milestones in a customer's lifecycle, it'd be nice to send them some kind of notification. In a real application, you'd call out to SendGrid, Mailchimp, Constant Contact, or some other email provider, but for simplicity's sake, I'm just logging out the details. This initial Workflow does just that: if it's a new customer, send a welcome email, but otherwise move on.

func CustomerLoyaltyWorkflow(ctx workflow.Context, customer CustomerInfo, newCustomer bool) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("Loyalty workflow started.", "CustomerInfo", customer)

  var activities Activities

    if newCustomer {
        logger.Info("New customer workflow; sending welcome email.")
        err := workflow.ExecuteActivity(ctx, activities.SendEmail,
            fmt.Sprintf("Welcome, %v, to our loyalty program!", customer.Name)).
            Get(ctx, nil)
        if err != nil {
            logger.Error("Error running SendEmail activity for welcome email.", "Error", err)
        }
    } else {
        logger.Info("Skipping welcome email for non-new customer.")
    }

    // ... [to be added later] ... //

    return nil
}

type Activities struct {
    Client client.Client
}

func (*Activities) SendEmail(ctx context.Context, body string) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Sending email.", "Contents", body)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Next up, we need to be able to handle messages. This is the primary thing the Workflow (i.e., customer loyalty Actor) does: sit around waiting for new messages to come in.

The following code replaces the // ... [to be added below] ... // line from the previous snippet:

  selector := workflow.NewSelector(ctx)

    // Signal handler for adding points
    selector.AddReceive(workflow.GetSignalChannel(ctx, "addPoints"),
        func(c workflow.ReceiveChannel, _ bool) {
            signalAddPoints(ctx, c, &customer)
        })

    // Signal handler for canceling account
    selector.AddReceive(workflow.GetSignalChannel(ctx, "cancelAccount"),
        func(c workflow.ReceiveChannel, _ bool) {
            signalCancelAccount(ctx, c, &customer)
        })

    // ... [register other Signal handlers here] ... //

  logger.Info("Waiting for new messages")
    for customer.AccountActive {
        selector.Select(ctx)
    }
Enter fullscreen mode Exit fullscreen mode

The Signal handler function for adding points does very little, adding in the given points to the customer's state and then sending an email to the customer with the new value.

As you might imagine, the cancel account handler is very similar, setting the customer.AccountActive flag used above to false and then notifying the customer.

func signalAddPoints(ctx workflow.Context, c workflow.ReceiveChannel, customer *CustomerInfo) {
    logger := workflow.GetLogger(ctx)
    var activities Activities

    var pointsToAdd int
    c.Receive(ctx, &pointsToAdd)

    logger.Info("Adding points to customer account.", "PointsAdded", pointsToAdd)
    customer.LoyaltyPoints += pointsToAdd

    err := workflow.ExecuteActivity(ctx, activities.SendEmail,
        fmt.Sprintf("You've earned more points! You now have %v.", customer.LoyaltyPoints)).
        Get(ctx, nil)
    if err != nil {
        logger.Error("Error running SendEmail activity for added points.", "Error", err)
    }

    // ... [insert logic for unlocking status levels or rewards] ... //
}
Enter fullscreen mode Exit fullscreen mode

All combined, the code so far does three things:

  1. First, it registers the signalAddPoints and signalCancelAccount functions as the handlers for the "addPoints" and "cancelAccount" Signals, respectively.
  2. Then, it blocks forward progress on the Workflow, via selector.Select(ctx), until a registered Signal comes in. Unless that Signal is "cancelAccount," the Workflow will keep looping on this select.
  3. I've chosen for this application to not fail the Workflow when an email fails to send. This keeps the Workflow representing the customer's loyalty account active and running even in spite of external system failure.
    • For that, you'll want to set an appropriate retry policy to ensure that the Workflow doesn't completely block on email failures, for example by setting the MaximumAttempts to a reasonably low number like 10.

Already this gives us most of the application. We have a function that runs perpetually, thanks to Temporal, and can receive two different kinds of messages, both of which modify the state of the Workflow with one that also results in the Workflow finishing.

What remains is a couple of more Temporal-specific considerations.

Long-Lived Customers

In my last post, I spilled many words on the topic of "Continue-As-New." If you didn't—or don't want to!—read those words, the gist is this: at some point, a Workflow's history may get unwieldily big; Continue-As-New resets it.

For this customer loyalty example Workflow, the far-and-away biggest contributor to the Event History is the number of events, not the size. With the addPoints Signal only taking a single integer argument and the cancelAccount Signal taking none, the combined contribution to the size of the history is minimal.

A Signal with only a single integer parameter will, by itself, contribute one Event and about 500 bytes to the History, even with very large values. And so, how many of these Signals would be required to hit either the size or length limits?

If nothing else happened but addPoints Signals, it'd take 51,200 of them to reach the length limit, but 50 * 1024 * 1024 / 500 or 104,857.6 to reach the size limit. Knowing that many of these Signals will result in the SendEmail Activity running, and each Activity contributes a handful of (small) events to the history, this Workflow will hit the History length limit well before the size limit.

So, let's add a check for that into our Workflow loop:

    const eventsThreshold = 10_000
    // ... snip ...

    info := workflow.GetInfo(ctx)

    logger.Info("Waiting for new messages")
    for customer.AccountActive && info.GetCurrentHistoryLength() < eventsThreshold {
        selector.Select(ctx)
    }
Enter fullscreen mode Exit fullscreen mode

Finally, trigger Continue-As-New as needed, draining any pending signals:

    if customer.AccountActive {
        logger.Info("Account still active, but hit continue-as-new threshold.")
        // Drain signals before continuing-as-new
        for selector.HasPending() {
            selector.Select(ctx)
        }
        return workflow.NewContinueAsNewError(ctx, CustomerLoyaltyWorkflow, customer, false)
    }
Enter fullscreen mode Exit fullscreen mode

My previous post on this topic explained in a little more detail about why it's necessary to drain signals before continuing-as-new. To briefly recap, Continue-As-New finishes the current Workflow run and starts a new instance of the Workflow regardless of any pending Signals. If we don't drain (and handle!) Signals before calling workflow.NewContinueAsNewError (or workflow.continue_as_new in Python, or Workflow.continueAsNew in Java), those pending Signals will be forever lost.

The last major thing this Workflow needs to make it a true, stage-worthy Actor is the ability to create others.

Spawning New Customers

While Temporal has support for Parent/Child relationships between Workflows, in this customer loyalty application, the only thing we need is the ability to send a message from one to the other in the case of gifting status or points.

Temporal provides an API in the Client that can do this and create other Workflows all in one call, called Signal-with-Start. Since this is only available in the Client, not from a Workflow, we'll need to do this in an Activity.

First, I'm setting the ID Reuse Policy to REJECT. This is in some ways a "business logic" kind of decision, where I'm declaring that once a customer's account is closed, it can't be re-invited. (Note that after a namespace's retention period has passed, IDs from closed Workflows can be reused regardless of this policy, and so in a real-life production version of this app, you'd want to have this check an external source for customer account statuses.)

func (a *Activities) StartGuestWorkflow(ctx context.Context, guest CustomerInfo) error {
    // ...
    workflowOptions := client.StartWorkflowOptions{
        TaskQueue:             TaskQueue,
        WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
    }
Enter fullscreen mode Exit fullscreen mode

Then, we can call Client.SignalWithStartWorkflow:

    logger.Info("Starting and signaling guest workflow.", "GuestID", guest.CustomerID)
    _, err := a.Client.SignalWithStartWorkflow(ctx, CustomerWorkflowID(guest.CustomerID),
        SignalEnsureMinimumStatus, guest.StatusLevel.Ordinal,
        workflowOptions, CustomerLoyaltyWorkflow, guest, true)
Enter fullscreen mode Exit fullscreen mode

Note the use of the Client from the Activities receiver struct! I'm making use of something in the way Temporal works in Go: if, when we instantiate and register the Activities in the Worker, we also set this Client, then the same connection will be available within the Activities. This way, we don't have to worry about re-creating the Client.

I'm also ignoring the returned future from SignalWithStartWorkflow via a Go convention of assigning to _; because this "guest" Workflow is expected to run indefinitely long, blocking on its results would prevent the original Workflow from doing anything else. Since the future returned from starting a Workflow is either used for waiting for the Workflow to finish, or getting its IDs (which we already know from the CustomerWorkflowID(guest.CustomerID) call), we can safely ignore it.

But, it's still necessary to handle the error. With the ID Reuse Policy set to REJECT, retrying the resulting error from trying to start a an already-closed Workflow will get us nowhere, and so we should instead send some useful information back to the Workflow:

    target := &serviceerror.WorkflowExecutionAlreadyStarted{}
    if errors.As(err, &target) {
        return GuestAlreadyCanceled, nil
    } else if err != nil {
        return -1, err
    }

    return GuestInvited, nil
}

// ... [Defined at top] ...
type GuestInviteResult int

const (
    GuestInvited GuestInviteResult = iota
    GuestAlreadyCanceled
)
Enter fullscreen mode Exit fullscreen mode

Back in the Workflow, after running this Activity I can then check for that error and notify the customer as appropriate. As before, I'm allowing the Workflow to continue if sending the email failed. But if that SignalWithStartWorkflow call failed for any reason other than the guest's account already existing, I want to make some noise and fail the Workflow—something unusual is likely happening.

var inviteResult GuestInviteResult
err := workflow.ExecuteActivity(ctx, activities.StartGuestWorkflow, guest).
    Get(ctx, &inviteResult)
if err != nil {
    return fmt.Errorf("could not signal-with-start guest/child workflow for guest ID '%v': %w", guestID, err)
}

if inviteResult == GuestAlreadyCanceled {
    emailToSend = "Your guest has canceled!"
} else {
    emailToSend = "Your guest has been invited!"
}

err := workflow.ExecuteActivity(ctx, activities.SendEmail, emailToSend).Get(ctx, nil)
Enter fullscreen mode Exit fullscreen mode

This snippet of code would end up being in a Signal handler for something like an "invite guest" Signal. The handler would also include, as discussed at the top of this post, a check for if the current customer is even allowed to do this action.

Summing it all up

There are a few other things to explore in this app, like catching a cancellation request or looking through the tests, but this post has gotten long enough as it is. 🙂

Hopefully this post serves as a nice "close-to-real-world" example for you of how to build something that looks like an "Actor"—aka, a really, really long running Workflow that can send and receive messages and maintain state without a database—using Temporal.

For more information related to this post and about Temporal, check out the following links:

And the best way to learn Temporal is with our free courses.

Cover image from John Jennings on Unsplash

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .