I keep getting this error when running crew ai flow from inside of a venv:
Using tool: Delegate work to coworker
Tool Input:
“{"task": "SOME TASK HERE.", "coworker": "Writer Agent"}”
Tool Output:
Error executing tool. coworker mentioned not found, it must be one of the following options:
- project manager agent
It’s not seeing the other agents that are part of the same crew from inside the venv. I have a few more agents in the same crew for writing tasks. But when I run this from outside of the venv, it runs fine!
Code for reference:
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from pydantic import BaseModel, Field
from typing import List
from src.content_creator.llms import gpt4o, gpt4o_mini, llama3_1, mistral
Uncomment the following line to use an example of a custom tool
from writers_crew.tools.custom_tool import MyCustomTool
Check our tools documentations for more information on how to use them
from crewai_tools import SerperDevTool
class Post(BaseModel):
title: str # e.g., “ShortContent”, “MediumContent”
description: str # e.g., “DataContent”, “Humor”
anchor: str # Anchor for the content
location: str # Specific location for the content. Location within US Only for now.
image: str # Broader context (e.g., “OneAct is a platform…”)
class AllPosts(BaseModel):
posts: List[Post] = Field(…, description=“Planned posts to be created”)
@CrewBase
class WritersCrew:
“”“WritersCrew crew”“”
agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"
@agent
def project_manager_agent(self) -> Agent:
return Agent(
config=self.agents_config["project_manager_agent"],
allow_delegation=True,
cache=True,
# llm=gpt4o,
)
@agent
def short_form_post_agent(self) -> Agent:
return Agent(
config=self.agents_config["short_form_post_agent"],
verbose=True,
allow_delegation=True,
cache=True,
max_retry_limit=3,
)
@agent
def medium_form_post_agent(self) -> Agent:
return Agent(
config=self.agents_config["medium_form_post_agent"],
verbose=True,
allow_delegation=True,
cache=True,
max_retry_limit=3,
)
@agent
def funny_post_agent(self) -> Agent:
return Agent(
config=self.agents_config["funny_post_agent"],
verbose=True,
allow_delegation=True,
cache=True,
max_retry_limit=3,
# llm=gpt4o_mini,
)
@agent
def cta_post_agent(self) -> Agent:
return Agent(
config=self.agents_config["cta_post_agent"],
verbose=True,
allow_delegation=True,
cache=True,
max_retry_limit=3,
# llm=gpt4o_mini,
)
@agent
def data_post_agent(self) -> Agent:
return Agent(
config=self.agents_config["data_post_agent"],
verbose=True,
allow_delegation=True,
cache=True,
max_retry_limit=3,
# llm=gpt4o_mini,
)
@task
def posts_writing_task(self) -> Task:
return Task(
description=self.tasks_config["posts_writing_task"]["description"],
expected_output=self.tasks_config["posts_writing_task"]["expected_output"],
output_json=AllPosts,
)
@crew
def crew(self) -> Crew:
"""Creates the WritersCrew crew"""
return Crew(
agents=[
self.short_form_post_agent(),
self.medium_form_post_agent(),
self.cta_post_agent(),
self.funny_post_agent(),
self.data_post_agent(),
],
manager_agent=self.project_manager_agent(),
tasks=[
self.posts_writing_task()
],
verbose=True,
process=Process.hierarchical,
memory=True,
planning=True,
)