CrewAI Knowledge Reset Not Working

Hello All,

I am trying to reset knowledge with the following command from the terminal:

crewai reset-memories --knowledge

It is erroring out with “An unexpected error occurred: No crew found.”

I have tried with upgrading to latest CrewAI version but with no help.

Note: My crew files are not named crew.py, if that could be a reason…

When you run the crewai reset-memories --knowledge command, it executes the reset_memories_command() function, which is defined here. This, in turn, calls the get_crews() function, defined here, to fetch the crews to be reset. Just by looking at the function’s signature:


def get_crews(crew_path: str = "crew.py", require: bool = False) -> list[Crew]:

you can see that, yes, your crew files must be named crew.py for the command to work correctly.

This is really interesting… From now on I am going to keep crew.py as my main crew.

Not necessarily. You can use any filename you want, and then perform the reset manually:

my_crew.reset_memories(command_type="all")

command_type must be one of: long, short, entity, knowledge, agent_knowledge, kickoff_outputs, or all.

Thanks for your response. However, when I try to run it from the python process after my crew is defined and before the crew is kicked-off, I get the following error:

RuntimeError: Failed to reset knowledge memory: [Crew (crew)] Failed to reset Crew Knowledge and Agent Knowledge memory: [WinError 32] The process cannot access the file because it is being used by another process: ‘<path_to_knowledge>\knowledge\chroma.sqlite3’

Any suggestion on how to resolve this?

You’re right, that completely slipped my mind when I suggested that approach.

As soon as you instantiate crewai.Crew, its internal memory mechanisms are automatically initialized. This means, for example, that the chroma.sqlite3 file is locked for the lifetime of the Crew object. Therefore, you can’t safely reset the Crew’s memory within the same process.

And since the CrewAI CLI doesn’t let you specify a file other than crew.py, here’s a simplified version of the original CLI that allows you to reset all memories for Crews in a specific file, like main.py or any other. Save it as reset_crew_memories.py and then use it like this:

python reset_crew_memories.py main.py crew1.py my_file.py ...
import os
import sys
import importlib.util
from inspect import isfunction, ismethod
from pathlib import Path
from typing import Any, List, Optional, get_type_hints

from crewai.crew import Crew
from crewai.flow import Flow

def get_crew_instance(module_attr: Any) -> Optional[Crew]:
    if isinstance(module_attr, Crew):
        return module_attr

    if (
        callable(module_attr)
        and hasattr(module_attr, "is_crew_class")
        and module_attr.is_crew_class
    ):
        return module_attr().crew() # type: ignore

    try:
        if (
            ismethod(module_attr) or isfunction(module_attr)
        ) and get_type_hints(module_attr).get("return") is Crew:
            return module_attr()
    except Exception:
        pass

    return None

def fetch_crews_from_module_attr(module_attr: Any) -> List[Crew]:
    crew_instances: List[Crew] = []

    if crew_instance := get_crew_instance(module_attr):
        crew_instances.append(crew_instance)

    if isinstance(module_attr, type) and issubclass(module_attr, Flow):
        try:
            instance = module_attr()
            for attr_name in dir(instance):
                if not attr_name.startswith("_"):
                    attr = getattr(instance, attr_name)
                    if crew_instance := get_crew_instance(attr):
                        crew_instances.append(crew_instance)
        except Exception:
            pass

    return crew_instances

def find_crews_in_file(filepath: str) -> List[Crew]:
    if not os.path.exists(filepath):
        print(f"Error: File not found at '{filepath}'", file=sys.stderr)
        return []

    absolute_path = os.path.abspath(filepath)
    module_name = Path(absolute_path).stem

    file_dir = os.path.dirname(absolute_path)
    if file_dir not in sys.path:
        sys.path.insert(0, file_dir)

    try:
        spec = importlib.util.spec_from_file_location(
            module_name, absolute_path
        )
        if not spec or not spec.loader:
            print(
                f"Error: Could not create module spec for '{filepath}'",
                file=sys.stderr,
            )
            if file_dir in sys.path:
                sys.path.remove(file_dir)
            return []

        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)

        found_crews = []
        for attr_name in dir(module):
            if not attr_name.startswith("_"):
                module_attr = getattr(module, attr_name)
                found_crews.extend(fetch_crews_from_module_attr(module_attr))
        return found_crews
    except Exception as e:
        print(f"Error processing file '{filepath}': {e}", file=sys.stderr)
        return []
    finally:
        if file_dir in sys.path:
            sys.path.remove(file_dir)
        if module_name in sys.modules:
            del sys.modules[module_name]

def main():
    if len(sys.argv) < 2:
        print("Usage: python reset_all_memories.py <file1.py> <file2.py> ...")
        sys.exit(1)

    filenames = sys.argv[1:]
    total_crews_reset = 0

    for filename in filenames:
        print(f"\nProcessing '{filename}'...")

        crews = find_crews_in_file(filename)
        if not crews:
            print(f"No crews found in '{filename}'.")
            continue

        for crew in crews:
            try:
                crew_id = crew.name if crew.name else crew.id
                print(f"  - Resetting all memories for Crew '{crew_id}'...")
                # command_type = long, short, entity, knowledge, agent_knowledge, kickoff_outputs or all
                crew.reset_memories(
                    command_type="all"
                ) 
                total_crews_reset += 1
                print(f"  - Memories for crew '{crew_id}' have been reset.")
            except Exception as e:
                crew_id = crew.name if crew.name else crew.id
                print(
                    f"  - Error resetting memories for crew '{crew_id}': {e}",
                    file=sys.stderr,
                )

    print(
        f"\nOperation complete. Reset memories for {total_crews_reset} Crew(s)."
    )

if __name__ == "__main__":
    main()

(Full disclosure, I haven’t tested this exhaustively, so good luck! :grinning_face_with_smiling_eyes:)