Having Agents pass crew input Pandas DataFrame to tools

Hi!
I’m trying to develop a set of agents to analyze data tables (loaded from CSVs as Pandas DataFrames) and answer questions about the data. For example, one might ask what is the difference between the average values of two of the columns for the rows when the value on another column is above some threshold. This will almost always involve performing calculations or other aggregations on the table. If I understand correctly, this means that the CSVSearchTool is not what I need.

One can imagine asking the agent to generate code to analyze the dataframe, but let’s consider a simplified situation in which I define all of the necessary tools that involve dealing with the dataframe directly. The problem is that apparently the tools are still being calling with strings instead of the input dataframe. Is there a way to pass the Pandas dataframe to the tools without having to convert it to, say, a JSON string that gets passed in the prompt? Can this be somehow managed via the agents’ memory?

Example code:

import json
import pandas as pd

from crewai import Agent, Task, Crew, LLM
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type

llm = LLM(model='bedrock/amazon.nova-pro-v1:0')

class DataFrameInput(BaseModel):
    """Input schema for Pandas DataFrame input data tables."""
    model_config = ConfigDict(arbitrary_types_allowed=True)
    data_table: pd.DataFrame = Field(..., description='Data table as Pandas DataFrame.')

class DataFrameSchemaTool(BaseTool):
    name: str = 'DataFrameSchemaTool'
    description: str = """
        Returns the schema of the input data table, which includes
        the column names and their datatypes, as JSON.
        The data table should be a Pandas Dataframe object.
    """
    args_schema: Type[BaseModel] = DataFrameInput

    def _run(self, data_table: pd.DataFrame) -> str:
        print('Running DataFrameSchemaTool...')
        if not isinstance(data_table, pd.DataFrame):
            raise TypeError('Incorrect data table type. It should be a Pandas DataFrame.')
        return json.dumps(
            {
                'schema': json.loads(
                    df.iloc[:3].to_json(orient='table', index=False)
                )['schema']['fields'],   # remove extraneous pandas info in schema
            },
            indent=2,
        )

class DataFrameSamplesTool(BaseTool):
    name: str = 'DataFrameSamplesTool'
    description: str = """
        Returns a random sample of 3 rows of the input data table as JSON.
        The data table should be a Pandas Dataframe object.
    """
    args_schema: Type[BaseModel] = DataFrameInput

    def _run(self, data_table: pd.DataFrame) -> str:
        print('Running DataFrameSamplesTool...')
        if not isinstance(data_table, pd.DataFrame):
            raise TypeError('Incorrect data table type. It should be a Pandas DataFrame.')
        return json.dumps(
            json.loads(
                df.sample(n=3, axis=0).to_json(orient='table', index=False)
            )['data'],
            indent=2,
        )

analysis_agent = Agent(
    role='Data analyst',
    goal='Provide an overview and detailed information on each of the columns of the provided data table.',
    backstory="""
        You a data analyst who thoroughly inspects data to determine its meaning.
        You can access tools to get information about a data table, such as
        its schema and data row examples.
        
        You then carefully organize this information providing:
         * An overview of what the table is about and the information it contains.
         * The JSON schema of the data table where the information about each
           column has been augmented with a `description` field which describes
           the data in the corresponding data table column.
    """,
    tools=[DataFrameSchemaTool(), DataFrameSamplesTool()],
    llm=llm,
    allow_delegation=False,
    verbose=True,
)

analysis_task = Task(
    description='Given a Pandas DataFrame, analyze it to determine what it is about.',
    expected_output='Provide an overview of the data and detailed information on, including a description of, each of the data table columns.',
    agent=analysis_agent,
)

crew = Crew(
    agents=[analysis_agent],
    tasks=[analysis_task],
    verbose=True,
    full_output=True,
)

result = crew.kickoff(inputs={
    'data_table': pd.read_csv('iris_dataset.csv'),
})

Output:

# Agent: Data analyst
## Task: Given a Pandas DataFrame, analyze it to determine what it is about.
 

I encountered an error while trying to use the tool. This was the error: Arguments validation failed: 1 validation error for DataFrameInput
data_table
  Input should be an instance of DataFrame [type=is_instance_of, input_value='data_frame', input_type=str]
    For further information visit https://errors.pydantic.dev/2.10/v/is_instance_of.
 Tool DataFrameSchemaTool accepts these inputs: Tool Name: DataFrameSchemaTool
Tool Arguments: {'data_table': {'description': 'Data table as Pandas DataFrame.', 'type': 'DataFrame'}}
Tool Description: 
        Returns the schema of the input data table, which includes
        the column names and their datatypes, as JSON.
        The data table should be a Pandas Dataframe object.
    



