I have finally cracked the right way to integrating streamable-http FastMCP server but the crew fails after successfully executing a single tool call. Most probably a session problem but how to handle that as crewai is handling that in internally.
This is the code:
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
import os
from crewai import Agent, Crew, Process, Task,LLM
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List, Optional,Dict,Any,Literal
from pydantic import BaseModel
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai_tools import MCPServerAdapter
from typing import List,Union
from dotenv import load_dotenv
load_dotenv()
token="ZuCHImGIwA"
os.environ["AZURE_API_KEY"] = os.getenv("AZURE_API_KEY")
os.environ["AZURE_API_BASE"] = os.getenv("AZURE_API_BASE")
os.environ["AZURE_API_VERSION"] = "2024-02-01"
# Initialize the LLM using environment variables
llm = LLM(
model="azure/gpt-4o"
)
local_url="http://localhost:8000/mcp/"
@CrewBase
class SnowCrew():
"""SnowCrew crew"""
agents_config="config/agents.yaml"
task_config="config/tasks.yaml"
mcp_server_params: Union[list[MCPServerAdapter | dict[str, str]], MCPServerAdapter, dict[str, str]] = {
"url": local_url,
"transport":"streamable-http",
"headers":{
"Authorization":f"Bearer {token}"
}
}
@agent
def snow_manager(self) -> Agent:
return Agent(
config=self.agents_config['snow_manager'], # type: ignore[index]
verbose=True,
llm=llm,
tools=self.get_mcp_tools()
)
@task
def incident_task(self) -> Task:
return Task(
config=self.tasks_config['incident_task'],
)
@crew
def crew(self) -> Crew:
"""Creates the SnowCrew crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
chat_llm=llm
)
def run():
"""
Run the crew.
"""
print("Hi, Please enter the problem you are facing/or incident you to want get information")
while True:
query=input("Enter Your Query")
inputs = {'input':query}
try:
SnowCrew().crew().kickoff(inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while running the crew: {e}")
if __name__=="__main__":
run()
The Error:
Exception in thread Thread-8 (_run_loop):
+ Exception Group Traceback (most recent call last):
| File "C:\Users\850081290\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1045, in _bootstrap_inner
| self.run()
| File "C:\Users\850081290\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 982, in run
| self._target(*self._args, **self._kwargs)
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\mcpadapt\core.py", line 234, in _run_loop
| self.loop.run_until_complete(self.task)
| File "C:\Users\850081290\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 654, in run_until_complete
| return future.result()
| ^^^^^^^^^^^^^^^
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\mcpadapt\core.py", line 222, in setup
| connections = [
| ^
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\mcpadapt\core.py", line 223, in <listcomp>
| await stack.enter_async_context(
| File "C:\Users\850081290\AppData\Local\Programs\Python\Python311\Lib\contextlib.py", line 650, in enter_async_context
| result = await _enter(cm)
| ^^^^^^^^^^^^^^^^
| File "C:\Users\850081290\AppData\Local\Programs\Python\Python311\Lib\contextlib.py", line 210, in __aenter__
| return await anext(self.gen)
| ^^^^^^^^^^^^^^^^^^^^^
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\mcpadapt\core.py", line 119, in mcptools
| async with client as (read, write, *_):
| File "C:\Users\850081290\AppData\Local\Programs\Python\Python311\Lib\contextlib.py", line 210, in __aenter__
| return await anext(self.gen)
| ^^^^^^^^^^^^^^^^^^^^^
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\mcp\client\sse.py", line 54, in sse_client
| async with anyio.create_task_group() as tg:
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\anyio\_backends\_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\mcp\client\sse.py", line 65, in sse_client
| event_source.response.raise_for_status()
| File "C:\Users\850081290\Documents\New folder\gen1\Lib\site-packages\httpx\_models.py", line 829, in raise_for_status
| raise HTTPStatusError(message, request=request, response=self)
| httpx.HTTPStatusError: Client error '400 Bad Request' for url 'http://127.0.0.1:8050/mcp/servicenow/'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/400
+------------------------------------
this is connected locally and is working fine with sse as an transport. Only facing this issue with streamable-http
Moreover, are we planning to introduce mcp.resources or mcp.prompts and mcp sampling methods to crewai as its revolutionizing the AI industry and keeping up with mcp will be easier as no more separate tools we will have to create for each agent.
Directed towards @maxmoura @João @tonykipkemboi. Please help me out!!!