Developing your first Azure Durable Function

Saturnino Pimentel - Sep 26 '19 - - Dev Community

This article is part of #ServerlessSeptember. You'll find other helpful articles, detailed tutorials, and videos in this all-things-Serverless content collection. New articles are published every day — that's right, every day — from community members and cloud advocates in the month of September.

Find out more about how Microsoft Azure enables your Serverless functions at https://docs.microsoft.com/azure/azure-functions/.

This year I have been learning about Azure Functions the Serverless offer of Microsoft Azure and how with this approach we can handle a wide range of solutions just paying for the resources that we use. However, some times, we want to keep the state of our workflow using the serverless approach and avoid the complexity of handle the state of those scenarios. Fortunately, Microsoft simplifies our work with the release of Durable Functions Extension.

Azure Durable Functions is an extension of Azure Functions that permits you to write stateful functions in a serverless compute environment.
Source: https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview

Stateful functions mean that you can pass information between two o more functions and keep the information with lest effort. The Durable Functions take care of the state for you, but also you can control the retry attempts in case of failure and if all it fails the framework gives you access to all the steps that the Durable Function followed.

The fundamentals

Before we write our first Durable Function, we need to understand some main concepts like the concept of Orchestrator Function.
The Orchestrator Function defines the workflow inside our Durable Function. It is in charge of call the "activity" functions, and after calling them, the Orchestrator Function sleeps until is triggered again.
The "activity" Function it is just a normal Function that represents a single step in the workflow handled by the Orchestrator Function, they can receive and return data to the Orchestrator Function.
Durable Functions uses the Azure Storage service; they use the Table Storage for store the state of the orchestration and also the Storage Queues for throw the next "activity" through a message.
If you had troubles with some Durable Function, you could watch all the steps executed by your Durable Function using tools like the Azure Storage Explorer and tracking the id provided for the OrchestratorClient instance.

Prerequisites

There are three ways to build your first Azure Durable Function, the first is through the Azure Portal, the second is using the Command Line and the last but not least using Visual Studio 2017 or above.

In this post, we going to use Visual Studio 2019 and C#, but before it is necessary to install the workload of Azure development as is showed in the next image.
WorkLoad

You need to install the CosmosDB emulator to executing your functions in a development environment; you can download it from the next link.
Also, you are going to need the Azure CLI but fortunately, the first time that you execute an Azure Function project Visual Studio going to install it for you.

Optionally you can install the Azure Storage Explorer from the next link, we going to use it to see the content saved in the CosmosDB.

Creating our first Durable Function

To this example, we are going to create a Durable Function with the function chaining pattern, and the next image shows the project architecture.
Architecture

First at all open Visual Studio and search for the function project.
CreateAProject

After, configure your new project settings, set the name, location, solution name and click create.
CreateAProject2

In the following screen select the Azure Functions V2 and the empty Azure Function application template.
CreateAProject3

Visual Studio going to create an empty project.
EmptyFunctionProject

Now that we have our empty project it is necessary to add the next classes; they are going to serve as our models to get information of the Computer Vision Service

public class AnalysisResult
{
    [JsonProperty("categories")]
    public Category[] Categories { get; set; }

    [JsonProperty("description")]
    public Description Description { get; set; }

    [JsonProperty("id")]
    public string Id { get => RequestId; }

    [JsonProperty("requestId")]
    public string RequestId { get; set; }

    [JsonProperty("metadata")]
    public Metadata Metadata { get; set; }
}

public class Description
{
    [JsonProperty("tags")]
    public string[] Tags { get; set; }

    [JsonProperty("captions")]
    public Caption[] Captions { get; set; }
}

public class Caption
{
    [JsonProperty("text")]
    public string Text { get; set; }

    [JsonProperty("confidence")]
    public float Confidence { get; set; }
}

public class Metadata
{
    [JsonProperty("width")]
    public int Width { get; set; }

    [JsonProperty("height")]
    public int Height { get; set; }

    [JsonProperty("format")]
    public string Format { get; set; }
}

public class Category
{
    [JsonProperty("name")]
    public string Name { get; set; }

    [JsonProperty("score")]
    public float Score { get; set; }

    [JsonProperty("detail")]
    public Detail Detail { get; set; }
}

