Apparently I figured out solution where we can have hosted solutions for Long term memory (ltm), short term memory (stm) and entity memory (em). Though official document seems a bit misleading as there is option to overwrite the ltm, stm and em with custom database implementation. I am providing example of MySql for long term and persistence chroma (HTTP Client) example for a crew implementation with custom storages
Storage class for Long term
import json
import mysql.connector
from mysql.connector import Error
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from crewai.utilities import Printer
from crewai.utilities.paths import db_storage_path
class LTMMySQLStorage:
"""An custom MySQL storage class for LTM data storage."""
def __init__(
self,
host: str,
user: str,
password: str,
database: str # database sghould exist in the instance
) -> None:
self.host = host
self.user = user
self.password = password
self.database = database
self._printer: Printer = Printer()
self._initialize_db()
def _initialize_db(self):
"""Initializes the MySQL database and creates LTM table."""
try:
conn = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
if conn.is_connected():
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS long_term_memories (
id INT AUTO_INCREMENT PRIMARY KEY,
task_description TEXT,
metadata TEXT,
datetime TEXT,
score FLOAT
)
"""
)
conn.commit()
except Error as e:
self._printer.print(
content=f"MEMORY ERROR: An error occurred during database initialization: {e}",
color="red",
)
finally:
if conn.is_connected():
cursor.close()
conn.close()
def save(
self,
task_description: str,
metadata: Dict[str, Any],
datetime: str,
score: Union[int, float],
) -> None:
"""Saves data to the LTM table with error handling."""
try:
conn = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
if conn.is_connected():
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO long_term_memories (task_description, metadata, datetime, score)
VALUES (%s, %s, %s, %s)
""",
(task_description, json.dumps(metadata), datetime, score),
)
conn.commit()
except Error as e:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while saving to LTM: {e}",
color="red",
)
finally:
if conn.is_connected():
cursor.close()
conn.close()
def load(
self, task_description: str, latest_n: int
) -> Optional[List[Dict[str, Any]]]:
"""Queries the LTM table by task description with error handling."""
try:
conn = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
if conn.is_connected():
cursor = conn.cursor()
cursor.execute(
"""
SELECT metadata, datetime, score
FROM long_term_memories
WHERE task_description = %s
ORDER BY datetime DESC, score ASC
LIMIT %s
""",
(task_description, latest_n),
)
rows = cursor.fetchall()
if rows:
return [
{
"metadata": json.loads(row[0]),
"datetime": row[1],
"score": row[2],
}
for row in rows
]
except Error as e:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while querying LTM: {e}",
color="red",
)
finally:
if conn.is_connected():
cursor.close()
conn.close()
return None
def reset(self) -> None:
"""Resets the LTM table with error handling."""
try:
conn = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database
)
if conn.is_connected():
cursor = conn.cursor()
cursor.execute("DELETE FROM long_term_memories")
conn.commit()
except Error as e:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)
finally:
if conn.is_connected():
cursor.close()
conn.close()
Storage class for Short term and entity:
import contextlib
import io
import logging
import os
import shutil
import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
@contextlib.contextmanager
def suppress_logging(
logger_name="chromadb.segment.impl.vector.local_persistent_hnsw",
level=logging.ERROR,
):
logger = logging.getLogger(logger_name)
original_level = logger.getEffectiveLevel()
logger.setLevel(level)
with (
contextlib.redirect_stdout(io.StringIO()),
contextlib.redirect_stderr(io.StringIO()),
contextlib.suppress(UserWarning),
):
yield
logger.setLevel(original_level)
class ChromaRAGStorage():
"""
Extends Storage to handle embeddings for memory entries, improving
search efficiency.
"""
app: ClientAPI | None = None
def __init__(
self, host, port, type="short_term", allow_reset=True, embedder_config=None, #crew=None
):
self.type = type
self.host = host
self.port = port
self.embedder_config = embedder_config
self.allow_reset = allow_reset
self._initialize_app()
def _set_embedder_config(self):
configurator = EmbeddingConfigurator()
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
print(f"in chroma initializing client: HttpClient")
try:
chroma_client = chromadb.HttpClient(host=self.host, port=self.port, settings=Settings(allow_reset=self.allow_reset))
except Exception as e:
raise Exception(f"An error occurred while trying to connect to Chromadb Instance at {self.host} : {self.port}: {e}")
self.app = chroma_client
self.collection = self.app.get_or_create_collection(name=self.type, embedding_function=self.embedder_config)
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
try:
self._generate_embedding(value, metadata)
except Exception as e:
logging.error(f"Error during {self.type} save: {str(e)}")
def search(
self,
query: str,
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Any]:
if not hasattr(self, "app"):
self._initialize_app()
try:
with suppress_logging():
response = self.collection.query(query_texts=query, n_results=limit)
results = []
for i in range(len(response["ids"][0])):
result = {
"id": response["ids"][0][i],
"metadata": response["metadatas"][0][i],
"context": response["documents"][0][i],
"score": response["distances"][0][i],
}
if result["score"] >= score_threshold:
results.append(result)
return results
except Exception as e:
logging.error(f"Error during {self.type} search: {str(e)}")
return []
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
self.collection.add(
documents=[text],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
def reset(self) -> None:
try:
if self.app:
self.app.reset()
shutil.rmtree(f"{db_storage_path()}/{self.type}")
self.app = None
self.collection = None
except Exception as e:
if "attempt to write a readonly database" in str(e):
# Ignore this specific error
pass
else:
raise Exception(
f"An error occurred while resetting the {self.type} memory: {e}"
)
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
Now For main crew class:
long_term_memory = LongTermMemory(
storage=LTMMySQLStorage(database="long_term_memories", 'host'="localhost", user="root", password="yourpassword")
)
short_term_memory = ShortTermMemory(
storage=ChromaRAGStorage(
type="short_term", host="localhost", port=8000, embedder_config=embedder_config,
)
)
entity_memory = EntityMemory(
storage=ChromaRAGStorage(
type="entity", host="localhost", port=8000, embedder_config=embedder_config,
)
)
crew = Crew(
agents=[father_agent],
tasks=[father_task],
process=Process.sequential,
memory=True,
verbose=True,
embedder=embedder_config,
long_term_memory = long_term_memory,
short_term_memory= short_term_memory,
entity_memory=entity_memory
)
crew.kickoff({"question": question})
with these configuration I was able to achieve my task, let me know for any more clarification, the official documentation might needed to be changes