Error passing outputs between agents

I have a crew with 3 agents. First agent define a query to be search using SerperAI, second agent take the query and do the search extracting the content, and last agent analyse the articles found.

The problem that I am facing is that agent 3 (article_analyst) is not receiving or considering the articles found by agent 2 (article_researcher), sometimes it invents the result (fake articles) and sometimes only take one article.

This is the code used:

  • Crew.py:

from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai_tools import SerperDevTool
from datetime import datetime, timedelta
import logging
import os

from nexago_agents_project.src.nexago_agents_project.crews.mrk.search_articles.tools.article_analysis import ArticleAnalysis
from nexago_agents_project.src.nexago_agents_project.crews.mrk.search_articles.tools.article_validator_extractor import ArticleValidatorExtractor

logging.basicConfig(level=logging.DEBUG, format=“%(asctime)s - %(name)s - %(levelname)s - %(message)s”)
logger = logging.getLogger(name)

from pydantic import BaseModel
from typing import Optional, List
from langchain_openai import ChatOpenAI

class ArticleOutput(BaseModel):
title: str
date: str
link: str
snippet: str
content: str
imageUrl: Optional[str]
relevance_score: int
sentiment: str

@CrewBase
class ArticleResearchCrew:
“”“NexagoAgentsProject crew”“”

agents_config = "config/agents.yaml"
tasks_config = "config/tasks.yaml"

def prepare_data(self, data):
    start_date = (datetime.today() - timedelta(days=7)).strftime('%Y-%m-%d')
    end_date = datetime.today().strftime('%Y-%m-%d')
    keywords = " and ".join(data.get("keywords", []))
    return {
        "keywords": keywords,
        "startDate": start_date,
        "endDate": end_date,
        "numQueries": data.get("numQueries"),
        "userQuery": data.get("userQuery")
    }

def kickoff(self, data):
    try:
        prepared_data = self.prepare_data(data)
        crew_instance = self.crew()

        result = crew_instance.kickoff(inputs=prepared_data)
        logger.debug("Kickoff result: %s", result)
        return result

    except Exception as e:
        logger.error(f"Error in kickoff: {e}")
        return {"error": str(e), "status": "kickoff_failed"}

@agent
def query_generator(self) -> Agent:
   return Agent(
       config=self.agents_config['query_generator'],
       verbose = True,
       allow_delegation=False
   )

@agent
def article_researcher(self) -> Agent:
    serper_tool = SerperDevTool(n_results=15, tbs="qdr:w")

    return Agent(
       config=self.agents_config['article_researcher'],
       tools=[serper_tool, ArticleValidatorExtractor(result_as_answer=True)],
       verbose = True,
       memory=True,
       allow_delegation=False
   )

@agent
def article_analyst(self) -> Agent:
    return Agent(
        config=self.agents_config['article_analyst'],
        verbose=True,
        max_iter=1,
        memory=True,
        allow_delegation=False
    )

@task
def query_generator_task(self) -> Task:
    return Task(
        config=self.tasks_config['query_generator_task'],
        expected_output=self.tasks_config['query_generator_task'].get('expected_output'),
        agent=self.query_generator()  
    )

@task
def article_research_task(self) -> Task:
   return Task(
       config=self.tasks_config['article_research_task'],
       expected_output=self.tasks_config['article_research_task'].get('expected_output'),
       agent=self.article_researcher(),
       context=[self.query_generator_task()]
   )

@task
def article_analyst_task(self) -> Task:
   return Task(
        config=self.tasks_config['article_analyst_task'],
        expected_output=self.tasks_config['article_analyst_task'].get('expected_output'),
        agent=self.article_analyst(),
        context=[self.article_research_task()],  # Ensure it gets the output from the article_research_task
        output_type='json'          
   )

@crew
def crew(self) -> Crew:
    """Creates the NexagoAgentsProject crew"""
    logger.debug("Initializing agents and tasks in crew")
    return Crew(
        agents=self.agents,  # Automatically created by the @agent decorator
        tasks=self.tasks,    # Automatically created by the @task decorator
        process=Process.sequential,
        verbose=True,
        memory=True
    )

Following other posts, I have activated the memory, I have also done test creating a manager, but it doesn’t work.

I am facing a similar issue and the approach I am taking is based on a few suggestions made by Claude while referencing the crewai source code as context in Cursor.

I will let you know how it goes on my end.

For what its worth, here is (Claude + Cursor) reply to your original question, which is similar to my scenario:

I can help explain the behavior you’re experiencing and propose a solution. The core issue appears to be that while your prepare_data function handles initial data preparation, there’s no mechanism ensuring your analyst agent properly processes all articles from the researcher agent.

The solution involves implementing a custom tool for your analyst agent. Instead of relying solely on instructions to the agent, a tool enforces a structured processing pipeline. Consider this implementation:

class ArticleAnalysisTool(BaseTool):
    def run(self, articles):
        results = []
        # Enforces processing of each article
        for article in articles:
            analysis = self._analyze_single_article(article)
            results.append(analysis)
        
        return {
            "total_analyzed": len(articles),
            "analyses": results
        }

@agent
def article_analyst(self) -> Agent:
    return Agent(
        config=self.agents_config['article_analyst'],
        tools=[ArticleAnalysisTool()]
    )

This approach provides several key benefits:

  • Ensures comprehensive processing of all articles
  • Prevents hallucination of non-existent articles
  • Maintains consistent output structure
  • Provides validation of inputs and outputs

The tool effectively creates a structured pipeline that the agent must follow, rather than just relying on instructions that the agent might interpret loosely or ignore.

After more experimenting I can confirm that adding simple tools to handle and validate the data being passed between agents does allow the task results to be passed reliably in between agents. Here’s an example custom tool:

class ArticleAnalysisTool(BaseTool):
    name: str = "ArticleAnalysis"
    description: str = "Analyzes article content and extracts key information"
    
    def _run(self, article_data: str) -> str:
        try:
            # Handle both JSON strings and raw data
            if isinstance(article_data, str):
                try:
                    articles = json.loads(article_data)
                except:
                    articles = article_data
            
            # Process and return structured analysis
            return self._analyze_articles(articles)
            
        except Exception as e:
            return f"Error analyzing articles: {str(e)}\nRaw data: {article_data}"

This implementation helps ensure your Article Analyst actually works with the articles found by the Researcher, rather than inventing content or only considering a single article.

Other considerations that can help: proper task context chaining using the context parameter, as well as managing the volume of search results to prevent truncation. (I think you are already handling context chaining here…search results of 15 might be pushing the context window boundary? That one I’m not so sure about.)

I don’t think enabling memory is necessary for this to work the way you expect.

Thank you, @olaservo. I’ve implemented your recommendations, but the error persists. The analysis is still only processing the last article, not the entire list. :frowning:. I have tried activating and deactivating the memory. I have also checked to setup the context as task level.

I’m curious what version of crewai you’re using?

I am using 0.80.0 version.

Finally, my last agent is taking the output of previous agent. I had to redefine the agents and tasks iteratively until I achieved the desired outcome.

This topic was automatically closed 24 hours after the last reply. New replies are no longer allowed.