This past week I had a really weird bug in my kedro pipeline. For some reason data running through my pipeline was coming out completely made no sense, but if I manually request raw data outside of the pipeline it matched expectations.
NOTE While this story is about a kedro pipeline, it can be applied anywhere closures are put into an iterable.
After a few days of looking at it off and on, I pinpointed that it was all the way down in the raw layer. Right as data is coming off of the database. For this I already had existing sql
files stored and a read_sql
function to get the data so I opted to just set up the pipeline to utilize the existing code as much as possible, leaning on the kedro framework a bit less.
I have dynamically created lists of pipeline nodes many times in the past, but typically I take data from kedro input and use it in the lambda. I prefer the simplicity of using lambdas over functools.partial
. It typically looks something like this.
# 👍 I do this all the time
from kedro.pipeline import node
from my_generic_project_lib import clean
datasets_to_clean = ['sales', 'production', 'inventory']
nodes = []
for dataset in datasets_to_clean:
nodes.append(
node(
func=lambda x: clean(x)
inputs = f'raw_{dataset}'
outputs=f'int_{dataset}'
tags=['int', dataset]
name=f'create_int_{dataset}'
)
)
What was different this time is that I needed to pass in the name of the dataset to my read_sql function, not the data loaded in the framework.
# ❌ This does not work
from kedro.pipeline import node
from my_generic_project_lib import read_sql
datasets_to_read = ['sales', 'production', 'inventory']
nodes = []
for dataset in datasets_to_clean:
nodes.append(
node(
func=lambda: read_sql(dataset) # 💥 The major issue
inputs = f'dummy'
outputs=f'int_{dataset}'
tags=['int', dataset]
name=f'create_int_{dataset}'
)
)
As I am still oblivious to what has happened I pop in a breakpoint()
and quickly see that during the first run the dataset passed into read_sql
was 'inventory'
, in fact, every single one was 'inventory'
. The lambda is just using the latest value of dataset from outside and has no local
dataset
attached to it.
# 👍 Much Better
from kedro.pipeline import node
from my_generic_project_lib import read_sql
datasets_to_read = ['sales', 'production', 'inventory']
nodes = []
for dataset in datasets_to_clean:
nodes.append(
node(
func=lambda dataset=dataset: read_sql(dataset) # dataset is now bound to the lambda ✨
inputs = f'dummy'
outputs=f'int_{dataset}'
tags=['int', dataset]
name=f'create_int_{dataset}'
)
)
I made a slightly more simple example so that you can try it and play with it yourself, edit it, share it with your friends, laugh at my mistake, whatever you like.