•
2 July 2025
•
7 mins
I ran into one of those delightful bugs that only show up in dynamic task generation of your data pipelines — the kind that teach you how a leaky abstraction in your pipeline platform can have you scratching your head in confusion.
The short version: I made a simple function signature change, assuming only future runs would care. Instead, my pipeline broke days later when an old task serialized under the previous signature collided with the new code. The fix? Classic backward compatibility tricks that saved me from babysitting all existing task runs when making changes in the future.
Here’s the story — and how to avoid learning this lesson the hard way.
I had a Dataswarm operator that would execute a Python function every day, and the output of that Python function was a list of tasks (other functions) to be executed that day.
Here is what the function looked like:
# Function that generates the tasks to be executed.
def task_generator(arg1, arg2) -> List[Task]:
...
# How the function is invoked
wait_for_data = SomeTaskWaitingForData(data)
tasks_to_be_executed = DynamicTasks(
wait_for_tasks=[wait_for_data]
task_gen_function=task_generator
task_gen_args={
"arg1": a,
"arg2": b,
}
)
I put in a diff that looked something like this:
# Function that generates the tasks to be executed.
- def task_generator(arg1, arg2) -> List[Task]:
+ def task_generator(arg1, arg3) -> List[Task]:
...
# How the function is invoked
wait_for_data = SomeTaskWaitingForData(data)
tasks_to_be_executed = DynamicTasks(
wait_for_tasks=[wait_for_data]
task_gen_function=task_generator
task_gen_args={
"arg1": a,
- "arg2": b,
+ "arg3": c,
}
)
You see, I just replaced arg2
with arg3
and everything looked fine. I tested the diff and landed it, expecting the next task instance to pick up the changes and move on. As you can imagine, that is not what happened :)
I soon got a bug report that said that my pipeline failed with an error: TypeError: 'arg2' is an invalid keyword argument for task_generator()
. This had me completely confused. My expectation was that either the previous version of the pipeline would be executed, in which task_generator()
is defined to expect arg2
and tasks_to_be_executed
passes a value for arg2
, or the new version of the pipeline would run, where task_generator()
expects arg3
and tasks_to_be_executed
passes arg3. Neither of those two scenarios result in a TypeError: 'arg2' is an invalid keyword argument for task_generator()
. So, what’s going on?
After some debugging, I saw that the tasks_to_be_executed
task instance that errored out started off two days ago, but was waiting for the wait_for_data
to complete, and the wait_for_data
task didn’t complete until the current day, after which the tasks_to_be_executed
task instance ran and errored out. Eventually I found that DynamicTasks
serializes the function name and args as a JSON blob at schedule time, waits for upstream tasks to finish, then reloads the function from HEAD and calls it with the original arguments. That’s why old args collided with new code.
Because DynamicTasks
persists the function name and args and then later reloads HEAD, it breaks the assumption that changing a function signature only affects new pipeline runs. I only discovered this by digging into DynamicTasks
implementation; classic leaky abstraction!
Changing the Dataswarm operator implementation to not leak its implementation detail was a pretty heavy lift, and I needed a more scoped down change to unblock myself. So, I needed a way to make sure that I can change the task_generator
implementation without running into such combinations of race conditions and leaky abstraction again. Making the task_generator implementation backward compatible accomplishes this quite nicely. But first, I need to make sure that it can be made backwards compatible. That involves a few steps.
First, we need to ensure that passing in parameters from the previous version of task_generator
does not throw an unexpected exception. We can do that by swallowing all unspecified parameters in **kwargs
as follows.
# Function that generates the tasks to be executed.
- def task_generator(arg1, arg2) -> List[Task]:
+ def task_generator(
+ *,
+ arg1=None,
+ arg2=None,
+ **kwargs
+ ) -> List[Task]:
+ if kwargs:
+ LOG.warning(f"Found unspecified arguments {kwargs.keys()}")
...
The diff does three things.
None
. This ensures that leaving out any specific parameter doesn’t break the call. The reasoning for this is similar to the ones in proto3 that made all fields optional.Land this change and wait for it to propagate to all your task instances.
Now you are ready to make changes to your function signature without breaking existing tasks. Suppose you want to remove arg2 and introduce arg3. Your diff would look like this.
def task_generator(
*,
arg1=None,
- arg2=None,
+ arg3=None,
**kwargs
) -> List[Task]:
if kwargs:
LOG.warning(f"Found unspecified arguments {kwargs.keys()}")
+ if not arg3:
+ arg2 = kwargs.get("arg2", None)
+ # Old business logic with arg2
...
+ return tasks
+ # New business logic with arg3
+ ...
+ return tasks
When you land this, you could have tasks scheduled to run that are currently persisting the old function signature. When such tasks execute your new function definition, **kwargs
will swallow arg2
and arg3
is set to its default value None
. The function will see that arg3
is None, so it will look for arg2
in kwargs and execute the old business logic.
However, for all new instances of your task, arg3
is set, and so the function executes the new business logic. Backward compatibility accomplished!
After all your old task instances have completed execution, you are now ready to remove the old business logic. This is a simple red diff.
def task_generator(
*,
arg1=None,
- arg3=None,
**kwargs
) -> List[Task]:
if kwargs:
LOG.warning(f"Found unspecified arguments {kwargs.keys()}")
- if not arg3:
- arg2 = kwargs.get("arg2", None)
- # Old business logic with arg2
- ...
- return tasks
# New business logic with arg3
...
return tasks
And, you are done!
Pipeline frameworks can serialize more than you think. DynamicTasks pickled the function name and arguments days earlier, then loaded the function definition fresh from HEAD. That mismatch broke everything.
Stage changes with **kwargs
and defaults. When changing function signatures that might still be called by older task payloads, always accept extra kwargs and use None
defaults to gracefully detect old vs. new callers.
Expect your abstractions to leak. If your orchestration tool stores data and code separately (JSON blobs now, functions later), your assumption that “old code only calls old function signatures” is toast.
Logging unrecognized parameters is gold. Instead of crashing, you get explicit warnings when old payloads collide with new code. Debugging becomes a thousand times easier.
Like it? Share it!