public class Detail
{
    [JsonProperty("landmarks")]
    public object[] Landmarks { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

Now we need to create our Function "starter", it going to get the container name from the uri received for the function's HttpTrigger and Start a new orchestrator, the next code reflects those instructions.

public static class AnalysisFunctionStarter
{
   [FunctionName("AnalysisFunction")]
   public static async Task<HttpResponseMessage> Run(
   [HttpTrigger(AuthorizationLevel.Function,
   "get", Route = null)] HttpRequestMessage req,
   [OrchestrationClient] DurableOrchestrationClient client,
   ILogger log)
   {
      log.LogInformation("C# HTTP trigger function processed a request.");

      string container = req.RequestUri.ParseQueryString()["container"];

      if (string.IsNullOrEmpty(container))
      {
         return req.CreateResponse(HttpStatusCode.BadRequest,
                    "Please provide the name of the container");
      }

      log.LogInformation($"Orchestation started for the container {container}");

      string orchestationId = await client.StartNewAsync("O_ProccessImage", container);
      return client.CreateCheckStatusResponse(req, orchestationId);
    }
}
Enter fullscreen mode Exit fullscreen mode

Notice the string O_ProccessImage this is a way that we can use to name the orchestrator. You can name the orchestrator whatever you want, and I recommend you to follow the pattern for keeping the order and make easy to understand the code use the prefix O for orchestrator and A for the activities.

The next is the orchestrator's code. In this code, we are going to call the activities functions and also pass information among the activities.

public class ProcessImageOrchestrators
{
   [FunctionName("O_ProccessImage")]
   public static async Task
   ProccessImage(
   [OrchestrationTrigger]DurableOrchestrationContext context,
   ILogger logger)
   {
      string containerName = context.GetInput<string>();

      List<string> blobList = await
                context.CallActivityAsync<List<string>>("A_GetBlockBlobStorageItemsUris", containerName);

      Task<AnalysisResult>[] analysisTasks =
                new Task<AnalysisResult>[blobList.Count];

      for (int i = 0; i < blobList.Count; i++)
      {
         ImageInformation imageInformation = new ImageInformation()
         {
            BlobName = blobList[i],
            ContainerName = containerName
         };
         analysisTasks[i] = context.CallActivityAsync<AnalysisResult>("A_AnalizeImage",
                    imageInformation);
      }
      await Task.WhenAll(analysisTasks);
      Task[] saveTasks =
             new Task[analysisTasks.Length];
      for (int i = 0; i < analysisTasks.Length; i++)
      {
         saveTasks[i] = context.CallActivityAsync("A_SaveResult",
                    analysisTasks[i].Result);
      }
      await Task.WhenAll(saveTasks);
   }
}
Enter fullscreen mode Exit fullscreen mode

You must name the function in the same way as you call in the line string orchestrationId = await client.StartNewAsync("O_ProccessImage", container);

Now we need to add the code for all the activities functions that the orchestrator needs to execute, the first function going to return the list of the blobs available in the container with the name that we provided to the starter.

The second function is going to send the blob image to the Computer Vision service of Azure to get the information of the image.

The third function is going to save the results of the analysis in our Cosmos DB service.

public static class ProccessImageActivities
{
    [FunctionName("A_GetBlockBlobStorageItemsUris")]
    public static async Task<List<string>> GetBlockBlobStorageItemsNames(
        [ActivityTrigger]string containerName,
        ILogger logger)
        {
            CloudBlobContainer cloudBlobContainer = await GetCloudBlobContainer(containerName);

            int i = 0;
            BlobContinuationToken continuationToken = null;
            List<string> results = new List<string>();
            do
            {
                BlobResultSegment resultSegment = await cloudBlobContainer
                    .ListBlobsSegmentedAsync("", true, BlobListingDetails.All, 10, continuationToken, null, null);
                if (resultSegment.Results.Any())
                {
                    Console.WriteLine("Page {0}:", ++i);
                }
                foreach (IListBlobItem item in resultSegment.Results)
                {
                    CloudBlockBlob blob = (CloudBlockBlob)item;
                    await blob.FetchAttributesAsync();
                    logger.LogInformation(blob.Name);
                    results.Add(blob.Name);
                }
            } while (continuationToken != null);
            return results;
        }

    [FunctionName("A_AnalizeImage")]
    public static async Task<AnalysisResult> AnalizeImageAndSaveImage(
        [ActivityTrigger] ImageInformation imageInformation,
        ILogger logger)
        {
            logger.LogInformation("por iniciar el análisis de imágenes");
            AnalysisResult analysisResult = null;
            CloudBlobContainer cloudBlobContainer =
                await GetCloudBlobContainer(imageInformation.ContainerName);
            CloudBlockBlob blockBlob =
                cloudBlobContainer.GetBlockBlobReference(imageInformation.BlobName);

            using (MemoryStream memoryStream = new MemoryStream())
            {
                await blockBlob.DownloadToStreamAsync(memoryStream);
                try
                {
                    analysisResult = await AnalizeImageAsync(memoryStream.GetBuffer());
                }
                catch (Exception ex)
                {
                    logger.LogError(ex, $"Error {nameof(AnalizeImageAndSaveImage)} \n{blockBlob?.Name}");
                }

            }
            return analysisResult;
        }

