Hi everyone!
I’m working on an ai chatbot using CrewAI, LangChain, FastAPI, and MySQL, but I’m running into an issue when initializing Agent. The error suggests that BaseTool is missing an implementation for _run().
I’ll share my full code and the error details below. If anyone knows how to resolve this issue or can help me debug, I’d really appreciate it!
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from langchain_community.utilities import SQLDatabase
from langchain_community.tools.sql_database.tool import (
InfoSQLDatabaseTool,
ListSQLDatabaseTool,
QuerySQLCheckerTool,
QuerySQLDatabaseTool,
)
from crewai import Agent, Task, Crew, Process
from langchain_groq import ChatGroq
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from dotenv import load_dotenv
import os
# ✅ Load environment variables
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
if not GROQ_API_KEY:
raise ValueError("GROQ_API_KEY is missing! Ensure it's set in your .env file.")
# ✅ Initialize FastAPI app
app = FastAPI()
# ✅ Initialize Llama 3 with Groq API Key
llm = ChatGroq(model="Llama3-8B", api_key=GROQ_API_KEY)
# ✅ MySQL Connection
DB_URL = "mysql+mysqlconnector://username:password@localhost:3306/subway_db"
engine = create_engine(DB_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# ✅ Initialize SQLDatabase for LangChain
sql_database = SQLDatabase(engine)
# ✅ Initialize LangChain SQL Tools (Using instances, not `.dict()`)
info_tool = InfoSQLDatabaseTool(db=sql_database)
list_tool = ListSQLDatabaseTool(db=sql_database)
query_tool = QuerySQLDatabaseTool(db=sql_database)
checker_tool = QuerySQLCheckerTool(db=sql_database, llm=llm)
# ✅ Define Agents with `goal` and `backstory`
subway_data_agent = Agent(
role="Subway Database Analyst",
goal="Retrieve Subway outlet data.",
backstory="An AI system designed to analyze Subway locations, operating hours, and details.",
tools=[list_tool.dict(), info_tool.dict(), query_tool.dict()],
description="Retrieves Subway outlets data, including names, addresses, operating hours, and locations."
)
sql_validator_agent = Agent(
role="SQL Query Validator",
goal="Validate SQL queries before execution.",
backstory="A SQL validator that ensures all queries are correctly formatted and safe.",
tools=[checker_tool.dict()],
description="Validates SQL queries before execution to prevent errors."
)
query_executor_agent = Agent(
role="SQL Query Executor",
goal="Execute SQL queries safely.",
backstory="An AI that runs SQL queries and fetches database records.",
tools=[query_tool.dict()],
description="Executes validated SQL queries and returns the results."
)
# ✅ Define Tasks
task1 = Task(description="List all Subway outlets in Kuala Lumpur with their names and addresses.", agent=subway_data_agent)
task2 = Task(description="Ensure that the SQL query is correct before executing.", agent=sql_validator_agent)
task3 = Task(description="Fetch Subway outlet details based on user query.", agent=query_executor_agent)
# ✅ Define CrewAI Workflow (Do not auto-run)
crew = Crew(
agents=[subway_data_agent, sql_validator_agent, query_executor_agent],
tasks=[task1, task2, task3],
process=Process.sequential # Tasks will run one after another
)
# ✅ Define a prompt for natural language to SQL conversion
prompt = PromptTemplate(
template="Convert this question into an SQL query: {question}",
input_variables=["question"]
)
# ✅ Create an LLM Chain for SQL generation
llm_chain = LLMChain(llm=llm, prompt=prompt)
# ✅ Define API request model
class QueryRequest(BaseModel):
question: str
# ✅ API route to trigger CrewAI workflow
@app.get("/start-crew")
def start_crew():
"""Starts the CrewAI process."""
crew.kickoff()
return {"message": "CrewAI process started."}
# ✅ Define API route for querying Subway outlets
@app.post("/query")
def query_database(request: QueryRequest):
session = SessionLocal()
try:
# Generate SQL query from Llama 3
generated_sql_query = llm_chain.run(request.question)
print(f"Generated SQL Query: {generated_sql_query}") # ✅ Debugging
# Validate the SQL query before execution
validated_query = checker_tool.invoke(generated_sql_query) # ✅ Fixed `.invoke()`
print(f"Validated SQL Query: {validated_query}") # ✅ Debugging
# Execute the validated query safely
result = session.execute(text(validated_query))
# Fetch results safely
rows = result.fetchall()
column_names = result.keys() # ✅ Use `.keys()` instead of `cursor.description`
# Convert results to JSON format
response_data = [dict(zip(column_names, row)) for row in rows]
return {"query": validated_query, "results": response_data}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
finally:
session.close()
Error Message
TypeError: Can’t instantiate abstract class BaseTool without an implementation for abstract method ‘_run’