Deploying CrewAI as an API service

Hi everyone,

I want to deploy CrewAI as an API service so that I can send requests and receive responses.

What’s the best way to deploy it as an API service?

I’m using FastAPI and I wanted to deploy it on Heroku - but it doesn’t fit within the 1GB RAM in Heroku, and more than that is just unreasonable.
So I deployed it on Google Cloud Run (where it fits within the 1GB ?!), but the service is unreliable - when I use it in the morning (CET), it works well and delivers result within ~1m40s, but when I use it in the evening (CET, when US is heavily using the service as well), connection cuts off after a minute (even though the server continues to process the input and finishes the process, the connection is cut off). It works well with receiving multiple request simultaneously, the connection is the only issue.

So, I was just wondering if anyone has any recommendations for how to efficiently deploy CrewAI as a service so that it’s scalable and independent.

I’m not using memory, and every request is independent of everything else.

This is really troubling me, and I’m happy to share my approach after I solve it.
Does anyone have any experience with that?

Hi @Dragutin_Oreski ,
I have just wrote a response to another post: Client session memory - #4 by Dabnis that may help your considerations.

Reagrds
Dabnis

I have the same dilemma as you, have you tried render or digitalocean? i think also that the cerebras.ai can help you with the velocity of the api model

No, I haven’t tried Render or digitalocean.

I think I’ll drop CrewAI altogether. It brings so much overhead, and issues like this are very hard to deal with.

FYI: DigitalOcean cheapest GPU droplete ~ $3.64/hour!

I have an outstanding query to them asking if this is when used, or when powered up!

As yet no reponse.

I take it that you have looked at the link I put in my post above. In essence these issues will probably be the same for most cloud GPU providers due to CrewAI being Python based.

Given how easy it is to duplicate CrewAI functionality, I am considering porting to either Golang, or Rust. At least I’ll get container sizes down to < 100 MB :slight_smile:

Yes, thank you.
I checked your comment.

But I was wondering about the deployment of the CrewAI as an API service, natively in Python.
I didn’t want to rewrite everything in another language.

Even now, when I’m quite sure I won’t use CrewAI directly, I’m still going to use Python.

Best regards,
Dragutin

Hi @Dragutin_Oreski,
Python is the issue, not CrewAI. Any AI app created with Python is going to have the same issue.
I’m not a Python dev, but I believe that there are techniques to reduce the container size, even thenn it’s going to be significantly larger than an equivilent Golang, Rust application.
You could ‘hack’ it by forwarding incoming API requests back to your local IP where you would have a Python AI app running locally on your box! Not great, but all I can think of at this time.
There’s now a Rust and Goland version of Numpy, sickit-learn predicted for mid next year.

As I’ve stated, ‘Python is holding AI hostage’ :slight_smile:

If you find a solution, would appreciate an update.

Best of luck :hand_with_index_finger_and_thumb_crossed:

Hi @joseguzman
I have created my crew but am getting the last output of the task only.
Did it get a similar issue

same issues to
i only get the last output of the task

were you able to fix this

After reading through the documentation, I understood that the crew will by default return the last output of the last task.
You can have a task that compiles the results from other tasks or,
You can get the full output by using the full_output parameter in the crew:

@crew
def crew(self) → Crew:
“”“Creates the Meal Planner Crew”“”
return Crew(
agents=self.agents,
tasks=self.tasks,
verbose=True,
full_output=True

)

Ohhh, i will try creating a task that compiles results from other task.
i’ve also tried to use the full_option but i still got the last output.

please can you share your github repo. probably i’ll find what i’m doing wrong

We can have a meeting for me to demonstrate this.

Would like to add to this question. In regards to hosting CrewAI as a stand alone app, has anyone been successful with this?

I have a Flow which I integrated with my Django application – and I noticed that CrewAI’s Flow setup comes with a chrom.sqlite DB.

I couldn’t find anything in the Crew AI document about hosting, other than through CrewAI’s enterprise.

Did you set verbose=True on each of the agents as well?

Hi to everyone, I am interested in your experience about running crew.ai as an API (even without the enterprise version).

At the moment I think, reading the thread, that calling and agent cannot be a sync process, so I agree also with the callback url approach provided by crew.ai Enteprise.

This topic is very similar to Hosting crew ai on some other platform - #7 by MJFresh is there a way to merge things? It would also we awesome to learn how to host Crew.ai w/FastAPI on Heroku or other Paas.

Over the weekend I deployed a simple 2-agent crew (with RAG and search internet tools) to a Render free tier using FastAPI and a Docker image.

1 Like

Good to know! It seems the “timeout” issue comes up. Doesn’t seem to be an issue with Render: “Render allows responses to take up to 100 minutes for HTTP requests. Heroku has a hard response timeout of 30 seconds and it can not be configured or changed.” https://render.com/docs/render-vs-heroku-comparison

Given the long running process of Crew I think putting it into a queue or background job would work for Heroku? They support Redis queue: Background Tasks in Python with RQ | Heroku Dev Center

I’ve been playing with crews locally and looked for options to accomplish this task myself (which is why I have been following this thread).

