Crewai enterprise inputs

Hi, I’m having an issue with getting inputs to show correctly in crewai enterprise. The crew deploys correctly but does not seem to show my input variables correctly. I have tried a number of combinations. It seems to be showing my various crews as input fields rather than my actual input variables (which re stored in a config_competitor_analysis.py file)

see below:

# Configuration for company analysis research
import os
from datetime import datetime

RESEARCH_INPUTS = {
    # Target company details
    "company_name": "Futureworld",
    "ticker_symbol": "",
    "company_website": "https://futureworld.org",
    "industry": "Consulting",
    "specialisation": "CompanyAnalysis",
    "company_name_short": "FW",
    
    # Analysis parameters
    "date": datetime.now().strftime('%d %B %Y'),
    "year": datetime.now().strftime('%Y'),
    "time_period": "2022-2025",
    "focus_areas": [], # not used currently
    "competitor_names": [], # not used currently
    
    # Analysis depth
    "detail_level": "comprehensive",  # can be 'brief', 'standard', or 'comprehensive'
    "audience": "Expert Investors",
}

My main.py file then references this file and passes the input into the various crews within the flow:

I have tried using input = RESEARCH_INPUTS & inputs = RESEARCH_INPUTS

see extract below:

import os
import json
import sys
import warnings
import traceback
from datetime import datetime
from typing import Dict, Any
from dotenv import load_dotenv
load_dotenv()
# import agentops
# agentops.init()
                
from pydantic import BaseModel
from crewai.flow import Flow, start, router,listen, and_, or_
from competitor_analysis.crews.internal_analysis_crew.internal_analysis_crew import InternalAnalysisCrew
from competitor_analysis.crews.external_analysis_crew.external_analysis_crew import ExternalAnalysisCrew
from competitor_analysis.crews.integrated_analysis_crew.integrated_analysis_crew import IntegratedAnalysisCrew
from competitor_analysis.crews.formatting_crew.formatting_crew import FormattingCrew

from competitor_analysis.config_competitor_analysis import RESEARCH_INPUTS

# Models for source internal analysis crew
from competitor_analysis.competitor_analysis_models import (
    CompanyInternalAnalysis,
    CompanyExternalAnalysis,
    CompanyIntegratedAnalysis
)

class CompetitorAnalysisState(BaseModel):
    research_context: dict = None
    internal_analysis: CompanyInternalAnalysis = None
    external_analysis: CompanyExternalAnalysis = None
    integrated_analysis: CompanyIntegratedAnalysis = None
    markdown_report: str = None
    markdown_internal_analysis: str = None
    markdown_external_analysis: str = None
    markdown_integrated_analysis: str = None

