Hi @Max_Moura, 
Below, I combined the Task
and Tool
event listener-based code examples you provided in the threads mentioned. Thank you for sharing them! I’ll build upon your examples using the following documentation:
As I note in my in-line comments below, the two native observability portals are:
# =========================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers.
# =========================================================
# =========================================================
# Uses: crewai.utilities.events.base_event_listener.BaseEventListener
# Approach: Event Listener Pub/Sub
# =========================================================
I have a follow-up question:
You notice below that I subclass langchain’s BaseCallbackHandler
. That’s because I couldn’t find what to subclass within the CrewAI
library. I’m sure I overlooked it in the documentation and/or in the GitHub sources. Can you mention what CrewAI
package and class(es) I can use to implement callbacks
?
./utils/event_handlers.py
import json, pathlib
from pprint import pprint
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Union
#
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages.base import BaseMessage
from langchain_core.outputs.llm_result import LLMResult
from langchain_core.callbacks.base import BaseCallbackHandler
#
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.tool_usage_events import (
ToolUsageFinishedEvent, ToolUsageErrorEvent, ToolUsageStartedEvent,
ToolExecutionErrorEvent, ToolSelectionErrorEvent, ToolValidateInputErrorEvent)
from crewai.utilities.events.task_events import (TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,)
base_directory = pathlib.Path(__file__).parent.parent # Define basedir to be one leve up (not two).
@dataclass
class Event():
event: str
timestamp: str
text: str
def _current_time() -> str:
return datetime.now(timezone.utc).isoformat()
# =========================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers.
# =========================================================
class LLMCallbackHandler(BaseCallbackHandler):
def __init__(self, log_file: str):
self.log_path = base_directory / "outputs.d" / log_file
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
"""Run when LLM starts running."""
assert len(prompts) == 1
event = Event(event="llm_start", timestamp=_current_time(), text=prompts[0])
self._persist_event(event)
print("LLMCallbackHandler::on_llm_start() complete.")
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
"""Run when LLM ends running."""
generation = response.generations[-1][-1].message.content
event = Event(event="llm_end", timestamp=_current_time(), text=generation)
self._persist_event(event)
print("LLMCallbackHandler::on_llm_end() complete.")
def _persist_event(self, event):
with self.log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(asdict(event)) + "\n")
# ========================================================================
# ========================================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers
# ========================================================================
class AgentCallbackHandler(BaseCallbackHandler):
def __init__(self, log_file: str, agent_name: str) -> None:
self.log_path = base_directory / "outputs.d" / log_file
self.agent_name = agent_name
# ============================================================================================
# As of 04/08/25, only these four funcions work.
# ============================================================================================
def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None:
"""Execute upon entering a chain."""
event = f"on_chain_start::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=str(serialized))
self._persist_event(event)
print("AgentCallbackHandler::on_chain_start() complete.")
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
"""Execute upon completion of a chain."""
event = f"on_chain_end::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=str(outputs))
self._persist_event(event)
print("AgentCallbackHandler::on_chain_end() complete.")
def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""Execute when an agent performs an action."""
text = f"""**Action** : {action.tool}\n**Content** : {str(action.tool_input)}"""
event = f"on_agent_action::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=text)
self._persist_event(event)
print("AgentCallbackHandler::on_agent_action() complete.")
def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""Execute upon completion of an agent."""
#text = finish # repr(finish) | str(finish) # Not sure what this would look like.
text = f"""{str(finish.return_values)}"""
event = f"on_agent_finish::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=text)
self._persist_event(event)
print("AgentCallbackHandler::on_agent_finish() complete.")
# ============================================================================================
# As of 04/08/25, the funcions below don't work. Apparently not yet implemented by CrewAI team.
# ============================================================================================
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
"""Execute upon the initiation of an LLM run."""
event = f"on_llm_start::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=str(serialized))
self._persist_event(event)
print("AgentCallbackHandler::on_llm_start() complete.")
def on_chat_model_start(self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs: Any) -> Any:
"""Execute upon the initiation of a Chat Model run."""
#messages_dict = [[message.dict() for message in message_list] for message_list in messages]
event = f"on_chat_model_start::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=str(serialized))
self._persist_event(event)
print("AgentCallbackHandler::on_chat_model_start() complete.")
def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
"""Execute upon issuance of a new LLM token. Only available when streaming is enabled."""
event = f"on_llm_new_token::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=token)
self._persist_event(event)
print("AgentCallbackHandler::on_llm_new_token() complete.")
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
"""Execute upon the end of an LLM run."""
#text = response # repr(response) | str(response) # If this excepts, try str() or repr().
text = json.dumps(response.dict(), indent=4)
event = f"on_llm_end::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=text)
self._persist_event(event)
print("AgentCallbackHandler::on_llm_end() complete.")
def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> Any:
"""Execute upon the occurrence of an LLM error."""
event = f"on_llm_error::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=str(error))
self._persist_event(event)
print("AgentCallbackHandler::on_llm_error() complete.")
def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> Any:
"""Execute upon the occurrence of a chain error."""
event = f"on_chain_error::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=str(error))
self._persist_event(event)
print("AgentCallbackHandler::on_chain_error() complete.")
def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
"""Execute upon the initiation of a tool."""
text = "Input string: " + str(input_str) + "\n" + str(serialized)
event = f"on_tool_start::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=text)
self._persist_event(event)
print("AgentCallbackHandler::on_tool_start() complete.")
def on_tool_end(self, output: str, **kwargs: Any) -> Any:
"""Execute upon the completion of a tool."""
event = f"on_tool_end::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=output)
self._persist_event(event)
print("AgentCallbackHandler::on_tool_end() complete.")
def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> Any:
"""Execute upon the occurrence of a tool error."""
event = f"on_tool_error::{self.agent_name}"
event = Event(event=event, timestamp=_current_time(), text=str(error))
self._persist_event(event)
print("AgentCallbackHandler::on_tool_error() complete.")
def on_text(self, text: str, **kwargs: Any) -> Any:
"""Execute upon issuance or receipt of arbitrary text.""" # Stub b/c don't know what this means.
print("AgentCallbackHandler::on_text() complete.")
def _persist_event(self, event):
with self.log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(asdict(event)) + "\n")
# ========================================================================
# ========================================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers
# ========================================================================
class TaskCallbackHandler(BaseCallbackHandler):
def __init__(self, log_file: str, task_name: str) -> None:
self.log_path = base_directory / "outputs.d" / log_file
self.task_name = task_name
def __call__(self, task_output: TaskOutput):
event = f"task_complete::{self.task_name}"
event = Event(event=event, timestamp=_current_time(), text=repr(task_output))
self._persist_event(event)
print("TaskCallbackHandler::__call_() complete.")
def _persist_event(self, event):
with self.log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(asdict(event)) + "\n")
# ========================================================================
# ========================================================================
# Uses: crewai.utilities.events.base_event_listener.BaseEventListener
# Approach: Event Listener Pub/Sub
# ========================================================================
class ToolEventListener(BaseEventListener):
"""
A custom event listener that prints messages when specific tool-related events occur.
"""
def __init__(self):
super().__init__()
print("ToolEventListener initialized.")
def setup_listeners(self, crewai_event_bus):
"""Registers listeners for various tool usage events on the provided event bus."""
print("Setting up tool event listeners...")
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
print(f"\n--> Received 'ToolUsageStartedEvent':")
pprint(event.__dict__)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
print(f"\n<-- Received 'ToolUsageFinishedEvent':")
pprint(event.__dict__)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
print(f"\n<-- [ERROR] Received 'ToolUsageErrorEvent':")
pprint(event.__dict__)
@crewai_event_bus.on(ToolExecutionErrorEvent)
def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
print(f"\n<-- [ERROR] Received 'ToolExecutionErrorEvent':")
pprint(event.__dict__)
@crewai_event_bus.on(ToolSelectionErrorEvent)
def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
print(f"\n--> [ERROR] Received 'ToolSelectionErrorEvent':")
pprint(event.__dict__)
@crewai_event_bus.on(ToolValidateInputErrorEvent)
def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
print(f"\n--> [ERROR] Received 'ToolValidateInputErrorEvent':")
pprint(event.__dict__)
# ========================================================================
tool_event_listener = ToolEventListener()
# ========================================================================
# ========================================================================
# Uses: crewai.utilities.events.base_event_listener.BaseEventListener
# Approach: Event Listener Pub/Sub
# ========================================================================
class TaskEventListener(BaseEventListener):
def __init__(self):
super().__init__()
print("TaskEventListener initialized.")
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
print(f"\nReceived 'TaskStartedEvent' with 'source': {source}\n")
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
print(f"\nReceived 'TaskCompletedEvent' with 'source': {source}\n")
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
print(f"\nReceived 'TaskFailedEvent' with 'source': {source}\n")
# ========================================================================
task_event_listener = TaskEventListener()
# ========================================================================