Debugging State Persistence: When Does @persist Save Flow State?

Hi everyone,
I was studying the use of the @persist decorator when applied at the class level versus the method level, but I’m still unclear on how state persistence actually works.

Let’s start with the class-level application.
After running the following example code — written purely to observe the effects of the decorator applied at different levels — it seems that applying @persist at the class level has the following effect:

:backhand_index_pointing_right: It saves the state of the flow every time a step (method) in the flow is successfully executed.

from crewai.flow.flow import Flow, listen, start, router
from crewai.flow.persistence import persist
from pydantic import BaseModel

class State(BaseModel):
    counter: int = 0
@persist(verbose=True)
class FlowClassPersistance(Flow[State]):
    
    @start()
    def first(self):
        print(f"Initial counter: {self.state.counter}")
        print("Executing first method")
        self.state.counter += 1
        print(f"Counter: {self.state.counter}")
        return

    @listen(first)
    def increase(self):
        print("Executing increase method")
        self.state.counter *= 2
        print(f"Counter: {self.state.counter}")
        if self.state.counter <= 2:
            raise Exception("Stopping the flow due to error")
        return "next"
        
    @listen(increase)
    def increase_next(self):
        print("Executing increase_next method")
        self.state.counter *= 2
        print(f"Counter: {self.state.counter}")
        return "next"

def main():
    try:
        
        flow = FlowClassPersistance()
        flow1 = flow.kickoff()
        print(f"Esecuzione 1, counter: {flow.state.counter}")
        
        
    except Exception as e:
        flow2 = flow.kickoff(inputs={
            "id": flow.state.id,  # Use the same ID to load the persisted state
        })
        print(f"Esecuzione 2, counter: {flow.state.counter}")    
        
if __name__ == "__main__":
    main()

Here’s the summary of the output I get:

🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
└── 🧠 Starting Flow...

 Flow started with ID: 2054d94e-3164-409e-a5a4-821703d1297c
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── 🧠 Starting Flow...
└── 🔄 Running: first

Initial counter: 0
Executing first method
Counter: 1

 ⚠️ #HERE, AFTER THE 'FIRST' METHOD HAS BEEN SUCCESSFULLY COMPLETED, THE STATE IS SAVED.

 Saving flow state to memory for ID: 2054d94e-3164-409e-a5a4-821703d1297c
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
└── ✅ Completed: first

🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
├── ✅ Completed: first
└── 🔄 Running: increase

Executing increase method
Counter: 2

⚠️# HERE, THE 'INCREASE' METHOD FAILED, SO THE STATE IS NOT SAVED.
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
├── ✅ Completed: first
└── ❌ Failed: increase


[Flow._execute_single_listener] Error in method increase: Stopping the flow due to error

⚠️#FLOW RE-EXECTUED PASSING THE STATE ID OF THE PREVIOUS FLOW
Loading flow state from memory for UUID: 2054d94e-3164-409e-a5a4-821703d1297c
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
└── 🧠 Starting Flow...

 Flow started with ID: 2054d94e-3164-409e-a5a4-821703d1297c
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── 🧠 Starting Flow...
└── 🔄 Running: first

Initial counter: 1 ⚠️# STATE LOADED AS EXPECTED
Executing first method
Counter: 2
 Saving flow state to memory for ID: 2054d94e-3164-409e-a5a4-821703d1297c
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
└── ✅ Completed: first

🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
├── ✅ Completed: first
└── 🔄 Running: increase

Executing increase method
Counter: 4
 Saving flow state to memory for ID: 2054d94e-3164-409e-a5a4-821703d1297c
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
├── ✅ Completed: first
└── ✅ Completed: increase

🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
├── ✅ Completed: first
├── ✅ Completed: increase
└── 🔄 Running: increase_next

Executing increase_next method
Counter: 8
 Saving flow state to memory for ID: 2054d94e-3164-409e-a5a4-821703d1297c
🌊 Flow: FlowClassPersistance
    ID: 2054d94e-3164-409e-a5a4-821703d1297c
├── Flow Method Step
├── ✅ Completed: first
├── ✅ Completed: increase
└── ✅ Completed: increase_next

Moving on to method-level persistence, it’s not clear to me when the state is actually saved.
Here is the example code:

from crewai.flow.flow import Flow, listen, start, router  
from crewai.flow.persistence import persist
from pydantic import BaseModel

class State(BaseModel):
    counter: int = 0

class FlowMethodPersistence(Flow[State]):
    
    @start()
    def first(self):
        print(f"Initial counter: {self.state.counter}")
        print("Executing first method")
        self.state.counter += 1
        print(f"Counter: {self.state.counter}")
        return

    @listen(first)
    def increase(self):
        print("Executing increase method")
        self.state.counter *= 2
        print(f"Counter: {self.state.counter}")
        if self.state.counter <= 2:
            raise Exception("Stopping the flow due to error")
        return "next"
    
    @persist(verbose=True)  # ⚠️I EXPECT THE STATE TO BE SAVED ONLY AFTER THIS METHOD IS EXECUTED SUCCESSFULLY.
    @listen(increase)
    def increase_next(self):
        print("Executing increase_next method")
        self.state.counter *= 2
        print(f"Counter: {self.state.counter}")
        return "next"

