Dataflow pipelines rarely are on their own. Most of the time, they are part of a more global process. For example : one pipeline collects events from the source into BigTable, then a second pipeline computes aggregated data from BigTable and store them into BigQuery.
Of course, each pipeline could be scheduled independently with Cloud Scheduler.
But if these pipelines need to be linked somehow, such as launching the second pipeline when the first is done, then orchestration is required.
Until recently, GCP had one tool in the box for this kind of purpose : Cloud Composer , a (slightly) managed Apache Airflow. Despite its rich and numerous functionalities and its broad community, this service had several caveats for the kind of simple orchestration I was after:
it's not fully integrated : you need to manage costly resources such as a GKE cluster, a CloudSQL instance
it pushes Python in your codebase, there is no other choice
any change in the setup (like, adding an environment variable) is painfully slow to propagate
the wide variety of operators in the ecosystem can lead to a poor separation of concerns between orchestration and business processes
And I won't even talk about the Airflow UI... (I've heard that some people like it)
Because of these, orchestrating with Composer is overly difficult. Yet, as it is often the case with the GCP platform, if you face too many difficulties when doing something that should be simple enough, you're probably not doing it right. This proved true once again: Cloud Composer wasn't the right product for my need...
Enter GCP Workflows !
Workflows is a new service : it has been promoted out of bêta very recently. And luckily, it already offers most of the needed functionality to do the orchestration of GCP services' jobs, and doing it simply:
it is fully managed and serverless, which means you don't pay when you don't use it
it does only one job and does it well : orchestrating HTTP calls
all is configured in YAML files, whose syntax is short and easy to learn
the UI is neatly integrated and feels more "part of GCP" than Composer (Although there is still quite a few display bugs at the moment)
With this new product it becomes really easy to write a Workflow which chains multiple Dataflow jobs like in the diagram above.
A sample workflow for Dataflow jobs
Workflow files are YAML. It is simple and straightforward:
main:
steps:
- init:
assign:
- project: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- region: "europe-west1"
- topic: "myTopic"
- firstPipeline:
call: LaunchDataflow
args:
project: ${project}
region: ${region}
template: "first"
result: firstJobId
- waitFirstDone:
call: DataflowWaitUntilStatus
args:
project: ${project}
region: ${region}
jobId: ${firstJobId}
status: "JOB_STATE_DONE"
- secondPipeline:
call: LaunchDataflow
args:
project: ${project}
region: ${region}
template: "second"
result: secondJobId
- waitSecondDone:
call: DataflowWaitUntilStatus
args:
project: ${project}
region: ${region}
jobId: ${secondJobId}
status: "JOB_STATE_DONE"
- publish:
call: googleapis.pubsub.v1.projects.topics.publish
args:
topic: ${"projects/" + project + "/topics/" + topic}
body:
messages:
- data: ${base64.encode(text.encode("{\"message\":\"workflow done\"}"))}
Let's break it down. The sample workflow has the following steps:
- init: preprocessing stage, where workflow variables are initialized.
- firstPipeline: Launch the first dataflow job
- waitFirstDone: Wait until the first dataflow job is completed
- secondPipeline: Launch the second dataflow job
- waitSecondDone: Wait until the second dataflow job is completed -publish: Push a sample PubSub notification at the end of the workflow
As you have noticed, firstPipeline and secondPipeline call a custom routine, a subworkflow, which is defined in the same file:
LaunchDataflow:
params: [project, region, template]
steps:
- launch:
call: http.post
args:
url: ${"https://dataflow.googleapis.com/v1b3/projects/"+project+"/locations/"+region+"/flexTemplates:launch"}
auth:
type: OAuth2
body:
launchParameter:
jobName: ${"workflow-" + template }
environment:
numWorkers: 1
maxWorkers: 8
containerSpecGcsPath: ${template}
result: dataflowResponse
next: jobCreated
- jobCreated:
return: ${dataflowResponse.body.job.id}
This subworkflow calls the Dataflow Rest API to launch a job (here, a flex template). With workflows you can easily call any service's API or any external HTTP endpoint.
Similarly, waitFirstDone and waitSecondDone call another subworkflow:
DataflowWaitUntilStatus:
params: [project, region, jobId, status]
steps:
- init:
assign:
- currentStatus: ""
- failureStatuses: ["JOB_STATE_FAILED", "JOB_STATE_CANCELLED", "JOB_STATE_UPDATED", "JOB_STATE_DRAINED"]
- check_condition:
switch:
- condition: ${currentStatus in failureStatuses}
next: exit_fail
- condition: ${currentStatus != status}
next: iterate
next: exit_success
- iterate:
steps:
- sleep30s:
call: sys.sleep
args:
seconds: 30
- getJob:
call: http.get
args:
url: ${"https://dataflow.googleapis.com/v1b3/projects/"+project+"/locations/"+region+"/jobs/"+jobId}
auth:
type: OAuth2
result: getJobResponse
- getStatus:
assign:
- currentStatus: ${getJobResponse.body.currentState}
- log:
call: sys.log
args:
text: ${"Current job status="+currentStatus}
severity: "INFO"
next: check_condition
- exit_success:
return: ${currentStatus}
- exit_fail:
raise: ${"Job in unexpected terminal status "+currentStatus}
This subworkflow also calls the Dataflow Rest API, this time in a kind of loop until the job reach a terminal status. In case of unexpected state, an exception is raised and the workflow stops and is marked as failed. Otherwise, it proceeds to the next stage
Finally, just deploy this workflow, via the UI or gcloud for example:
#! /bin/bash
localDir=$(dirname "$0")
WORKFLOW="sample"
DESCRIPTION="Sample workflow"
SOURCE="sample.yaml"
PROJECT="my-gcp-project"
REGION="europe-west4"
SERVICE_ACCOUNT="sa-workflows@my-gcp-project.iam.gserviceaccount.com"
gcloud beta workflows deploy "${WORKFLOW}" --location="${REGION}" --service-account="${SERVICE_ACCOUNT}" --source="${localDir}/${SOURCE}" --description="${DESCRIPTION}"
Breaking news Workflows resources are now available in Terraform for you IAC freaks
Once deployed, it can be launched, for example from Scheduler, by POSTing to this endpoint https://workflowexecutions.googleapis.com/v1/projects/${PROJECT}/locations/${REGION}/workflows/${WORKFLOW}/executions
Conclusion
Thanks to Workflows, with just a relatively small YAML file we were able to chain two Dataflow jobs the easy way: serverlessly.
Thanks for reading! I’m Matthieu, data engineer at Stack Labs.
If you want to discover the Stack Labs Data Platform or join an enthousiast Data Engineering team, please contact us.