My CrewAI Flow isn’t behaving as I’d expect. I have the following Flow class defined:
class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
def __init__(self, specialist_executor, rag_crew, spin_crew, identification_crew, rag_consolidation_crew, **kwargs):
super().__init__(specialist_executor, "SPIN Specialist Flow", **kwargs)
self.specialist_executor = specialist_executor
self.rag_crew = rag_crew
self.spin_crew = spin_crew
self.identification_crew = identification_crew
self.rag_consolidation_crew = rag_consolidation_crew
@start()
def process_inputs(self):
current_app.logger.debug("Processing inputs")
@listen(process_inputs)
def execute_rag(self):
current_app.logger.debug("Executing RAG")
inputs = self.state.input.model_dump()
crew_output = self.rag_crew.kickoff(inputs=inputs)
self.state.rag_output = crew_output
current_app.logger.debug(f"Executing RAG finished")
self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
return crew_output
@listen(process_inputs)
def execute_spin(self):
current_app.logger.debug("Executing spin")
crew_output = self.spin_crew.kickoff(inputs=flatten_pydantic_model(self.state))
current_app.logger.debug(f"Executing spin finished")
# spin_output = SPINOutput.model_validate(crew_output["spin_detect_task"])
self.state.spin = crew_output
return crew_output
@listen(process_inputs)
def execute_identification(self):
current_app.logger.debug("Executing identification")
crew_output = self.identification_crew.kickoff(inputs=flatten_pydantic_model(self.state))
current_app.logger.debug(f"Executing identification finished")
self.state.identification = crew_output
self.specialist_executor.log_tuning("Identification Crew Output", crew_output.model_dump())
return crew_output
@listen(and_(execute_rag, execute_spin, execute_identification))
def consolidate(self):
current_app.logger.debug("Executing consolidate")
inputs = {
"language": self.state.input.language,
"prepared_answers": self.state.rag_output.answer,
"additional_questions": self.state.lead_identification_questions.questions +
self.state.spin_questions.questions,
}
crew_output = self.rag_consolidation_crew.kickoff(inputs=inputs)
current_app.logger.debug(f"Executing consolidate finished")
self.specialist_executor.log_tuning("RAG Consolidation Crew Output", crew_output.model_dump())
def kickoff(self, inputs=None):
with current_event.create_span("SPIN Specialist Execution"):
self.specialist_executor.log_tuning("Inputs retrieved", inputs)
self.state.input = SPINSpecialistInput.model_validate(inputs)
self.specialist.update_progress("EveAI Flow Start", {"name": "SPIN"})
super().kickoff(inputs=inputs)
self.specialist.update_progress("EveAI Flow End", {"name": "SPIN"})
return self.state
This is the log when kicking off the flow:
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:188]: Processing inputs
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:193]: Executing RAG
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:203]: Executing spin
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:212]: Executing identification
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:131]: Flow Returned: {'id': '7e8ff963-c945-469f-a172-0cf84630a510', 'input': ...
2025-02-18 20:47:42 [ERROR] eveai_chat_workers (eveai_chat_workers) [tasks:326]: execute_specialist: Error
As you can see, the @start and first 3 @listen steps are started. But then, for one reason or another, the flow is ended without each of the steps (crews) are finished.
I thought that ‘kickoff’ is a synchronous call? Not an asynchronous one?
I cannot use ‘await’ as I’m not in an asynchronous function (gives an error in the IDE as well as at runtime).
What am I missing? Why is execution ending, while the different crews have not yet finished? I get no additional errors in my log, btw (except at the end, as the output of the flow has no value).