My Flow isn't waiting for crews to finish

My CrewAI Flow isn’t behaving as I’d expect. I have the following Flow class defined:

class SPINFlow(EveAICrewAIFlow[SPINFlowState]):
    def __init__(self, specialist_executor, rag_crew, spin_crew, identification_crew, rag_consolidation_crew, **kwargs):
        super().__init__(specialist_executor, "SPIN Specialist Flow", **kwargs)
        self.specialist_executor = specialist_executor
        self.rag_crew = rag_crew
        self.spin_crew = spin_crew
        self.identification_crew = identification_crew
        self.rag_consolidation_crew = rag_consolidation_crew

    @start()
    def process_inputs(self):
        current_app.logger.debug("Processing inputs")


    @listen(process_inputs)
    def execute_rag(self):
        current_app.logger.debug("Executing RAG")
        inputs = self.state.input.model_dump()
        crew_output = self.rag_crew.kickoff(inputs=inputs)
        self.state.rag_output = crew_output
        current_app.logger.debug(f"Executing RAG finished")
        self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
        return crew_output

    @listen(process_inputs)
    def execute_spin(self):
        current_app.logger.debug("Executing spin")
        crew_output = self.spin_crew.kickoff(inputs=flatten_pydantic_model(self.state))
        current_app.logger.debug(f"Executing spin finished")
        # spin_output = SPINOutput.model_validate(crew_output["spin_detect_task"])
        self.state.spin = crew_output
        return crew_output

    @listen(process_inputs)
    def execute_identification(self):
        current_app.logger.debug("Executing identification")
        crew_output = self.identification_crew.kickoff(inputs=flatten_pydantic_model(self.state))
        current_app.logger.debug(f"Executing identification finished")
        self.state.identification = crew_output
        self.specialist_executor.log_tuning("Identification Crew Output", crew_output.model_dump())
        return crew_output

    @listen(and_(execute_rag, execute_spin, execute_identification))
    def consolidate(self):
        current_app.logger.debug("Executing consolidate")
        inputs = {
            "language": self.state.input.language,
            "prepared_answers": self.state.rag_output.answer,
            "additional_questions": self.state.lead_identification_questions.questions +
                                    self.state.spin_questions.questions,
        }
        crew_output = self.rag_consolidation_crew.kickoff(inputs=inputs)
        current_app.logger.debug(f"Executing consolidate finished")
        self.specialist_executor.log_tuning("RAG Consolidation Crew Output", crew_output.model_dump())

    def kickoff(self, inputs=None):
        with current_event.create_span("SPIN Specialist Execution"):
            self.specialist_executor.log_tuning("Inputs retrieved", inputs)
            self.state.input = SPINSpecialistInput.model_validate(inputs)
            self.specialist.update_progress("EveAI Flow Start", {"name": "SPIN"})
            super().kickoff(inputs=inputs)

            self.specialist.update_progress("EveAI Flow End", {"name": "SPIN"})

            return self.state

This is the log when kicking off the flow:

2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:188]: Processing inputs
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:193]: Executing RAG
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:203]: Executing spin
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:212]: Executing identification
2025-02-18 20:47:42 [DEBUG] eveai_chat_workers (eveai_chat_workers) [1_0:131]: Flow Returned: {'id': '7e8ff963-c945-469f-a172-0cf84630a510', 'input': ...
2025-02-18 20:47:42 [ERROR] eveai_chat_workers (eveai_chat_workers) [tasks:326]: execute_specialist: Error

As you can see, the @start and first 3 @listen steps are started. But then, for one reason or another, the flow is ended without each of the steps (crews) are finished.
I thought that ‘kickoff’ is a synchronous call? Not an asynchronous one?
I cannot use ‘await’ as I’m not in an asynchronous function (gives an error in the IDE as well as at runtime).

What am I missing? Why is execution ending, while the different crews have not yet finished? I get no additional errors in my log, btw (except at the end, as the output of the flow has no value).

I have been diving deeper into the problem. I think the problem is due to the fact that exceptions that are raised in crew execution (i.e. the first crew fails) doesn’t get propagated to the flow execution.
When I start the first crew in the execute_rag, this raises an exception, and this exception is not propagated. When I kickoff the crew directly - outside of the context of the flow - the exception is raised (and I can act on it).

@listen(process_inputs)
def execute_rag(self):
    current_app.logger.debug("Executing RAG")
    inputs = self.state.input.model_dump()
    try:
        crew_output = self.rag_crew.kickoff(inputs=inputs)
    except Exception as e:
        current_app.logger.error(f"CREW rag_crew Kickoff Error: {str(e)}")
        raise e

    self.state.rag_output = crew_output
    current_app.logger.debug(f"Executing RAG finished")
    self.specialist_executor.log_tuning("RAG Crew Output", crew_output.model_dump())
    return crew_output

I suppose this is a bug?

As a workaround, I’m now capturing potential exceptions within the @listen methods, to at least see what’s going on. I need to find a mechanism to propagate it outside of the flow kickoff into my calling app, as raising the the exception in order to inform methods higher in the stack doesn’t work as expected.