Unable to use hierarchical process within virtual env

I keep getting this error when running crew ai flow from inside of a venv:

Using tool: Delegate work to coworker

Tool Input:

“{"task": "SOME TASK HERE.", "coworker": "Writer Agent"}”

Tool Output:

Error executing tool. coworker mentioned not found, it must be one of the following options:

  • project manager agent

It’s not seeing the other agents that are part of the same crew from inside the venv. I have a few more agents in the same crew for writing tasks. But when I run this from outside of the venv, it runs fine!

Code for reference:

from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from pydantic import BaseModel, Field
from typing import List
from src.content_creator.llms import gpt4o, gpt4o_mini, llama3_1, mistral

Uncomment the following line to use an example of a custom tool

from writers_crew.tools.custom_tool import MyCustomTool

Check our tools documentations for more information on how to use them

from crewai_tools import SerperDevTool

class Post(BaseModel):
title: str # e.g., “ShortContent”, “MediumContent”
description: str # e.g., “DataContent”, “Humor”
anchor: str # Anchor for the content
location: str # Specific location for the content. Location within US Only for now.
image: str # Broader context (e.g., “OneAct is a platform…”)

class AllPosts(BaseModel):
posts: List[Post] = Field(…, description=“Planned posts to be created”)

@CrewBase
class WritersCrew:
“”“WritersCrew crew”“”

agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"

@agent
def project_manager_agent(self) -> Agent:
    return Agent(
        config=self.agents_config["project_manager_agent"],
        allow_delegation=True,
        cache=True,
        # llm=gpt4o,
    )

@agent
def short_form_post_agent(self) -> Agent:
    return Agent(
        config=self.agents_config["short_form_post_agent"],
        verbose=True,
        allow_delegation=True,
        cache=True,
        max_retry_limit=3,
    )

@agent
def medium_form_post_agent(self) -> Agent:
    return Agent(
        config=self.agents_config["medium_form_post_agent"],
        verbose=True,
        allow_delegation=True,
        cache=True,
        max_retry_limit=3,
    )

@agent
def funny_post_agent(self) -> Agent:
    return Agent(
        config=self.agents_config["funny_post_agent"],
        verbose=True,
        allow_delegation=True,
        cache=True,
        max_retry_limit=3,
        # llm=gpt4o_mini,
    )

@agent
def cta_post_agent(self) -> Agent:
    return Agent(
        config=self.agents_config["cta_post_agent"],
        verbose=True,
        allow_delegation=True,
        cache=True,
        max_retry_limit=3,
        # llm=gpt4o_mini,
    )

@agent
def data_post_agent(self) -> Agent:
    return Agent(
        config=self.agents_config["data_post_agent"],
        verbose=True,
        allow_delegation=True,
        cache=True,
        max_retry_limit=3,
        # llm=gpt4o_mini,
    )

@task
def posts_writing_task(self) -> Task:
    return Task(
        description=self.tasks_config["posts_writing_task"]["description"],
        expected_output=self.tasks_config["posts_writing_task"]["expected_output"],
        output_json=AllPosts,
    )

@crew
def crew(self) -> Crew:
    """Creates the WritersCrew crew"""
    return Crew(
        agents=[
            self.short_form_post_agent(),
            self.medium_form_post_agent(),
            self.cta_post_agent(),
            self.funny_post_agent(),
            self.data_post_agent(),
        ],
        manager_agent=self.project_manager_agent(),
        tasks=[
            self.posts_writing_task()
        ],
        verbose=True,
        process=Process.hierarchical,
        memory=True,
        planning=True,
    )