Hi team — I’m trying to understand the intended semantics for shared state mutation in Flows when multiple listeners are triggered in parallel from the same start event.
The docs describe state management as the backbone of Flows, and Flows themselves as a way to manage state and control execution paths precisely.
However, if multiple listeners run in parallel and both perform read-modify-write updates against the same state field, the final result appears to depend on execution order.
Minimal Repro
import asyncio
import random
from typing import Optional
from pydantic import BaseModel, Field
from crewai.flow.flow import Flow, start, listen, and_
class MyState(BaseModel):
counter: int = 0
execution_order: list[str] = Field(default_factory=list)
final_result: Optional[str] = None
class TestFlow(Flow[MyState]):
@start()
def start_event(self):
print("--- Parallel tasks triggered ---")
return {"trigger": True}
@listen(start_event)
async def update_a(self):
await asyncio.sleep(random.uniform(0.05, 0.2))
current = self.state.counter
self.state.counter = current * 2 + 10
self.state.execution_order.append("A")
print(f"[A] {current} * 2 + 10 = {self.state.counter}")
return self.state.counter
@listen(start_event)
async def update_b(self):
await asyncio.sleep(random.uniform(0.05, 0.2))
current = self.state.counter
self.state.counter = current + 100
self.state.execution_order.append("B")
print(f"[B] {current} + 100 = {self.state.counter}")
return self.state.counter
@listen(and_(update_a, update_b))
def finalize(self):
self.state.final_result = (
f"Final Counter Value: {self.state.counter}, "
f"Order: {self.state.execution_order}"
)
print(f"✅ Finalize: {self.state.final_result}")
return self.state.final_result
if __name__ == "__main__":
for i in range(5):
print(f"\n>>> Run #{i + 1}")
flow = TestFlow()
flow.kickoff()
Observed behavior
Across repeated runs, the final counter value flips depending on which listener updates shared state first:
• If update_a happens before update_b:
(0 * 2 + 10) + 100 = 110
• If update_b happens before update_a:
(0 + 100) * 2 + 10 = 210
So the same Flow definition can produce two different business outcomes depending only on listener scheduling.
My question
Is this considered expected behavior in Flows?
If yes, would it make sense to explicitly document that:
• parallel listeners should not perform unsynchronized read-modify-write updates on the same state field
• shared-state fan-out/fan-in patterns should use merge-safe logic or explicit serialization
Possible improvements
A few possible ways this could be made easier / safer for developers:
- A docs warning around shared mutable state in parallel listeners
- An ordered / sequential listener mode for state-sensitive handlers
- A recommended pattern for deterministic state merging after parallel branches
I’m mainly looking for clarification on intended semantics here. If this is expected, I’d be happy to help draft a documentation warning or a small example showing the safe pattern.