def main():
    try:
        
        flow = FlowMethodPersistence()

        flow1 = flow.kickoff()
        print(f"Esecuzione 1, valore del counter {flow.state.counter}")
        
                
    except Exception as e:
        flow2 = flow.kickoff(inputs={
            "id": flow.state.id,  # Use the same ID to load the persisted state
        })
        print(f"Esecuzione 2, valore del counter {flow.state.counter}")

    
if __name__ == "__main__":
    main()

the output i get does not match what i expect; in fact, it seems the state is saved beforehand. here is the output i get:

🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
└── 🧠 Starting Flow...

 Flow started with ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── 🧠 Starting Flow...
└── 🔄 Running: first

Initial counter: 0
Executing first method
Counter: 1
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
└── ✅ Completed: first

🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
├── ✅ Completed: first
└── 🔄 Running: increase

Executing increase method
Counter: 2

# ⚠️HERE THE METHOD FAILED, SO I EXPECT THE STATE NOT TO BE SAVED.
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
├── ✅ Completed: first
└── ❌ Failed: increase

[Flow._execute_single_listener] Error in method increase: Stopping the flow due to error

⚠️#FLOW RE-EXECTUED PASSING THE STATE ID OF THE PREVIOUS FLOW
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
└── 🧠 Starting Flow...

 Flow started with ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── 🧠 Starting Flow...
└── 🔄 Running: first

#⚠️HERE I EXPECTED THE COUNTER TO BE 0.
Initial counter: 2
Executing first method
Counter: 3
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
└── ✅ Completed: first

🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
├── ✅ Completed: first
└── 🔄 Running: increase

Executing increase method
Counter: 6
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
├── ✅ Completed: first
└── ✅ Completed: increase

🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
├── ✅ Completed: first
├── ✅ Completed: increase
└── 🔄 Running: increase_next

Executing increase_next method
Counter: 12
 Saving flow state to memory for ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
🌊 Flow: FlowMethodPersistence
    ID: bd5ce9bf-2ec6-4caf-830a-9b2463af8efd
├── Flow Method Step
├── ✅ Completed: first
├── ✅ Completed: increase
└── ✅ Completed: increase_next

✅ Flow Finished: FlowMethodPersistence
├── Flow Method Step
├── ✅ Completed: first
├── ✅ Completed: increase
└── ✅ Completed: increase_next

Hey Simone, here we go.

It looks like your main question is about using @persist for method-level persistence, right? Like, if a method fails, any persistence tied to that method (or maybe the flow as a whole) shouldn’t commit the changes made during that failed method. Did I get that right?

Alright, let’s take a look at the key part of the code for the @persist decorator/wrapper (check it out in crewai/flow/persistence/decorators.py):

def persist(persistence: Optional[FlowPersistence] = None, verbose: bool = False):
    """Decorator to persist flow state.

    This decorator can be applied at either the class level or method level.
    When applied at the class level, it automatically persists all flow method
    states. When applied at the method level, it persists only that method's
    state.
    """
    def decorator(target: Union[Type, Callable[..., T]]) -> Union[Type, Callable[..., T]]:
        """Decorator that handles both class and method decoration."""
        actual_persistence = persistence or SQLiteFlowPersistence()

        if isinstance(target, type):
            # Class decoration
            # [Bla bla bla]
        else:
            # Method decoration
            method = target
            setattr(method, "__is_flow_method__", True)

            if asyncio.iscoroutinefunction(method):
                # [Bla bla bla]
            else:
                @functools.wraps(method)
                def method_sync_wrapper(flow_instance: Any, *args: Any, **kwargs: Any) -> T:
                    # Comment [1] below
                    result = method(flow_instance, *args, **kwargs)
                    # Comment [2] below
                    PersistenceDecorator.persist_state(flow_instance, method.__name__, actual_persistence, verbose)
                    return result
  • Comment [1]: The original method runs here.
  • Comment [2]: If the original method throws an exception, this line is NEVER reached, so nothing gets persisted.

So why does it seem like the state changes are still being persisted? Simple: because you’re reusing the same FlowMethodPersistence() object (and, as a result, the same state) that’s sitting in your computer’s RAM — not the state that would be saved to disk if persistence had actually happened!

Remember, the whole point of persistence is: you save the state now, shut down your computer, go on an 80-day vacation across South America, and when you come back, your Flow object should be able to recover the state from before your trip.

Since you’re catching the exception, on your second run you’re reusing the same flow and the same state that’s still in RAM, because you’re still working with the same in-memory state, you know? And since the state in memory was modified, you end up seeing the result of that change after you catch the exception.

To simulate your trip to South America (without actually having to pack your bags) do this:

def main():
    try:
        flow1 = FlowMethodPersistence()
        flow1.kickoff()
    except Exception as e:
        flow2 = FlowMethodPersistence()  # No cheating - create a brand new object
        flow2.kickoff(
            inputs={
                "id": flow1.state.id  # Use the same ID to load the persisted state
            }
        )

if __name__ == "__main__":
    main()

And now, you’ll always get Initial counter: 0.

So, yeah, method-level persistence seems to be holding up pretty well to the test when you simulate a true “load from persisted storage” scenario.

So even with class-level persistence, since I’m always working with the same instance of the FlowClassPersistence class, it’s kind of like I cheated.

1 Like

This topic was automatically closed 24 hours after the last reply. New replies are no longer allowed.