Issue Overview
I’m having inconsistent problems with my LLM usage that I would like to handle properly in my Flow execution. Everything works (and fails) as expect when running tasks synchronously, but since my flow would really benefit from running them asynchronously, this is an issue I would rather fix than circumvent. Unfortunately, when the LLM call throws an exception in an async task, the flow freezes instead of failing gracefully.
Context
I have a crew which summarizes a bunch of input documents and then finds similar documentation in a private database. Since the input can sometimes be very large, I split it in smaller groups so that I don’t have issues with context window limits. My crew execution is composed of three tasks:
- Summarize documents: Executed with
async_execution=True, this task may be executedntimes, depending on the input. - Combine summaries: Has all previous summaries tasks in its
context. Combines all summaries into one concise summary. - Find relevant documentation: Searches private documentation based on the previous summary.
The code for this crew creation is as follows:
def crew(self, total_executions) -> Crew:
summarization_tasks = []
for i in range(total_executions):
summarize_task = self.summarize_code() # Creates a new task based on task.yaml
summarize_task.async_execution = True
# For each task description, change input placeholder to receive a different group of documents
summarize_task.description = summarize_task.replace('input', f'input_{i}')
summarization_tasks.append(summarize_task)
combine_task = self.combine_summaries()
combine_task.context = summarization_tasks # Define all previous summary tasks as context
return Crew(
agents=[
self.summarizer(), # Used in summary tasks and summaries combine task
self.evaluator() # Used in task to find similar documentation
],
tasks=[
*summarization_tasks,
combine_task,
self.evaluate()
]
)
Each agent has it’s LLM defined with this function:
def _build_llm(self, model) -> LLM:
base_url = os.getenv("BASE_URL")
return LLM(
model=model,
base_url=base_url,
api_key=os.getenv("SECRET"),
temperature=0.6
)
Crew is called inside flow step as usual:
summary_crew = SummaryCrew()
crew = summary_crew.crew(total_executions=5)
result = crew.kickoff(inputs=inputs)
Flow is called asynchronously, as it is running in an uvicorn app:
flow = MyFlow()
result = await flow.kickoff_async(flow_input)
Error information
During the asynchronous summarize tasks, I sometimes have the following error:
litellm.APIError: APIError: OpenAIException
Although this occurs due to an issue in my company’s proxy, what I’m really trying to solve is how to handle this and potentially other errors that may happen.
The full trace is the following:
Exception in thread Thread-3 (_execute_task_async):
Traceback (most recent call last):
File
"/project/.venv/lib/python3.12/site-packages/litellm/llms/openai/openai.
py", line 745, in completion
raise e
File
"/project/.venv/lib/python3.12/site-packages/litellm/llms/openai/openai.
py", line 673, in completion
) = self.make_sync_openai_chat_completion_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/
logging_utils.py", line 237, in sync_wrapper
result = func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/litellm/llms/openai/openai.
py", line 489, in make_sync_openai_chat_completion_request
raise e
File
"/project/.venv/lib/python3.12/site-packages/litellm/llms/openai/openai.
py", line 471, in make_sync_openai_chat_completion_request
raw_response = openai_client.chat.completions.with_raw_response.create(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/openai/_legacy_response.py"
, line 364, in wrapped
return cast(LegacyAPIResponse[R], func(*args, **kwargs))
^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/openai/_utils/_utils.py",
line 286, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/openai/resources/chat/compl
etions/completions.py", line 1189, in create
return self._post(
^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/openai/_base_client.py",
line 1259, in post
return cast(ResponseT, self.request(cast_to, opts, stream=stream, stream_cls=stream_cls))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/openai/_base_client.py",
line 1047, in request
raise self._make_status_error_from_response(err.response) from None
openai.PermissionDeniedError: redacted
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/project/.venv/lib/python3.12/site-packages/litellm/main.py",
line 2158, in completion
raise e
File "/project/.venv/lib/python3.12/site-packages/litellm/main.py",
line 2130, in completion
response = openai_chat_completions.completion(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/litellm/llms/openai/openai.
py", line 756, in completion
raise OpenAIError(
litellm.llms.openai.common_utils.OpenAIError: redacted
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.12/threading.py", line 1073, in _bootstrap_inner
self.run()
File "/usr/lib/python3.12/threading.py", line 1010, in run
self._target(*self._args, **self._kwargs)
File "/project/.venv/lib/python3.12/site-packages/crewai/task.py",
line 497, in _execute_task_async
result = self._execute_core(agent, context, tools)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/project/.venv/lib/python3.12/site-packages/crewai/task.py",
line 591, in _execute_core
raise e # Re-raise the exception after emitting the event
^^^^^^^
File "/project/.venv/lib/python3.12/site-packages/crewai/task.py",
line 522, in _execute_core
result = agent.execute_task(
^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/crewai/agent/core.py", line
514, in execute_task
raise e
File
"/project/.venv/lib/python3.12/site-packages/crewai/agent/core.py", line
490, in execute_task
result = self._execute_without_timeout(task_prompt, task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/crewai/agent/core.py", line
598, in _execute_without_timeout
return self.agent_executor.invoke(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/crewai/agents/crew_agent_ex
ecutor.py", line 188, in invoke
formatted_answer = self._invoke_loop()
^^^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/crewai/agents/crew_agent_ex
ecutor.py", line 287, in _invoke_loop
raise e
File
"/project/.venv/lib/python3.12/site-packages/crewai/agents/crew_agent_ex
ecutor.py", line 229, in _invoke_loop
answer = get_llm_response(
^^^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/crewai/utilities/agent_util
s.py", line 276, in get_llm_response
raise e
File
"/project/.venv/lib/python3.12/site-packages/crewai/utilities/agent_util
s.py", line 268, in get_llm_response
answer = llm.call(
^^^^^^^^^
File "/project/.venv/lib/python3.12/site-packages/crewai/llm.py", line
1321, in call
return self._handle_non_streaming_response(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/project/.venv/lib/python3.12/site-packages/crewai/llm.py", line
1081, in _handle_non_streaming_response
response = litellm.completion(**params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/project/.venv/lib/python3.12/site-packages/litellm/utils.py",
line 1381, in wrapper
raise e
File "/project/.venv/lib/python3.12/site-packages/litellm/utils.py",
line 1250, in wrapper
result = original_function(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/project/.venv/lib/python3.12/site-packages/litellm/main.py",
line 3772, in completion
raise exception_type(
^^^^^^^^^^^^^^^
File
"/project/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/
exception_mapping_utils.py", line 2328, in exception_type
raise e
File
"/project/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/
exception_mapping_utils.py", line 563, in exception_type
raise APIError(
litellm.exceptions.APIError: litellm.APIError: APIError: OpenAIException
After this exception is thrown, my flow freezes like this:
🌊 Flow: MyFlow
ID: uuid
├── ✨ Created
├── ✅ Initialization Complete
├── ✅ Completed: process_input_step
├── ✅ Completed: input_router
└── 🔄 Running: iterative_evaluation_step # step that called the crew
When running tasks synchronously, the flow fails as expected:
🌊 Flow: MyFlow
ID: uuid
├── ✨ Created
├── ✅ Initialization Complete
├── ✅ Completed: process_input_step
├── ✅ Completed: input_router
└── ❌ Failed: iterative_evaluation_step
- Python 3.12.3
- CrewAI 1.6.0
What I want is that when executing tasks asynchronously, the execution fails exactly like it does synchronously. Ideally, I want to handle the exception inside either the crew execution or the flow execution. If there is no workaround, being able to properly throw the exception and stop the flow execution is enough for me.
Heads up
- Both
LLMCallFailedEventandTaskFailedEventare being raised correctly. - Flow execution would freeze whether or not the async tasks were included in the context of the following task (combine summaries).