Temporal provides helpful primitives called Workflows and Activities for orchestrating processes. A common pattern I’ve found useful is the ability to run multiple “child workflows” in parallel from a single “parent” workflow.
Let’s say we have the following activity and workflow (imports omitted for brevity)
Activity code
@dataclass
class MyGoodActivityArgs:
arg1: str
arg2: str
@dataclass
class MyGoodActivityResult:
arg1: str
arg2: str
random_val: float
@activity.defn
async def my_good_activity(args: MyGoodActivityArgs) -> MyGoodActivityResult:
activity.logger.info("Running my good activity")
return MyGoodActivityResult(
arg1=args.arg1,
arg2=args.arg2,
random_val=random.random(),
)
Workflow code
@dataclass
class MyGoodWorkflowArgs:
arg1: str
arg2: str
@dataclass
class MyGoodWorkflowResult:
result: MyGoodActivityResult
@workflow.defn
class MyGoodWorkflow:
@workflow.run
async def run(self, args: MyGoodWorkflowArgs) -> MyGoodWorkflowResult:
result: MyGoodActivityResult = await workflow.execute_activity(
my_good_activity,
MyGoodActivityArgs(
arg1=f"activity arg1: {args.arg1}",
arg2=f"activity arg2: {args.arg2}",
),
schedule_to_close_timeout=timedelta(seconds=60),
task_queue=TASK_QUEUE,
)
return MyGoodWorkflowResult(result=result)
We can now write another Temporal workflow that starts multiple instances of MyGoodWorkflow
.
@dataclass
class BatchWorkflowArgs:
inputs: List[MyGoodWorkflowArgs]
@dataclass
class BatchWorkflowResult:
results: List[MyGoodWorkflowResult]
@workflow.defn
class MyBatchWorkflow:
@workflow.run
async def run(self, args: BatchWorkflowArgs) -> BatchWorkflowResult:
# Create a list to store the workflow futures
workflow_futures = []
# Create child workflow stubs for each set of args
for i, workflow_args in enumerate(args.inputs):
future = await workflow.start_child_workflow(
MyGoodWorkflow,
workflow_args,
id=f"my_good_workflow_{i}",
task_queue=TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=3),
)
workflow_futures.append(future)
# Wait for all workflows to complete and collect results
results: List[MyGoodWorkflowResult] = await asyncio.gather(*workflow_futures)
workflow.logger.info(
f"Completed {len(workflow_futures)} MyGoodWorkflow workflows"
)
return BatchWorkflowResult(results)
The main parts to focus on are await workflow.start_child_workflow
, which creates a future that we can await
on to get the workflow result.
Instead of calling await
explicitly, we use await asyncio.gather(*workflow_futures)
, which gets us all the results together.
In this example, we use dataclass
es as the inputs and outputs to all activities and workflows to allow us to evolve the inputs and outputs without breaking the data contract between the workflow components
For example, if an activity returned str
, we would struggle to add an additional return parameter without changing the return type. We can get in front of this issue by always returning classes.
.
Finally, we can run MyBatchWorkflow
.
async def main() -> BatchWorkflowResult:
client = await Client.connect("localhost:7233")
batch_args = BatchWorkflowArgs(
inputs=[
MyGoodWorkflowArgs(arg1="workflow arg1", arg2="workflow arg2"),
MyGoodWorkflowArgs(arg1="workflow arg3", arg2="workflow arg4"),
]
)
result: BatchWorkflowResult = await client.execute_workflow(
MyBatchWorkflow.run,
batch_args,
id=str(uuid.uuid4()),
task_queue=TASK_QUEUE,
)
print(f"Batch workflow completed with results: {result}")
return result
if __name__ == "__main__":
asyncio.run(main())
When we run this script, we get
python -m run_workflow
Batch workflow completed with results: BatchWorkflowResult(results=[MyGoodWorkflowResult(result=MyGoodActivityResult(arg1='activity arg1: workflow arg1', arg2='activity arg2: workflow arg2', random_val=0.8471340083778467)), MyGoodWorkflowResult(result=MyGoodActivityResult(arg1='activity arg1: workflow arg3', arg2='activity arg2: workflow arg4', random_val=0.21755659662944782))])
In this specific example, we collect all the results of the child workflows after they complete running.
Keep in mind this could lead to a large workflow history if run on a very large list of inputs
with big payloads.
A possible workaround if you encounter an issue with large workflow history using this approach is to write the results from each workflow to blob store and don’t return them from the workflows themselves, which avoids putting them into the workflow history.
You can find working code for this toy example here.