I am not an expert by any means but I have been able to build several projects with the help of AI. I use Cursor AI as IDE and I’ve seen lots of startups adopting it as an accelerator in writing code. Claude seems to be the de-facto standard when it comes to code wirh LLM so I decided to ask this question. I thought it might be useful to share it here.

And the response was:

"I should note that while I aim to help, my knowledge cutoff means I may not have complete information about the latest crewAI developments or deployment best practices. However, I can suggest an approach based on general principles for deploying AI systems as APIs.

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from crewai import Agent, Task, Crew
from typing import List, Optional
import logging
import asyncio
from functools import lru_cache
import os
from contextlib import asynccontextmanager

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI(title="CrewAI API Service")

class CrewRequest(BaseModel):
    objective: str
    context: Optional[str] = None
    max_iterations: Optional[int] = 3
    async_execution: Optional[bool] = True

class CrewResponse(BaseModel):
    task_id: str
    status: str
    result: Optional[str] = None

# In-memory store for task results
task_results = {}

@lru_cache()
def get_agents():
    """Cache and return agent instances to reduce initialization overhead"""
    researcher = Agent(
        role='Researcher',
        goal='Find and analyze relevant information',
        backstory='Expert at gathering and analyzing information',
        allow_delegation=False
    )
    
    writer = Agent(
        role='Writer',
        goal='Create clear and concise content',
        backstory='Expert at creating engaging content',
        allow_delegation=False
    )
    
    return [researcher, writer]

async def process_crew_task(task_id: str, crew_request: CrewRequest):
    """Execute crew task asynchronously"""
    try:
        # Initialize agents
        agents = get_agents()
        
        # Create tasks
        research_task = Task(
            description=f"Research: {crew_request.objective}",
            agent=agents[0]
        )
        
        writing_task = Task(
            description=f"Write content about: {crew_request.objective}",
            agent=agents[1]
        )
        
        # Initialize crew
        crew = Crew(
            agents=agents,
            tasks=[research_task, writing_task],
            max_iterations=crew_request.max_iterations
        )
        
        # Execute crew tasks
        result = await crew.kickoff()
        
        # Store result
        task_results[task_id] = {
            "status": "completed",
            "result": result
        }
        
    except Exception as e:
        logger.error(f"Error processing task {task_id}: {str(e)}")
        task_results[task_id] = {
            "status": "failed",
            "result": str(e)
        }

@app.post("/crew/tasks", response_model=CrewResponse)
async def create_crew_task(
    crew_request: CrewRequest,
    background_tasks: BackgroundTasks
):
    """Create and initiate a new crew task"""
    task_id = str(len(task_results) + 1)  # Simple ID generation
    
    if crew_request.async_execution:
        # Initialize task status
        task_results[task_id] = {
            "status": "processing",
            "result": None
        }
        
        # Schedule task execution
        background_tasks.add_task(
            process_crew_task,
            task_id,
            crew_request
        )
        
        return CrewResponse(
            task_id=task_id,
            status="processing"
        )
    else:
        # Synchronous execution
        await process_crew_task(task_id, crew_request)
        result = task_results[task_id]
        
        return CrewResponse(
            task_id=task_id,
            status=result["status"],
            result=result["result"]
        )

@app.get("/crew/tasks/{task_id}", response_model=CrewResponse)
async def get_task_status(task_id: str):
    """Get status and result of a specific task"""
    if task_id not in task_results:
        raise HTTPException(status_code=404, detail="Task not found")
    
    result = task_results[task_id]
    return CrewResponse(
        task_id=task_id,
        status=result["status"],
        result=result["result"]
    )

# Startup and shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: Initialize any resources (e.g., connection pools)
    logger.info("Starting up CrewAI API Service")
    yield
    # Shutdown: Clean up resources
    logger.info("Shutting down CrewAI API Service")

Here’s a breakdown of the key considerations and optimizations in this implementation:

  1. Cost Optimization:
  • Uses agent caching with @lru_cache to avoid reinitializing agents
  • Supports both synchronous and asynchronous execution modes
  • Implements background task processing to handle long-running operations
  • Allows configuration of max_iterations to control API usage
  1. Latency Management:
  • Asynchronous execution with FastAPI
  • In-memory result storage (can be replaced with Redis/database for production)
  • Background task processing for long-running operations
  • Agent instance caching
  1. Production-Ready Features:
  • Proper error handling and logging
  • Input validation with Pydantic models
  • Async context manager for resource management
  • API documentation with FastAPI
  • Status tracking for long-running tasks

To deploy this:

  1. Host on a scalable platform (e.g., Kubernetes, AWS ECS)
  2. Add proper authentication
  3. Replace in-memory storage with a proper database
  4. Set up monitoring and logging
  5. Configure environment variables for sensitive data

"

It looks like there are a couple of tricks here to handle latency issue described in some of the replies.

I linked this code here: Claude Artifact

I have not tried to implement this yet but I am planning to experiment with it and share my learnings in a github repo once I do.

If anyone makes this working before me let me know how it worked.