Need help with asynchronous tasks

Hi all! I’ve been using Crew AI for the past week and really love the simplicity. However, I do need some help figuring out how to run a task asynchronously. I have n number of tasks that are independent of each other and don’t need to run sequentially. I found these documentation pages:

but I can’t find an example of how this is done. I tried doing something like this in my crew class:

@task
def example_task(self) -> Task:
    output_path = Path("example/path"
    output_path.mkdir(parents=True, exist_ok=True)
    return Task(
        config=self.tasks_config["threads_single_task"],
        output_file=str(output_path / f"filename.txt"),
        async_execution=True,
    )

for my n number of files but I got this error:

python main.py run blog_repurpose

Traceback (most recent call last):
File “/Users/cjjohanson/Documents/GitHub/content_team/content_gen_team/main.py”, line 39, in run
crew = CrewClass().crew()
^^^^^^^^^^^^^^^^^^
File “/Users/cjjohanson/.local/share/virtualenvs/content_team-8pRxG-uY/lib/python3.12/site-packages/crewai/project/utils.py”, line 11, in memoized_func
cache[key] = func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File “/Users/cjjohanson/.local/share/virtualenvs/content_team-8pRxG-uY/lib/python3.12/site-packages/crewai/project/annotations.py”, line 112, in wrapper
crew = func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “/Users/cjjohanson/Documents/GitHub/content_team/content_gen_team/crews/blog_repurpose_crew.py”, line 145, in crew
return Crew(
^^^^^
File “/Users/cjjohanson/.local/share/virtualenvs/content_team-8pRxG-uY/lib/python3.12/site-packages/pydantic/main.py”, line 243, in init
validated_self = self.pydantic_validator.validate_python(data, self_instance=self)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 1 validation error for Crew
The crew must end with at most one asynchronous task. [type=async_task_count, input_value={‘agents’: [Agent(role=Se…tial’>, ‘verbose’: True}, input_type=dict]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File “/Users/cjjohanson/Documents/GitHub/content_team/content_gen_team/main.py”, line 94, in
run(CrewClass)
File “/Users/cjjohanson/Documents/GitHub/content_team/content_gen_team/main.py”, line 43, in run
raise Exception(f"An error occurred while running the crew: {e}")
Exception: An error occurred while running the crew: 1 validation error for Crew
The crew must end with at most one asynchronous task. [type=async_task_count, input_value={‘agents’: [Agent(role=Se…tial’>, ‘verbose’: True}, input_type=dict]

When I read it, that error seems to be at odds with the asynchronous functionality but I must be reading it wrong. Can anyone help me out? :folded_hands:

The error you’re facing is due to this section of the Crew class (defined in crewai/crew.py):

@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
    """Validates that the crew ends with at most one asynchronous task."""
    final_async_task_count = 0

    # Traverse tasks backward from the end
    for task in reversed(self.tasks):
        if task.async_execution:
            final_async_task_count += 1
        else:
            # Stop traversing as soon as a non-async task is found
            break

    # Check if more than one consecutive async task exists at the end
    if final_async_task_count > 1:
        raise PydanticCustomError(
            "async_task_count",
            "The crew must end with at most one asynchronous task.",
            {}, # Context for the error (empty in this case)
        )

    return self

Basically, this validation ensures that your sequence of tasks ends with a synchronous task (the default async_execution=False). In the CrewAI documentation example, the list_ideas and list_important_history tasks run asynchronously. The write_article task then waits for the output of these two tasks to complete and uses their combined output as context. The write_article task is the last one executed, and it is not asynchronous, satisfying the validation rule.

Without further details about what you’re trying to do and how, that’s the explanation for the error you’re encountering. If you need more specific help with your implementation, I suggest you provide a clearer description and a code example of how you’re trying to build your solution.

1 Like