OpenAI streaming?

It appears that litellm supports streaming so is there anyway to enable that functionality through CrewAI? Seems like an obvious option to include so I assume that I’m just missing something. Any advice?

1 Like

Is this simply not possible with the current CrewAI? I can’t possibly be the only person looking for this, am I?

2 Likes

We are also looking for this and this is a blocker. It’s disappointing to see silence from CrewAI on this. We’re very close to moving to Langchain due to this.

You mean streaming responses from LLMs?

It’s not really well documented. It actually works the way that when LLM has stream parameter set to true, the response chunks are sent into an event bus that you have to register to and take it from there, store it into a queue that must be consumed in a separate thread and sent via SSE or websockets back to the user.

This is the LLM that can be set to an agent:

llm = LLM(
model="gpt-4o",
stream=True
)

This is the way to register the event handler:

from crewai.utilities.events import crewai_event_bus
crewai_event_bus.register_handler(LLMStreamChunkEvent, stream_token_handler)

This is the actual handler that will save it to a queue:

token_queue = Queue()

def stream_token_handler(sender: Any, event: LLMStreamChunkEvent):
token = event.chunk
token_queue.put(token)
print(f"LLM Token received via event bus: {token}")

And then, in your endpoint handler, you have to write a loop where you would consume the queue with some timeouts until the response is fully generated. It would be nice if there was a fully working example with the streaming to the user as this scenario might be pretty common.

3 Likes

Welcome, Ondřej! And thanks for your valuable contribution!

You’re absolutely right to point out that the documentation could use a more complete example, especially regarding the importance of adopting thread-safe practices, like the Queue you used in your example. So, I took the liberty of refining your excellent example a bit, making the event handler just quickly enqueue tokens, and spinning up a consumer thread to process them.

To sum up, when we set LLM.stream=True, an event of type LLMStreamChunkEvent is triggered for each token received from the LLM. The event listeners have access to each received token and can handle them however your application needs.

One important note: I said “event listenerS” because there can be other listeners attached to the same crewai_event_bus. That’s why, in the example below, I highlighted the tokens received in our Queue — cause the listener in crewai/utilities/events/event_listener.py is also listening to this same event and printing out the tokens. So, it’s totally normal to see two prints of the same token on screen.

import threading
import queue
import os
from typing import Any

from crewai import LLM
from crewai.utilities.events import LLMStreamChunkEvent, crewai_event_bus
from crewai.utilities.events.base_event_listener import BaseEventListener

# Thread-safe Queue for communication
token_queue = queue.Queue()

# Event Handler (Producer): just puts tokens into the queue
class MyLLMStreamListener(BaseEventListener):
    def __init__(self):
        super().__init__()

    def setup_listeners(self, crewai_event_bus):
        @crewai_event_bus.on(LLMStreamChunkEvent)
        def on_llm_stream_chunk(source: Any, event: LLMStreamChunkEvent):
            # Do NOT print or block here. Just enqueue quickly!
            token = event.chunk
            token_queue.put(token)

# Register the Event Handler
llm_stream_listener = MyLLMStreamListener()

# Consumer Thread to process tokens as they arrive
def token_consumer():
    while True:
        try:
            # Adjust timeout as needed for your application
            token = token_queue.get(timeout=5)
        except queue.Empty:
            print("\n----- No new tokens. Exiting consumer.")
            break
        # Here, process your token: print, save, send to API, etc.
        print("\n----- Consumer received LLM token -----")
        print(token)
        print("---------------------------------------")
        token_queue.task_done()

# Start the consumer thread
consumer_thread = threading.Thread(target=token_consumer, daemon=True)
consumer_thread.start()

# Prepare your LLM with streaming
os.environ["GEMINI_API_KEY"] = "YOUR_KEY"

gemini_llm = LLM(
    model="gemini/gemini-2.0-flash",
    temperature=0.7,
    stream=True
)

# Call the LLM to produce streaming output
response = gemini_llm.call("Say something interesting about Python Queues.")

# Optional: Wait for the consumer to process all items before shutting down
token_queue.join()
print("\n----- All tokens processed.")

# Optional: Ensure clean exit (if main exits immediately)
consumer_thread.join(timeout=1)

Lovely it

Please submit a PR with that to enhance our documentation with real usage examples

How can I implement it using an async method ? Feels like it have a delay when the websocket is activated.

class InteractionListener(BaseEventListener):
    def __init__(self):
        super().__init__()
        self.loop = asyncio.get_running_loop()
        
   
    def setup_listeners(self, crewai_event_bus):
        @crewai_event_bus.on(LLMStreamChunkEvent)
        def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
            logger.warning(f"LLMStreamChunkEvent: {event.chunk}")
            # schedule the coroutine on the running loop:
            asyncio.run_coroutine_threadsafe(
                self.websocket.send_text(event.chunk),
                self.loop
            )

On the terminal, the logs appears, but the websocket response not. It will be send when later.