I am working on a multi-AI agent system that extracts information from files uploaded through the backend. However, the agent keeps encountering issues with the tool arguments, even though I have configured the query properly in the task settings. I’m reaching out for help because I am new to this and don’t have much experience yet. Did I miss something in the configuration, or is there another area I should check to fix this issue? Any guidance would be greatly appreciated.
from crewai import Agent, Task, Crew
from crewai_tools import FileReadTool
from app.config.config import load_agent_configs, load_task_configs
from tempfile import NamedTemporaryFile
import os
import logging
import asyncio
import sys
import requests
from crewai_tools import (
PDFSearchTool,
DOCXSearchTool,
TXTSearchTool,
CSVSearchTool,
JSONSearchTool,
)
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../../")))
class CrewAiService:
def __init__(self, clinical_notes, measure_outcomes, additional_notes, uploaded_files_paths, tasks_config):
self.clinical_note_path = self.create_temp_file(clinical_notes, "clinical_notes.txt")
self.outcome_measure_path = self.create_temp_file(measure_outcomes, "measure_outcomes.txt")
self.additional_notes_path = self.create_temp_file(additional_notes, "additional_notes.txt")
self.tasks_config = tasks_config
self.uploaded_files = uploaded_files_paths
self.tools = self.initialize_tools()
self.crew = self.initialize_crew()
def create_temp_file(self, content, filename):
# Create a temporary file with the given content
temp_file = NamedTemporaryFile(delete=False, suffix=f"_{filename}")
temp_file.write(content.encode("utf-8")) # Write string content as bytes
temp_file.close()
return temp_file.name
def initialize_tools(self):
tools = [
FileReadTool(file_path=self.clinical_note_path, name="clinical_note_tool"),
FileReadTool(file_path=self.outcome_measure_path, name="outcome_measure_tool"),
FileReadTool(file_path=self.additional_notes_path, name="additional_notes_tool"),
]
query = self.tasks_config['tasks']['data_extraction_task']['query']
for file_path, content_type in self.uploaded_files:
logging.debug(f"Processing file: {file_path}, Content Type: {content_type}")
if content_type == "application/pdf":
tools.append(PDFSearchTool(pdf=file_path, name="pdf_tool", query=query))
elif content_type == "text/csv":
tools.append(CSVSearchTool(csv=file_path, name="csv_tool", query=query))
elif content_type == "application/json":
tools.append(JSONSearchTool(json=file_path, name="json_tool", query=query))
elif content_type == "text/plain":
tools.append(TXTSearchTool(txt=file_path, name="txt_tool", query=query))
elif content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
tools.append(DOCXSearchTool(docx=file_path, name="docx_tool", query=query))
else:
logging.warning(f"Unsupported file type: {content_type}")
logging.debug(f"Initializing tools with uploaded files: {self.uploaded_files}")
logging.debug(f"Tools initialized with files: {tools}")
return tools
def initialize_crew(self):
agents_config = load_agent_configs()
tasks_config = load_task_configs()
# Initialize agents
data_extraction_agent = Agent(
config=agents_config['agents']['data_extraction_agent'],
tools=self.tools
)
cost_calculation_agent = Agent(
config=agents_config['agents']['cost_calculation_agent'],
tools=[] # Explicitly pass an empty list
)
insight_recommendation_agent = Agent(
config=agents_config['agents']['insight_recommendation_agent'],
tools=[] # Explicitly pass an empty list
)
reporting_agent = Agent(
config=agents_config['agents']['reporting_agent'],
tools=[] # Explicitly pass an empty list
)
# Initialize tasks
data_extraction_task = Task(config=tasks_config['tasks']['data_extraction_task'], agent=data_extraction_agent)
cost_calculation_task = Task(config=tasks_config['tasks']['cost_calculation_task'], agent=cost_calculation_agent)
insight_recommendation_task = Task(config=tasks_config['tasks']['insight_recommendation_task'], agent=insight_recommendation_agent)
report_compilation_task = Task(config=tasks_config['tasks']['report_compilation_task'], agent=reporting_agent,
context=[data_extraction_task, cost_calculation_task, insight_recommendation_task])
return Crew(
agents=[data_extraction_agent, cost_calculation_agent, insight_recommendation_agent, reporting_agent],
tasks=[data_extraction_task, cost_calculation_task, insight_recommendation_task, report_compilation_task],
verbose=True
)
async def run_analysis(self, openai_model_name="gpt-4o"):
try:
self.crew.test(n_iterations=1, openai_model_name=openai_model_name)
self.crew.train(n_iterations=1, filename='training.pkl')
# Execute tools with specific queries for PDF
for tool in self.tools:
await tool.run()
result = self.crew.kickoff()
processed_result = self.process_result(result)
self.store_results_in_backend(processed_result)
return processed_result
finally:
self.cleanup_temp_files()
def cleanup_temp_files(self):
# Remove temporary files to prevent clutter
temp_files = [self.clinical_note_path, self.outcome_measure_path, self.additional_notes_path]
for file_path in temp_files:
if os.path.exists(file_path):
os.remove(file_path)
def process_result(self, result):
# Extract data for each task
data_extraction_output = result.get("data_extraction_task", {}).get("output", {})
cost_calculation_output = result.get("cost_calculation_task", {}).get("output", {})
insight_recommendation_output = result.get("insight_recommendation_task", {}).get("output", {})
# Parse meaningful fields from outputs
parsed_result = {
"data_extraction": {
"equipment": data_extraction_output.get("equipment", []),
"sessions": data_extraction_output.get("sessions", []),
"notes": data_extraction_output.get("notes", "No additional notes."),
},
"cost_calculation": {
"equipment_summary": cost_calculation_output.get("equipment_summary", {}),
"session_summary": cost_calculation_output.get("session_summary", {}),
"total_summary": cost_calculation_output.get("total_summary", {}),
},
"insights": {
"recommendations": insight_recommendation_output.get("recommendations", []),
"justifications": insight_recommendation_output.get("justifications", "No justifications provided."),
}
}
return parsed_result
def store_results_in_backend(self, data):
backend_url = "http://localhost:3000/services/api/v1/agent_results"
headers = {"Content-Type": "application/json"}
try:
response = requests.post(backend_url, json=data, headers=headers)
response.raise_for_status() # Raise an exception for HTTP errors
logging.info("Data successfully stored in backend.")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to store data in backend: {e}")
if __name__ == "__main__":
# Example test
tasks_config = load_task_configs()
test_service = CrewAiService(
clinical_notes="Patient has lowback pain",
measure_outcomes="GSA",
additional_notes="Continue physiotherapy",
uploaded_files_paths=[("bills_sample.pdf", "application/pdf")],
tasks_config=tasks_config
)
print("Running analysis...")
result = asyncio.run(test_service.run_analysis())
print("Final result is:", result)
— This is the pdf_search_tool setup
from typing import Any, Optional, Type
from embedchain.models.data_type import DataType
from pydantic import BaseModel, Field, model_validator
from ..rag.rag_tool import RagTool
class FixedPDFSearchToolSchema(BaseModel):
"""Input for PDFSearchTool."""
query: str = Field(
..., description="Mandatory query you want to use to search the PDF's content"
)
class PDFSearchToolSchema(FixedPDFSearchToolSchema):
"""Input for PDFSearchTool."""
pdf: str = Field(..., description="Mandatory pdf path you want to search")
class PDFSearchTool(RagTool):
name: str = "Search a PDF's content"
description: str = (
"A tool that can be used to semantic search a query from a PDF's content."
)
args_schema: Type[BaseModel] = PDFSearchToolSchema
def __init__(self, pdf: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if pdf is not None:
kwargs["data_type"] = DataType.PDF_FILE
self.add(pdf)
self.description = f"A tool that can be used to semantic search a query the {pdf} PDF's content."
self.args_schema = FixedPDFSearchToolSchema
self._generate_description()
@model_validator(mode="after")
def _set_default_adapter(self):
if isinstance(self.adapter, RagTool._AdapterPlaceholder):
from embedchain import App
from crewai_tools.adapters.pdf_embedchain_adapter import (
PDFEmbedchainAdapter,
)
app = App.from_config(config=self.config) if self.config else App()
self.adapter = PDFEmbedchainAdapter(
embedchain_app=app, summarize=self.summarize
)
return self
def add(
self,
*args: Any,
**kwargs: Any,
) -> None:
super().add(*args, **kwargs)
def _before_run(
self,
query: str,
**kwargs: Any,
) -> Any:
if "pdf" in kwargs:
self.add(kwargs["pdf"])
return query
– This is the tasks.yaml
tasks:
data_extraction_task:
description: >
Extract detailed data from clinical notes, additional notes, measure outcomes, and uploaded files.
Identify:
- Equipment details (name, quantity, rate, category, total) and categorize into `low`, `mid`, or `high`.
- Session details (name, duration, rate, total) and categorize into `low`, `mid`, or `high`.
- Notes or additional information relevant for cost calculations.
Provide structured JSON output for downstream tasks.
query: >
Extract equipment , session costs from uploaded files and notes from the clinical notes, additional notes, measure outcomes
agent: data_extraction_agent
expected_output: >
A JSON object containing:
- equipment: List of equipment items with:
- name: The name of the equipment.
- category: Categorized as `low`, `mid`, or `high`.
- quantity: Quantity of the equipment.
- rate: Cost per unit of equipment.
- total: Total cost for the equipment.
- sessions: List of session items with:
- name: The session type or activity.
- duration: Duration of the session.
- rate: Cost per session or hourly rate.
- total: Total cost for the session.
- notes: Any additional contextual notes extracted from the inputs.
- missing_data: Fields that could not be extracted or validated.
cost_calculation_task:
description: >
Calculate the total costs for sessions and equipment using the data from the Data Extraction Task.
Categorize items into `low`, `mid`, or `high` based on NDIS classifications and prepare a summary.
agent: cost_calculation_agent
dependencies:
- data_extraction_task
expected_output: >
A JSON object containing:
- equipment_summary: Aggregated costs by categories (low, mid, high).
- session_summary: Aggregated costs by categories (low, mid, high).
- total_summary: Grand totals for equipment and sessions.
insight_recommendation_task:
description: >
Generate recommendations for cost optimization. Include:
- Equipment recommendations based on cost-effectiveness.
- Session adjustments to meet budget thresholds.
- Market-based suggestions for pricing or scheduling.
agent: insight_recommendation_agent
dependencies:
- cost_calculation_task
- data_extraction_task
expected_output: >
A JSON object containing:
- recommendations: A list of suggestions for optimization.
- justifications: Reasons for each recommendation.
report_compilation_task:
description: >
Compile a structured report detailing the total costs, itemized breakdown by session and equipment,
and categorization levels. Incorporate insights and recommendations from the previous task.
Present data in tables and include summaries for stakeholder review.
agent: reporting_agent
dependencies:
- insight_recommendation_task
- cost_calculation_task
- data_extraction_task
expected_output: >
A JSON object containing:
- equipment_cost_summary: Detailed breakdown of equipment costs.
- session_cost_summary: Detailed breakdown of session costs.
- total_summary: Grand totals.
- recommendations: Optimizations and justifications for stakeholders.
— This is the error I got—
# Agent: Extract and organize structured data from clinical notes, additional notes, measure outcomes, and uploaded files. Categorize items according to NDIS guidelines: - Low: Cost less than $100 per unit/session. - Mid: Cost between $100 and $500 per unit/session. - High: Cost above $500 per unit/session. Provide complete, structured outputs with placeholders for missing data (e.g., "Not Available"
).
## Thought: Thought: I will specify my query to search the PDF for any relevant information on equipment and session costs since my previous attempt to access it did not go through. I will search for general terms related to cost.
## Using tool: pdf_tool
## Tool Input:
“{"description": "cost of sessions and equipment", "type": "str"}”
## Tool Output:
I encountered an error while trying to use the tool. This was the error: 1 validation error for FixedPDFSearchToolSchema
query
Field required [type=missing, input_value={'description': 'cost of ...uipment', 'type': 'str'}, input_type=dict]
For further information visit https://errors.pydantic.dev/2.10/v/missing.
Tool pdf_tool accepts these inputs: Tool Name: pdf_tool
Tool Arguments: {'query': {'description': "Mandatory query you want to use to search the PDF's content", 'type': 'str'}}
Tool Description: A tool that can be used to semantic search a query the bills_sample.pdf PDF's content..
Moving on then. I MUST either use a tool (use one at time) OR give my best final answer not both at the same time. To Use the following format:
Thought: you should always think about what to do
Action: the action to take, should be one of [clinical_note_tool, outcome_measure_tool, additional_notes_tool, pdf_tool]
Action Input: the input to the action, dictionary enclosed in curly braces
Observation: the result of the action
... (this Thought/Action/Action Input/Result can repeat N times)
Thought: I now can give a great answer
Final Answer: Your final answer must be the great and the most complete as possible, it must be outcome described