Handling LLM Errors in Hierarchical CrewAI Process with Callbacks

I am running a CrewAI process in a hierarchical setup, but sometimes I encounter LLM errors during execution.

I would like to know:

  1. Is there a way to add a callback functionality that triggers automatically whenever an LLM error occurs?
  2. If yes, how can I implement it within the CrewAI framework?

Any guidance, best practices, or examples would be greatly appreciated.

Thanks in advance!

1 Like

What do you want to achieve with the callback?

There isn’t anything like that as far as I know but if you can describe what you aim to achieve people might have better context to advise you on what to do.

Thanks for your response!

The main issue I am facing is that I occasionally get LLM errors like:

  • “Received None or empty response from LLM call.”
  • “An unknown error occurred. Please check the details below. Error details: Invalid response from LLM call - None or empty.”

If I run the same Crew with the same task multiple times, sometimes it works fine, but other times I get the above errors.

I want to handle these errors more gracefully—perhaps with a retry mechanism, a fallback response, or a way to detect and log these errors properly within the CrewAI framework.

Is there a recommended approach for handling such intermittent LLM failures? Would implementing a callback or exception handling mechanism help in this case?

Any guidance or best practices would be much appreciated!

1 Like

I have the same issue and requirement as you, and desire Native LLM observability (ie, before employing things like Langfuse, Comet Opik, Arize AI, etc).

So, I plan on trying the following this coming week, which supplies various LLM related callbacks like: on_llm_start(), on_llm_end(), on_chat_model_start(),
on_chat_model_end(), and many more. You basically write a subclass against BaseCallbackHandler, then assign an instantiated object of it as the value of the Agents’s callbacks parameter.

Unfortunately, as the GitHub issue OP states, at the time of his writing, some of the methods were never hooked in by CrewAI and are never invoked. That may have been rectified since the post and worth trying now:

Hey @nmvega, how’s it going?

If I got you right, you’re looking for a better level of observability, correct? If that’s the case, I recommend you check out two other threads where this topic came up. In both, I suggested using custom listeners to receive CrewAI bus events in a less intrusive way:

In that last thread, @tonykipkemboi mentioned that they have more advanced tracing capabilities available on the enterprise platform.

1 Like

Hi Max, thank you.

I hope you are doing well too.

If I got you right, you’re looking for a better level of observability, correct?

Yes, you are correct. Still being fairly new to CrewAI, I was unaware of the capabilities you demonstrate in the clear code you wrote in those threads. Tapping into an events bus generalizes our ability to capture and inspect executions. That’s very exciting and I will try them this week. Thank you. :smiling_face:

1 Like

Hi @Max_Moura, :blush:

Below, I combined the Task and Tool event listener-based code examples you provided in the threads mentioned. Thank you for sharing them! I’ll build upon your examples using the following documentation:

As I note in my in-line comments below, the two native observability portals are:

  • Callback handlers:
# =========================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers.
# =========================================================
  • Event Listener Pub/Sub:
# =========================================================
# Uses: crewai.utilities.events.base_event_listener.BaseEventListener
# Approach: Event Listener Pub/Sub
# =========================================================

I have a follow-up question:

You notice below that I subclass langchain’s BaseCallbackHandler. That’s because I couldn’t find what to subclass within the CrewAI library. I’m sure I overlooked it in the documentation and/or in the GitHub sources. Can you mention what CrewAI package and class(es) I can use to implement callbacks?

  • ./utils/event_handlers.py
import json, pathlib
from pprint import pprint
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Union
#
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages.base import BaseMessage
from langchain_core.outputs.llm_result import LLMResult
from langchain_core.callbacks.base import BaseCallbackHandler
#
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.tool_usage_events import (
    ToolUsageFinishedEvent, ToolUsageErrorEvent, ToolUsageStartedEvent,
    ToolExecutionErrorEvent, ToolSelectionErrorEvent, ToolValidateInputErrorEvent)
from crewai.utilities.events.task_events import (TaskCompletedEvent,
                                                 TaskFailedEvent,
                                                 TaskStartedEvent,)

base_directory = pathlib.Path(__file__).parent.parent # Define basedir to be one leve up (not two).

@dataclass
class Event():
    event: str
    timestamp: str
    text: str

def _current_time() -> str:
    return datetime.now(timezone.utc).isoformat()

