Hi Crew ai Team,
My question is that for my project i am saving everything from the UI, for crews and flow, i am able to create agent, task and crew instance, but for the flow you supported decorator based approach which is not worked if i have save every detail in db. apart from decorator is there any way i can use the class instances for CrewBase, listen and start like i am able to do for Agent, Task and Crew, hope you understand my question. I dont want to utilize decorators as everything is dynamic in the project.
e.g from crewai import Agent, Crew, Task, Process
from crewai.project import CrewBase, agent, task, crew, before_kickoff, after_kickoff
@CrewBase
class MyCrew:
agents_config = {…} # or define agents directly
tasks_config = {…} # or define tasks directly
@before_kickoff
def prepare_inputs(self, inputs):
# Modify inputs before kickoff
inputs["timestamp"] = "2025-02-22T21:00:00Z"
return inputs
@after_kickoff
def process_output(self, output):
# Modify output after the crew finishes
output.raw += "\nProcessed after kickoff."
return output
@agent
def my_agent(self) -> Agent:
return Agent(
role="Example Agent",
goal="Perform a specific task",
backstory="An agent with specialized skills.",
verbose=True
)
@task
def my_task(self) -> Task:
return Task(
description="Collect and analyze data.",
expected_output="Data analysis results.",
agent=self.my_agent()
)
@crew
def crew(self) -> Crew:
# Automatically collects agents and tasks defined above
return Crew(
agents=self.agents, # gathered from the @agent methods
tasks=self.tasks, # gathered from the @task methods
process=Process.sequential
)
CrewBase is the decorator, but for dynamically initialization i need instance of CrewBase, also see below these are decorators, #!/usr/bin/env python
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from test_flow.crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = “”
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = (
PoemCrew()
.crew()
.kickoff(inputs={"sentence_count": self.state.sentence_count})
)
print("Poem generated", result.raw)
self.state.poem = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)