BigQuery supports transactions since last year (Presented at Google Cloud Next'21) : it is now possible to perform mutating operations over one or several tables and then commit or rollback the result atomically, by wrapping the script between
BEGIN TRANSACTION;
and
COMMIT TRANSACTION;
or
ROLLBACK TRANSACTION;
Easy enough ! It's all explained in the official documentation
Yet transactions come with a limitation:
A transaction cannot span multiple scripts.
While this is not an issue most of the time, it can be a problem when the scripts enclosed in the transaction become too complex or have too many query parameters, or break any other quota of BigQuery jobs. This can happen when query scripts are auto-generated from a request payload, for example.
There is a way around it, with BigQuery sessions. Let's see how it works
BigQuery Sessions
Sessions are a way to link jobs and persist transient data, like temporary tables, between them.
One common use case for sessions is exactly what we want:
Create multi-statement transactions over multiple queries. Within a session, you can begin a transaction, make changes, and view the temporary result before deciding to commit or rollback. You can do this over several queries in the session. If you do not use a session, a multi-statement transaction needs to be completed in a single query.
The idea is to stack the transaction queries inside the same session, beginning with BEGIN TRANSACTION;
and ending with COMMIT TRANSACTION;
.
In between, you can call put as many queries as necessary and the whole session will have atomic behavior.
A session is closed automatically after 24 hours of inactivity. However, when mixed with transactions, it can happen that the targeted table gets "locked" in the session and becomes unusable until the end of the session. That's why I recommend to force the session ending at the end of the script. It is done by invoking the following query:
CALL BQ.ABORT_SESSION();
Python implementation
We are dealing with the notion of session that we need to open and always close at the end of the processing : a context manager is naturally indicated for this.
"""ContextManager wrapping a bigquery session."""
from google.cloud import bigquery
class BigquerySession:
"""ContextManager wrapping a bigquerySession."""
def __init__(self, bqclient: bigquery.Client, bqlocation: str = "EU") -> None:
"""Construct instance."""
self._bigquery_client = bqclient
self._location = bqlocation
self._session_id = None
def __enter__(self) -> str:
"""Initiate a Bigquery session and return the session_id."""
job = self._bigquery_client.query(
"SELECT 1;", # a query can't fail
job_config=bigquery.QueryJobConfig(create_session=True),
location=self._location,
)
self._session_id = job.session_info.session_id
job.result() # wait job completion
return self._session_id
def __exit__(self, exc_type, exc_value, traceback):
"""Abort the opened session."""
if self._session_id:
# abort the session in any case to have a clean state at the end
# (sometimes in case of script failure, the table is locked in
# the session)
job = self._bigquery_client.query(
"CALL BQ.ABORT_SESSION();",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=self._session_id
)
],
),
location=self._location,
)
job.result()
It then become really easy to use this context to stack jobs into a single session, thus to create a multistatement, multiscripts bigquery transaction:
with BigquerySession(self.bigquery_client, BIGQUERY_LOCATION) as session_id:
# open transaction
job = self.bigquery_client.query(
"BEGIN TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
# stack queries
for queryscript in scripts:
job = self.bigquery_client.query(
queryscript,
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
# end transaction
job = self.bigquery_client.query(
"COMMIT TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
Notice how all jobs are run with the same session_id (i.e. within the same session) and in the same location (this is a requirement for sessions).
Hope this helps !
Thanks for reading! I’m Matthieu, data engineer at Stack Labs.
If you want to discover the Stack Labs Data Platform or join an enthousiast Data Engineering team, please contact us.
Photo by Caroline Selfors on Unsplash