Nowadays, in a lot of Data Stacks, most of the Data Engineering task is writing SQL.
SQL is a powerful language that is well suited for building most batch data pipelines: it's universal, great for structured data and (most of the time) easy to write. On the other hand, one of the complexities can be orchestrating the series of SQL statements in the correct order of the dependencies, meaning that if a SQL query references the result of another one, the second should be run before the first.
For a personal project I pursued the quest of automatically generating the graph of BigQuery transformations dependencies using a small Python script. I wanted to use graph theory and SQL syntax parser. Let’s see how.
What are the dependencies in BigQuery SQL transformations ?
In a BigQuery (or any other DataWarehouse actually), transformations are typically chained to form a more or less complex DAG (Direct Acyclic Graph). In this DAG, all the transformations are sourced from one or more tables and the result is dumped in a single table, that can in turn be used in other transformations and so on. In this graph all the nodes appear to be tables (sources or destinations) and the edges are dependencies, meaning the transformation references the sources tables in a FROM or JOIN statement to produce the target.
Here is an example:
Here we can see that the D table is produced from tables A & B; table E produced from table D & C and table F from tables D & E. From this diagram we can easily see that the transformation D should be run first, followed by the E and finally the transformation F.
Automatically infer the graph with Python
In the project we used Python lib networkx and a DiGraph object (Direct Graph). To detect a table reference in a Query, we use sqlglot, a SQL parser (among other things) that works well with Bigquery.
Here is the first part of the Python script to create the graph, simplified for this blog post.
import networkx as nx
# all the transfromations are stored in a structure
# here let's assume Transformation object only contains the
# query and the destination table (project_id, dataset_id,
# table_id)
transformations: list[Transformation]
graph = nx.DiGraph()
for transfo in transformations:
dependencies = find_query_dependencies(transfo.query)
# Add nodes, the transfo infos are added as node metadata
graph.add_node(transfo.destination_table, transfo=transfo)
# Add edges
for src in dependencies:
# here note that is the src does not exist yet are a
# node it's created
graph.add_edge(src, transfo.destination_table)
Now let’s see how to find the dependencies of a SQL query by using the sqlglot SQL parser:
from sqlglot import parse_one, exp
def find_query_dependencies(query: str) -> set[str]:
"""Find all the tables in the query"""
return {
f"{t.catalog}.{t.db}.{t.name}"
for t in parse_one(
query, read="bigquery"
).find_all(exp.Table)
}
In the piece of code above, we use the parse_one function from sqlglot to parse the transformation query using BigQuery dialect into a tree that can be search on :
Automatically infer the schema of the output tables
Another cool feature we can add to our script is the ability to auto-detect with a high precision the expected output schema of all our tables (D, E and F in the example), even if they haven't yet been created. This can be very helpful if we want to create the tables using an Infrastructure as Code tool like Terraform before the transformations even run.
For this feature, I used the following BigQuery trick: we can run a query with a LIMIT 0
at the end of the SELECT
statement. The awesomeness of this is that BigQuery won't charge anything (0 bytes billed) but will still create a temporary output table with the expected schema (including NULLABLE
/ REQUIRED
coherence) !
To generate the query with LIMIT 0
we need to add it to all the SELECT
statements of a query (including all the subqueries). Let’s use sqlglot again by defining a SQL transformer:
def limit_transformer(node):
"""This sqlglot transformer function add a limit 0 to
every SELECT stmnt"""
if isinstance(node, exp.Select):
return node.limit(0)
return node
query = """
WITH source_A as (
SELECT "HelloWorld" as hello
), source_B as (
SELECT CURRENT_DATE() as date
)
SELECT *
FROM source_A, source_B
"""
sample_query = (
parse_one(query, dialect=Dialects.BIGQUERY)
.transform(limit_transformer)
.sql(dialect=Dialects.BIGQUERY)
)
print(sample_query)
# =====================
# WITH source_A as (
# SELECT "HelloWorld" as hello LIMIT 0
# ), source_B as (
# SELECT CURRENT_DATE() as date LIMIT 0
# )
# SELECT *
# FROM source_A, source_B
# LIMIT 0
Once we have our new query with all the LIMIT 0
, we need to create a BQ job that runs this query, for free !
from google.cloud import bigquery
def fetch_destination_schema(query: str):
bq_client = bigquery.Client()
query_job = bq_client.query(query=query)
query_job.result()
# Fetch the temporary table schema created by BigQuery
tmp_dest_table = bq_client.get_table(query_job.destination)
destination_schema = tmp_dest_table.schema
return destination_schema, str(query_job.destination)
Now in order to generate the output schema from all the tables in the graph, even the last one which depends on tables that have not yet been created (here, tables D and E will only be temporary tables, not real tables), we need our schema generation method and apply it to each node in the DiGraph in the "correct" order.
We call this order the topological order in graph theory, i.e. in this example first table D, then E, then F. For each node, we need to replace the reference of the real table in the body of the transformation with the reference of the temporary table previously created . This way, even if none of tables D and E exist, we can still deduce the final schema for table F !
This process can be illustrated like this:
for node_id in nx.topological_sort(graph):
# here the "transfo" key is where the transformation
# metadata have been stored in the node
query_to_run = graph.nodes[node_id]["transfo"].query
ancestors = nx.ancestors(graph, node_id)
# We exclude all the ancestors that don't have transfo
# metadata, meaning all nodes that are not and
# intermediary output table
ancestors = filter(
lambda x: "transfo" in graph.nodes[x], ancestors
)
for ancestor_table in ancestors:
query_to_run = query_to_run.replace(
ancestor_table,
graph.nodes[ancestor_table]["transfo"].tmp_table
)
schema, tmp_table = fetch_destination_schema(query_to_run)
graph.nodes[node_id]["transfo"].tmp_table = tmp_table
# This will be the exact schema of the last transformation
print(schema)
And voilà ! With this graph generation technique and a bit of BigQuery ticks, we were able to automatically infer a dependency graph of complex SQL transformations, as well as the exact final schema of all tables, without any tables being created and at zero cost !
In a next blog post we will see how I've applied and improved this technique to build an experimental and Ops-oriented data orchestration tool !