class CompetitorAnalysisFlow(Flow[CompetitorAnalysisState]):

    input = RESEARCH_INPUTS


    # @start()
    # def competitor_analysis_flow(self):
    #     """Initial method to check for report"""
    #     print("starting new flow...")
    #     return "start_flow"

    # @router(competitor_analysis_flow)
    # def entry_router(self):
    #     # Assuming a flow has already run, add some additional user urls against which to extract market forces
    #     url_path = os.path.join("Resume_files", "user_urls.json")
    #     if os.path.exists(url_path):
    #         # Read the file content
    #         with open(url_path, "r") as f:
    #             json_content = f.read()
                
    #         # Convert the JSON string directly to a SourceApprovedResults object
    #         from scan_sources.models import SourceApprovedResults
    #         sources_result = SourceApprovedResults.model_validate_json(json_content)
    #         print(f"Loaded user_urls.json as SourceApprovedResults")
            
    #         self.state.source_approved_results = sources_result
    #         return "user_urls_found"

    #     # If no user urls found, check for a saved report to move directly to formatting
    #     else:
    #         report_path = os.path.join("Resume_files", "saved_report.json")
    #         aggregated_market_forces_path = os.path.join("Resume_files", "aggregated_market_forces.json")
    #         implications_report_path = os.path.join("Resume_files", "implications_report.json")
    #         if os.path.exists(report_path):
    #             with open(report_path, "r") as f:
    #                 report_final_content_dict_saved = json.load(f)
    #             with open(implications_report_path, "r") as f:
    #                 implications_report_dict_saved = json.load(f)
    #             print(report_final_content_dict_saved)          
    #             self.state.saved_report = report_final_content_dict_saved
    #             self.state.saved_implications_report = implications_report_dict_saved
    #             self.state.saved_research_context = self.research_inputs
    #             return "report_found"
    #         elif os.path.exists(aggregated_market_forces_path):
    #             with open(aggregated_market_forces_path, "r") as f:
    #                 aggregated_market_forces_content_dict_saved = json.load(f)
    #                 print(aggregated_market_forces_content_dict_saved)          
    #             self.state.aggregated_market_forces = aggregated_market_forces_content_dict_saved
    #             self.state.saved_research_context = self.research_inputs
    #             return "aggregated_market_forces_found"
    #         else:
    #             return "report_not_found"

    # Start the flow from scratch searching for sources
    @start()
    def competitor_analysis_flow(self):
        """Initial method to check for report"""
        print("starting new flow...")
        return "start_flow"

    @listen(competitor_analysis_flow)
    def internal_analysis(self):
        self.state.research_context = self.input
        internal_analysis_inputs = self.input.copy()
        internal_analysis_result = InternalAnalysisCrew().crew().kickoff(inputs=internal_analysis_inputs).pydantic
        self.state.internal_analysis = internal_analysis_result

        return internal_analysis_result

    @listen(competitor_analysis_flow)
    def external_analysis(self):
        self.state.research_context = self.input
        external_analysis_inputs = self.input.copy()
        external_analysis_result = ExternalAnalysisCrew().crew().kickoff(inputs=external_analysis_inputs).pydantic
        self.state.external_analysis = external_analysis_result
        return external_analysis_result
        # pass

    @listen(and_(internal_analysis))
    def integrated_analysis(self):
        self.state.research_context = self.input
        
        # Creates a list of the integrated analysis results
        integrated_analysis_content=[]
        integrated_analysis_content_json=[]
        integrated_analysis_content_dict=[]
        
        # preping the inputs to the crew
        integrated_analysis_inputs = self.input.copy()
        integrated_analysis_inputs["internal_analysis"] = self.state.internal_analysis.model_dump()
        integrated_analysis_inputs["external_analysis"] = self.state.external_analysis.model_dump()
        integrated_analysis_result = IntegratedAnalysisCrew().crew().kickoff(inputs=integrated_analysis_inputs).pydantic
        
        # Appends the results to the list - captures the outputs of the crew
        integrated_analysis_content.append(integrated_analysis_result)
        integrated_analysis_content_json.append(integrated_analysis_result.model_dump_json())
        integrated_analysis_content_dict.append(integrated_analysis_result.model_dump())
        
        # Sets the state to the integrated analysis result
        self.state.integrated_analysis = integrated_analysis_result
        return integrated_analysis_result
        # pass

    # Format the Internal Analysis into markdown from within the normal flow
    @listen(internal_analysis)
    def format_internal_analysis(self):
        self.state.research_context = self.input
        formatting_inputs = self.input.copy()
        formatting_inputs['report_final_content'] = self.state.internal_analysis.model_dump()
        formatting_internal_analysis_result = FormattingCrew().crew().kickoff(inputs=formatting_inputs).raw
        self.state.markdown_internal_analysis = formatting_internal_analysis_result
        print("Final Markdown Internal Analysis:\n")
        print(formatting_internal_analysis_result)
        return formatting_internal_analysis_result

    # Format the External Analysis into markdown from within the normal flow
    @listen(external_analysis)
    def format_external_analysis(self):
        self.state.research_context = self.input
        formatting_inputs = self.input.copy()
        formatting_inputs['report_final_content'] = self.state.external_analysis.model_dump()
        formatting_external_analysis_result = FormattingCrew().crew().kickoff(inputs=formatting_inputs).raw
        self.state.markdown_external_analysis = formatting_external_analysis_result
        print("Final Markdown External Analysis:\n")
        print(formatting_external_analysis_result)
        return formatting_external_analysis_result

    # Format the report into markdown from within the normal flow
    @listen(integrated_analysis)
    def format_report(self):
        self.state.research_context = self.input
        formatting_inputs = self.input.copy()
        formatting_inputs['report_final_content'] = self.state.integrated_analysis.model_dump()
        formatting_result = FormattingCrew().crew().kickoff(inputs=formatting_inputs).raw
        self.state.markdown_report = formatting_result
        print("Final Markdown Report:\n")
        print(formatting_result)
        return formatting_result
        # pass