# Agent: Data analyst
## Thought: Thought: To analyze the DataFrame, I first need to get the schema of the data table to understand its structure and the types of data it contains. Then, I will fetch a sample of the data to get a glimpse of the actual values and better understand the content of each column.
## Using tool: DataFrameSchemaTool
## Tool Input: 
"{\"data_table\": \"data_frame\"}"
## Tool Output: 

I encountered an error while trying to use the tool. This was the error: Arguments validation failed: 1 validation error for DataFrameInput
data_table
  Input should be an instance of DataFrame [type=is_instance_of, input_value='data_frame', input_type=str]
    For further information visit https://errors.pydantic.dev/2.10/v/is_instance_of.
 Tool DataFrameSchemaTool accepts these inputs: Tool Name: DataFrameSchemaTool
Tool Arguments: {'data_table': {'description': 'Data table as Pandas DataFrame.', 'type': 'DataFrame'}}
Tool Description: 
        Returns the schema of the input data table, which includes
        the column names and their datatypes, as JSON.
        The data table should be a Pandas Dataframe object.
    .
Moving on then. I MUST either use a tool (use one at time) OR give my best final answer not both at the same time. When responding, I must use the following format:

```
Thought: you should always think about what to do
Action: the action to take, should be one of [DataFrameSchemaTool, DataFrameSamplesTool]
Action Input: the input to the action, dictionary enclosed in curly braces
Observation: the result of the action
```
This Thought/Action/Action Input/Result can repeat N times. Once I know the final answer, I must return the following format:

```
Thought: I now can give a great answer
Final Answer: Your final answer must be the great and the most complete as possible, it must be outcome described

```
 

I encountered an error while trying to use the tool. This was the error: Arguments validation failed: 1 validation error for DataFrameInput
data_table
  Input should be an instance of DataFrame [type=is_instance_of, input_value='data_frame', input_type=str]
    For further information visit https://errors.pydantic.dev/2.10/v/is_instance_of.
 Tool DataFrameSchemaTool accepts these inputs: Tool Name: DataFrameSchemaTool
Tool Arguments: {'data_table': {'description': 'Data table as Pandas DataFrame.', 'type': 'DataFrame'}}
Tool Description: 
        Returns the schema of the input data table, which includes
        the column names and their datatypes, as JSON.
        The data table should be a Pandas Dataframe object.
    



# Agent: Data analyst
## Using tool: DataFrameSchemaTool
## Tool Input: 
"{\"data_table\": \"data_frame\"}"
## Tool Output: 

I encountered an error while trying to use the tool. This was the error: Arguments validation failed: 1 validation error for DataFrameInput
data_table
  Input should be an instance of DataFrame [type=is_instance_of, input_value='data_frame', input_type=str]
    For further information visit https://errors.pydantic.dev/2.10/v/is_instance_of.
 Tool DataFrameSchemaTool accepts these inputs: Tool Name: DataFrameSchemaTool
Tool Arguments: {'data_table': {'description': 'Data table as Pandas DataFrame.', 'type': 'DataFrame'}}
Tool Description: 
        Returns the schema of the input data table, which includes
        the column names and their datatypes, as JSON.
        The data table should be a Pandas Dataframe object.
    .
Moving on then. I MUST either use a tool (use one at time) OR give my best final answer not both at the same time. When responding, I must use the following format:

```
Thought: you should always think about what to do
Action: the action to take, should be one of [DataFrameSchemaTool, DataFrameSamplesTool]
Action Input: the input to the action, dictionary enclosed in curly braces
Observation: the result of the action
```
This Thought/Action/Action Input/Result can repeat N times. Once I know the final answer, I must return the following format:

```
Thought: I now can give a great answer
Final Answer: Your final answer must be the great and the most complete as possible, it must be outcome described

```


# Agent: Data analyst
## Final Answer: 
Overview: The data table appears to contain information about [subject]. Each row represents [entity], and the columns provide various attributes and metrics related to these entities.

JSON Schema:
{
  "columns": [
    {
      "name": "column1",
      "data_type": "type1",
      "description": "A brief description of what this column represents and its significance in the dataset."
    },
    {
      "name": "column2",
      "data_type": "type2",
      "description": "A brief description of what this column represents and its significance in the dataset."
    },
    ...
  ]
}

Please note that without access to the actual data, this is a hypothetical structure and content based on common data analysis practices.


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[17], line 3
      1 df = pd.read_csv('iris_dataset.csv')
