Introduction
Imagine uploading data to BigQuery is as simple as dropping a file into a bucket. For intermediate data engineers, this could be a game-changer. In this tutorial, we'll delve into a practical architecture tailored for your needs in Python.
Before we proceed with the practical aspects, let's begin by understanding what BigQuery is and why loading data into this platform is significant. We'll then explore the overall architecture flow, providing a holistic view, before delving into the practical implementation details. Finally, we'll also look at pricing to evaluate this solution.
Why are we trying to load onto BigQuery tables ?
BigQuery is Google’s managed data warehouse, processing SQL queries rapidly. It excels in analyzing large datasets through serverless computing and efficient replication. Here are some key features :
- Ease of Use and Integration:
Integrates with Google Workspace, Looker Studio, and other Google Cloud services.
Features serverless tools like Datastream for CDC, Dataform for SQL pipelines, Dataplex for automated governance, ect..
- Query Performance and Storage Optimization:
Reduces redundant storage,separates data processing from storage for efficiency.
Utilizes Dremel as a query execution engine and Colossus as distributed file system for optimized performance.
- Governance:
Implements fine-grained access control for rigorous governance.
Harnessing Events: Event Arc Trigger + Cloud Run + BigQuery SDK
Event-Driven Architecture Flow :
In crafting an event-driven architecture that integrates storage finalized events, Event Arc trigger, Cloud Run, and BigQuery SDK, we establish a dynamic workflow for near real-time data processing.
- Storage finalized event:
The initiation of this event-driven solution begins with the detection of storage finalized events within a designated - Cloud Storage bucket. When an object is finalized—indicating that it is fully written and available for further processing an event happens.
- Event Arc trigger:
Event Arc trigger captures the storage finalized events and transforms them into actionable triggers to the desired path on Cloud Run.
- Cloud Run service:
Utilizing the BigQuery SDK within Cloud Run, we initiate a loading job to ingest the newly available data. It allows a near real-time loading process of the data into BigQuery for further analysis.
Required ressources :
This foundational setup serves as a starting point, allowing you to customize and enhance the workflow based on your specific requirement across Google Cloud services.
1- Service Account with Appropriate Roles:
Granting only the necessary permissions to comply with the principle of least privilege:
- storage.objectViewer: Allows viewing objects in Cloud Storage.
- run.invoker: Permits triggering Cloud Run services. eventarc.eventReceiver: Enables receiving events from EventArc.
- bigquery.jobUser: Grants permissions to create and manage BigQuery jobs.
- bigquery.dataEditor: Provides editing access to BigQuery datasets and tables.
2- Bucket:
A storage container in Cloud Storage to hold and manage your data files.
3- Event Arc Trigger:
Configured to respond to specified events and trigger subsequent actions.
4- Cloud Run:
A serverless container platform designed for deploying and managing your application. Consider alternatives such as Cloud Functions, Workflows, or other Google Cloud serverless services based on your specific use case and preferences.For this particular use case, the utilization of serverless services presents a cost-effective solution. With Cloud Run, you only pay for the actual compute time your application consumes, eliminating the need to provision and pay for idle resources. This flexibility ensures optimal cost efficiency.
5- BigQuery Dataset:
A logical container for organizing and controlling access to your tables.
6- BigQuery Table:
The destination for your ingested data, organized within the dataset.
Practical part
I suggest having all the ressource in a multi-region (EU or US) if you have no political or technical constraint since on-demand compute pricing is at least 20% cheaper for multi-region in BigQuery.
Cloud Run isn’t available in multi-region. However, you can opt to deploy it in the corresponding sub-region.
Step 1 : Setup required resources
- Create the source bucket
- Create a service account dedicated to this workflow
- Grant him the previously discussed roles
- Create a Bigquery dataset
Note that Bigquery table creation is not mandatory, as load jobs automatically create tables if they don't exist.
Step 2 Creation of a Cloud Run service
Let's make a basic Dockerfile to start the application using uvicorn on the specified port and host, this Dockerfile is used by Cloud Run to build the image.
FROM python:3.8-slim-buster
ENV PORT=8080
EXPOSE $PORT
COPY ./src/requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY ./src/main.py .
CMD ["sh", "-c", "uvicorn --port ${PORT} --host 0.0.0.0 main:app"]
having a src folder with a requirements.txt file containing these lines :
fastapi==0.92.0
pydantic==1.10.5
uvicorn==0.20.0
google-cloud-bigquery==3.7.0
and a main.py containing the load script using fastapi and pydantic libraries,if you're not familiar with these libraries, you can consider taking a look at this documentation : Request Body with FastApi Indeed, using FastAPI and Pydantic in conjunction with Cloud Run is beneficial for handling HTTP request bodies.
In our case we want to retrieve a cloud storage event : Documentation Cloud Storage Event.
We will exclusively fetch the Cloud Storage Event fields for the bucket and name since those are the only ones required to construct our URI.
It's also important to understand that Eventarc sends events using HTTP POST requests. When setting up an Eventarc trigger to direct events to your Cloud Run service, the service is anticipated to manage incoming HTTP POST requests hence the presence of the @app.post decorator in the FastAPI application.
import uvicorn
from google.cloud import bigquery
from fastapi import FastAPI
from pydantic import BaseModel
import os
import string
import random
PROJECT_ID = os.getenv("PROJECT_ID")
class CloudStorageEvent(BaseModel):
bucket: str
name: str
bigquery_client = bigquery.Client(project=PROJECT_ID)
app = FastAPI()
@app.post("/bucket_to_bq")
def bucket_to_bq(data:CloudStorageEvent):
bucket = data.bucket
name = data.name
uri = f"gs://{bucket}/{name}"
file_name = os.path.basename(name)
job_config = bigquery.LoadJobConfig()
# file that match a specific format, as many "if" as there are formats in the source files
if file_name.startswith("FORMAT_1") and file_name.endswith(".csv"):
dataset_id = "[DATASET_NAME_SCHEMA_1]"
dataset_ref = bigquery_client.dataset(dataset_id)
dest_table = "[TABLE_NAME_SCHEMA_1]"
job_config.schema = [
bigquery.SchemaField("COLUMN_1", "INTEGER"),
bigquery.SchemaField("COLUMN_2", "STRING"),
bigquery.SchemaField("COLUMN_3", "DATE"),
bigquery.SchemaField("COLUMN_4", "BOOL"),
]
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# If the file doesn't adhere to a specific format, you have the option to either insert the data into a random table within a designated dataset or choose to do nothing.
elif file_name.endswith(".csv"):
dataset_id = '[DATASET_NAME_UNKNOWN_SCHEMA]'
dataset_ref = bigquery_client.dataset(dataset_id)
characters = string.ascii_letters + string.digits
dest_table = ''.join(random.choice(characters) for _ in range(25))
job_config.autodetect = True
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# call BigQuery load job using API request
load_job = bigquery_client.load_table_from_uri(
uri, dataset_ref.table(dest_table), job_config=job_config)
# wait for table load job to complete
load_job.result()
print(f"Load {uri} to {PROJECT_ID}.{dataset_id}.{dest_table} succesfully.")
return 200, load_job._properties.get('jobReference')
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
To handle the management of diverse file formats, schemas, and other considerations based on known information about files uploaded in the bucket, you might explore refining and parameterizing the underlying logic.
For instance, an approach could involve utilizing a BigQuery mapping table configured with CRUD operations, featuring columns such as file_name and table_path to map file names with corresponding formats and schemas.
Finally, deploy it.
Step 3 Creation of EventArc trigger
As said in the Google documentation you can create an eventarc trigger by running the following gcloud cli command:
gcloud eventarc triggers create TRIGGER_NAME \
--location=LOCATION \
--destination-run-service=DESTINATION_RUN_SERVICE \
--destination-run-region=DESTINATION_RUN_REGION \
--destination-run-path= bucket_to_bq\
--event-filters="type=google.cloud.storage.object.v1.finalized" \
--event-filters="bucket=BUCKET" \
--service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
Simply select a TRIGGER_NAME and replace the LOCATION and BUCKET variables with the location and name of the bucket you created in Step 1. Adjust the other variables accordingly using a similar logic with previously created Cloud Run and Service Account. Note that bucket_to_bq refers to the path name we choose in the main.py @app.post("/bucket_to_bq").
Pricing
Cloud Storage pricing: You'll be billed primarily for storing your files in your buckets. Depending on your context and your choice of keeping your data in the bucket, for example, for historical purposes where you won't be performing any operations on it, you may opt for an Archive storage class bucket: 10 times less expensive on the storage side than the Standard bucket, but 10 times more expensive per number of operations.
Otherwise, remember to delete the files from the bucket by adding the Storage Object User IAM role to the dedicated service account beforehand.
Depending on the format of your files, the cost of storage in the bucket can be much higher than that of storage on the BigQuery side, as the data is not compressed in the same way.
EventArc Pricing : Events from Google sources are free.
BigQuery Pricing : “Batch Loading is free using the shared slot pool... By default, you are not charged for batch loading data from Cloud Storage or from local files into BigQuery. Load jobs by default use a shared pool of slots.”
Cloud Run Pricing : For our specific use case, the minimal configuration (1 vCPU, 128 MiB of memory) suffices, and it comes with an approximate cost of zero as we do not engage in extensive computations. Depending on your need, you may opt to set the minimal instances at 1 for faster data processing, mitigating cold start delays, even if this means additional costs.
To estimate pricing accurately, you can utilize the official GCP Calculator
Conclusion
In summary, our solution ensures a cost-effective approach for loading data from Cloud Storage to BigQuery, with almost no expenses incurred during the actual loading process.
However, for scenarios demanding a more sophisticated orchestration of serverless workflows, the upcoming article will introduce Cloud Workflows as an equally economical alternative. The context will differ to showcase some of the service's interesting features. Stay tuned for more insights and practical implementations!
Thanks for reading! I'm Adam, data engineer at Stack Labs.
If you want to discover the Stack Labs Data Platform or join an enthousiast Data Engineering team, please contact us.