I’ve been playing with crews locally and looked for options to accomplish this task myself (which is why I have been following this thread).
I am not an expert by any means but I have been able to build several projects with the help of AI. I use Cursor AI as IDE and I’ve seen lots of startups adopting it as an accelerator in writing code. Claude seems to be the de-facto standard when it comes to code wirh LLM so I decided to ask this question. I thought it might be useful to share it here.
And the response was:
"I should note that while I aim to help, my knowledge cutoff means I may not have complete information about the latest crewAI developments or deployment best practices. However, I can suggest an approach based on general principles for deploying AI systems as APIs.
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from crewai import Agent, Task, Crew
from typing import List, Optional
import logging
import asyncio
from functools import lru_cache
import os
from contextlib import asynccontextmanager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(title="CrewAI API Service")
class CrewRequest(BaseModel):
objective: str
context: Optional[str] = None
max_iterations: Optional[int] = 3
async_execution: Optional[bool] = True
class CrewResponse(BaseModel):
task_id: str
status: str
result: Optional[str] = None
# In-memory store for task results
task_results = {}
@lru_cache()
def get_agents():
"""Cache and return agent instances to reduce initialization overhead"""
researcher = Agent(
role='Researcher',
goal='Find and analyze relevant information',
backstory='Expert at gathering and analyzing information',
allow_delegation=False
)
writer = Agent(
role='Writer',
goal='Create clear and concise content',
backstory='Expert at creating engaging content',
allow_delegation=False
)
return [researcher, writer]
async def process_crew_task(task_id: str, crew_request: CrewRequest):
"""Execute crew task asynchronously"""
try:
# Initialize agents
agents = get_agents()
# Create tasks
research_task = Task(
description=f"Research: {crew_request.objective}",
agent=agents[0]
)
writing_task = Task(
description=f"Write content about: {crew_request.objective}",
agent=agents[1]
)
# Initialize crew
crew = Crew(
agents=agents,
tasks=[research_task, writing_task],
max_iterations=crew_request.max_iterations
)
# Execute crew tasks
result = await crew.kickoff()
# Store result
task_results[task_id] = {
"status": "completed",
"result": result
}
except Exception as e:
logger.error(f"Error processing task {task_id}: {str(e)}")
task_results[task_id] = {
"status": "failed",
"result": str(e)
}
@app.post("/crew/tasks", response_model=CrewResponse)
async def create_crew_task(
crew_request: CrewRequest,
background_tasks: BackgroundTasks
):
"""Create and initiate a new crew task"""
task_id = str(len(task_results) + 1) # Simple ID generation
if crew_request.async_execution:
# Initialize task status
task_results[task_id] = {
"status": "processing",
"result": None
}
# Schedule task execution
background_tasks.add_task(
process_crew_task,
task_id,
crew_request
)
return CrewResponse(
task_id=task_id,
status="processing"
)
else:
# Synchronous execution
await process_crew_task(task_id, crew_request)
result = task_results[task_id]
return CrewResponse(
task_id=task_id,
status=result["status"],
result=result["result"]
)
@app.get("/crew/tasks/{task_id}", response_model=CrewResponse)
async def get_task_status(task_id: str):
"""Get status and result of a specific task"""
if task_id not in task_results:
raise HTTPException(status_code=404, detail="Task not found")
result = task_results[task_id]
return CrewResponse(
task_id=task_id,
status=result["status"],
result=result["result"]
)
# Startup and shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize any resources (e.g., connection pools)
logger.info("Starting up CrewAI API Service")
yield
# Shutdown: Clean up resources
logger.info("Shutting down CrewAI API Service")
Here’s a breakdown of the key considerations and optimizations in this implementation:
- Cost Optimization:
- Uses agent caching with
@lru_cache
to avoid reinitializing agents
- Supports both synchronous and asynchronous execution modes
- Implements background task processing to handle long-running operations
- Allows configuration of max_iterations to control API usage
- Latency Management:
- Asynchronous execution with FastAPI
- In-memory result storage (can be replaced with Redis/database for production)
- Background task processing for long-running operations
- Agent instance caching
- Production-Ready Features:
- Proper error handling and logging
- Input validation with Pydantic models
- Async context manager for resource management
- API documentation with FastAPI
- Status tracking for long-running tasks
To deploy this:
- Host on a scalable platform (e.g., Kubernetes, AWS ECS)
- Add proper authentication
- Replace in-memory storage with a proper database
- Set up monitoring and logging
- Configure environment variables for sensitive data
"
It looks like there are a couple of tricks here to handle latency issue described in some of the replies.
I linked this code here: Claude Artifact
I have not tried to implement this yet but I am planning to experiment with it and share my learnings in a github repo once I do.
If anyone makes this working before me let me know how it worked.