Batch Processing Best Practice

Right now I’m designing a pipeline/workflow using CrewAI flows with sub-flows and crews.

My workflow takes in a list of elements as its input, a list of json objects for example, performs the pipeline/workflow on each input and then returns the complete results.

My question is: What is the best practice for when to split the inputs and gather the results. In my immediate example, do I create the flow to act on 1 input and have my runner (fastapi in this case) kick off a new instance of the crew for each input and then gather the results of all the flow instances into 1 result? Or, do I have the workflow (in this case CrewAI flows) have the looping mechanism internal to process and then return which would mean 1 outer flow kickoff with a list as input.

I can see there being a “it depends” answer based on complicated state or expensive to create objects, but I feel like this is a pretty common use case, so there’s probably some great common wisdom I should draw from.

As far as I know, there’s no Flow.kickoff_for_each() or Flow.kickoff_for_each_async() method. So, you can simply implement your own batch processing (either sequential or parallel). Personally, I’d prefer to implement it using a ThreadPool.

Take a look at how CrewAI itself defines its Crew.kickoff_async() (in crewai/crew.py):

async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput:
    """Asynchronous kickoff method to start the crew execution."""
    return await asyncio.to_thread(self.kickoff, inputs)

And asyncio.to_thread is nothing more than a ThreadPoolExecutor over which you have no control. So, I think it’s much better to have control over your batch processing yourself (allowing you to set limits on how many threads are active at the same time to avoid excessive requests to your LLM). Also, call me old-fashioned, but I find the mental model of ThreadPools much more intelligible than all the asyncio machinery, and our home processors have more than enough power to run plenty of threads.

As for processing one input per Flow, well, that’s an optimization question and will fundamentally depend on the size (and complexity) of each input for your chosen LLM. It’s known that LLMs suffer from what has been dubbed Context Rot, so the processing capability (in terms of quality) degrades rapidly as the context size (prompt) increases. You’ll have to experiment and find a trade-off between maximizing the processed quantity and the processing quality.

In that case, my advice is to gradually increase the number of inputs for each call. Pass a list of inputs and give each one a unique ID (so the LLM can use this identifier as a key), ask the LLM to perform the processing, and return a list like: [{input_id: "A1B2C3", result: "I hate humans..."}, {...}, {...}]. This way, you can keep track of each input-output pair and can process multiple inputs in the same call. Combining this with the multiprocessing approach mentioned above, you could get some very interesting results.

I hope these thoughts help. Good luck.

Thanks - I appreciate your response. The input will be a list of audit logs, 1 log event per line. They are completely isolated and each will have their own retrieved context. I absolutely do not want the LLM to ever get more than 1 input at a time, I have not had good results doing that ever. I’m using CrewAI with flows because I want to have a lot more control over the workflow control and state because I’ve been burned before.

You’re right, there’s no for-each kickoff for flows like there is for crews. I want to avoid creating/destroying objects that have high resource or time needs like instantiating a chromadb client - so having one instance be used by more than 1 input makes sense to me, they’ll have distinct queries and a handful of shared collections.

1 Like