CrewAI Long Term Memory Persistence

Hi CrewAI Community,

I am exploring options for implementing long-term memory storage in a CrewAI project and have a few questions.

As per the documentation, long-term memory storage can be set using the db_path, which seems to be a local path relative to the project directory. Here’s an example setup:

long_term_memory = EnhanceLongTermMemory(
    storage=LTMSQLiteStorage(
        db_path="/my_data_dir/my_custom_storage/long_term_memory_storage.db"
    )
)

However, I am working on a multi-pod application running on a Kubernetes cluster, and there is a requirement to store the long-term memory on external persistent storage. The goal is to enable seamless saving and retrieval of data from this external storage.

My questions are:

  1. Does CrewAI provide any built-in support or implementation to store long-term memory in external persistent storage (e.g., cloud storage, external databases)?
  2. If not, are there any recommended approaches or patterns for achieving this in a Kubernetes-based setup?

I would greatly appreciate any insights, examples, or guidance from the community.

Thanks in advance!

I am also facing a similar situation. CloudRun GCP agent. I want to have options to connect to redis or other options. I will dig into the python modules of them and try to find something. Otherwise I will try to build a solution with help of Claude lol.

Apparently I figured out solution where we can have hosted solutions for Long term memory (ltm), short term memory (stm) and entity memory (em). Though official document seems a bit misleading as there is option to overwrite the ltm, stm and em with custom database implementation. I am providing example of MySql for long term and persistence chroma (HTTP Client) example for a crew implementation with custom storages

Storage class for Long term

import json
import mysql.connector
from mysql.connector import Error
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from crewai.utilities import Printer
from crewai.utilities.paths import db_storage_path

class LTMMySQLStorage:
    """An custom MySQL storage class for LTM data storage."""

    def __init__(
        self, 
        host: str, 
        user: str, 
        password: str, 
        database: str # database sghould exist in the instance
    ) -> None:
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self._printer: Printer = Printer()
        self._initialize_db()

    def _initialize_db(self):
        """Initializes the MySQL database and creates LTM table."""
        try:
            conn = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database
            )
            if conn.is_connected():
                cursor = conn.cursor()
                cursor.execute(
                    """
                    CREATE TABLE IF NOT EXISTS long_term_memories (
                        id INT AUTO_INCREMENT PRIMARY KEY,
                        task_description TEXT,
                        metadata TEXT,
                        datetime TEXT,
                        score FLOAT
                    )
                    """
                )
                conn.commit()
        except Error as e:
            self._printer.print(
                content=f"MEMORY ERROR: An error occurred during database initialization: {e}",
                color="red",
            )
        finally:
            if conn.is_connected():
                cursor.close()
                conn.close()

    def save(
        self,
        task_description: str,
        metadata: Dict[str, Any],
        datetime: str,
        score: Union[int, float],
    ) -> None:
        """Saves data to the LTM table with error handling."""
        try:
            conn = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database
            )
            if conn.is_connected():
                cursor = conn.cursor()
                cursor.execute(
                    """
                    INSERT INTO long_term_memories (task_description, metadata, datetime, score)
                    VALUES (%s, %s, %s, %s)
                    """,
                    (task_description, json.dumps(metadata), datetime, score),
                )
                conn.commit()
        except Error as e:
            self._printer.print(
                content=f"MEMORY ERROR: An error occurred while saving to LTM: {e}",
                color="red",
            )
        finally:
            if conn.is_connected():
                cursor.close()
                conn.close()

    def load(
        self, task_description: str, latest_n: int
    ) -> Optional[List[Dict[str, Any]]]:
        """Queries the LTM table by task description with error handling."""
        try:
            conn = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database
            )
            if conn.is_connected():
                cursor = conn.cursor()
                cursor.execute(
                    """
                    SELECT metadata, datetime, score
                    FROM long_term_memories
                    WHERE task_description = %s
                    ORDER BY datetime DESC, score ASC
                    LIMIT %s
                    """,
                    (task_description, latest_n),
                )
                rows = cursor.fetchall()
                if rows:
                    return [
                        {
                            "metadata": json.loads(row[0]),
                            "datetime": row[1],
                            "score": row[2],
                        }
                        for row in rows
                    ]
        except Error as e:
            self._printer.print(
                content=f"MEMORY ERROR: An error occurred while querying LTM: {e}",
                color="red",
            )
        finally:
            if conn.is_connected():
                cursor.close()
                conn.close()
        return None

    def reset(self) -> None:
        """Resets the LTM table with error handling."""
        try:
            conn = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password,
                database=self.database
            )
            if conn.is_connected():
                cursor = conn.cursor()
                cursor.execute("DELETE FROM long_term_memories")
                conn.commit()
        except Error as e:
            self._printer.print(
                content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
                color="red",
            )
        finally:
            if conn.is_connected():
                cursor.close()
                conn.close()