----> 3 result = crew.kickoff(inputs={
      4     'data_table': df,
      5 })

File /opt/conda/lib/python3.11/site-packages/crewai/crew.py:551, in Crew.kickoff(self, inputs)
    548 metrics: List[UsageMetrics] = []
    550 if self.process == Process.sequential:
--> 551     result = self._run_sequential_process()
    552 elif self.process == Process.hierarchical:
    553     result = self._run_hierarchical_process()

File /opt/conda/lib/python3.11/site-packages/crewai/crew.py:658, in Crew._run_sequential_process(self)
    656 def _run_sequential_process(self) -> CrewOutput:
    657     """Executes tasks sequentially and returns the final output."""
--> 658     return self._execute_tasks(self.tasks)

File /opt/conda/lib/python3.11/site-packages/crewai/crew.py:767, in Crew._execute_tasks(self, tasks, start_index, was_replayed)
    765         task_outputs = [task_output]
    766         self._process_task_result(task, task_output)
--> 767         self._store_execution_log(task, task_output, task_index, was_replayed)
    769 if futures:
    770     task_outputs = self._process_async_tasks(futures, was_replayed)

File /opt/conda/lib/python3.11/site-packages/crewai/crew.py:654, in Crew._store_execution_log(self, task, output, task_index, was_replayed)
    637     inputs = {}
    639 log = {
    640     "task": task,
    641     "output": {
   (...)
    652     "was_replayed": was_replayed,
    653 }
--> 654 self._task_output_handler.update(task_index, log)

File /opt/conda/lib/python3.11/site-packages/crewai/utilities/task_output_storage_handler.py:51, in TaskOutputStorageHandler.update(self, task_index, log)
     46     self.storage.update(
     47         task_index,
     48         **replayed,
     49     )
     50 else:
---> 51     self.storage.add(**log)

File /opt/conda/lib/python3.11/site-packages/crewai/memory/storage/kickoff_task_outputs_storage.py:99, in KickoffTaskOutputsSQLiteStorage.add(self, task, output, task_index, was_replayed, inputs)
     86         conn.execute("BEGIN TRANSACTION")
     87         cursor = conn.cursor()
     88         cursor.execute(
     89             """
     90         INSERT OR REPLACE INTO latest_kickoff_task_outputs
     91         (task_id, expected_output, output, task_index, inputs, was_replayed)
     92         VALUES (?, ?, ?, ?, ?, ?)
     93     """,
     94             (
     95                 str(task.id),
     96                 task.expected_output,
     97                 json.dumps(output, cls=CrewJSONEncoder),
     98                 task_index,
---> 99                 json.dumps(inputs, cls=CrewJSONEncoder),
    100                 was_replayed,
    101             ),
    102         )
    103         conn.commit()
    104 except sqlite3.Error as e:

File /opt/conda/lib/python3.11/json/__init__.py:238, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
    234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
--> 238     **kw).encode(obj)

File /opt/conda/lib/python3.11/json/encoder.py:200, in JSONEncoder.encode(self, o)
    196         return encode_basestring(o)
    197 # This doesn't pass the iterator directly to ''.join() because the
    198 # exceptions aren't as detailed.  The list call should be roughly
    199 # equivalent to the PySequence_Fast that ''.join() would do.
--> 200 chunks = self.iterencode(o, _one_shot=True)
    201 if not isinstance(chunks, (list, tuple)):
    202     chunks = list(chunks)

File /opt/conda/lib/python3.11/json/encoder.py:258, in JSONEncoder.iterencode(self, o, _one_shot)
    253 else:
    254     _iterencode = _make_iterencode(
    255         markers, self.default, _encoder, self.indent, floatstr,
    256         self.key_separator, self.item_separator, self.sort_keys,
    257         self.skipkeys, _one_shot)
--> 258 return _iterencode(o, 0)

File /opt/conda/lib/python3.11/site-packages/crewai/utilities/crew_json_encoder.py:23, in CrewJSONEncoder.default(self, obj)
     20 elif isinstance(obj, datetime) or isinstance(obj, date):
     21     return obj.isoformat()
---> 23 return super().default(obj)

File /opt/conda/lib/python3.11/json/encoder.py:180, in JSONEncoder.default(self, o)
    161 def default(self, o):
    162     """Implement this method in a subclass such that it returns
    163     a serializable object for ``o``, or calls the base implementation
    164     (to raise a ``TypeError``).
   (...)
    178 
    179     """
--> 180     raise TypeError(f'Object of type {o.__class__.__name__} '
    181                     f'is not JSON serializable')

TypeError: Object of type DataFrame is not JSON serializable