# # Develop the report from the forces
#     @listen(integrated_analysis_result)
#     def develop_report(self):
#         self.state.research_context = self.research_inputs
#         report_final_content = []
#         report_final_content_json = []
#         report_final_content_dict = []
#         reporting_inputs = self.research_inputs.copy()
#         reporting_inputs['raw_market_forces'] = forces_final_content_dict
#         reporting_result = ReportingCrew().crew().kickoff(reporting_inputs).pydantic
#         report_final_content.append(reporting_result)
#         report_final_content_json.append(reporting_result.model_dump_json())
#         report_final_content_dict.append(reporting_result.model_dump())
#         self.state.report = report_final_content
#         return report_final_content_dict

    # Identify market forces from the sources
    # @listen(or_(identify_sources, identify_sources_from_user_urls))
    # def identify_market_forces(self, sources_result):
    #     import json
    #     from datetime import datetime   

    #     self.state.research_context = self.research_inputs
    #     forces_final_content = []
    #     forces_final_content_dict = []

    #     # Loop through the URLs to extract the sources
    #     for source in sources_result.approved_sources:
    #         # forces_inputs = self.state.research_context.copy()
    #         forces_inputs = self.research_inputs.copy()
    #         forces_inputs['url'] = source.url
    #         forces_inputs['source_type'] = source.source_type
    #         forces_inputs['source_date'] = source.source_date

    #         forces_result = MarketForceExtractionCrew().crew().kickoff(forces_inputs).pydantic
    #         forces_final_content.append(forces_result)
    #         forces_final_content_dict.append(forces_result.model_dump())

    #     self.state.extraction_results = forces_final_content
    #     print(forces_final_content) # this also provides a full view of all of the analysis 
        
    #     # Create an aggregated file and Save the already aggregated results to a JSON file
    #     specialisation = self.research_inputs.get("specialisation", "general")
    #     topic_short = self.research_inputs.get("topic_short", "market_forces")
    #     timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
    #     output_dir = "outputs"
    #     os.makedirs(output_dir, exist_ok=True)
    #     output_filename = os.path.join(
    #         output_dir, 
    #         f'aggregated_market_forces_{specialisation}_{topic_short}_{timestamp}.json'
    #     )
    
    #     with open(output_filename, 'w', encoding='utf-8') as json_file:
    #         json.dump(forces_final_content_dict, json_file, indent=4, ensure_ascii=False)
    
    #     print(f"Aggregated market forces saved to: {output_filename}")
        
    #     return forces_final_content_dict

# # Develop the report from the forces
#     @listen(identify_market_forces)
#     def develop_report(self, forces_final_content_dict):
#         self.state.research_context = self.research_inputs
#         report_final_content = []
#         report_final_content_json = []
#         report_final_content_dict = []
#         reporting_inputs = self.research_inputs.copy()
#         reporting_inputs['raw_market_forces'] = forces_final_content_dict
#         reporting_result = ReportingCrew().crew().kickoff(reporting_inputs).pydantic
#         report_final_content.append(reporting_result)
#         report_final_content_json.append(reporting_result.model_dump_json())
#         report_final_content_dict.append(reporting_result.model_dump())
#         self.state.report = report_final_content
#         return report_final_content_dict

#     @listen("aggregated_market_forces_found")
#     def develop_saved_report(self):
#         self.state.research_context = self.research_inputs
#         report_final_content = []
#         report_final_content_json = []
#         report_final_content_dict = []
#         reporting_inputs = self.research_inputs.copy()
#         reporting_inputs['raw_market_forces'] = self.state.aggregated_market_forces
#         reporting_result = ReportingCrew().crew().kickoff(reporting_inputs).pydantic
#         report_final_content.append(reporting_result)
#         report_final_content_json.append(reporting_result.model_dump_json())
#         report_final_content_dict.append(reporting_result.model_dump())
#         self.state.report = report_final_content
#         return report_final_content_dict