Storage class for Short term and entity:

import contextlib
import io
import logging
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional

from chromadb.api import ClientAPI

from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path


@contextlib.contextmanager
def suppress_logging(
    logger_name="chromadb.segment.impl.vector.local_persistent_hnsw",
    level=logging.ERROR,
):
    logger = logging.getLogger(logger_name)
    original_level = logger.getEffectiveLevel()
    logger.setLevel(level)
    with (
        contextlib.redirect_stdout(io.StringIO()),
        contextlib.redirect_stderr(io.StringIO()),
        contextlib.suppress(UserWarning),
    ):
        yield
    logger.setLevel(original_level)


class ChromaRAGStorage():
    """
    Extends Storage to handle embeddings for memory entries, improving
    search efficiency.
    """

    app: ClientAPI | None = None

    def __init__(
        self, host, port, type="short_term", allow_reset=True, embedder_config=None, #crew=None
    ):
        self.type = type
        self.host = host
        self.port = port
        self.embedder_config = embedder_config
        self.allow_reset = allow_reset
        self._initialize_app()

    def _set_embedder_config(self):
        configurator = EmbeddingConfigurator()
        self.embedder_config = configurator.configure_embedder(self.embedder_config)

    def _initialize_app(self):
        import chromadb
        from chromadb.config import Settings

        self._set_embedder_config()
        print(f"in chroma initializing client: HttpClient")
        try:
            chroma_client = chromadb.HttpClient(host=self.host, port=self.port, settings=Settings(allow_reset=self.allow_reset))
        except Exception as e:
            raise Exception(f"An error occurred while trying to connect to Chromadb Instance at {self.host} : {self.port}: {e}")

        self.app = chroma_client
        
        self.collection = self.app.get_or_create_collection(name=self.type, embedding_function=self.embedder_config)

    def save(self, value: Any, metadata: Dict[str, Any]) -> None:
        if not hasattr(self, "app") or not hasattr(self, "collection"):
            self._initialize_app()
        try:
            self._generate_embedding(value, metadata)
        except Exception as e:
            logging.error(f"Error during {self.type} save: {str(e)}")

    def search(
        self,
        query: str,
        limit: int = 3,
        filter: Optional[dict] = None,
        score_threshold: float = 0.35,
    ) -> List[Any]:
        if not hasattr(self, "app"):
            self._initialize_app()

        try:
            with suppress_logging():
                response = self.collection.query(query_texts=query, n_results=limit)

            results = []
            for i in range(len(response["ids"][0])):
                result = {
                    "id": response["ids"][0][i],
                    "metadata": response["metadatas"][0][i],
                    "context": response["documents"][0][i],
                    "score": response["distances"][0][i],
                }
                if result["score"] >= score_threshold:
                    results.append(result)

            return results
        except Exception as e:
            logging.error(f"Error during {self.type} search: {str(e)}")
            return []

    def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None:  # type: ignore
        if not hasattr(self, "app") or not hasattr(self, "collection"):
            self._initialize_app()

        self.collection.add(
            documents=[text],
            metadatas=[metadata or {}],
            ids=[str(uuid.uuid4())],
        )

    def reset(self) -> None:
        try:
            if self.app:
                self.app.reset()
                shutil.rmtree(f"{db_storage_path()}/{self.type}")
                self.app = None
                self.collection = None
        except Exception as e:
            if "attempt to write a readonly database" in str(e):
                # Ignore this specific error
                pass
            else:
                raise Exception(
                    f"An error occurred while resetting the {self.type} memory: {e}"
                )

    def _create_default_embedding_function(self):
        from chromadb.utils.embedding_functions.openai_embedding_function import (
            OpenAIEmbeddingFunction,
        )

        return OpenAIEmbeddingFunction(
            api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
        )

Now For main crew class:

long_term_memory = LongTermMemory(
        storage=LTMMySQLStorage(database="long_term_memories", 'host'="localhost", user="root", password="yourpassword")
    )
short_term_memory = ShortTermMemory(
        storage=ChromaRAGStorage(
        type="short_term", host="localhost", port=8000, embedder_config=embedder_config,
    )
)
entity_memory = EntityMemory(
        storage=ChromaRAGStorage(
        type="entity", host="localhost", port=8000, embedder_config=embedder_config,
    )
)

crew = Crew(
    agents=[father_agent],
    tasks=[father_task],
    process=Process.sequential,
    memory=True,
    verbose=True,
    embedder=embedder_config,
    long_term_memory = long_term_memory,
    short_term_memory= short_term_memory,
    entity_memory=entity_memory
)
crew.kickoff({"question": question})

with these configuration I was able to achieve my task, let me know for any more clarification, the official documentation might needed to be changes

That’s really cool - - Thanks a lot!

I did something on my own if anyone would like to have a different perspective. However, the given solution by @koustavpramanick is simpler and goes more with CREWAI out of the box db handler.

from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.long_term.long_term_memory import LongTermMemory
from sqlalchemy import create_engine, text
from typing import Dict, List, Optional
from datetime import datetime
from crewai import Crew
import json


# Step 1: Create Base Storage Handler
class BaseStorageHandler:
    def connect(self):
        raise NotImplementedError

    def disconnect(self):
        raise NotImplementedError

    def save(self, task_description: str, metadata: dict, datetime: str, score: float):
        raise NotImplementedError

    def load(self, task_description: str, latest_n: int) -> List[Dict]:
        raise NotImplementedError

    def reset(self):
        raise NotImplementedError


# Step 2: Implement External SQL Handler
class ExternalSQLHandler(BaseStorageHandler):
    def __init__(self, connection_string: str, pool_size: int = 5):
        self.connection_string = connection_string
        self.pool_size = pool_size
        self.pool = None

    def connect(self):
        """Initialize connection pool"""
        self.engine = create_engine(self.connection_string)
        self.pool = self.engine.pool

    def save(self, task_description: str, metadata: dict, datetime: str, score: float):
        """Save memory to database"""
        with self.engine.connect() as conn:
            # Convert Unix timestamp to proper datetime
            timestamp = datetime if isinstance(datetime, str) else str(datetime)
            formatted_datetime = text("to_timestamp(:dt)")

            conn.execute(
                text(
                    """
                    INSERT INTO long_term_memories 
                    (task_description, metadata, datetime, score)
                    VALUES (:task, :meta, """
                    + formatted_datetime.text
                    + """, :score)
                    """
                ),
                {
                    "task": task_description,
                    "meta": json.dumps(metadata),
                    "dt": timestamp,
                    "score": score,
                },
            )
            conn.commit()

    def load(self, task_description: str, latest_n: int) -> List[Dict]:
        """Load the latest n memories related to the task description"""
        with self.engine.connect() as conn:
            result = conn.execute(
                text(
                    """
                    SELECT task_description, metadata, datetime, score
                    FROM long_term_memories
                    WHERE task_description LIKE :task
                    ORDER BY datetime DESC
                    LIMIT :n
                    """
                ),
                {"task": f"%{task_description}%", "n": latest_n},
            )

            memories = []
            for row in result:
                memories.append(
                    {
                        "task": row.task_description,
                        "metadata": json.loads(row.metadata),
                        "datetime": row.datetime,
                        "score": row.score,
                    }
                )

            return memories


# Step 3: Custom Long Term Memory
class CustomLongTermMemory(LongTermMemory):
    def __init__(self, storage: BaseStorageHandler, *args, **kwargs):
        super(CustomLongTermMemory, self).__init__(*args, **kwargs)
        self.storage = storage
        self.storage.connect()

    def save(self, item: LongTermMemoryItem):
        metadata = item.metadata
        metadata.update({"agent": item.agent, "expected_output": item.expected_output})

        self.storage.save(
            task_description=item.task,
            metadata=metadata,
            datetime=item.datetime,
            score=metadata["quality"],
        )

    def search(self, task: str, latest_n: int = 3) -> List[Dict]:
        return self.storage.load(task, latest_n)

It connects to an external postgress in my case.

This topic was automatically closed 24 hours after the last reply. New replies are no longer allowed.