I have been working for this very large French cosmetic company for a few months now and here is some feedback about a common problem we had with Bigquery when working in a global context : how to query data located all around the world ?
Here is the use case we had and you might recognize yourself -or your company- in it : imagine that you have many subsidiary companies all over the world, and each of these entities are producing a lot of data. Nowadays data is everywhere : from financial documents, to website sessions, online advertisement, in-store sales… and the volume is growing exponentially ! In our case, every locality (basically at country level) is responsible for collecting and managing the data it produces, and storing it in BigQuery in the GCP region closest to it.
The problem is that BigQuery, let’s remind it, is actually two distinct products : the Query Engine (based on Google’s Dremel) and the Storage (based on Capacitor, Google’s columnar storage format); but you cannot use the query engine in a given location to process data stored in another location ! And this is a big issue when your company is distributed globally.
To get around this issue, we need to periodically transfer the data from remote locations to the main location of analysis (closest to the users, in our case it’s the EU region) and ideally this transfer must be done incrementally : we only want to transfer the new data produced since the last transfer, in order to save cost and improve performance.
How to efficiently transfer data between BigQuery regions ?
There are two main methods for this issue :
1. The "legacy" method : use Google Cloud Storage
For this kind of problem, the historical solution would be to use the following process :
- export the data from the BigQuery Storage to Parquet file (or any other format but Parquet is great for most use cases) in a GCS bucket located in the same region as the original dataset,
- transfer the created objects (that we won’t be able to know the number in advance, because that’s how BigQuery works) in another bucket located in the same region as the final location,
- finally load the Parquet files into the final table
2. The “new” method : use the dedicated service : Data transfer Service
But there is something more straightforward to solve this : use the new feature from Data Transfer Service for copying full datasets ! It does the same thing as the first method without extracting data out of the BigQuery storage 😎. Under the hood, DTS is performing a table copy command for each table in the source dataset. If the table was already transferred in a previous run, and no row has been modified or added since then, the table is skipped.
The service is still in beta and we found some inconveniences though :
- The incremental load is managed automatically based on internal hidden metadata in BQ tables, but it’s not yet supported for partitioned tables (no append mode).
- You cannot choose which tables from the source dataset are transferred : it’s everything or nothing !
- In a GCP project we can only transfer 2000 tables a day (cross-region), but some of our sources hosted sharded tables (one table per day with a history of years), so we reached the quota pretty soon.
And these limitations were a pain in the ass for us : we didn’t want to transfer every table from the source and our destination tables were partitioned 😭.
Our use case was as follows : the data from each country was resting in a dedicated dataset, in a country-dedicated project, and located in a different GCP region. Each dataset contained many sharded tables sharing the same schema. But due to DTS limitations we couldn’t just use the service as it is : as the append mode in partitioned tables doesn’t work yet, every transfer would have erased the data previously transferred from another country…
Cloud Workflow as a wrapper for Data Transfer Service
Finally we managed to design an architecture for our situation by transferring the data not directly from the source to the destination but by constituting dedicated temporary datasets :
Stage 1: For each table to transfer, we create a temporary table in a temporary dataset that contains only the last partitions (or shard) to transfer since the last transfer happened (by providing a custom business logic)
Stage 2: Once the temporary table is built, trigger the Data Transfer Service to copy everything in a temporary destination dataset (in the same region as the final tables)
Stage 3: Merge the transferred partitions with a custom BQ job in the final tables.
And voilà !
The whole thing was orchestrated with a monstrous Workflow (more than 130 actual steps long) that was designed using DRY principles : everything is generic and built to adapt to input params from the user, and so as to be as much as possible detached from the current use case to be re-usable by other teams for other use cases.
If you don’t know Cloud Workflow you should definitely give it a try, it’s a new serverless -minimalist- orchestration service from Google Cloud that is explicitly designed to make API calls. Indeed, everything is API in the Cloud, and it's the only thing that Cloud Workflows needs to do (well, sometimes some additional features might be a nice-have though, Google if you read me, let’s talk)
Conclusion
It works like a charm ! And thanks to these DRY principles we have bee n able to make the workflow as generic as possible and use the same code to transfer different sources with different tables and structure. The performances are satisfying and of the order of minutes to less than an hour to transfer GB to TB of data across more than 60 countries.
What's next ?
In a next article to come, we will discuss the daily transfer of Google Analytics data with this method and the issues and solutions we found regarding schema drifting, a common problem in the Data Engineering world. Stay tuned..