How to save Flow state and restart from checkpoint?

I’m currently developing a flow in CrewAI composed of 4 sequential crews, each performing significant work that takes considerable time and tokens. To speed up my development, I’m trying to persist the state at certain checkpoints, allowing me to restart execution from these saved states instead of rerunning the entire flow every time I make a change.

I’m aware from the documentation (Flow State Lifecycle) that each state gets a unique ID, and there’s a @persist decorator available for saving states. However, I’m unclear about:

  1. How can I access or retrieve these specific flow state IDs after the flow has executed? (it seems there’s a CLI command ‘crewai log-tasks-outputs’ that shows IDs but only works for crews, not for flows).
  2. What’s the correct way to use the @persist decorator to store and later restore the flow state from the database? This documentation section ‘Mastering Flow State Management - CrewAI’ details how to persist, but not how to load back.

Any guidance, examples, or best practices on managing persisted states would be greatly appreciated! I’m guessing restoring a state is something we will need to implement independently for now (or am I missing something?)

If you try to run the example code that’s right here with version 0.105.0 of CrewAI, it won’t even work. I feel like the CrewAI dev team is so dedicated to implementing improvements that things can change very quickly, and the documentation doesn’t keep up with the pace of the changes. In this case, it’s best to dive into the source. Be brave! :flexed_biceps:

Looking at some things here in crewai/flow (version 0.105.0), I understood that we need to tie the identity of the flow that we are interested in keeping track of. In particular, the state must have the id field defined by you (if you don’t, the library will assign a new id each time it runs), and you must also pass the same id in the inputs dictionary of your flow. By doing this, your state is automatically saved and recovered each time it runs.

Here’s a fully functional code snippet, I hope it helps:

from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel

from crewai.utilities.paths import db_storage_path
from pathlib import Path

print(
    '! DB default location is: '
    f'{str(Path(db_storage_path()) / "flow_states.db")}'
)

class CounterState(BaseModel):
    id: str = 'my-unique-id'
    value: int = 0

@persist(verbose=True)
class PersistentCounterFlow(Flow[CounterState]):
    @start()
    def increment(self):
        self.state.value += 1
        print(f"+ Incremented to {self.state.value}")
        return self.state.value

    @listen(increment)
    def double(self, value):
        self.state.value = value * 2
        print(f"x Doubled to {self.state.value}")
        return self.state.value

flow = PersistentCounterFlow()
result = flow.kickoff(
    inputs={
        'id': 'my-unique-id'
    }
)
print(f"= This run result: {result}")
1 Like

As @Max_Moura has clearly stated in his example code above you can control the flow state id by passing one in to your inputs.

This gives you control over how you can start a flow with the same state again. I usually like to save the whole state so I use the class @persist decorator which saves everything,

To start your flow on different stages depending on the flow, you could have checks that say, verify the flow method doesn’t have a method result assigned first before executing it. If it does skip and go to the next method. If done correctly this should work.

1 Like

According to the documentation, it is possible to apply persistence at the method level, but it seems that this blocks the flow execution before the method on which persistence is specified is actually executed.

This is the example code edited to make persistance works

from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist

class SelectivePersistFlow(Flow):
    @start()
    def first_step(self):
        self.state["count"] = 1
        return self.state["count"]

    @persist  # Only persist after this method
    @listen(first_step)
    def important_step(self, prev_result):
        self.state["count"] += 1
        self.state["important_data"] = "This will be persisted"
        return "Important step completed"

    @listen(important_step)
    def final_step(self, prev_result):
        self.state["count"] += 1
        return self.state["count"]
    
# First run
flow1 = SelectivePersistFlow()
result1 = flow1.kickoff()
flow1.plot()
print(f"First run result: {result1}")

# Second run - state is automatically loaded
flow2 = SelectivePersistFlow()
result2 = flow2.kickoff(
    inputs={
        "id": flow1.state["id"],  # Use the same ID to load the persisted state
    }
)
print(f"Second run result: {result2}")  # Will be higher due to persisted state

As can be seen from the log below, during the first execution the first_step function is executed, but then the flow stops without executing the other two functions.

Flow Execution
Starting Flow Execution
Name: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5

:ocean: Flow: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
└── :brain: Starting Flow…

Flow started with ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
:ocean: Flow: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
├── :brain: Starting Flow…
└── :counterclockwise_arrows_button: Running: first_step

:ocean: Flow: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
├── Flow Method Step
└── :white_check_mark: Completed: first_step

:white_check_mark: Flow Finished: SelectivePersistFlow
├── Flow Method Step
└── :white_check_mark: Completed: first_step

Flow Completion
Flow Execution Completed
Name: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5

First run result: First step

Flow Execution
Starting Flow Execution
Name: SelectivePersistFlow
ID: 47899c18-f6f7-4ef5-a4dd-e0a65a834a7e

:ocean: Flow: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
└── :brain: Starting Flow…

Flow started with ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
:ocean: Flow: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
├── :brain: Starting Flow…
└── :counterclockwise_arrows_button: Running: first_step

:ocean: Flow: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5
├── Flow Method Step
└── :white_check_mark: Completed: first_step

:white_check_mark: Flow Finished: SelectivePersistFlow
├── Flow Method Step
└── :white_check_mark: Completed: first_step

Flow Completion
Flow Execution Completed
Name: SelectivePersistFlow
ID: e940f8fa-5687-48f4-abf1-a32c38b051f5

Second run result: First step

Edit:
Using @persist(verbose=False) or @persist(verbose=True) seems to solve the problem

This topic was automatically closed 24 hours after the last reply. New replies are no longer allowed.