OpenAI streaming?

Welcome, Ondřej! And thanks for your valuable contribution!

You’re absolutely right to point out that the documentation could use a more complete example, especially regarding the importance of adopting thread-safe practices, like the Queue you used in your example. So, I took the liberty of refining your excellent example a bit, making the event handler just quickly enqueue tokens, and spinning up a consumer thread to process them.

To sum up, when we set LLM.stream=True, an event of type LLMStreamChunkEvent is triggered for each token received from the LLM. The event listeners have access to each received token and can handle them however your application needs.

One important note: I said “event listenerS” because there can be other listeners attached to the same crewai_event_bus. That’s why, in the example below, I highlighted the tokens received in our Queue — cause the listener in crewai/utilities/events/event_listener.py is also listening to this same event and printing out the tokens. So, it’s totally normal to see two prints of the same token on screen.

import threading
import queue
import os
from typing import Any

from crewai import LLM
from crewai.utilities.events import LLMStreamChunkEvent, crewai_event_bus
from crewai.utilities.events.base_event_listener import BaseEventListener

# Thread-safe Queue for communication
token_queue = queue.Queue()

# Event Handler (Producer): just puts tokens into the queue
class MyLLMStreamListener(BaseEventListener):
    def __init__(self):
        super().__init__()

    def setup_listeners(self, crewai_event_bus):
        @crewai_event_bus.on(LLMStreamChunkEvent)
        def on_llm_stream_chunk(source: Any, event: LLMStreamChunkEvent):
            # Do NOT print or block here. Just enqueue quickly!
            token = event.chunk
            token_queue.put(token)

# Register the Event Handler
llm_stream_listener = MyLLMStreamListener()

# Consumer Thread to process tokens as they arrive
def token_consumer():
    while True:
        try:
            # Adjust timeout as needed for your application
            token = token_queue.get(timeout=5)
        except queue.Empty:
            print("\n----- No new tokens. Exiting consumer.")
            break
        # Here, process your token: print, save, send to API, etc.
        print("\n----- Consumer received LLM token -----")
        print(token)
        print("---------------------------------------")
        token_queue.task_done()

# Start the consumer thread
consumer_thread = threading.Thread(target=token_consumer, daemon=True)
consumer_thread.start()

# Prepare your LLM with streaming
os.environ["GEMINI_API_KEY"] = "YOUR_KEY"

gemini_llm = LLM(
    model="gemini/gemini-2.0-flash",
    temperature=0.7,
    stream=True
)

# Call the LLM to produce streaming output
response = gemini_llm.call("Say something interesting about Python Queues.")

# Optional: Wait for the consumer to process all items before shutting down
token_queue.join()
print("\n----- All tokens processed.")

# Optional: Ensure clean exit (if main exits immediately)
consumer_thread.join(timeout=1)