# =========================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers.
# =========================================================
class LLMCallbackHandler(BaseCallbackHandler):
    def __init__(self, log_file: str):
        self.log_path = base_directory / "outputs.d" / log_file

    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
        """Run when LLM starts running."""
        assert len(prompts) == 1
        event = Event(event="llm_start", timestamp=_current_time(), text=prompts[0])
        self._persist_event(event)
        print("LLMCallbackHandler::on_llm_start() complete.")

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
        """Run when LLM ends running."""
        generation = response.generations[-1][-1].message.content
        event = Event(event="llm_end", timestamp=_current_time(), text=generation)
        self._persist_event(event)
        print("LLMCallbackHandler::on_llm_end() complete.")

    def _persist_event(self, event):
        with self.log_path.open("a", encoding="utf-8") as f:
            f.write(json.dumps(asdict(event)) + "\n")
# ========================================================================



# ========================================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers
# ========================================================================
class AgentCallbackHandler(BaseCallbackHandler):
    def __init__(self, log_file: str, agent_name: str) -> None:
        self.log_path = base_directory / "outputs.d" / log_file
        self.agent_name = agent_name

    # ============================================================================================
    # As of 04/08/25, only these four funcions work.
    # ============================================================================================
    def on_chain_start(self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any) -> None:
        """Execute upon entering a chain."""
        event = f"on_chain_start::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=str(serialized))
        self._persist_event(event)
        print("AgentCallbackHandler::on_chain_start() complete.")

    def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> None:
        """Execute upon completion of a chain."""
        event = f"on_chain_end::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=str(outputs))
        self._persist_event(event)
        print("AgentCallbackHandler::on_chain_end() complete.")

    def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
        """Execute when an agent performs an action."""
        text = f"""**Action** : {action.tool}\n**Content** : {str(action.tool_input)}"""
        event = f"on_agent_action::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=text)
        self._persist_event(event)
        print("AgentCallbackHandler::on_agent_action() complete.")

    def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
        """Execute upon completion of an agent."""
       #text = finish # repr(finish) | str(finish) # Not sure what this would look like.
        text = f"""{str(finish.return_values)}"""
        event = f"on_agent_finish::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=text)
        self._persist_event(event)
        print("AgentCallbackHandler::on_agent_finish() complete.")


    # ============================================================================================
    # As of 04/08/25, the funcions below don't work. Apparently not yet implemented by CrewAI team.
    # ============================================================================================
    def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
        """Execute upon the initiation of an LLM run."""
        event = f"on_llm_start::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=str(serialized))
        self._persist_event(event)
        print("AgentCallbackHandler::on_llm_start() complete.")

    def on_chat_model_start(self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs: Any) -> Any:
        """Execute upon the initiation of a Chat Model run."""
       #messages_dict = [[message.dict() for message in message_list] for message_list in messages]
        event = f"on_chat_model_start::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=str(serialized))
        self._persist_event(event)
        print("AgentCallbackHandler::on_chat_model_start() complete.")

    def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
        """Execute upon issuance of a new LLM token. Only available when streaming is enabled."""
        event = f"on_llm_new_token::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=token)
        self._persist_event(event)
        print("AgentCallbackHandler::on_llm_new_token() complete.")

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
        """Execute upon the end of an LLM run."""
       #text = response # repr(response) | str(response) # If this excepts, try str() or repr().
        text = json.dumps(response.dict(), indent=4)
        event = f"on_llm_end::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=text)
        self._persist_event(event)
        print("AgentCallbackHandler::on_llm_end() complete.")

    def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> Any:
        """Execute upon the occurrence of an LLM error."""
        event = f"on_llm_error::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=str(error))
        self._persist_event(event)
        print("AgentCallbackHandler::on_llm_error() complete.")

    def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> Any:
        """Execute upon the occurrence of a chain error."""
        event = f"on_chain_error::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=str(error))
        self._persist_event(event)
        print("AgentCallbackHandler::on_chain_error() complete.")

    def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
        """Execute upon the initiation of a tool."""
        text = "Input string: " + str(input_str) + "\n" + str(serialized)
        event = f"on_tool_start::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=text)
        self._persist_event(event)
        print("AgentCallbackHandler::on_tool_start() complete.")

    def on_tool_end(self, output: str, **kwargs: Any) -> Any:
        """Execute upon the completion of a tool."""
        event = f"on_tool_end::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=output)
        self._persist_event(event)
        print("AgentCallbackHandler::on_tool_end() complete.")

    def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> Any:
        """Execute upon the occurrence of a tool error."""
        event = f"on_tool_error::{self.agent_name}"
        event = Event(event=event, timestamp=_current_time(), text=str(error))
        self._persist_event(event)
        print("AgentCallbackHandler::on_tool_error() complete.")

    def on_text(self, text: str, **kwargs: Any) -> Any:
        """Execute upon issuance or receipt of arbitrary text.""" # Stub b/c don't know what this means.
        print("AgentCallbackHandler::on_text() complete.")

    def _persist_event(self, event):
        with self.log_path.open("a", encoding="utf-8") as f:
            f.write(json.dumps(asdict(event)) + "\n")