# # Develop Implications Report
#     @listen(identify_market_forces)
#     def develop_implications_report(self, forces_final_content_dict):
#         implications_inputs = self.research_inputs.copy()
#         implications_inputs['raw_market_forces'] = forces_final_content_dict
#         implications_result = ImplicationsCrew().crew().kickoff(implications_inputs).pydantic
#         self.state.implications_report = implications_result
#         return implications_result

#     @listen("aggregated_market_forces_found")
#     def develop_saved_implications_report(self):
#         implications_inputs = self.research_inputs.copy()
#         implications_inputs['raw_market_forces'] = self.state.aggregated_market_forces
#         implications_result = ImplicationsCrew().crew().kickoff(implications_inputs).pydantic
#         self.state.implications_report = implications_result
#         return implications_result

#     @listen(and_(develop_report, identify_market_forces, identify_sources))
#     def print_outputs(self):
#         print("=== Sources Result ===\n", self.state.source_approved_results, "\n")
#         print("=== Forces Final Content ===\n", self.state.extraction_results, "\n")
#         print("=== Reporting Result ===\n", self.state.report, "\n")

#     # Format the report into markdown from within the normal flow
#     @listen(or_(develop_report, develop_saved_report))
#     def format_report(self, report_final_content_dict):
#         self.state.research_context = self.research_inputs
#         formatting_inputs = self.research_inputs.copy()
#         formatting_inputs['report_final_content'] = report_final_content_dict
#         formatting_inputs['implications_report_content'] = self.state.implications_report
#         formatting_result = FormattingCrew().crew().kickoff(inputs=formatting_inputs).raw
#         self.state.markdown_report = formatting_result
#         print("Final Markdown Report:\n")
#         print(formatting_result)
#         return formatting_result

#     # Format the report into markdown from a saved report
#     @listen("report_found")
#     def format_saved_report(self):
#         self.state.saved_research_context = self.research_inputs
#         formatting_inputs_from_saved = self.research_inputs.copy()
#         formatting_inputs_from_saved['report_final_content'] = [self.state.saved_report]
#         formatting_inputs_from_saved['implications_report_content'] = [self.state.saved_implications_report]
#         formatting_result_from_saved = FormattingCrew().crew().kickoff(inputs=formatting_inputs_from_saved).raw
#         self.state.markdown_report_from_saved = formatting_result_from_saved
#         print("Final Markdown Report:\n")
#         print(formatting_result_from_saved)
#         return formatting_result_from_saved

# def kickoff_comp():
#     competitor_analysis_flow = CompetitorAnalysisFlow()
#     competitor_analysis_flow.kickoff_comp()

# def plot_comp():
#     competitor_analysis_flow = CompetitorAnalysisFlow()
#     competitor_analysis_flow.plot_comp()

# if __name__ == "__main__":
#     kickoff_comp()

def load_frontend_config():
    """Load configuration from frontend temporary file if it exists."""
    frontend_config_path = os.path.join(os.getcwd(), "temp_competitor_config.json")
    if os.path.exists(frontend_config_path):
        try:
            with open(frontend_config_path, "r") as f:
                frontend_config = json.load(f)
                
            # Update the global RESEARCH_INPUTS with frontend values
            global RESEARCH_INPUTS
            for key, value in frontend_config.items():
                if key in RESEARCH_INPUTS:
                    RESEARCH_INPUTS[key] = value
                    
            print(f"Loaded configuration from frontend: {frontend_config_path}")
            return True
        except Exception as e:
            print(f"Error loading frontend configuration: {e}")
    return False

def kickoff():
    # Try to load frontend configuration
    load_frontend_config()
    
    competitor_analysis_flow = CompetitorAnalysisFlow()
    competitor_analysis_flow.kickoff()
    return competitor_analysis_flow

def plot():
    competitor_analysis_flow = CompetitorAnalysisFlow()
    competitor_analysis_flow.plot()

if __name__ == "__main__":
    kickoff()