Flow execution - asyncio.run() cannot be called from a running event loop

Hi all,
I’m running into this error when trying to start a flow. This is based on the DeepLearningAI, Practical Multi Agent … class. Lesson 6/ lab3. I’m running it locally in a python 3.10 env with crewai 0.75.0 and crew_tools 0.13.2.

When running the emails = await flow.kickoff() cell I receive the following.

RuntimeError: asyncio.run() cannot be called from a running event loop

I get similar results if I run the Structured State Management code example from the docs.

from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel

class ExampleState(BaseModel):
counter: int = 0
message: str = “”

class StructuredExampleFlow(Flow[ExampleState]):

@start()
def first_method(self):
    self.state.message = "Hello from structured flow"

@listen(first_method)
def second_method(self):
    self.state.counter += 1
    self.state.message += " - updated"

@listen(second_method)
def third_method(self):
    self.state.counter += 1
    self.state.message += " - updated again"

    print(f"State after third_method: {self.state}")

flow = StructuredExampleFlow()
flow.kickoff()

Both code examples work when using crew 0.67.0 and crewai_tools 0.12.1

How do I go about implementing flows with the latest crewai packages? Any pointers would be helpful.

V/r,

Chuck

I think probably that’s because you are using Jupyter Notebook where it’s asynchronous in nature. What worked for me was replacing this:

emails = await flow.kickoff()

With this:
emails = await flow.kickoff_async()

Hope it helps, Chuck

Hey @nugroho1234, I dug a little deep into this issue. It is happening because the flow.kickoff() function is implemented as follows in the library

def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
    """
    Starts the execution of the flow synchronously.

    Args:
        inputs: Optional dictionary of inputs to initialize or update the state.

    Returns:
        The final output from the flow execution.
    """
    if inputs is not None:
        self._initialize_state(inputs)
    return asyncio.run(self.kickoff_async())

flow.kickoff() internally calls flow.kickoff_async() by wrapping it over an event thread. That’s why we get the error RuntimeError: asyncio.run() cannot be called from a running event loop