# ========================================================================



# ========================================================================
# Uses: langchain_core.callbacks.base.BaseCallbackHandler
# Approach: Callback handlers
# ========================================================================
class TaskCallbackHandler(BaseCallbackHandler):
    def __init__(self, log_file: str, task_name: str) -> None:
        self.log_path = base_directory / "outputs.d" / log_file
        self.task_name = task_name

    def __call__(self, task_output: TaskOutput):
        event = f"task_complete::{self.task_name}"
        event = Event(event=event, timestamp=_current_time(), text=repr(task_output))
        self._persist_event(event)
        print("TaskCallbackHandler::__call_() complete.")

    def _persist_event(self, event):
        with self.log_path.open("a", encoding="utf-8") as f:
            f.write(json.dumps(asdict(event)) + "\n")
# ========================================================================



# ========================================================================
# Uses: crewai.utilities.events.base_event_listener.BaseEventListener
# Approach: Event Listener Pub/Sub
# ========================================================================
class ToolEventListener(BaseEventListener):
    """
    A custom event listener that prints messages when specific tool-related events occur.
    """
    def __init__(self):
        super().__init__()
        print("ToolEventListener initialized.")

    def setup_listeners(self, crewai_event_bus):
        """Registers listeners for various tool usage events on the provided event bus."""
        print("Setting up tool event listeners...")

        @crewai_event_bus.on(ToolUsageStartedEvent)
        def on_tool_usage_started(source, event: ToolUsageStartedEvent):
            print(f"\n--> Received 'ToolUsageStartedEvent':")
            pprint(event.__dict__)

        @crewai_event_bus.on(ToolUsageFinishedEvent)
        def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
            print(f"\n<-- Received 'ToolUsageFinishedEvent':")
            pprint(event.__dict__)

        @crewai_event_bus.on(ToolUsageErrorEvent)
        def on_tool_usage_error(source, event: ToolUsageErrorEvent):
            print(f"\n<-- [ERROR] Received 'ToolUsageErrorEvent':")
            pprint(event.__dict__)

        @crewai_event_bus.on(ToolExecutionErrorEvent)
        def on_tool_execution_error(source, event: ToolExecutionErrorEvent):
            print(f"\n<-- [ERROR] Received 'ToolExecutionErrorEvent':")
            pprint(event.__dict__)

        @crewai_event_bus.on(ToolSelectionErrorEvent)
        def on_tool_selection_error(source, event: ToolSelectionErrorEvent):
            print(f"\n--> [ERROR] Received 'ToolSelectionErrorEvent':")
            pprint(event.__dict__)

        @crewai_event_bus.on(ToolValidateInputErrorEvent)
        def on_tool_validate_input_error(source, event: ToolValidateInputErrorEvent):
            print(f"\n--> [ERROR] Received 'ToolValidateInputErrorEvent':")
            pprint(event.__dict__)
# ========================================================================
tool_event_listener = ToolEventListener()
# ========================================================================


# ========================================================================
# Uses: crewai.utilities.events.base_event_listener.BaseEventListener
# Approach: Event Listener Pub/Sub
# ========================================================================
class TaskEventListener(BaseEventListener):
    def __init__(self):
        super().__init__()
        print("TaskEventListener initialized.")

    def setup_listeners(self, crewai_event_bus):
        @crewai_event_bus.on(TaskStartedEvent)
        def on_task_started(source, event: TaskStartedEvent):
            print(f"\nReceived 'TaskStartedEvent' with 'source': {source}\n")

        @crewai_event_bus.on(TaskCompletedEvent)
        def on_task_completed(source, event: TaskCompletedEvent):
            print(f"\nReceived 'TaskCompletedEvent' with 'source': {source}\n")

        @crewai_event_bus.on(TaskFailedEvent)
        def on_task_failed(source, event: TaskFailedEvent):
            print(f"\nReceived 'TaskFailedEvent' with 'source': {source}\n")
# ========================================================================
task_event_listener = TaskEventListener()
# ========================================================================

Well, if the callback you’re referring to is the one triggered after the task finishes, I recommend checking out CrewAI’s documentation on the topic. The mechanism is straightforward and to the point — just don’t forget that your callback receives an instance of the TaskOutput class, alright?

1 Like

Yes, and thank you. I have Task and TaskOutput covered in my example code above. Hopefully this thread will assist others as it did me. : )

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.