Data on Kubernetes: Part 4 - Argo Workflows: Simplify parallel jobs : Container-native workflow engine for Kubernetes 🔮

saifeddine Rajhi - Jul 29 - - Dev Community

🐙 Introduction:

Welcome to the fourth part of the "Data on Kubernetes" series! 
In this part, we'll explore Argo Workflows - an open-source, container-native workflow engine designed specifically for orchestrating parallel jobs on Kubernetes. 

Argo Workflows seamlessly integrates with Kubernetes services like volumes, secrets, and RBAC, making it a powerful tool for managing complex data workflows.

Introduction to Argo Workflows:

In our previous blog post, we explored the world of workflow orchestrators, with a specific focus on Apache Airflow. Now, let's check another tool: Argo Workflows

As an open-source, container-native workflow engine, Argo Workflows is purpose-built for orchestrating parallel jobs within Kubernetes environments. 

Let's take a closer look at why Argo Workflows stands out:

1. What is Argo Workflows? 🤔

Argo Workflows empowers you to define workflows where each step corresponds to a container. This flexibility allows for the creation of complex data processing pipelines. 

Think of it as assembling a series of building blocks, where each block represents a specific task or operation.

Additionally, Argo Workflows is implemented as a Kubernetes Custom Resource Definition CRD, seamlessly integrating into Kubernetes ecosystem without disrupting the existing setup.

2. Main features and benefits:

Container-Native Approach:

Argo Workflows fully embraces the containerization paradigm. Each step in your workflow runs within its own container, ensuring consistency and compatibility across different tasks. Whether you're dealing with data transformations, model training, or any other computational task, Argo Workflows has you covered.

DAG Modeling (Directed Acyclic Graphs):

Imagine your workflow as a flowchart with interconnected boxes. Argo Workflows allows you to model multi-step workflows using directed acyclic graphs DAGs. These graphs capture dependencies between tasks, ensuring that steps execute in the correct order. It's like choreographing a dance where each move follows a logical sequence.

Efficient compute handling:

Argo Workflows shines when it comes to compute-intensive workloads. Whether you're crunching numbers, analyzing data, or training machine learning models, Argo efficiently manages the computational resources needed for each step.

3. Integration with the Kubernetes Ecosystem:

Argo Workflows seamlessly integrates with other Kubernetes services:

  • Volumes: Need to read or write data? Argo Workflows plays well with Kubernetes volumes, allowing you to handle data storage efficiently.
  • Secrets: Security matters! Argo Workflows integrates with Kubernetes secrets, ensuring that sensitive information remains protected.
  • RBAC (Role-Based Access Control): Argo Workflows respects your access policies. It works harmoniously with Kubernetes RBAC, allowing you to control who can orchestrate workflows and who can't.

4. Why Choose Argo Workflows?

Argo Workflows stands out for several reasons:

Kubernetes Integration:

Argo Workflows seamlessly integrates with Kubernetes, simplifying usage and deployment.

Scalability and Complexity Handling:

It efficiently manages complex workflows, even those with thousands of steps.

Flexible Workflow Patterns:

Argo supports various workflow structures, adapting to your needs.

Active Open Source Community:

Benefit from a thriving community that actively contributes to Argo's development.

Exploring Argo Workflows on Amazon EKS:

In this section, we'll dive into creating and deploying a data processing platform on Amazon Elastic Kubernetes Service Amazon EKS.
 
The solution includes essential Kubernetes add-ons: Argo Workflows, Argo Events, Spark Operator for managing Spark jobs, Fluent Bit for logging, and Prometheus for metrics.

Here's an overview of the setup used:

  1. Cluster Setup: We'll set up an Amazon EKS cluster with a managed node group. We'll install Argo Workflows and Argo Events in their own dedicated namespaces (argo-workflows and argo-events).
  2. Event-Driven Workflow: An Amazon SQS queue will receive user requests. In the Argo Events namespace, an SQS event source object will fetch messages from this external queue. When a new message arrives, a sensor in Argo Workflow will trigger the specified workflow.
  3. Spark Job Execution: The triggered workflow will create a Spark application using Spark Operator in the data-ops namespace. This application consists of one Spark driver pod and two executor pods.