    [FunctionName("A_SaveResult")]
    public static async Task SaveResult(
        [ActivityTrigger]AnalysisResult analysisResult,
        [CosmosDB(
            databaseName: "AnalysisResult",
            collectionName: "ImageAnalysis",
            ConnectionStringSetting = "CosmosDBConnection",
        CreateIfNotExists =true)]IAsyncCollector<AnalysisResult> analysisResultItems,
        ILogger logger)
        {
            logger.LogInformation("Iniciando guardado de datos en cosmos db");
            if (analysisResult != null)
            {
                await analysisResultItems.AddAsync(analysisResult);
            }
        }

    private static async Task<AnalysisResult> AnalizeImageAsync(byte[] imageArray)
        {
            HttpClient httpClient = new HttpClient();
            string token = Environment.GetEnvironmentVariable("ComputerValidationToken");
            httpClient.DefaultRequestHeaders.Add("Ocp-Apim-Subscription-Key", token);
            HttpResponseMessage response;
            string computerVisionUrl = Environment.GetEnvironmentVariable("ComputerVisionUrl");
            using (StreamContent content =
                new StreamContent(new MemoryStream(imageArray)))
            {
                content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
                response =
                    await httpClient.PostAsync(computerVisionUrl, content);
            }

            response.EnsureSuccessStatusCode();
            string responseString = await
                response.Content.ReadAsStringAsync();
            AnalysisResult analysisResult = JsonConvert.DeserializeObject<AnalysisResult>(responseString);

            return analysisResult;
        }

    private static async Task<CloudBlobContainer> GetCloudBlobContainer(string containerName)
        {
            string connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
            CloudStorageAccount cloudStorageAccount = CloudStorageAccount.Parse(connectionString);
            CloudBlobClient cloudBlobClient =
                cloudStorageAccount.CreateCloudBlobClient();
            CloudBlobContainer cloudBlobContainer =
                cloudBlobClient.GetContainerReference(containerName);
            await cloudBlobContainer.CreateIfNotExistsAsync();
            return cloudBlobContainer;
        }

}
Enter fullscreen mode Exit fullscreen mode

As you can see, the orchestrator can pass the information from a Function to others.

As the last step, it is necessary to provide the settings, write the follow keys and provide the values for the Durable Function execution.

{
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "AzureWebJobsDashboard": "UseDevelopmentStorage=true",
    "CosmosDBConnection": "",
    "ComputerValidationToken": "YourToken",
    "ComputerVisionUrl": "https://southcentralus.api.cognitive.microsoft.com/vision/v1.0/analyze"
  }
}
Enter fullscreen mode Exit fullscreen mode

The value "UseDevelopmentStorage=true" used in the settings "AzureWebJobsStorage" and "AzureWebJobsDashboard" indicates that the Function going to use the Azure Storage Emulator.
In the key "CosmosDBConnection" you need to set the CosmoDB connection string, you can get it from the CosmosDB emulator settings.
In the key "ComputerValidationToken" you need to set the Computer Vision key you can obtain it from the Azure Portal.
The "ComputerVisionUrl" is the URL that you are going to use for the image analysis, if you want to realize other kinds of analysis, please look at the API documentation in the following link.

At this point, we can execute our Durable Function and call to the service endpoint passing the container name as is showed in the next images where I used postman to make the call.
Alt Text
Alt Text

When the process finishes, you can check the data saved in the CosmoDB emulator using the Azure Storage Explorer.
CosmosDB

The code showed here was written in C#, but Durable Functions supports the following languages:
C#: both precompiled class libraries and C# script.
F#: precompiled class libraries and F# script are supported. F# script is only supported for version 1.x of the Azure Functions runtime.
JavaScript: supported only for version 2.x of the Azure Functions runtime. Your requires the version 1.7.0 of the Durable Functions extension or a later version.

Summary

Durable Functions are useful for scenarios where you want Stateful Functions. In this post, we describe and develop a Durable Function that uses the Function chaining pattern, but you can follow some of the patters that are listed below.

  • Function chaining
  • Fan-out/fan-in
  • Async HTTP APIs
  • Monitoring
  • Human interaction
  • Aggregator

More information in the next link.

With all these supported scenarios, you can improve your processes with an approach where you just pay for the resources that you use and with less effort and complexity that before the Durable Functions were released.

I want to finish this post saying that the Azure Functions Durable Extension is an open-source project and the source code is in the next link.

GitHub repository

@SaturPimentel

.