Recognising that flows are event based, if there anyway to avoid race conditions in a Flow?
As a context processing a list of results and want to take each individual result and process further with additional crews. I am exploring if I can do this by looping through a flow - this is possible, but as it is event based I get into race conditions as I cannot control the ‘await’. I have tried making the state thread-safe but it does not resolve the issue.
Other options are possible either by dynamically creating the tasks for the crews dynamically or by using .kickoff_for_each - but looping approach allows the task definitions to be more dynamic.
sample code below for reference
from collections import deque
from crewai.flow.flow import Flow, listen, start, router, or_
from pydantic import BaseModel,RootModel,Field, PydanticUserError
from typing import Optional
from threading import Lock
class safe_stack(BaseModel):
checks: list[str] = Field(default_factory=list)
def __init__(self, items:list):
super().__init__()
self.checks=deque(items)
self._lock = Lock()
def pop_item(self):
with self._lock:
return self.checks.popleft()
def push_item(self, item):
with self._lock:
self.checks.append(item)
def is_empty(self):
with self._lock:
return len(self.checks) == 0
class ExampleState(BaseModel):
counter: int = 0
message: str = “”
checks: Optional[safe_stack]=None
decomposed_checks: Optional[safe_stack]=None
class ConditionalFlow(Flow[ExampleState]):
@start()
def first_method(self):
self.state.message = "Hello from first_method"
self.state.checks=safe_stack(['a','b','c','d',' e'])
self.state.counter += 1
@listen(first_method)
def second_method(self):
self.state.message += " - updated by second_method"
self.state.counter += 1
self.state.checks.pop_item()
@listen(or_(second_method,"outer_loop"))
def outer_loop_start(self):
print("outer_loop_start")
## set up decomposed_checks
self.state.decomposed_checks=safe_stack([1,2])
return self.state
@listen(or_(outer_loop_start,"inner_loop"))
def inner_loop_start(self):
decomposed_item = self.state.decomposed_checks.pop_item()
print(f"inner_loop_start decomposed_item: {decomposed_item}")
return self.state
@listen("inner_loop_end")
def inner_loop_end(self):
print(f"inner_loop_end")
return self.state
@router(inner_loop_start)
def inner_loop_processing(self):
print("inner_loop_processing")
if self.state.decomposed_checks.is_empty():
return "inner_loop_end"
return "inner_loop"
@router(inner_loop_end)
def outer_loop(self):
item=self.state.checks.pop_item()
print(f"Outer router: {item}")
self.state.counter += 1
if self.state.checks.is_empty():
return "end"
return "outer_loop"
@listen("end")
def end_method(self):
return self.state