The diagram illustrates how Argo Events sources drive the execution of Spark jobs.

Image description

Remember to meet the prerequisites, including AWS cli, kubectl, terraform and Argo Workflow CLI.

The Below diagram shows the detailed technical design that will be setup in this demo:

Image description

Now let us start implementing the solution:

git clone https://github.com/awslabs/data-on-eks.git
cd data-on-eks/schedulers/terraform/argo-workflow

terraform init
terraform apply -var region=eu-west-1 --auto-approve
Enter fullscreen mode Exit fullscreen mode

This will create:

EKS and networking infrastructure:

  • VPC with Private and Public Subnets.
  • Internet Gateway (Public) and NAT Gateway (Private).
  • EKS Cluster Control Plane with managed node group.

EKS Managed Add-ons:

  • VPC_CNI, CoreDNS, Kube_Proxy, EBS_CSI_Driver.

Additional Kubernetes Components:

  • Argo Workflows, Argo Events, AWS for FluentBit, Karpenter.
  • Metrics Server, CoreDNS Autoscaler, Cluster Autoscaler.
  • Kube Prometheus Stack, Spark Operator.
  • Roles for Argo Workflows and Argo Events.

Next step is create that queue in your account and capture the QueueUrl output for the next steps.

queue_url=$(aws sqs create-queue --queue-name demo-argo-workflows --region eu-west-1 --output text)
Enter fullscreen mode Exit fullscreen mode

Then retrieve the queue arn:

sqs_queue_arn=$(aws sqs get-queue-attributes --queue-url $queue_url --attribute-names QueueArn --region eu-west-1 --query "Attributes.QueueArn" --output text)

template=`cat argo-events-manifests/sqs-accesspolicy.json | sed -e "s|<sqs_queue_arn>|$sqs_queue_arn|g;s|<your_event_irsa_arn>|$your_event_irsa_arn|g"`

aws sqs set-queue-attributes --queue-url $queue_url --attributes $template --region eu-west-1
Enter fullscreen mode Exit fullscreen mode

Access Argo Workflows web user interface:

To monitor the progress of an Argo Workflow job, you can use the web interface provided by Argo Workflows. 

To access the web UI, follow these steps:

  • Execute the following command to extract the bearer token.
$ argo auth token # get login token
# example of output:
Bearer k8s-aws-v1.aHR0cHM6Ly9cccccxxxxxxxxxxqqqqqosoososnccddcnOA
Enter fullscreen mode Exit fullscreen mode
  • Get the load balancer url:
kubectl -n argo-workflows get service argo-workflows-server -o jsonpath="{.status.loadBalancer.ingress[*].hostname}
Enter fullscreen mode Exit fullscreen mode

Open browser and enter the URL and paste the token (whole text including Bearer) into the yellow circle as shown in the following details.

Image description

Now let's explore how to create a Spark job within an Argo Workflow by listening to SQS messages.

The EventBus is like a special tool in Kubernetes. Its job is to help messages travel from EventSources to Sensors

Think of it as a messenger that carries information between different parts of your system.

Right now, the EventBus uses something called NATS (which stands for Networked Approach To Technology Services). 
NATS is like a super-fast, lightweight messaging system. It's often used for microservices and works really well in cloud environments.

In the YAML file you have, you'll create a NATS service with three copies (we call them replicas)

To install eventbus runt the below command:

kubectl apply -f argo-events-manifests/eventbus.yaml
Enter fullscreen mode Exit fullscreen mode

Setting Up Amazon SQS as an Event Source:

The Argo Events system requires an eventsource object to monitor external events and trigger subsequent operations. 

We'll configure an Amazon SQS eventsource in Argo Events and connect it to our SQS queue.

The eventsource is able to monitoring events across regions, so the EKS cluster and Amazon SQS queue don't have to be in the same region.

queue_name=demo-argo-workflows
region_sqs=eu-west-1

cat argo-events-manifests/eventsource-sqs.yaml | sed "s/<region_sqs>/$region_sqs/g;s/<queue_name>/$queue_name/g" | kubectl apply -f -
Enter fullscreen mode Exit fullscreen mode

