From 3140b69fbe6e2732c8a0c33ce0e5ac878f1d95f7 Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sat, 1 Feb 2025 19:32:31 -0500 Subject: [PATCH] [7.0.4] [FEAT] [ModelRouter + docs] [docs fix] [docs][env docs] --- .env.example | 48 +- .../deepseek_r1.py => deepseek_r1.py | 2 +- docs/mkdocs.yml | 3 +- docs/swarms/install/env.md | 187 +++++ docs/swarms/structs/model_router.md | 361 ++++++++++ model_router_example.py | 11 + .../hs_examples/hierarchical_swarm_example.py | 130 ++++ .../hs_examples/hs_stock_team.py | 219 ++++++ o3_mini.py | 9 + pyproject.toml | 2 +- swarms/structs/__init__.py | 2 + swarms/structs/agent.py | 4 +- swarms/structs/auto_swarm_builder.py | 4 +- swarms/structs/hiearchical_swarm.py | 636 +++++------------- .../structs/meme_agent_persona_generator.py | 139 +--- swarms/structs/model_router.py | 376 +++++++++++ swarms/structs/multi_agent_orchestrator.py | 86 +-- swarms/structs/swarm_builder.py | 103 +-- swarms/telemetry/bootup.py | 3 + swarms/utils/function_caller_model.py | 143 ++++ swarms/utils/litellm_tokenizer.py | 5 +- swarms/utils/litellm_wrapper.py | 15 +- swarms/utils/pdf_to_text.py | 4 +- 23 files changed, 1694 insertions(+), 798 deletions(-) rename new_features_examples/deepseek_r1.py => deepseek_r1.py (89%) create mode 100644 docs/swarms/install/env.md create mode 100644 docs/swarms/structs/model_router.md create mode 100644 model_router_example.py create mode 100644 new_features_examples/hs_examples/hierarchical_swarm_example.py create mode 100644 new_features_examples/hs_examples/hs_stock_team.py create mode 100644 o3_mini.py create mode 100644 swarms/structs/model_router.py create mode 100644 swarms/utils/function_caller_model.py diff --git a/.env.example b/.env.example index c6e429ec9..e933d976b 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,48 @@ +# Framework Configuration WORKSPACE_DIR="agent_workspace" +SWARMS_VERBOSE_GLOBAL="False" SWARMS_API_KEY="" -USE_TELEMETRY=True -OPENAI_API_KEY="sk-" + +# Model Provider API Keys +## OpenAI +OPENAI_API_KEY="" + +## Anthropic +ANTHROPIC_API_KEY="" + +## Google +GEMINI_API_KEY="" + +## Hugging Face +HUGGINGFACE_TOKEN="" + +## Perplexity AI +PPLX_API_KEY="" + +## AI21 +AI21_API_KEY="" + +# Tool Provider API Keys +## Search Tools +BING_BROWSER_API="" +BRAVESEARCH_API_KEY="" +TAVILY_API_KEY="" +YOU_API_KEY="" + +## Analytics & Monitoring +AGENTOPS_API_KEY="" +EXA_API_KEY="" + +## Browser Automation +MULTION_API_KEY="" + +## Other Tools +HCP_APP_ID="" + +# Cloud Provider Configuration +## Azure OpenAI +AZURE_OPENAI_ENDPOINT="" +AZURE_OPENAI_DEPLOYMENT="" +OPENAI_API_VERSION="" +AZURE_OPENAI_API_KEY="" +AZURE_OPENAI_AD_TOKEN="" diff --git a/new_features_examples/deepseek_r1.py b/deepseek_r1.py similarity index 89% rename from new_features_examples/deepseek_r1.py rename to deepseek_r1.py index a871d1b3d..0ae8cd4db 100644 --- a/new_features_examples/deepseek_r1.py +++ b/deepseek_r1.py @@ -5,5 +5,5 @@ model_name="deepseek/deepseek-reasoner", max_loops="auto", interactive=True, - streaming_on=True, + streaming_on=False, ).run("What are 5 hft algorithms") diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index d80e8f309..abd1fcc3d 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -149,6 +149,7 @@ nav: - Quickstart: "swarms/install/quickstart.md" - Swarms CLI: "swarms/cli/main.md" - Swarms Framework Architecture: "swarms/concept/framework_architecture.md" + - Environment Variables: "swarms/install/env.md" # - Prelimary: # - 80/20 Rule For Agents: "swarms/prompting/8020.md" - Agents: @@ -189,6 +190,7 @@ nav: - SwarmRearrange: "swarms/structs/swarm_rearrange.md" - MultiAgentRouter: "swarms/structs/multi_agent_router.md" - MatrixSwarm: "swarms/structs/matrix_swarm.md" + - ModelRouter: "swarms/structs/model_router.md" - Various Execution Methods: "swarms/structs/various_execution_methods.md" - Workflows: - ConcurrentWorkflow: "swarms/structs/concurrentworkflow.md" @@ -196,7 +198,6 @@ nav: - SequentialWorkflow: "swarms/structs/sequential_workflow.md" - Structs: - Conversation: "swarms/structs/conversation.md" - # - Task: "swarms/structs/task.md" - Full API Reference: "swarms/framework/reference.md" - Examples: - Unique Swarms: "swarms/examples/unique_swarms.md" diff --git a/docs/swarms/install/env.md b/docs/swarms/install/env.md new file mode 100644 index 000000000..689a5d933 --- /dev/null +++ b/docs/swarms/install/env.md @@ -0,0 +1,187 @@ +# Environment Variable Management & Security + +This guide provides comprehensive documentation for managing environment variables and API keys securely in the Swarms framework. + +## Overview + +Swarms uses environment variables for configuration management and secure credential storage. This approach keeps sensitive information like API keys out of your code and allows for easy configuration changes across different environments. + +## Core Environment Variables + +### Framework Configuration + +- `SWARMS_VERBOSE_GLOBAL`: Controls global logging verbosity + ```bash + SWARMS_VERBOSE_GLOBAL="True" # Enable verbose logging + SWARMS_VERBOSE_GLOBAL="False" # Disable verbose logging + ``` + +- `WORKSPACE_DIR`: Defines the workspace directory for agent operations + ```bash + WORKSPACE_DIR="agent_workspace" + ``` + +### API Keys + +#### Model Provider Keys + +1. **OpenAI** + - `OPENAI_API_KEY`: Authentication for GPT models + ```bash + OPENAI_API_KEY="your-openai-key" + ``` + +2. **Anthropic** + - `ANTHROPIC_API_KEY`: Authentication for Claude models + ```bash + ANTHROPIC_API_KEY="your-anthropic-key" + ``` + +3. **Google** + - `GEMINI_API_KEY`: Authentication for Gemini models + +4. **Hugging Face** + - `HUGGINGFACE_TOKEN`: Access to Hugging Face models + +5. **Perplexity AI** + - `PPLX_API_KEY`: Access to Perplexity models + +6. **AI21** + - `AI21_API_KEY`: Access to AI21 models + +#### Tool Provider Keys + +1. **Search Tools** + - `BING_BROWSER_API`: Bing search capabilities + - `BRAVESEARCH_API_KEY`: Brave search integration + - `TAVILY_API_KEY`: Tavily search services + - `YOU_API_KEY`: You.com search integration + +2. **Analytics & Monitoring** + - `AGENTOPS_API_KEY`: AgentOps monitoring + - `EXA_API_KEY`: Exa.ai services + +3. **Browser Automation** + - `MULTION_API_KEY`: Multi-browser automation + + +## Security Best Practices + +### 1. Environment File Management + +- Create a `.env` file in your project root +- Never commit `.env` files to version control +- Add `.env` to your `.gitignore`: + ```bash + echo ".env" >> .gitignore + ``` + +### 2. API Key Security + +- Rotate API keys regularly +- Use different API keys for development and production +- Never hardcode API keys in your code +- Limit API key permissions to only what's necessary +- Monitor API key usage for unusual patterns + +### 3. Template Configuration + +Create a `.env.example` template without actual values: + +```bash +# Required Configuration +OPENAI_API_KEY="" +ANTHROPIC_API_KEY="" +WORKSPACE_DIR="agent_workspace" + +# Optional Configuration +SWARMS_VERBOSE_GLOBAL="False" +``` + +### 4. Loading Environment Variables + +```python +from dotenv import load_dotenv +import os + +# Load environment variables +load_dotenv() + +# Access variables +workspace_dir = os.getenv("WORKSPACE_DIR") +openai_key = os.getenv("OPENAI_API_KEY") +``` + +## Environment Setup Guide + +1. **Install Dependencies**: + ```bash + pip install python-dotenv + ``` + +2. **Create Environment File**: + ```bash + cp .env.example .env + ``` + +3. **Configure Variables**: + - Open `.env` in your text editor + - Add your API keys and configuration + - Save the file + +4. **Verify Setup**: + ```python + import os + from dotenv import load_dotenv + + load_dotenv() + assert os.getenv("OPENAI_API_KEY") is not None, "OpenAI API key not found" + ``` + +## Environment-Specific Configuration + +### Development + +```bash +WORKSPACE_DIR="agent_workspace" +SWARMS_VERBOSE_GLOBAL="True" +``` + +### Production + +```bash +WORKSPACE_DIR="/var/swarms/workspace" +SWARMS_VERBOSE_GLOBAL="False" +``` + +### Testing + +```bash +WORKSPACE_DIR="test_workspace" +SWARMS_VERBOSE_GLOBAL="True" +``` + +## Troubleshooting + +### Common Issues + +1. **Environment Variables Not Loading** + - Verify `.env` file exists in project root + - Confirm `load_dotenv()` is called before accessing variables + - Check file permissions + +2. **API Key Issues** + - Verify key format is correct + - Ensure key has not expired + - Check for leading/trailing whitespace + +3. **Workspace Directory Problems** + - Confirm directory exists + - Verify write permissions + - Check path is absolute when required + +## Additional Resources + +- [Swarms Documentation](https://docs.swarms.world) +- [Security Best Practices](https://swarms.world/security) +- [API Documentation](https://swarms.world/docs/api) diff --git a/docs/swarms/structs/model_router.md b/docs/swarms/structs/model_router.md new file mode 100644 index 000000000..11526d8c6 --- /dev/null +++ b/docs/swarms/structs/model_router.md @@ -0,0 +1,361 @@ +# ModelRouter Docs + +The ModelRouter is an intelligent routing system that automatically selects and executes AI models based on task requirements. It leverages a function-calling architecture to analyze tasks and recommend the optimal model and provider combination for each specific use case. + + + + + +### Key Features + +- Dynamic model selection based on task complexity and requirements +- Multi-provider support (OpenAI, Anthropic, Google, etc.) +- Concurrent and asynchronous execution capabilities +- Batch processing with memory +- Automatic error handling and retries +- Provider-aware routing +- Cost optimization + +### Constructor Arguments + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| system_prompt | str | model_router_system_prompt | Custom prompt for guiding model selection behavior | +| max_tokens | int | 4000 | Maximum token limit for model outputs | +| temperature | float | 0.5 | Control parameter for response randomness (0.0-1.0) | +| max_workers | int/str | 10 | Maximum concurrent workers ("auto" for CPU count) | +| api_key | str | None | API key for model access | +| max_loops | int | 1 | Maximum number of refinement iterations | +| *args | Any | None | Additional positional arguments | +| **kwargs | Any | None | Additional keyword arguments | + +### Core Methods + +#### run(task: str) -> str + +Executes a single task through the model router with memory and refinement capabilities. + +# Installation + +1. Install the latest version of swarms using pip: + +```bash +pip3 install -U swarms +``` + +2. Setup your API Keys in your .env file with the following: + +```bash +OPENAI_API_KEY=your_openai_api_key +ANTHROPIC_API_KEY=your_anthropic_api_key +GOOGLE_API_KEY=your_google_api_key +# Add more API keys as needed following litellm format +``` + + +```python +from swarms import ModelRouter + +router = ModelRouter() + +# Simple text analysis +result = router.run("Analyze the sentiment and key themes in this customer feedback") + +# Complex reasoning task +complex_result = router.run(""" +Evaluate the following business proposal: +- Initial investment: $500,000 +- Projected ROI: 25% annually +- Market size: $2B +- Competition: 3 major players +Provide detailed analysis and recommendations. +""") +``` + +#### batch_run(tasks: list) -> list +Executes multiple tasks sequentially with result aggregation. + +```python +# Multiple analysis tasks +tasks = [ + "Analyze Q1 financial performance", + "Predict Q2 market trends", + "Evaluate competitor strategies", + "Generate growth recommendations" +] + +results = router.batch_run(tasks) + +# Process results +for task, result in zip(tasks, results): + print(f"Task: {task}\nResult: {result}\n") +``` + +#### concurrent_run(tasks: list) -> list +Parallel execution of multiple tasks using thread pooling. + +```python +import asyncio +from typing import List + +# Define multiple concurrent tasks +analysis_tasks = [ + "Perform technical analysis of AAPL stock", + "Analyze market sentiment from social media", + "Generate trading signals", + "Calculate risk metrics" +] + +# Execute tasks concurrently +results = router.concurrent_run(analysis_tasks) + +# Process results with error handling +for task, result in zip(analysis_tasks, results): + try: + processed_result = process_analysis(result) + save_to_database(processed_result) + except Exception as e: + log_error(f"Error processing {task}: {str(e)}") +``` + +#### async_run(task: str) -> asyncio.Task +Asynchronous task execution with coroutine support. + +```python +async def process_data_stream(): + tasks = [] + async for data in data_stream: + task = await router.async_run(f"Process data: {data}") + tasks.append(task) + + results = await asyncio.gather(*tasks) + return results + +# Usage in async context +async def main(): + router = ModelRouter() + results = await process_data_stream() +``` + +### Advanced Usage Examples + +#### Financial Analysis System + +```python +from swarms import ModelRouter +from typing import Dict, List +import pandas as pd + +class FinancialAnalysisSystem: + def __init__(self): + self.router = ModelRouter( + temperature=0.3, # Lower temperature for more deterministic outputs + max_tokens=8000, # Higher token limit for detailed analysis + max_loops=2 # Allow for refinement iteration + ) + + def analyze_company_financials(self, financial_data: Dict) -> Dict: + analysis_task = f""" + Perform comprehensive financial analysis: + + Financial Metrics: + - Revenue: ${financial_data['revenue']}M + - EBITDA: ${financial_data['ebitda']}M + - Debt/Equity: {financial_data['debt_equity']} + - Working Capital: ${financial_data['working_capital']}M + + Required Analysis: + 1. Profitability assessment + 2. Liquidity analysis + 3. Growth projections + 4. Risk evaluation + 5. Investment recommendations + + Provide detailed insights and actionable recommendations. + """ + + result = self.router.run(analysis_task) + return self._parse_analysis_result(result) + + def _parse_analysis_result(self, result: str) -> Dict: + # Implementation of result parsing + pass + +# Usage +analyzer = FinancialAnalysisSystem() +company_data = { + 'revenue': 150, + 'ebitda': 45, + 'debt_equity': 0.8, + 'working_capital': 25 +} + +analysis = analyzer.analyze_company_financials(company_data) +``` + +#### Healthcare Data Processing Pipeline + +```python +from swarms import ModelRouter +import pandas as pd +from typing import List, Dict + +class MedicalDataProcessor: + def __init__(self): + self.router = ModelRouter( + max_workers="auto", # Automatic worker scaling + temperature=0.2, # Conservative temperature for medical analysis + system_prompt="""You are a specialized medical data analyzer focused on: + 1. Clinical terminology interpretation + 2. Patient data analysis + 3. Treatment recommendation review + 4. Medical research synthesis""" + ) + + async def process_patient_records(self, records: List[Dict]) -> List[Dict]: + analysis_tasks = [] + + for record in records: + task = f""" + Analyze patient record: + - Age: {record['age']} + - Symptoms: {', '.join(record['symptoms'])} + - Vital Signs: {record['vitals']} + - Medications: {', '.join(record['medications'])} + - Lab Results: {record['lab_results']} + + Provide: + 1. Symptom analysis + 2. Medication interaction check + 3. Lab results interpretation + 4. Treatment recommendations + """ + analysis_tasks.append(task) + + results = await asyncio.gather(*[ + self.router.async_run(task) for task in analysis_tasks + ]) + + return [self._parse_medical_analysis(r) for r in results] + + def _parse_medical_analysis(self, analysis: str) -> Dict: + # Implementation of medical analysis parsing + pass + +# Usage +async def main(): + processor = MedicalDataProcessor() + patient_records = [ + { + 'age': 45, + 'symptoms': ['fever', 'cough', 'fatigue'], + 'vitals': {'bp': '120/80', 'temp': '38.5C'}, + 'medications': ['lisinopril', 'metformin'], + 'lab_results': 'WBC: 11,000, CRP: 2.5' + } + # More records... + ] + + analyses = await processor.process_patient_records(patient_records) +``` + +#### Natural Language Processing Pipeline + +```python +from swarms import ModelRouter +from typing import List, Dict +import asyncio + +class NLPPipeline: + def __init__(self): + self.router = ModelRouter( + temperature=0.4, + max_loops=2 + ) + + def process_documents(self, documents: List[str]) -> List[Dict]: + tasks = [self._create_nlp_task(doc) for doc in documents] + results = self.router.concurrent_run(tasks) + return [self._parse_nlp_result(r) for r in results] + + def _create_nlp_task(self, document: str) -> str: + return f""" + Perform comprehensive NLP analysis: + + Text: {document} + + Required Analysis: + 1. Entity recognition + 2. Sentiment analysis + 3. Topic classification + 4. Key phrase extraction + 5. Intent detection + + Provide structured analysis with confidence scores. + """ + + def _parse_nlp_result(self, result: str) -> Dict: + # Implementation of NLP result parsing + pass + +# Usage +pipeline = NLPPipeline() +documents = [ + "We're extremely satisfied with the new product features!", + "The customer service response time needs improvement.", + "Looking to upgrade our subscription plan next month." +] + +analyses = pipeline.process_documents(documents) +``` + +### Available Models and Use Cases + +| Model | Provider | Optimal Use Cases | Characteristics | +|-------|----------|-------------------|-----------------| +| gpt-4-turbo | OpenAI | Complex reasoning, Code generation, Creative writing | High accuracy, Latest knowledge cutoff | +| claude-3-opus | Anthropic | Research analysis, Technical documentation, Long-form content | Strong reasoning, Detailed outputs | +| gemini-pro | Google | Multimodal tasks, Code generation, Technical analysis | Fast inference, Strong coding abilities | +| mistral-large | Mistral | General tasks, Content generation, Classification | Open source, Good price/performance | +| deepseek-reasoner | DeepSeek | Mathematical analysis, Logic problems, Scientific computing | Specialized reasoning capabilities | + +### Provider Capabilities + +| Provider | Strengths | Best For | Integration Notes | +|----------|-----------|-----------|------------------| +| OpenAI | Consistent performance, Strong reasoning | Production systems, Complex tasks | Requires API key setup | +| Anthropic | Safety features, Detailed analysis | Research, Technical writing | Claude-specific formatting | +| Google | Technical tasks, Multimodal support | Code generation, Analysis | Vertex AI integration available | +| Groq | High-speed inference | Real-time applications | Optimized for specific models | +| DeepSeek | Specialized reasoning | Scientific computing | Custom API integration | +| Mistral | Open source flexibility | General applications | Self-hosted options available | + + +### Performance Optimization Tips + +1. Token Management + - Set appropriate max_tokens based on task complexity + - Monitor token usage for cost optimization + - Use streaming for long outputs + +2. Concurrency Settings + - Adjust max_workers based on system resources + - Use "auto" workers for optimal CPU utilization + - Monitor memory usage with large batch sizes + +3. Temperature Tuning + - Lower (0.1-0.3) for factual/analytical tasks + - Higher (0.7-0.9) for creative tasks + - Mid-range (0.4-0.6) for balanced outputs + +4. System Prompts + - Customize for specific domains + - Include relevant context + - Define clear output formats + +### Dependencies + +- asyncio: Asynchronous I/O support +- concurrent.futures: Thread pool execution +- pydantic: Data validation +- litellm: LLM interface standardization diff --git a/model_router_example.py b/model_router_example.py new file mode 100644 index 000000000..644ae237c --- /dev/null +++ b/model_router_example.py @@ -0,0 +1,11 @@ +import os +from swarms.structs.model_router import ModelRouter +from dotenv import load_dotenv + +load_dotenv() + +model_router = ModelRouter(api_key=os.getenv("OPENAI_API_KEY")) + +model_router.run( + "What are the best ways to analyze macroeconomic data? Use openai gpt-4o models" +) diff --git a/new_features_examples/hs_examples/hierarchical_swarm_example.py b/new_features_examples/hs_examples/hierarchical_swarm_example.py new file mode 100644 index 000000000..cd9fd167e --- /dev/null +++ b/new_features_examples/hs_examples/hierarchical_swarm_example.py @@ -0,0 +1,130 @@ +import os +from dotenv import load_dotenv + +# Swarm imports +from swarms.structs.agent import Agent +from swarms.structs.hiearchical_swarm import ( + HierarchicalSwarm, + SwarmSpec, +) +from swarms.utils.function_caller_model import OpenAIFunctionCaller + +load_dotenv() + + +# ------------------------------------------------------------------------------ +# Director LLM: Responsible for orchestrating tasks among the agents +# ------------------------------------------------------------------------------ +llm = OpenAIFunctionCaller( + base_model=SwarmSpec, + api_key=os.getenv("OPENAI_API_KEY"), + system_prompt=( + "As the Director of this Hierarchical Agent Swarm, you are in charge of " + "coordinating and overseeing all tasks, ensuring that each is executed " + "efficiently and effectively by the appropriate agents. You must:\n\n" + "1. **Analyze** the user's request and **formulate** a strategic plan.\n" + "2. **Assign** tasks to the relevant agents, detailing **why** each task " + "is relevant and **what** is expected in the deliverables.\n" + "3. **Monitor** agent outputs and, if necessary, provide **constructive " + "feedback** or request **clarifications**.\n" + "4. **Iterate** this process until all tasks are completed to a high " + "standard, or until the swarm has reached the maximum feedback loops.\n\n" + "Remember:\n" + "- **Only** use the agents provided; do not invent extra roles.\n" + "- If you need additional information, request it from the user.\n" + "- Strive to produce a clear, comprehensive **final output** that addresses " + "the user's needs.\n" + "- Keep the tone **professional** and **informative**. If there's uncertainty, " + "politely request further details.\n" + "- Ensure that any steps you outline are **actionable**, **logical**, and " + "**transparent** to the user.\n\n" + "Your effectiveness hinges on clarity, structured delegation, and thoroughness. " + "Always focus on delivering the best possible outcome for the user's request." + ), + temperature=0.5, + max_tokens=8196, +) + + +def main(): + # -------------------------------------------------------------------------- + # Agent: Stock-Analysis-Agent + # -------------------------------------------------------------------------- + # This agent is responsible for: + # - Gathering and interpreting financial data + # - Identifying market trends and patterns + # - Providing clear, actionable insights or recommendations + # -------------------------------------------------------------------------- + analysis_agent = Agent( + agent_name="Stock-Analysis-Agent", + model_name="gpt-4o", + max_loops=1, + interactive=False, + streaming_on=False, + system_prompt=( + "As the Stock Analysis Agent, your primary responsibilities include:\n\n" + "1. **Market Trend Analysis**: Evaluate current and historical market data " + "to identify trends, patterns, and potential investment opportunities.\n" + "2. **Risk & Opportunity Assessment**: Pinpoint specific factors—whether " + "macroeconomic indicators, sector-specific trends, or company fundamentals—" + "that can guide informed investment decisions.\n" + "3. **Reporting & Recommendations**: Present your findings in a structured, " + "easy-to-understand format, offering actionable insights. Include potential " + "caveats or uncertainties in your assessment.\n\n" + "Operational Guidelines:\n" + "- If additional data or clarifications are needed, explicitly request them " + "from the Director.\n" + "- Keep your output **concise** yet **comprehensive**. Provide clear " + "rationales for each recommendation.\n" + "- Clearly state any **assumptions** or **limitations** in your analysis.\n" + "- Remember: You are not a financial advisor, and final decisions rest with " + "the user. Include necessary disclaimers.\n\n" + "Goal:\n" + "Deliver high-quality, well-substantiated stock market insights that can be " + "used to guide strategic investment decisions." + ), + ) + + # -------------------------------------------------------------------------- + # Hierarchical Swarm Setup + # -------------------------------------------------------------------------- + # - Director: llm + # - Agents: [analysis_agent] + # - max_loops: Maximum number of feedback loops between director & agents + # -------------------------------------------------------------------------- + swarm = HierarchicalSwarm( + name="HierarchicalAgentSwarm", + description=( + "A specialized swarm in which the Director delegates tasks to a Stock " + "Analysis Agent for thorough market evaluation." + ), + director=llm, + agents=[analysis_agent], + max_loops=2, # Limit on feedback iterations + ) + + # -------------------------------------------------------------------------- + # Execution + # -------------------------------------------------------------------------- + # The director receives the user's instruction: "Ask the stock analysis agent + # to analyze the stock market." The Director will then: + # 1. Formulate tasks (SwarmSpec) + # 2. Assign tasks to the Stock-Analysis-Agent + # 3. Provide feedback and/or request clarifications + # 4. Produce a final response + # -------------------------------------------------------------------------- + user_request = ( + "Please provide an in-depth analysis of the current stock market, " + "focusing on:\n" + "- Key macroeconomic factors affecting market momentum.\n" + "- Potential short-term vs. long-term opportunities.\n" + "- Sector performance trends (e.g., technology, healthcare, energy).\n" + "Highlight any risks, disclaimers, or uncertainties." + ) + + # Run the swarm with the user_request + print(swarm.run(user_request)) + + +if __name__ == "__main__": + main() diff --git a/new_features_examples/hs_examples/hs_stock_team.py b/new_features_examples/hs_examples/hs_stock_team.py new file mode 100644 index 000000000..d4cbe7635 --- /dev/null +++ b/new_features_examples/hs_examples/hs_stock_team.py @@ -0,0 +1,219 @@ +import os +from dotenv import load_dotenv + +# Swarm imports +from swarms.structs.agent import Agent +from swarms.structs.hiearchical_swarm import ( + HierarchicalSwarm, + SwarmSpec, +) +from swarms.utils.function_caller_model import OpenAIFunctionCaller + +load_dotenv() + +# ------------------------------------------------------------------------------ +# Trading Director: Responsible for orchestrating tasks among multiple stock analysts +# ------------------------------------------------------------------------------ +director_llm = OpenAIFunctionCaller( + base_model=SwarmSpec, + api_key=os.getenv("OPENAI_API_KEY"), + system_prompt=( + "You are the Trading Director in charge of coordinating a team of specialized " + "Stock Analysts. Your responsibilities include:\n\n" + "1. **Analyze** the user's request and **break it down** into actionable tasks.\n" + "2. **Assign** tasks to the relevant analysts, explaining **why** each task is " + "important and **what** each analyst should deliver.\n" + "3. **Review** all analyst outputs, providing **feedback** or **clarifications** " + "to ensure thoroughness and accuracy.\n" + "4. **Consolidate** final insights into a cohesive, actionable, and " + "easy-to-understand response for the user.\n\n" + "Guidelines:\n" + "- You can only delegate to the analysts assigned to this swarm.\n" + "- If essential data or clarifications are needed, request them from the user.\n" + "- Be direct, structured, and analytical. Present each key point clearly.\n" + "- Strive for a polished **final output** that addresses the user's request.\n" + "- If uncertainties remain, politely highlight them or request more info.\n\n" + "Overarching Goal:\n" + "Maximize the value of insights provided to the user by thoroughly leveraging " + "each analyst’s specialization, while maintaining a professional and " + "transparent communication style." + ), + temperature=0.5, + max_tokens=8196, +) + + +def main(): + # -------------------------------------------------------------------------- + # Agent 1: Macro-Economic-Analysis-Agent + # -------------------------------------------------------------------------- + # Focus: Assess macroeconomic factors like inflation, interest rates, GDP growth, etc. + # -------------------------------------------------------------------------- + macro_agent = Agent( + agent_name="Macro-Economic-Analysis-Agent", + model_name="gpt-4o", + max_loops=1, + interactive=False, + streaming_on=False, + system_prompt=( + "As the Macro-Economic Analysis Agent, your mission is to:\n\n" + "1. **Identify** the key macroeconomic indicators impacting the market.\n" + "2. **Interpret** how factors like inflation, interest rates, and fiscal " + "policies influence market sentiment.\n" + "3. **Connect** these insights to specific investment opportunities or " + "risks across various sectors.\n\n" + "Guidelines:\n" + "- Provide clear, data-driven rationales.\n" + "- Highlight potential global events or policy decisions that may shift " + "market conditions.\n" + "- Request further details if needed, and state any assumptions or " + "limitations.\n\n" + "Outcome:\n" + "Deliver a concise but thorough macroeconomic overview that the Trading " + "Director can combine with other analyses to inform strategy." + ), + ) + + # -------------------------------------------------------------------------- + # Agent 2: Sector-Performance-Analysis-Agent + # -------------------------------------------------------------------------- + # Focus: Drill down into sector-level trends, e.g., technology, healthcare, energy, etc. + # -------------------------------------------------------------------------- + sector_agent = Agent( + agent_name="Sector-Performance-Analysis-Agent", + model_name="gpt-4o", + max_loops=1, + interactive=False, + streaming_on=False, + system_prompt=( + "As the Sector Performance Analysis Agent, your responsibilities are:\n\n" + "1. **Evaluate** recent performance trends across key sectors—technology, " + "healthcare, energy, finance, and more.\n" + "2. **Identify** sector-specific drivers (e.g., regulatory changes, " + "consumer demand shifts, innovation trends).\n" + "3. **Highlight** which sectors may offer short-term or long-term " + "opportunities.\n\n" + "Guidelines:\n" + "- Focus on factual, data-backed observations.\n" + "- Cite any significant indicators or company-level news that might affect " + "the sector broadly.\n" + "- Clarify the confidence level of your sector outlook and note any " + "uncertainties.\n\n" + "Outcome:\n" + "Provide the Trading Director with actionable insights into sector-level " + "momentum and potential investment focal points." + ), + ) + + # -------------------------------------------------------------------------- + # Agent 3: Technical-Analysis-Agent + # -------------------------------------------------------------------------- + # Focus: Evaluate price action, volume, and chart patterns to guide short-term + # trading strategies. + # -------------------------------------------------------------------------- + technical_agent = Agent( + agent_name="Technical-Analysis-Agent", + model_name="gpt-4o", + max_loops=1, + interactive=False, + streaming_on=False, + system_prompt=( + "As the Technical Analysis Agent, you specialize in interpreting price " + "charts, volume trends, and indicators (e.g., RSI, MACD) to gauge short-term " + "momentum. Your tasks:\n\n" + "1. **Examine** current market charts for significant breakouts, support/resistance " + "levels, or technical signals.\n" + "2. **Identify** short-term trading opportunities or risks based on " + "technically-driven insights.\n" + "3. **Discuss** how these patterns align with or contradict fundamental " + "or macro perspectives.\n\n" + "Guidelines:\n" + "- Keep explanations accessible, avoiding excessive jargon.\n" + "- Point out levels or patterns that traders commonly monitor.\n" + "- Use disclaimers if there is insufficient data or conflicting signals.\n\n" + "Outcome:\n" + "Supply the Trading Director with technical viewpoints to complement broader " + "macro and sector analysis, supporting timely trading decisions." + ), + ) + + # -------------------------------------------------------------------------- + # Agent 4: Risk-Analysis-Agent + # -------------------------------------------------------------------------- + # Focus: Evaluate risk factors and potential uncertainties, providing disclaimers and + # suggesting mitigations. + # -------------------------------------------------------------------------- + risk_agent = Agent( + agent_name="Risk-Analysis-Agent", + model_name="gpt-4o", + max_loops=1, + interactive=False, + streaming_on=False, + system_prompt=( + "As the Risk Analysis Agent, your role is to:\n\n" + "1. **Identify** key risks and uncertainties—regulatory, geopolitical, " + "currency fluctuations, etc.\n" + "2. **Assess** how these risks could impact investor sentiment or portfolio " + "volatility.\n" + "3. **Recommend** risk mitigation strategies or cautionary steps.\n\n" + "Guidelines:\n" + "- Present both systemic (market-wide) and idiosyncratic (company/sector) risks.\n" + "- Be transparent about unknowns or data gaps.\n" + "- Provide disclaimers on market unpredictability.\n\n" + "Outcome:\n" + "Offer the Trading Director a detailed risk framework that helps balance " + "aggressive and defensive positions." + ), + ) + + # -------------------------------------------------------------------------- + # Hierarchical Swarm Setup + # -------------------------------------------------------------------------- + # - Director: director_llm + # - Agents: [macro_agent, sector_agent, technical_agent, risk_agent] + # - max_loops: Up to 2 feedback loops between director and agents + # -------------------------------------------------------------------------- + swarm = HierarchicalSwarm( + name="HierarchicalStockAnalysisSwarm", + description=( + "A specialized swarm consisting of a Trading Director overseeing four " + "Stock Analysts, each focusing on Macro, Sector, Technical, and Risk " + "perspectives." + ), + director=director_llm, + agents=[ + macro_agent, + sector_agent, + technical_agent, + risk_agent, + ], + max_loops=2, # Limit on feedback iterations + ) + + # -------------------------------------------------------------------------- + # Execution + # -------------------------------------------------------------------------- + # Example user request for the entire team: + # 1. Discuss key macroeconomic factors (inflation, interest rates, etc.) + # 2. Analyze sector-level performance (technology, healthcare, energy). + # 3. Give short-term technical signals and levels to watch. + # 4. Outline major risks or uncertainties. + # -------------------------------------------------------------------------- + user_request = ( + "Please provide a comprehensive analysis of the current stock market, " + "covering:\n" + "- Key macroeconomic drivers affecting market momentum.\n" + "- Which sectors seem likely to outperform in the near vs. long term.\n" + "- Any notable technical signals or price levels to monitor.\n" + "- Potential risks or uncertainties that might disrupt market performance.\n" + "Include clear disclaimers about the limitations of these analyses." + "Call the risk analysis agent only" + ) + + # Run the swarm with the user_request + final_output = swarm.run(user_request) + print(final_output) + + +if __name__ == "__main__": + main() diff --git a/o3_mini.py b/o3_mini.py new file mode 100644 index 000000000..7f8aff8f2 --- /dev/null +++ b/o3_mini.py @@ -0,0 +1,9 @@ +from swarms import Agent + +Agent( + agent_name="Stock-Analysis-Agent", + model_name="groq/deepseek-r1-distill-llama-70b", + max_loops="auto", + interactive=True, + streaming_on=False, +).run("What are the best ways to analyze macroeconomic data?") diff --git a/pyproject.toml b/pyproject.toml index f8061da3c..5eb3b94d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.0.3" +version = "7.0.4" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index 35fbbcfa4..66ea6c238 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -84,6 +84,7 @@ from swarms.structs.meme_agent_persona_generator import ( MemeAgentGenerator, ) +from swarms.structs.model_router import ModelRouter __all__ = [ "Agent", @@ -160,4 +161,5 @@ "expertise_based", "MultiAgentRouter", "MemeAgentGenerator", + "ModelRouter", ] diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index c242c59cf..1d932dc19 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -563,7 +563,9 @@ def __init__( max_loops=self.max_loops, steps=self.short_memory.to_dict(), full_history=self.short_memory.get_str(), - total_tokens=count_tokens(text=self.short_memory.get_str()), + total_tokens=count_tokens( + text=self.short_memory.get_str() + ), stopping_token=self.stopping_token, interactive=self.interactive, dynamic_temperature_enabled=self.dynamic_temperature_enabled, diff --git a/swarms/structs/auto_swarm_builder.py b/swarms/structs/auto_swarm_builder.py index 76fcbce96..9e757c701 100644 --- a/swarms/structs/auto_swarm_builder.py +++ b/swarms/structs/auto_swarm_builder.py @@ -4,9 +4,7 @@ from pydantic import BaseModel, Field from swarms.structs.agent import Agent -from swarms.structs.meme_agent_persona_generator import ( - OpenAIFunctionCaller, -) +from swarms.utils.function_caller_model import OpenAIFunctionCaller from swarms.structs.swarm_router import SwarmRouter from swarms.utils.loguru_logger import initialize_logger from swarms.structs.agents_available import showcase_available_agents diff --git a/swarms/structs/hiearchical_swarm.py b/swarms/structs/hiearchical_swarm.py index 91c5a68d2..e91a36703 100644 --- a/swarms/structs/hiearchical_swarm.py +++ b/swarms/structs/hiearchical_swarm.py @@ -1,576 +1,292 @@ -from typing import List, Any +from typing import Any, List, Optional, Union from pydantic import BaseModel, Field -from swarms.utils.loguru_logger import initialize_logger -from swarms.structs.base_swarm import BaseSwarm + from swarms.structs.agent import Agent -from swarms.structs.concat import concat_strings -from swarms.structs.agent_registry import AgentRegistry +from swarms.structs.base_swarm import BaseSwarm from swarms.structs.conversation import Conversation +from swarms.utils.formatter import formatter -logger = initialize_logger(log_folder="hiearchical_swarm") - -# Example usage: -HIEARCHICAL_AGENT_SYSTEM_PROMPT = """ -Here's a full-fledged system prompt for a director boss agent, complete with instructions and many-shot examples: - ---- - -**System Prompt: Director Boss Agent** - -### Role: -You are a Director Boss Agent responsible for orchestrating a swarm of worker agents. Your primary duty is to serve the user efficiently, effectively, and skillfully. You dynamically create new agents when necessary or utilize existing agents, assigning them tasks that align with their capabilities. You must ensure that each agent receives clear, direct, and actionable instructions tailored to their role. - -### Key Responsibilities: -1. **Task Delegation:** Assign tasks to the most relevant agent. If no relevant agent exists, create a new one with an appropriate name and system prompt. -2. **Efficiency:** Ensure that tasks are completed swiftly and with minimal resource expenditure. -3. **Clarity:** Provide orders that are simple, direct, and actionable. Avoid ambiguity. -4. **Dynamic Decision Making:** Assess the situation and choose the most effective path, whether that involves using an existing agent or creating a new one. -5. **Monitoring:** Continuously monitor the progress of each agent and provide additional instructions or corrections as necessary. - -### Instructions: -- **Identify the Task:** Analyze the input task to determine its nature and requirements. -- **Agent Selection/Creation:** - - If an agent is available and suited for the task, assign the task to that agent. - - If no suitable agent exists, create a new agent with a relevant system prompt. -- **Task Assignment:** Provide the selected agent with explicit and straightforward instructions. -- **Reasoning:** Justify your decisions when selecting or creating agents, focusing on the efficiency and effectiveness of task completion. - -""" - - -class AgentSpec(BaseModel): - """ - A class representing the specifications of an agent. - - Attributes: - agent_name (str): The name of the agent. - system_prompt (str): The system prompt for the agent. - agent_description (str): The description of the agent. - max_tokens (int): The maximum number of tokens to generate in the API response. - temperature (float): A parameter that controls the randomness of the generated text. - context_window (int): The context window for the agent. - task (str): The main task for the agent. - """ - - agent_name: str = Field( - ..., - description="The name of the agent.", - ) - system_prompt: str = Field( - ..., - description="The system prompt for the agent. Write an extremely detailed system prompt for the agent.", - ) - agent_description: str = Field( - ..., - description="The description of the agent.", - ) - task: str = Field( - ..., - description="The main task for the agent.", - ) - +from swarms.utils.loguru_logger import initialize_logger -# class AgentTeam(BaseModel): -# agents: List[AgentSpec] = Field( -# ..., -# description="The list of agents in the team", -# ) -# flow: str = Field( -# ..., -# description="Agent Name -> ", -# ) +logger = initialize_logger(log_folder="hierarchical_swarm") -# Schema to send orders to the agents -class HierarchicalOrderCall(BaseModel): +class HierarchicalOrder(BaseModel): agent_name: str = Field( ..., - description="The name of the agent to assign the task to.", + description="Specifies the name of the agent to which the task is assigned. This is a crucial element in the hierarchical structure of the swarm, as it determines the specific agent responsible for the task execution.", ) task: str = Field( ..., - description="The main specific task to be assigned to the agent. Be very specific and direct.", + description="Defines the specific task to be executed by the assigned agent. This task is a key component of the swarm's plan and is essential for achieving the swarm's goals.", ) -# For not agent creation -class CallTeam(BaseModel): - # swarm_name: str = Field( - # ..., - # description="The name of the swarm: e.g., 'Marketing Swarm' or 'Finance Swarm'", - # ) - rules: str = Field( +class SwarmSpec(BaseModel): + goals: str = Field( ..., - description="The rules for all the agents in the swarm: e.g., All agents must return code. Be very simple and direct", + description="The goal of the swarm. This is the overarching objective that the swarm is designed to achieve. It guides the swarm's plan and the tasks assigned to the agents.", ) plan: str = Field( ..., - description="The plan for the swarm: e.g., First create the agents, then assign tasks, then monitor progress", - ) - orders: List[HierarchicalOrderCall] - - -class SwarmSpec(BaseModel): - """ - A class representing the specifications of a swarm of agents. - - Attributes: - multiple_agents (List[AgentSpec]): The list of agents in the swarm. - """ - - swarm_name: str = Field( - ..., - description="The name of the swarm: e.g., 'Marketing Swarm' or 'Finance Swarm'", + description="Outlines the sequence of actions to be taken by the swarm. This plan is a detailed roadmap that guides the swarm's behavior and decision-making.", ) - multiple_agents: List[AgentSpec] rules: str = Field( ..., - description="The rules for all the agents in the swarm: e.g., All agents must return code. Be very simple and direct", + description="Defines the governing principles for swarm behavior and decision-making. These rules are the foundation of the swarm's operations and ensure that the swarm operates in a coordinated and efficient manner.", ) - plan: str = Field( + orders: List[HierarchicalOrder] = Field( ..., - description="The plan for the swarm: e.g., First create the agents, then assign tasks, then monitor progress", + description="A collection of task assignments to specific agents within the swarm. These orders are the specific instructions that guide the agents in their task execution and are a key element in the swarm's plan.", ) -class HierarchicalAgentSwarm(BaseSwarm): +class HierarchicalSwarm(BaseSwarm): """ - A class to create and manage a hierarchical swarm of agents. - - Methods: - __init__(system_prompt, max_tokens, temperature, base_model, parallel_tool_calls): Initializes the function caller. - create_agent(agent_name, system_prompt, agent_description, max_tokens, temperature, context_window): Creates an individual agent. - parse_json_for_agents_then_create_agents(function_call): Parses a JSON function call to create multiple agents. - run(task): Runs the function caller to create and execute agents based on the provided task. + Represents a hierarchical swarm of agents, with a director that orchestrates tasks among the agents. """ def __init__( self, name: str = "HierarchicalAgentSwarm", - description: str = "A swarm of agents that can be used to distribute tasks to a team of agents.", - director: Any = None, - agents: List[Agent] = None, + description: str = "Distributed task swarm", + director: Optional[Union[Agent, Any]] = None, + agents: List[Union[Agent, Any]] = None, max_loops: int = 1, - create_agents_on: bool = False, - template_worker_agent: Agent = None, - director_planning_prompt: str = None, - template_base_worker_llm: Any = None, - swarm_history: str = None, + return_all_history: bool = False, *args, **kwargs, ): """ - Initializes the HierarchicalAgentSwarm with an OpenAIFunctionCaller. - - Args: - system_prompt (str): The system prompt for the function caller. - max_tokens (int): The maximum number of tokens to generate in the API response. - temperature (float): The temperature setting for text generation. - base_model (BaseModel): The base model for the function caller. - parallel_tool_calls (bool): Whether to run tool calls in parallel. + Initializes the HierarchicalSwarm with the given parameters. + + :param name: The name of the swarm. + :param description: A description of the swarm. + :param director: The director agent that orchestrates tasks. + :param agents: A list of agents within the swarm. + :param max_loops: The maximum number of feedback loops between the director and agents. + :param return_all_history: A flag indicating whether to return all conversation history. """ super().__init__( name=name, description=description, agents=agents, - *args, - **kwargs, ) - self.name = name - self.description = description self.director = director self.agents = agents self.max_loops = max_loops - self.create_agents_on = create_agents_on - self.template_worker_agent = template_worker_agent - self.director_planning_prompt = director_planning_prompt - self.template_base_worker_llm = template_base_worker_llm - self.swarm_history = swarm_history - - # Check if the agents are set - self.agents_check() - - # Agent Registry - self.agent_registry = AgentRegistry() - - # Add agents to the registry - self.add_agents_into_registry(self.agents) - - # Swarm History + self.return_all_history = return_all_history self.conversation = Conversation(time_enabled=True) - self.swarm_history = ( - self.conversation.return_history_as_string() - ) - - def agents_check(self): - if self.director is None: - raise ValueError("The director is not set.") - - if len(self.agents) == 0: - self.create_agents_on = True + self.add_name_and_description() - if len(self.agents) > 0: - self.director.base_model = CallTeam - - self.director.system_prompt = ( - HIEARCHICAL_AGENT_SYSTEM_PROMPT - ) + self.check_agents() - if self.max_loops == 0: - raise ValueError("The max_loops is not set.") + formatter.print_panel(self.list_all_agents()) - def add_agents_into_registry(self, agents: List[Agent]): + def check_agents(self): """ - add_agents_into_registry: Add agents into the agent registry. - - Args: - agents (List[Agent]): A list of agents to add into the registry. + Checks if there are any agents and a director set for the swarm. + Raises ValueError if either condition is not met. + """ + if not self.agents: + raise ValueError("No agents found") - Returns: - None + if not self.director: + raise ValueError("Director not set") + def run_director( + self, task: str, img: str = None, *args, **kwargs + ): """ - for agent in agents: - self.agent_registry.add(agent) + Runs a task through the director agent. - def create_agent( - self, - agent_name: str, - system_prompt: str, - agent_description: str, - task: str = None, - ) -> str: + :param task: The task to be executed by the director. + :param img: Optional image to be used with the task. + :return: The output of the director's task execution. """ - Creates an individual agent. - - Args: - agent_name (str): The name of the agent. - system_prompt (str): The system prompt for the agent. - agent_description (str): The description of the agent. - max_tokens (int): The maximum number of tokens to generate. - temperature (float): The temperature for text generation. - context_window (int): The context window size for the agent. - - Returns: - Agent: An instantiated agent object. - """ - # name = agent_name.replace(" ", "_") - logger.info(f"Creating agent: {agent_name}") - - agent_name = Agent( - agent_name=agent_name, - llm=self.template_base_worker_llm, # Switch to model router here later - system_prompt=system_prompt, - agent_description=agent_description, - retry_attempts=1, - verbose=False, - dashboard=False, - ) + logger.info(f"Running director task: {task}") - self.agents.append(agent_name) + self.conversation.add(role="User", content=f"Task: {task}") - logger.info(f"Running agent: {agent_name} on task: {task}") - output = agent_name.run(task) - - self.conversation.add(role=agent_name, content=output) - return output + function_call = self.director.run( + task=f"History: {self.conversation.get_str()} Your Task: {task}", + ) - def parse_json_for_agents_then_create_agents( - self, function_call: dict - ) -> List[Agent]: - """ - Parses a JSON function call to create a list of agents. + formatter.print_panel(f"Director Output: {function_call}") - Args: - function_call (dict): The JSON function call specifying the agents. + return function_call - Returns: - List[Agent]: A list of created agent objects. + def run(self, task: str, img: str = None, *args, **kwargs) -> str: """ - responses = [] - logger.info("Parsing JSON for agents") - - if self.create_agents_on: - for agent in function_call["multiple_agents"]: - out = self.create_agent( - agent_name=agent["agent_name"], - system_prompt=agent["system_prompt"], - agent_description=agent["agent_description"], - task=agent["task"], - ) - responses.append(out) - else: - for agent in function_call["orders"]: - out = self.run_worker_agent( - name=agent["agent_name"], - task=agent["task"], - ) - responses.append(out) - - return concat_strings(responses) + Runs a task through the swarm, involving the director and agents. - def run(self, task: str) -> str: + :param task: The task to be executed by the swarm. + :param img: Optional image to be used with the task. + :return: The output of the swarm's task execution. """ - Runs the function caller to create and execute agents based on the provided task. + self.conversation.add(role="User", content=f"Task: {task}") - Args: - task (str): The task for which the agents need to be created and executed. + try: + task = self.conversation.get_str() - Returns: - List[Agent]: A list of created agent objects. - """ - logger.info("Running the swarm") + for i in range(self.max_loops): + function_call = self.run_director(task=task) - # Run the function caller to output JSON function call - function_call = self.model.run(task) + self.parse_orders(function_call) - # Add the function call to the conversation - self.conversation.add( - role="Director", content=str(function_call) - ) - - # Logging the function call with metrics and details - self.log_director_function_call(function_call) - - # # Parse the JSON function call and create agents -> run Agents - return self.parse_json_for_agents_then_create_agents( - function_call - ) + if self.return_all_history: + return self.conversation.get_str() + else: + return self.conversation.get_str() + except Exception as e: + logger.error(f"Error running hierarchical swarm: {e}") + return "Error running hierarchical swarm" - def run_new(self, task: str): + def add_name_and_description(self): """ - Runs the function caller to create and execute agents based on the provided task. - - Args: - task (str): The task for which the agents need to be created and executed. - - Returns: - List[Agent]: A list of created agent objects. + Adds the swarm's name and description to the conversation. """ - logger.info("Running the swarm") - - # Run the function caller to output JSON function call - function_call = self.model.run(task) self.conversation.add( - role="Director", content=str(function_call) + role="User", + content=f"\n Swarm Name: {self.name} \n Swarm Description: {self.description}", ) - # Logging the function call with metrics and details - self.log_director_function_call(function_call) - - if self.create_agents_on: - # Create agents from the function call - self.create_agents_from_func_call(function_call) - - # Now submit orders to the agents - self.director.base_model = CallTeam - - orders_prompt = f"Now, the agents have been created. Submit orders to the agents to enable them to complete the task: {task}: {self.list_agents_available()}" - orders = self.director.run(orders_prompt) - self.conversation.add( - role="Director", content=str(orders_prompt + orders) - ) - - # Check the type of the response - orders = self.check_agent_output_type(orders) - - # Distribute the orders to the agents - return self.distribute_orders_to_agents(orders) - - def check_agent_output_type(self, response: Any): - if isinstance(response, dict): - return response - if isinstance(response, str): - return eval(response) - else: - return response - - def distribute_orders_to_agents(self, order_dict: dict) -> str: - # Now we need to parse the CallTeam object - # and distribute the orders to the agents - responses = [] - - for order in order_dict["orders"]: - agent_name = order["agent_name"] - task = order["task"] - - # Find and run the agent - response = self.run_worker_agent( - name=agent_name, task=task - ) - - log = f"Agent: {agent_name} completed task: {task} with response: {response}" - self.conversation.add( - role=agent_name, content=task + response - ) - responses.append(log) - logger.info(log) - - return concat_strings(responses) - - def create_single_agent( - self, name: str, system_prompt: str, description - ) -> Agent: + def list_all_agents(self) -> str: """ - Create a single agent from the agent specification. - - Args: - agent_spec (dict): The agent specification. - - Returns: - Agent: The created agent. + Lists all agents available in the swarm. + :return: A string representation of all agents in the swarm. """ - # Unwrap all of the agent specifications - # agent_name = agent_spec["agent_name"] - # system_prompt = agent_spec["system_prompt"] - # agent_description = agent_spec["agent_description"] - - # Create the agent - agent_name = Agent( - agent_name=name, - llm=self.template_base_worker_llm, # Switch to model router here later - system_prompt=system_prompt, - agent_description=description, - max_loops=1, - retry_attempts=1, - verbose=False, - dashboard=False, + all_agents = "\n".join( + f"Agent: {agent.agent_name}, Description: {agent.system_prompt} " + for agent in self.agents ) - # Add agents into the registry - self.agents.append(agent_name) + self.conversation.add( + role="User", + content=f"All Agents Available in the Swarm {self.name}: \n {all_agents}", + ) - return agent_name + return all_agents - def create_agents_from_func_call(self, function_call: dict): + def find_agent(self, name: str) -> Optional[Agent]: """ - Create agents from the function call. - - Args: - function_call (dict): The function call containing the agent specifications. - - Returns: - List[Agent]: A list of created agents. + Finds an agent by its name within the swarm. + :param name: The name of the agent to find. + :return: The agent if found, otherwise None. """ - logger.info("Creating agents from the function call") - for agent_spec in function_call["multiple_agents"]: - agent = self.create_single_agent( - name=agent_spec["agent_name"], - system_prompt=agent_spec["system_prompt"], - description=agent_spec["agent_description"], - ) - - logger.info( - f"Created agent: {agent.agent_name} with description: {agent.description}" - ) - - self.agents.append(agent) + for agent in self.agents: + if agent.agent_name == name: + return agent + return None - def plan(self, task: str) -> str: + def run_agent(self, agent_name: str, task: str, img: str = None): """ - Plans the tasks for the agents in the swarm. - - Args: - task (str): The task to be planned. - - Returns: - str: The planned task for the agents. + Runs a task through a specific agent. + :param agent_name: The name of the agent to execute the task. + :param task: The task to be executed by the agent. + :param img: Optional image to be used with the task. + :return: The output of the agent's task execution. """ - logger.info("Director is planning the task") + try: + agent = self.find_agent(agent_name) - self.director.system_prompt = self.director_planning_prompt + if agent: + out = agent.run( + task=f"History: {self.conversation.get_str()} Your Task: {task}", + img=img, + ) - def log_director_function_call(self, function_call: dict): - # Log the agents the boss makes\ - logger.info(f"Swarm Name: {function_call['swarm_name']}") - # Log the plan - logger.info(f"Plan: {function_call['plan']}") - logger.info( - f"Number of agents: {len(function_call['multiple_agents'])}" - ) + self.conversation.add( + role=agent_name, + content=out, + ) - for agent in function_call["multiple_agents"]: - logger.info(f"Agent: {agent['agent_name']}") - # logger.info(f"Task: {agent['task']}") - logger.info(f"Description: {agent['agent_description']}") + return out + else: + logger.error( + f"Agent {agent_name} not found in the swarm {self.name}" + ) + except Exception as e: + logger.error(f"Error running agent {agent_name}: {e}") + return "Error running agent" - def run_worker_agent( - self, name: str = None, task: str = None, *args, **kwargs - ): + def parse_orders(self, orders: SwarmSpec) -> None: """ - Run the worker agent. - - Args: - name (str): The name of the worker agent. - task (str): The task to send to the worker agent. + Parses the orders from the SwarmSpec and executes them through the agents. - Returns: - str: The response from the worker agent. + :param orders: The SwarmSpec containing the orders to be parsed. + """ + self.add_goal_and_more_in_conversation(orders) - Raises: - Exception: If an error occurs while running the worker agent. + orders_list = self.parse_swarm_spec(orders) - """ try: - # Find the agent by name - agent = self.find_agent_by_name(name) - # Run the agent - response = agent.run(task, *args, **kwargs) + # Example of passing the parsed data to an agent + for order in orders_list: + out = self.run_agent( + agent_name=order.agent_name, + task=order.task, + ) - return response + return out except Exception as e: - logger.error(f"Error: {e}") - raise e + logger.error(f"Error parsing orders: {e}") + return "Error parsing orders" - def list_agents(self) -> str: - logger.info("Listing agents available in the swarm") + def parse_swarm_spec(self, swarm_spec: SwarmSpec) -> None: + """ + Parses the SwarmSpec to extract the orders. - for agent in self.agents: - name = agent.agent_name - description = ( - agent.description or "No description available." - ) - logger.info(f"Agent: {name}, Description: {description}") + :param swarm_spec: The SwarmSpec to be parsed. + :return: The list of orders extracted from the SwarmSpec. + """ + orders_list = swarm_spec.orders - def list_agents_available(self): - number_of_agents_available = len(self.agents) + print(orders_list) - agent_list = "\n".join( - [ - f"Agent {agent.agent_name}: Description {agent.description}" - for agent in self.agents - ] - ) + # return the orders_list + return orders_list - prompt = f""" - There are currently {number_of_agents_available} agents available in the swarm. + def provide_feedback(self, agent_name: str, out: str) -> None: + """ + Provides feedback to an agent based on its output. - Agents Available: - {agent_list} + :param agent_name: The name of the agent to provide feedback to. + :param out: The output of the agent to base the feedback on. """ + orders = self.director.run( + task=f"Provide feedback to {agent_name} on their output: {out}" + ) - return prompt + orders_list = self.parse_swarm_spec(orders) - def find_agent_by_name( - self, agent_name: str = None, *args, **kwargs - ): - """ - Finds an agent in the swarm by name. + for order in orders_list: + out = self.run_agent( + agent_name=order.agent_name, + task=order.task, + ) - Args: - agent_name (str): The name of the agent to find. + return out - Returns: - Agent: The agent with the specified name, or None if not found. + def add_goal_and_more_in_conversation( + self, swarm_spec: SwarmSpec + ) -> None: + """ + Adds the swarm's goals, plan, and rules to the conversation. + :param swarm_spec: The SwarmSpec containing the goals, plan, and rules. """ - for agent in self.agents: - if agent.name == agent_name: - return agent - return None + goals = swarm_spec.goals + plan = swarm_spec.plan + rules = swarm_spec.rules + + self.conversation.add( + role="Director", + content=f"Goals: {goals}\nPlan: {plan}\nRules: {rules}", + ) diff --git a/swarms/structs/meme_agent_persona_generator.py b/swarms/structs/meme_agent_persona_generator.py index 80f51effd..0fb19cc46 100644 --- a/swarms/structs/meme_agent_persona_generator.py +++ b/swarms/structs/meme_agent_persona_generator.py @@ -1,18 +1,7 @@ import json import os -import subprocess from typing import List -try: - import openai -except ImportError: - print( - "OpenAI is not installed. Please install it using 'pip install openai'." - ) - import sys - - subprocess.run([sys.executable, "-m", "pip", "install", "openai"]) - exit(1) from dotenv import load_dotenv from loguru import logger @@ -20,137 +9,11 @@ from swarms.structs.agent import Agent from swarms.structs.swarm_router import SwarmRouter +from swarms.utils.function_caller_model import OpenAIFunctionCaller load_dotenv() -class OpenAIFunctionCaller: - """ - A class that represents a caller for OpenAI chat completions. - - Args: - system_prompt (str): The system prompt to be used in the chat completion. - model_name (str): The name of the OpenAI model to be used. - max_tokens (int): The maximum number of tokens in the generated completion. - temperature (float): The temperature parameter for randomness in the completion. - base_model (BaseModel): The base model to be used for the completion. - openai_api_key (str): The API key for accessing the OpenAI service. - parallel_tool_calls (bool): Whether to make parallel tool calls. - top_p (float): The top-p parameter for nucleus sampling in the completion. - - Attributes: - system_prompt (str): The system prompt to be used in the chat completion. - model_name (str): The name of the OpenAI model to be used. - max_tokens (int): The maximum number of tokens in the generated completion. - temperature (float): The temperature parameter for randomness in the completion. - base_model (BaseModel): The base model to be used for the completion. - parallel_tool_calls (bool): Whether to make parallel tool calls. - top_p (float): The top-p parameter for nucleus sampling in the completion. - client (openai.OpenAI): The OpenAI client for making API calls. - - Methods: - check_api_key: Checks if the API key is provided and retrieves it from the environment if not. - run: Runs the chat completion with the given task and returns the generated completion. - - """ - - def __init__( - self, - system_prompt: str = None, - model_name: str = "gpt-4o-2024-08-06", - max_tokens: int = 4000, - temperature: float = 0.4, - base_model: BaseModel = None, - openai_api_key: str = None, - parallel_tool_calls: bool = False, - top_p: float = 0.9, - *args, - **kwargs, - ): - super().__init__() - self.system_prompt = system_prompt - self.model_name = model_name - self.max_tokens = max_tokens - self.temperature = temperature - self.openai_api_key = openai_api_key - self.base_model = base_model - self.parallel_tool_calls = parallel_tool_calls - self.top_p = top_p - self.client = openai.OpenAI(api_key=self.check_api_key()) - - def check_api_key(self) -> str: - """ - Checks if the API key is provided and retrieves it from the environment if not. - - Returns: - str: The API key. - - """ - self.openai_api_key = os.getenv("OPENAI_API_KEY") - - return self.openai_api_key - - def run(self, task: str, *args, **kwargs) -> dict: - """ - Runs the chat completion with the given task and returns the generated completion. - - Args: - task (str): The user's task for the chat completion. - *args: Additional positional arguments to be passed to the OpenAI API. - **kwargs: Additional keyword arguments to be passed to the OpenAI API. - - Returns: - str: The generated completion. - - """ - try: - completion = self.client.beta.chat.completions.parse( - model=self.model_name, - messages=[ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": task}, - ], - max_tokens=self.max_tokens, - temperature=self.temperature, - response_format=self.base_model, - tools=( - [openai.pydantic_function_tool(self.base_model)] - ), - *args, - **kwargs, - ) - - out = completion.choices[0].message.content - return out - except Exception as error: - logger.error( - f"Error in running OpenAI chat completion: {error}" - ) - return None - - def convert_to_dict_from_base_model( - self, base_model: BaseModel - ) -> dict: - return openai.pydantic_function_tool(base_model) - - def convert_list_of_base_models( - self, base_models: List[BaseModel] - ): - """ - Converts a list of BaseModels to a list of dictionaries. - - Args: - base_models (List[BaseModel]): A list of BaseModels to be converted. - - Returns: - List[Dict]: A list of dictionaries representing the converted BaseModels. - """ - return [ - self.convert_to_dict_from_base_model(base_model) - for base_model in base_models - ] - - class MemeAgentConfig(BaseModel): """Configuration for an individual meme agent in a swarm""" diff --git a/swarms/structs/model_router.py b/swarms/structs/model_router.py new file mode 100644 index 000000000..f23627531 --- /dev/null +++ b/swarms/structs/model_router.py @@ -0,0 +1,376 @@ +import asyncio +import os +from concurrent.futures import ThreadPoolExecutor +from typing import Optional +from pydantic import BaseModel, Field + +from swarms.utils.function_caller_model import OpenAIFunctionCaller +from swarms.utils.any_to_str import any_to_str +from swarms.utils.formatter import formatter +from swarms.utils.litellm_wrapper import LiteLLM + +model_recommendations = { + "gpt-4o": { + "description": "Fast and efficient for simple tasks and general queries", + "best_for": [ + "Simple queries", + "Basic text generation", + "Quick responses", + "Everyday tasks", + ], + "provider": "openai", + }, + "gpt-4-turbo": { + "description": "Latest GPT-4 model with improved capabilities and knowledge cutoff", + "best_for": [ + "Complex tasks", + "Up-to-date knowledge", + "Long context understanding", + ], + "provider": "openai", + }, + "gpt-3.5-turbo": { + "description": "Fast and cost-effective model for straightforward tasks", + "best_for": [ + "Chat applications", + "Content generation", + "Basic assistance", + ], + "provider": "openai", + }, + "o3-mini": { + "description": "Lightweight model good for basic tasks with lower compute requirements", + "best_for": [ + "Basic text completion", + "Simple classification", + "Resource-constrained environments", + ], + "provider": "groq", + }, + "deepseek-reasoner": { + "description": "Specialized for complex reasoning and analytical tasks", + "best_for": [ + "Complex problem solving", + "Logical reasoning", + "Mathematical analysis", + "High IQ tasks", + ], + "provider": "deepseek", + }, + "claude-3-5-sonnet": { + "description": "Well-rounded model with strong reasoning and creativity", + "best_for": [ + "Creative writing", + "Detailed analysis", + "Nuanced responses", + ], + "provider": "anthropic", + }, + "claude-3-opus": { + "description": "Most capable Claude model with enhanced reasoning and analysis", + "best_for": [ + "Research", + "Complex analysis", + "Technical writing", + "Code generation", + ], + "provider": "anthropic", + }, + "gemini-pro": { + "description": "Google's advanced model with strong general capabilities", + "best_for": [ + "Multimodal tasks", + "Code generation", + "Creative content", + ], + "provider": "google", + }, + "mistral-large": { + "description": "Open source model with strong performance across tasks", + "best_for": [ + "General purpose tasks", + "Code assistance", + "Content generation", + ], + "provider": "mistral", + }, +} + +providers = { + "openai": "Primary provider for GPT models", + "groq": "High-performance inference provider", + "anthropic": "Provider of Claude models", + "google": "Provider of PaLM and Gemini models", + "azure": "Cloud platform for various model deployments", + "deepseek": "Provider of specialized reasoning models", + "mistral": "Provider of open source and commercial language models", +} + + +class ModelOutput(BaseModel): + rationale: Optional[str] + model: Optional[str] + provider: Optional[str] + task: Optional[str] = Field( + description="The task to be executed for the model. It should be a clear, concise, and detailed task that the model can execute. It should only include details of the task, not the reasoning or the rationale, model, provider, or anything else. Do not include any other information in the task." + ) + max_tokens: Optional[int] = Field( + description="The maximum number of tokens to use for the model" + ) + temperature: Optional[float] = Field( + description="The temperature to use for the model" + ) + system_prompt: Optional[str] = Field( + description="The system prompt to use for the model. Leverge the best techniques to make the model perform the best. Make sure the prompt is clear, extensive, and detailed." + ) + + +providers = any_to_str(providers) +model_recommendations = any_to_str(model_recommendations) +data = f"Providers: {providers}\nModel Recommendations: {model_recommendations}" + +model_router_system_prompt = f""" +You are an expect model router responsible for recommending the optimal AI model for specific tasks. + +Available Models and Their Strengths: +- GPT-4: Best for complex reasoning, coding, and analysis requiring strong logical capabilities +- GPT-3.5-Turbo: Efficient for straightforward tasks, chat, and basic content generation +- Claude-3-Opus: Excels at research, technical writing, and in-depth analysis with strong reasoning +- Claude-3-Sonnet: Well-balanced for creative writing and nuanced responses +- Gemini Pro: Strong at multimodal tasks and code generation +- Mistral Large: Versatile open source model good for general tasks + +Provider Considerations: +- OpenAI: Industry standard with consistent performance +- Anthropic: Known for safety and detailed analysis +- Google: Strong technical capabilities and multimodal support +- Groq: Optimized for high-speed inference +- Mistral: Balance of open source and commercial offerings + +Data: +{data} + +When Making Recommendations Consider: +1. Task requirements (complexity, creativity, technical needs) +2. Performance characteristics needed (speed, accuracy, reliability) +3. Special capabilities required (code generation, analysis, etc) +4. Cost and efficiency tradeoffs +5. Provider-specific strengths and limitations + +Provide clear rationale for your model selection based on the specific task requirements. +""" + + +class ModelRouter: + """ + A router class that intelligently selects and executes AI models based on task requirements. + + The ModelRouter uses a function calling model to analyze tasks and recommend the optimal + model and provider combination, then executes the task using the selected model. + + Attributes: + system_prompt (str): Prompt that guides model selection behavior + max_tokens (int): Maximum tokens for model outputs + temperature (float): Temperature parameter for model randomness + max_workers (int): Maximum concurrent workers for batch processing + model_output (ModelOutput): Pydantic model for structured outputs + model_caller (OpenAIFunctionCaller): Function calling interface + """ + + def __init__( + self, + system_prompt: str = model_router_system_prompt, + max_tokens: int = 4000, + temperature: float = 0.5, + max_workers: int = 10, + api_key: str = None, + max_loops: int = 1, + *args, + **kwargs, + ): + """ + Initialize the ModelRouter. + + Args: + system_prompt (str): Prompt for model selection guidance + max_tokens (int): Maximum output tokens + temperature (float): Model temperature parameter + max_workers (int): Max concurrent workers + *args: Additional positional arguments + **kwargs: Additional keyword arguments + """ + try: + self.system_prompt = system_prompt + self.max_tokens = max_tokens + self.temperature = temperature + self.max_workers = max_workers + self.model_output = ModelOutput + self.max_loops = max_loops + + if self.max_workers == "auto": + self.max_workers = os.cpu_count() + + self.model_caller = OpenAIFunctionCaller( + base_model=ModelOutput, + temperature=self.temperature, + system_prompt=self.system_prompt, + max_tokens=self.max_tokens, + api_key=api_key, + ) + except Exception as e: + raise RuntimeError( + f"Failed to initialize ModelRouter: {str(e)}" + ) + + def step(self, task: str): + """ + Run a single task through the model router. + + Args: + task (str): The task to be executed + + Returns: + str: The model's output for the task + + Raises: + RuntimeError: If model selection or execution fails + """ + model_router_output = self.model_caller.run(task) + + selected_model = model_router_output.model + selected_provider = model_router_output.provider + routed_task = model_router_output.task + rationale = model_router_output.rationale + max_tokens = model_router_output.max_tokens + temperature = model_router_output.temperature + system_prompt = model_router_output.system_prompt + + formatter.print_panel( + f"Model: {selected_model}\nProvider: {selected_provider}\nTask: {routed_task}\nRationale: {rationale}\nMax Tokens: {max_tokens}\nTemperature: {temperature}\nSystem Prompt: {system_prompt}", + title="Model Router Output", + ) + + litellm_wrapper = LiteLLM( + model_name=f"{selected_provider}/{selected_model}", + max_tokens=max_tokens, + temperature=temperature, + system_prompt=system_prompt, + ) + + final_output = litellm_wrapper.run(task=routed_task) + + formatter.print_panel( + f"Output: {final_output} from {selected_provider}/{selected_model}", + title="Model Output", + ) + + return final_output + + def run(self, task: str): + """ + Run a task through the model router with memory. + """ + task_output = task + previous_output = None + for _ in range(self.max_loops): + if task_output == previous_output: + break # Exit if no change in output + previous_output = task_output + task_output = self.step(task_output) + return task_output + + def batch_run(self, tasks: list): + """ + Run multiple tasks in sequence. + + Args: + tasks (list): List of tasks to execute + + Returns: + list: List of outputs for each task + + Raises: + RuntimeError: If batch execution fails + """ + try: + outputs = [] + for task in tasks: + output = self.run(task) + outputs.append(output) + return outputs + except Exception as e: + raise RuntimeError(f"Batch execution failed: {str(e)}") + + def __call__(self, task: str, *args, **kwargs): + """ + Make the class callable to directly execute tasks. + + Args: + task (str): Task to execute + + Returns: + str: Model output + """ + return self.run(task, *args, **kwargs) + + def __batch_call__(self, tasks: list): + """ + Make the class callable for batch execution. + + Args: + tasks (list): List of tasks + + Returns: + list: List of outputs + """ + return self.batch_run(tasks) + + def __str__(self): + return f"ModelRouter(max_tokens={self.max_tokens}, temperature={self.temperature})" + + def __repr__(self): + return f"ModelRouter(max_tokens={self.max_tokens}, temperature={self.temperature})" + + def concurrent_run(self, tasks: list): + """ + Run multiple tasks concurrently using a thread pool. + + Args: + tasks (list): List of tasks to execute concurrently + + Returns: + list: List of outputs from all tasks + + Raises: + RuntimeError: If concurrent execution fails + """ + try: + with ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + outputs = list(executor.map(self.run, tasks)) + return outputs + except Exception as e: + raise RuntimeError( + f"Concurrent execution failed: {str(e)}" + ) + + async def async_run(self, task: str, *args, **kwargs): + """ + Run a task asynchronously. + + Args: + task (str): Task to execute asynchronously + + Returns: + asyncio.Task: Async task object + + Raises: + RuntimeError: If async execution fails + """ + try: + return asyncio.create_task( + self.run(task, *args, **kwargs) + ) + except Exception as e: + raise RuntimeError(f"Async execution failed: {str(e)}") diff --git a/swarms/structs/multi_agent_orchestrator.py b/swarms/structs/multi_agent_orchestrator.py index db845dd60..e3e53004e 100644 --- a/swarms/structs/multi_agent_orchestrator.py +++ b/swarms/structs/multi_agent_orchestrator.py @@ -9,15 +9,13 @@ """ import os -import subprocess import uuid from datetime import datetime from typing import List, Literal, Optional from loguru import logger from pydantic import BaseModel, Field -from tenacity import retry, stop_after_attempt, wait_exponential - +from swarms.utils.function_caller_model import OpenAIFunctionCaller from swarms.structs.agent import Agent @@ -35,88 +33,6 @@ class AgentResponse(BaseModel): ) -class OpenAIFunctionCaller: - """ - A class to interact with the OpenAI API for generating text based on a system prompt and a task. - """ - - def __init__( - self, - system_prompt: str, - api_key: str, - temperature: float, - max_tokens: int = 4000, - model_name: str = "gpt-4-0125-preview", - ): - self.system_prompt = system_prompt - self.api_key = api_key - self.temperature = temperature - self.max_tokens = max_tokens - self.model_name = model_name - - try: - from openai import OpenAI - except ImportError: - logger.error( - "OpenAI library not found. Please install it using 'pip install openai'" - ) - subprocess.run(["pip", "install", "openai"]) - raise - - try: - self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) - except Exception as e: - logger.error( - f"Error initializing OpenAI client: {str(e)}" - ) - raise - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - ) - def get_completion(self, task: str) -> AgentResponse: - """Get completion from OpenAI with retries""" - try: - response = self.client.chat.completions.create( - model=self.model_name, - messages=[ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": task}, - ], - response_format={"type": "json_object"}, - temperature=self.temperature, - max_tokens=self.max_tokens, - ) - - return AgentResponse.model_validate_json( - response.choices[0].message.content - ) - except Exception as e: - logger.error(f"Error getting completion: {str(e)}") - raise - - def get_agent_response( - self, system_prompt: str, task: str - ) -> str: - """Get agent response without function calling""" - try: - response = self.client.chat.completions.create( - model=self.model_name, - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": task}, - ], - temperature=self.temperature, - max_tokens=self.max_tokens, - ) - - return response.choices[0].message.content - except Exception as e: - logger.error(f"Error getting agent response: {str(e)}") - raise - - class MultiAgentRouter: """ Routes tasks to appropriate agents based on their capabilities. diff --git a/swarms/structs/swarm_builder.py b/swarms/structs/swarm_builder.py index aa1999476..71aacedd6 100644 --- a/swarms/structs/swarm_builder.py +++ b/swarms/structs/swarm_builder.py @@ -1,5 +1,4 @@ import os -import subprocess from typing import List, Optional from loguru import logger @@ -13,111 +12,11 @@ from swarms.structs.agent import Agent from swarms.structs.swarm_router import SwarmRouter, SwarmType +from swarms.utils.function_caller_model import OpenAIFunctionCaller logger.add("swarm_builder.log", rotation="10 MB", backtrace=True) -class OpenAIFunctionCaller: - """ - A class to interact with the OpenAI API for generating text based on a system prompt and a task. - - Attributes: - - system_prompt (str): The system prompt to guide the AI's response. - - api_key (str): The API key for the OpenAI service. - - temperature (float): The temperature parameter for the AI model, controlling randomness. - - base_model (BaseModel): The Pydantic model to parse the response into. - - max_tokens (int): The maximum number of tokens in the response. - - client (OpenAI): The OpenAI client instance. - """ - - def __init__( - self, - system_prompt: str, - api_key: str, - temperature: float, - base_model: BaseModel, - max_tokens: int = 5000, - ): - self.system_prompt = system_prompt - self.api_key = api_key - self.temperature = temperature - self.base_model = base_model - self.max_tokens = max_tokens - - try: - from openai import OpenAI - except ImportError: - logger.error( - "OpenAI library not found. Please install the OpenAI library by running 'pip install openai'" - ) - subprocess.run(["pip", "install", "openai"]) - from openai import OpenAI - - self.client = OpenAI(api_key=api_key) - - def run(self, task: str, *args, **kwargs) -> BaseModel: - """ - Run the OpenAI model with the system prompt and task to generate a response. - - Args: - - task (str): The task to be completed. - - *args: Additional positional arguments for the OpenAI API. - - **kwargs: Additional keyword arguments for the OpenAI API. - - Returns: - - BaseModel: The parsed response based on the base_model. - """ - completion = self.client.beta.chat.completions.parse( - model="gpt-4o-2024-08-06", - messages=[ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": task}, - ], - response_format=self.base_model, - temperature=self.temperature, - max_tokens=self.max_tokens, - *args, - **kwargs, - ) - - return completion.choices[0].message.parsed - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - ) - async def run_async( - self, task: str, *args, **kwargs - ) -> BaseModel: - """ - Asynchronous version of the run method. - - Args: - - task (str): The task to be completed. - - *args: Additional positional arguments for the OpenAI API. - - **kwargs: Additional keyword arguments for the OpenAI API. - - Returns: - - BaseModel: The parsed response based on the base_model. - """ - completion = ( - await self.client.beta.chat.completions.parse_async( - model="gpt-4o-2024-08-06", - messages=[ - {"role": "system", "content": self.system_prompt}, - {"role": "user", "content": task}, - ], - response_format=self.base_model, - temperature=self.temperature, - max_tokens=self.max_tokens, - *args, - **kwargs, - ) - ) - - return completion.choices[0].message.parsed - - BOSS_SYSTEM_PROMPT = """ Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective. diff --git a/swarms/telemetry/bootup.py b/swarms/telemetry/bootup.py index edb491335..16ea203e9 100644 --- a/swarms/telemetry/bootup.py +++ b/swarms/telemetry/bootup.py @@ -24,6 +24,9 @@ def bootup(): ): logger.disable("") logging.disable(logging.CRITICAL) + + else: + logger.enable("") # Silent wandb os.environ["WANDB_SILENT"] = "true" diff --git a/swarms/utils/function_caller_model.py b/swarms/utils/function_caller_model.py new file mode 100644 index 000000000..5931e40e3 --- /dev/null +++ b/swarms/utils/function_caller_model.py @@ -0,0 +1,143 @@ +import os +import subprocess +from concurrent.futures import ThreadPoolExecutor +from typing import List + +from loguru import logger +from pydantic import BaseModel + + +try: + from openai import OpenAI +except ImportError: + logger.error( + "OpenAI library not found. Please install the OpenAI library by running 'pip install openai'" + ) + import sys + + subprocess.run([sys.executable, "-m", "pip", "install", "openai"]) + from openai import OpenAI + + +SUPPORTED_MODELS = [ + "o3-mini-2025-1-31", + "o1-2024-12-17", + "gpt-4o-mini-2024-07-18", + "gpt-4o-2024-08-06", +] + + +def check_api_key(): + api_key = os.getenv("OPENAI_API_KEY") + if api_key is None: + raise ValueError( + "API key is not set. Please set the API key using the api_key parameter." + ) + return api_key + + +class OpenAIFunctionCaller: + """ + A class to interact with the OpenAI API for generating text based on a system prompt and a task. + + Attributes: + - system_prompt (str): The system prompt to guide the AI's response. + - api_key (str): The API key for the OpenAI service. + - temperature (float): The temperature parameter for the AI model, controlling randomness. + - base_model (BaseModel): The Pydantic model to parse the response into. + - max_tokens (int): The maximum number of tokens in the response. + - client (OpenAI): The OpenAI client instance. + """ + + def __init__( + self, + system_prompt: str, + base_model: BaseModel, + api_key: str = check_api_key(), + temperature: float = 0.1, + max_tokens: int = 5000, + model_name: str = "o1-2024-12-17", + ): + self.system_prompt = system_prompt + self.api_key = api_key + self.temperature = temperature + self.base_model = base_model + self.max_tokens = max_tokens + self.model_name = model_name + + self.client = OpenAI(api_key=self.api_key) + + def run(self, task: str): + """ + Run the OpenAI model with the system prompt and task to generate a response. + + Args: + - task (str): The task to be completed. + - *args: Additional positional arguments for the OpenAI API. + - **kwargs: Additional keyword arguments for the OpenAI API. + + Returns: + - BaseModel: The parsed response based on the base_model. + """ + try: + completion = self.client.beta.chat.completions.parse( + model=self.model_name, + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": task}, + ], + response_format=self.base_model, + max_tokens=self.max_tokens, + temperature=self.temperature, + ) + + return completion.choices[0].message.parsed + + except Exception as e: + print(f"There was an error: {e}") + + def check_api_key(self): + self.api_key = os.getenv("OPENAI_API_KEY") + + if self.api_key is None: + raise ValueError( + "API key is not set. Please set the API key using the api_key parameter." + ) + + def check_model_support(self): + # need to print the supported models + for model in SUPPORTED_MODELS: + print(model) + + return SUPPORTED_MODELS + + def batch_run(self, tasks: List[str]) -> List[BaseModel]: + """ + Batch run the OpenAI model with the system prompt and task to generate a response. + """ + return [self.run(task) for task in tasks] + + def concurrent_run(self, tasks: List[str]) -> List[BaseModel]: + """ + Concurrent run the OpenAI model with the system prompt and task to generate a response. + """ + with ThreadPoolExecutor(max_workers=len(tasks)) as executor: + return list(executor.map(self.run, tasks)) + + +# class TestModel(BaseModel): +# name: str +# age: int + +# # Example usage +# model = OpenAIFunctionCaller( +# system_prompt="You are a helpful assistant that returns structured data about people.", +# base_model=TestModel, +# api_key=os.getenv("OPENAI_API_KEY"), +# temperature=0.7, +# max_tokens=1000 +# ) + +# # Test with a more appropriate prompt for the TestModel schema +# response = model.run("Tell me about a person named John who is 25 years old") +# print(response) diff --git a/swarms/utils/litellm_tokenizer.py b/swarms/utils/litellm_tokenizer.py index 8bfc34402..c2743b104 100644 --- a/swarms/utils/litellm_tokenizer.py +++ b/swarms/utils/litellm_tokenizer.py @@ -7,7 +7,10 @@ def count_tokens(text: str, model: str = "gpt-4o") -> int: from litellm import encode except ImportError: import sys - subprocess.run([sys.executable, "-m", "pip", "install", "litellm"]) + + subprocess.run( + [sys.executable, "-m", "pip", "install", "litellm"] + ) from litellm import encode return len(encode(model=model, text=text)) diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index 88d706fac..a4452b70b 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -2,9 +2,16 @@ from litellm import completion except ImportError: import subprocess - - subprocess.check_call(["pip", "install", "litellm"]) + import sys import litellm + + print("Installing litellm") + + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "-U", "litellm"] + ) + print("litellm installed") + from litellm import completion litellm.set_verbose = True @@ -25,6 +32,7 @@ def __init__( temperature: float = 0.5, max_tokens: int = 4000, ssl_verify: bool = False, + max_completion_tokens: int = 4000, *args, **kwargs, ): @@ -44,6 +52,9 @@ def __init__( self.temperature = temperature self.max_tokens = max_tokens self.ssl_verify = ssl_verify + self.max_completion_tokens = max_completion_tokens + + self.max_completion_tokens = max_tokens def _prepare_messages(self, task: str) -> list: """ diff --git a/swarms/utils/pdf_to_text.py b/swarms/utils/pdf_to_text.py index 74d79cade..af39c7474 100644 --- a/swarms/utils/pdf_to_text.py +++ b/swarms/utils/pdf_to_text.py @@ -6,7 +6,9 @@ import subprocess import sys - subprocess.check_call([sys.executable, "-m", "pip", "install", "pypdf"]) + subprocess.check_call( + [sys.executable, "-m", "pip", "install", "pypdf"] + ) import pypdf