Next we can deploy sensor-rbac.yaml and sensor-sqs-sparkjos.yaml for triggering workflow:

kubectl apply -f argo-events-manifests/sensor-rbac.yaml
Enter fullscreen mode Exit fullscreen mode

Now, open taxi-trip.sh and update and
with the S3 bucket created by this blueprint(Check Terraform outputs)

if [ $# -ne 2 ]; then
  echo "Usage: $0 <s3_bucket> <region>"
  exit 1
fi

s3_bucket="$1"
region="$2"

INPUT_DATA_S3_PATH="s3://${s3_bucket}/taxi-trip/input/"

# Create a local input folder
mkdir input

# Copy PySpark Script to S3 bucket
aws s3 cp pyspark-taxi-trip.py s3://${s3_bucket}/taxi-trip/scripts/ --region ${region}

# Copy Test Input data to S3 bucket
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet -O "input/yellow_tripdata_2022-0.parquet"

# Making duplicate copies to increase the size of the data.
max=5
for (( i=1; i <= $max; ++i ))
do
   cp -rf "input/yellow_tripdata_2022-0.parquet" "input/yellow_tripdata_2022-${i}.parquet"
done

aws s3 sync "input/" ${INPUT_DATA_S3_PATH}

# Delete a local input folder
rm -rf input
Enter fullscreen mode Exit fullscreen mode

After updating the variables the shell script, execute it:

./taxi-trip.sh
Enter fullscreen mode Exit fullscreen mode

Next, update YAML file and run the below command:

kubectl apply -f sensor-sqs-sparkjobs.yaml
Enter fullscreen mode Exit fullscreen mode

Now we can test if our setup is fully functionning.
To do so, send a message from SQS: {"message": "hello data on k8s"}

aws sqs send-message --queue-url $queue_url --message-body '{"message": "hello data on k8s"}' --region $region_sqs
Enter fullscreen mode Exit fullscreen mode

Argo Events would capture the message and trigger Argo Workflows to create a workflow for spark jobs.

$ kubectl get workflows -n argo-workflows
NAMESPACE        NAME                           STATUS    AGE   MESSAGE
argo-workflows   aws-sqs-spark-workflow-hh79p   Running   20s
Enter fullscreen mode Exit fullscreen mode

You can also see the SQS workflow status in web UI:

Image description

Image description

Key takeaways:

In this post, we explored how Argo Workflows and Argo Events can help us better manage and scale Spark applications on Amazon EKS. These tools have several advantages, such as allowing us to create and automate complex pipelines using Argo Workflows, and triggering actions based on events using Argo Events.

Stay tuned for next blogs in this series🎉

Thank you for Reading !! 🙌🏻😁📃, see you in the next blog.🤘

🚀 Thank you for sticking up till the end. If you have any questions/feedback regarding this blog feel free to connect with me:

♻️ LinkedIn: https://www.linkedin.com/in/rajhi-saif/
♻️ Twitter: https://twitter.com/rajhisaifeddine
The end ✌🏻
🔰 Keep Learning !! Keep Sharing !! 🔰 

References:

https://spacelift.io/blog/argo-workflows

https://codefresh.io/learn/argo-workflows/

https://codefresh.io/learn/argo-workflows/learn-argo-workflows-with-8-simple-examples/

https://argo-workflows.readthedocs.io/en/latest/

https://awslabs.github.io/data-on-eks/docs/blueprints/job-schedulers/argo-workflows-eks

https://aws.amazon.com/blogs/containers/dynamic-spark-scaling-on-amazon-eks-with-argo-workflows-and-events/

https://mkdev.me/posts/argo-ecosystem-argo-cd-argo-workflows-argo-events-argo-rollouts-argo-everything

https://dok.community/blog/scheduled-scaling-with-dask-and-argo-workflows/

https://medium.datadriveninvestor.com/trigger-argo-workflow-using-argo-event-based-on-azure-service-bus-queue-095015c2b950

https://www.youtube.com/watch?v=FBRMURQYbgw

https://blog.zelarsoft.com/argo-workflows-and-argo-events-6fb3e3e5b367

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .