Skip to content

Agent Tooling - Intelligent Action Systems โ€‹

Build autonomous AI agents that can interact with external systems and perform complex tasks

๐Ÿค– What is Agent Tooling? โ€‹

Agent Tooling in LangChain enables language models to interact with external systems, APIs, and tools to perform actions beyond text generation. It transforms static language models into dynamic agents capable of reasoning, planning, and executing real-world tasks.

Simple Analogy: Think of Agent Tooling as giving an AI assistant "hands and eyes" - just like how a human assistant can use a calculator, search the web, send emails, or call APIs, LangChain agents can use tools to interact with the digital world and accomplish complex objectives.

๐ŸŽฏ Agent Architecture Overview โ€‹

text
                    ๐Ÿค– LANGCHAIN AGENT ARCHITECTURE ๐Ÿค–
                    (Intelligent Action & Reasoning)

    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ”‚                     ๐Ÿง  AGENT BRAIN                             โ”‚
    โ”‚                   "Decision Making Core"                        โ”‚
    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                         โ”‚
    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ”‚         ๐ŸŽฏ REASONING PROCESS            โ”‚
    โ””โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
       โ”‚        โ”‚        โ”‚        โ”‚
       โ–ผ        โ–ผ        โ–ผ        โ–ผ
  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
  โ”‚๐Ÿค” THINK โ”‚๐Ÿ“‹ PLAN  โ”‚๐Ÿ”ง ACT   โ”‚๐Ÿ” OBSERVEโ”‚
  โ”‚         โ”‚         โ”‚         โ”‚         โ”‚
  โ”‚Analyze  โ”‚Create   โ”‚Execute  โ”‚Check    โ”‚
  โ”‚Problem  โ”‚Strategy โ”‚Tools    โ”‚Results  โ”‚
  โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”˜
       โ”‚         โ”‚         โ”‚         โ”‚
       โ–ผ         โ–ผ         โ–ผ         โ–ผ
  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
  โ”‚                    ๐Ÿ› ๏ธ TOOL ECOSYSTEM                          โ”‚
  โ”‚                                                               โ”‚
  โ”‚ ๐ŸŒ Web APIs    ๐Ÿ“Š Databases   ๐Ÿ“ง Email      ๐Ÿงฎ Calculators    โ”‚
  โ”‚ ๐Ÿ” Search      ๐Ÿ“ Files       ๐Ÿ“ Documents  ๐ŸŽจ Image Gen     โ”‚
  โ”‚ ๐Ÿ’ฌ Chat        ๐Ÿ“ˆ Analytics   ๐Ÿ”ง Custom     โšก Real-time     โ”‚
  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ› ๏ธ Core Tool Types โ€‹

๐ŸŒ Web & API Tools โ€‹

Connecting agents to external web services and APIs

Basic Web Search Tool โ€‹

python
from langchain.tools import DuckDuckGoSearchRun
from langchain.utilities import SerpAPIWrapper
import requests
from typing import Optional

class WebSearchTool:
    def __init__(self, search_provider: str = "duckduckgo"):
        self.search_provider = search_provider
        if search_provider == "duckduckgo":
            self.search = DuckDuckGoSearchRun()
        elif search_provider == "serpapi":
            self.search = SerpAPIWrapper()
    
    def search_web(self, query: str, num_results: int = 5) -> str:
        """Search the web for information"""
        try:
            if self.search_provider == "duckduckgo":
                results = self.search.run(query)
                return results
            else:
                results = self.search.run(query)
                return results
        except Exception as e:
            return f"Error searching web: {str(e)}"
    
    def get_webpage_content(self, url: str) -> str:
        """Fetch content from a specific webpage"""
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Remove script and style elements
            for script in soup(["script", "style"]):
                script.decompose()
            
            # Get text content
            text = soup.get_text()
            
            # Clean up whitespace
            lines = (line.strip() for line in text.splitlines())
            chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
            text = ' '.join(chunk for chunk in chunks if chunk)
            
            return text[:2000]  # Limit to 2000 characters
            
        except Exception as e:
            return f"Error fetching webpage: {str(e)}"

# Usage as LangChain Tool
from langchain.tools import Tool

web_search_tool = WebSearchTool()

search_tool = Tool(
    name="web_search",
    description="Search the web for current information. Use this when you need up-to-date information that might not be in your training data.",
    func=web_search_tool.search_web
)

webpage_tool = Tool(
    name="get_webpage",
    description="Get the text content of a specific webpage. Provide the full URL.",
    func=web_search_tool.get_webpage_content
)

Custom API Integration Tool โ€‹

python
import json
from typing import Dict, Any, Optional

class APIIntegrationTool:
    def __init__(self, base_url: str, api_key: Optional[str] = None, headers: Optional[Dict] = None):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.default_headers = {
            'Content-Type': 'application/json',
            'User-Agent': 'LangChain-Agent/1.0'
        }
        if headers:
            self.default_headers.update(headers)
        if api_key:
            self.default_headers['Authorization'] = f'Bearer {api_key}'
    
    def make_api_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None, params: Optional[Dict] = None) -> str:
        """Make a request to the API"""
        try:
            url = f"{self.base_url}/{endpoint.lstrip('/')}"
            
            if method.upper() == "GET":
                response = requests.get(url, headers=self.default_headers, params=params, timeout=30)
            elif method.upper() == "POST":
                response = requests.post(url, headers=self.default_headers, json=data, params=params, timeout=30)
            elif method.upper() == "PUT":
                response = requests.put(url, headers=self.default_headers, json=data, params=params, timeout=30)
            elif method.upper() == "DELETE":
                response = requests.delete(url, headers=self.default_headers, params=params, timeout=30)
            else:
                return f"Unsupported HTTP method: {method}"
            
            response.raise_for_status()
            
            # Try to parse JSON response
            try:
                result = response.json()
                return json.dumps(result, indent=2)
            except json.JSONDecodeError:
                return response.text
                
        except requests.exceptions.RequestException as e:
            return f"API request failed: {str(e)}"
        except Exception as e:
            return f"Unexpected error: {str(e)}"
    
    def get_weather(self, city: str) -> str:
        """Get weather information for a city"""
        # Example using a weather API
        params = {'q': city, 'appid': self.api_key, 'units': 'metric'}
        return self.make_api_request('weather', 'GET', params=params)
    
    def send_notification(self, message: str, recipient: str) -> str:
        """Send a notification"""
        data = {'message': message, 'recipient': recipient}
        return self.make_api_request('notifications', 'POST', data=data)

# Create tools from API integration
weather_api = APIIntegrationTool(
    base_url="https://api.openweathermap.org/data/2.5",
    api_key="your-weather-api-key"
)

weather_tool = Tool(
    name="get_weather",
    description="Get current weather information for a city. Provide the city name.",
    func=weather_api.get_weather
)

notification_tool = Tool(
    name="send_notification",
    description="Send a notification message. Provide the message and recipient.",
    func=lambda input_str: weather_api.send_notification(*input_str.split(',', 1))
)

๐Ÿ“Š Data & Database Tools โ€‹

Database Query Tool โ€‹

python
import sqlite3
import pandas as pd
from typing import List, Dict, Any

class DatabaseTool:
    def __init__(self, database_path: str = ":memory:"):
        self.database_path = database_path
        self.connection = None
        self._connect()
    
    def _connect(self):
        """Establish database connection"""
        try:
            self.connection = sqlite3.connect(self.database_path)
            self.connection.row_factory = sqlite3.Row  # Enable column access by name
        except Exception as e:
            print(f"Database connection error: {e}")
    
    def execute_query(self, query: str) -> str:
        """Execute SQL query and return results"""
        try:
            cursor = self.connection.cursor()
            cursor.execute(query)
            
            if query.strip().upper().startswith(('SELECT', 'WITH')):
                # For SELECT queries, return results
                results = cursor.fetchall()
                if results:
                    # Convert to list of dictionaries
                    rows = [dict(row) for row in results]
                    df = pd.DataFrame(rows)
                    return f"Query executed successfully. Results:\n{df.to_string(index=False)}"
                else:
                    return "Query executed successfully. No results found."
            else:
                # For INSERT, UPDATE, DELETE queries
                self.connection.commit()
                affected_rows = cursor.rowcount
                return f"Query executed successfully. {affected_rows} rows affected."
                
        except Exception as e:
            return f"Query execution error: {str(e)}"
    
    def get_table_schema(self, table_name: str) -> str:
        """Get schema information for a table"""
        try:
            cursor = self.connection.cursor()
            cursor.execute(f"PRAGMA table_info({table_name})")
            schema_info = cursor.fetchall()
            
            if schema_info:
                schema_str = f"Schema for table '{table_name}':\n"
                for column in schema_info:
                    schema_str += f"- {column[1]} ({column[2]}) {'NOT NULL' if column[3] else 'NULL'} {'PRIMARY KEY' if column[5] else ''}\n"
                return schema_str
            else:
                return f"Table '{table_name}' not found."
                
        except Exception as e:
            return f"Error getting schema: {str(e)}"
    
    def list_tables(self) -> str:
        """List all tables in the database"""
        try:
            cursor = self.connection.cursor()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
            tables = cursor.fetchall()
            
            if tables:
                table_list = [table[0] for table in tables]
                return f"Tables in database: {', '.join(table_list)}"
            else:
                return "No tables found in database."
                
        except Exception as e:
            return f"Error listing tables: {str(e)}"
    
    def create_sample_data(self):
        """Create sample data for demonstration"""
        try:
            cursor = self.connection.cursor()
            
            # Create employees table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS employees (
                    id INTEGER PRIMARY KEY,
                    name TEXT NOT NULL,
                    department TEXT,
                    salary REAL,
                    hire_date DATE
                )
            ''')
            
            # Insert sample data
            sample_employees = [
                (1, 'John Doe', 'Engineering', 75000, '2022-01-15'),
                (2, 'Jane Smith', 'Marketing', 65000, '2021-06-20'),
                (3, 'Bob Johnson', 'Engineering', 80000, '2020-03-10'),
                (4, 'Alice Williams', 'Sales', 70000, '2023-02-14'),
                (5, 'Charlie Brown', 'Engineering', 85000, '2019-11-30')
            ]
            
            cursor.executemany(
                'INSERT OR REPLACE INTO employees VALUES (?, ?, ?, ?, ?)',
                sample_employees
            )
            
            self.connection.commit()
            return "Sample employee data created successfully."
            
        except Exception as e:
            return f"Error creating sample data: {str(e)}"

# Create database tools
db_tool = DatabaseTool()
db_tool.create_sample_data()

database_query_tool = Tool(
    name="database_query",
    description="Execute SQL queries on the database. Use this to retrieve, insert, update, or delete data. Always use proper SQL syntax.",
    func=db_tool.execute_query
)

database_schema_tool = Tool(
    name="get_table_schema",
    description="Get the schema (structure) of a database table. Provide the table name.",
    func=db_tool.get_table_schema
)

list_tables_tool = Tool(
    name="list_tables",
    description="List all tables available in the database.",
    func=lambda x: db_tool.list_tables()
)

Data Analysis Tool โ€‹

python
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from io import StringIO
import base64

class DataAnalysisTool:
    def __init__(self):
        self.data_cache = {}
    
    def load_csv_data(self, file_path_or_data: str) -> str:
        """Load CSV data from file path or direct data"""
        try:
            if file_path_or_data.startswith('/') or file_path_or_data.endswith('.csv'):
                # File path
                df = pd.read_csv(file_path_or_data)
                data_id = file_path_or_data.split('/')[-1].replace('.csv', '')
            else:
                # Direct CSV data
                df = pd.read_csv(StringIO(file_path_or_data))
                data_id = 'uploaded_data'
            
            self.data_cache[data_id] = df
            
            info = f"Data loaded successfully as '{data_id}':\n"
            info += f"Shape: {df.shape}\n"
            info += f"Columns: {list(df.columns)}\n"
            info += f"Data types:\n{df.dtypes.to_string()}\n"
            info += f"First 5 rows:\n{df.head().to_string()}"
            
            return info
            
        except Exception as e:
            return f"Error loading data: {str(e)}"
    
    def analyze_data(self, data_id: str, analysis_type: str = "summary") -> str:
        """Perform various data analysis operations"""
        if data_id not in self.data_cache:
            return f"Data '{data_id}' not found. Please load data first."
        
        df = self.data_cache[data_id]
        
        try:
            if analysis_type == "summary":
                return self._get_summary_statistics(df)
            elif analysis_type == "missing":
                return self._get_missing_data_info(df)
            elif analysis_type == "correlation":
                return self._get_correlation_analysis(df)
            elif analysis_type == "distribution":
                return self._get_distribution_analysis(df)
            else:
                return f"Unknown analysis type: {analysis_type}"
                
        except Exception as e:
            return f"Error during analysis: {str(e)}"
    
    def _get_summary_statistics(self, df: pd.DataFrame) -> str:
        """Get summary statistics"""
        numeric_df = df.select_dtypes(include=[np.number])
        
        if numeric_df.empty:
            return "No numeric columns found for summary statistics."
        
        summary = "Summary Statistics:\n"
        summary += numeric_df.describe().to_string()
        
        # Add value counts for categorical columns
        categorical_df = df.select_dtypes(include=['object'])
        if not categorical_df.empty:
            summary += "\n\nCategorical Column Value Counts:\n"
            for col in categorical_df.columns:
                summary += f"\n{col}:\n{df[col].value_counts().head().to_string()}"
        
        return summary
    
    def _get_missing_data_info(self, df: pd.DataFrame) -> str:
        """Get missing data information"""
        missing_counts = df.isnull().sum()
        missing_percentages = (missing_counts / len(df)) * 100
        
        missing_info = "Missing Data Analysis:\n"
        missing_info += "Column | Missing Count | Missing %\n"
        missing_info += "-------|---------------|----------\n"
        
        for col in df.columns:
            missing_info += f"{col} | {missing_counts[col]} | {missing_percentages[col]:.2f}%\n"
        
        total_missing = missing_counts.sum()
        missing_info += f"\nTotal missing values: {total_missing}"
        
        return missing_info
    
    def _get_correlation_analysis(self, df: pd.DataFrame) -> str:
        """Get correlation analysis"""
        numeric_df = df.select_dtypes(include=[np.number])
        
        if numeric_df.shape[1] < 2:
            return "Need at least 2 numeric columns for correlation analysis."
        
        correlation_matrix = numeric_df.corr()
        
        analysis = "Correlation Analysis:\n"
        analysis += correlation_matrix.to_string()
        
        # Find strong correlations
        strong_correlations = []
        for i in range(len(correlation_matrix.columns)):
            for j in range(i+1, len(correlation_matrix.columns)):
                corr_value = correlation_matrix.iloc[i, j]
                if abs(corr_value) > 0.7:  # Strong correlation threshold
                    col1 = correlation_matrix.columns[i]
                    col2 = correlation_matrix.columns[j]
                    strong_correlations.append(f"{col1} โ†” {col2}: {corr_value:.3f}")
        
        if strong_correlations:
            analysis += "\n\nStrong Correlations (|r| > 0.7):\n"
            analysis += "\n".join(strong_correlations)
        
        return analysis
    
    def filter_data(self, data_id: str, filter_expression: str) -> str:
        """Filter data based on expression"""
        if data_id not in self.data_cache:
            return f"Data '{data_id}' not found."
        
        try:
            df = self.data_cache[data_id]
            filtered_df = df.query(filter_expression)
            
            filtered_id = f"{data_id}_filtered"
            self.data_cache[filtered_id] = filtered_df
            
            result = f"Data filtered successfully. New dataset '{filtered_id}' created.\n"
            result += f"Original shape: {df.shape}\n"
            result += f"Filtered shape: {filtered_df.shape}\n"
            result += f"Rows removed: {len(df) - len(filtered_df)}\n"
            result += f"First 5 rows of filtered data:\n{filtered_df.head().to_string()}"
            
            return result
            
        except Exception as e:
            return f"Error filtering data: {str(e)}"

# Create data analysis tools
data_analyzer = DataAnalysisTool()

load_data_tool = Tool(
    name="load_csv_data",
    description="Load CSV data from file path or direct CSV content. Returns data ID for future reference.",
    func=data_analyzer.load_csv_data
)

analyze_data_tool = Tool(
    name="analyze_data",
    description="Analyze loaded data. Format: 'data_id,analysis_type' where analysis_type can be 'summary', 'missing', 'correlation', or 'distribution'.",
    func=lambda input_str: data_analyzer.analyze_data(*input_str.split(','))
)

filter_data_tool = Tool(
    name="filter_data",
    description="Filter data based on conditions. Format: 'data_id,filter_expression' (e.g., 'employees,salary > 70000').",
    func=lambda input_str: data_analyzer.filter_data(*input_str.split(',', 1))
)

๐Ÿงฎ Computational Tools โ€‹

Calculator & Math Tool โ€‹

python
import math
import re
from typing import Union

class AdvancedCalculatorTool:
    def __init__(self):
        self.constants = {
            'pi': math.pi,
            'e': math.e,
            'phi': (1 + math.sqrt(5)) / 2,  # Golden ratio
        }
        
        self.functions = {
            'sqrt': math.sqrt,
            'sin': math.sin,
            'cos': math.cos,
            'tan': math.tan,
            'log': math.log,
            'ln': math.log,
            'exp': math.exp,
            'abs': abs,
            'round': round,
            'ceil': math.ceil,
            'floor': math.floor,
        }
    
    def calculate(self, expression: str) -> str:
        """Safely evaluate mathematical expressions"""
        try:
            # Clean the expression
            expression = expression.strip()
            
            # Replace constants
            for const, value in self.constants.items():
                expression = expression.replace(const, str(value))
            
            # Handle function calls
            expression = self._handle_functions(expression)
            
            # Validate expression (only allow safe characters)
            if not re.match(r'^[0-9+\-*/().\s]+$', expression):
                return "Error: Expression contains invalid characters"
            
            # Evaluate safely
            result = eval(expression)
            
            return f"Result: {result}"
            
        except ZeroDivisionError:
            return "Error: Division by zero"
        except Exception as e:
            return f"Error: {str(e)}"
    
    def _handle_functions(self, expression: str) -> str:
        """Handle function calls in expressions"""
        for func_name, func in self.functions.items():
            pattern = f'{func_name}\\(([^)]+)\\)'
            matches = re.findall(pattern, expression)
            for match in matches:
                try:
                    arg = float(eval(match))
                    result = func(arg)
                    expression = expression.replace(f'{func_name}({match})', str(result))
                except:
                    pass
        return expression
    
    def solve_equation(self, equation: str) -> str:
        """Solve simple algebraic equations"""
        try:
            # Handle simple linear equations like "2x + 5 = 15"
            if '=' in equation and 'x' in equation:
                left, right = equation.split('=')
                left = left.strip()
                right = right.strip()
                
                # Simple linear equation solver
                # This is a simplified implementation
                return f"Equation solving not fully implemented yet. Expression: {equation}"
            else:
                return self.calculate(equation)
                
        except Exception as e:
            return f"Error solving equation: {str(e)}"
    
    def unit_conversion(self, value: float, from_unit: str, to_unit: str) -> str:
        """Convert between common units"""
        conversions = {
            # Length (to meters)
            'mm': 0.001, 'cm': 0.01, 'm': 1, 'km': 1000,
            'inch': 0.0254, 'ft': 0.3048, 'yard': 0.9144, 'mile': 1609.34,
            
            # Weight (to grams)
            'mg': 0.001, 'g': 1, 'kg': 1000,
            'oz': 28.3495, 'lb': 453.592,
            
            # Temperature (special handling needed)
            'celsius': 'C', 'fahrenheit': 'F', 'kelvin': 'K',
        }
        
        try:
            from_unit = from_unit.lower()
            to_unit = to_unit.lower()
            
            # Temperature conversions
            if from_unit in ['celsius', 'fahrenheit', 'kelvin'] or to_unit in ['celsius', 'fahrenheit', 'kelvin']:
                return self._convert_temperature(value, from_unit, to_unit)
            
            # Other unit conversions
            if from_unit in conversions and to_unit in conversions:
                # Convert to base unit, then to target unit
                base_value = value * conversions[from_unit]
                result = base_value / conversions[to_unit]
                return f"{value} {from_unit} = {result} {to_unit}"
            else:
                return f"Conversion not supported: {from_unit} to {to_unit}"
                
        except Exception as e:
            return f"Error in unit conversion: {str(e)}"
    
    def _convert_temperature(self, value: float, from_unit: str, to_unit: str) -> str:
        """Convert temperature between Celsius, Fahrenheit, and Kelvin"""
        # Convert to Celsius first
        if from_unit == 'fahrenheit':
            celsius = (value - 32) * 5/9
        elif from_unit == 'kelvin':
            celsius = value - 273.15
        else:  # celsius
            celsius = value
        
        # Convert from Celsius to target
        if to_unit == 'fahrenheit':
            result = celsius * 9/5 + 32
        elif to_unit == 'kelvin':
            result = celsius + 273.15
        else:  # celsius
            result = celsius
        
        return f"{value}ยฐ {from_unit.capitalize()} = {result:.2f}ยฐ {to_unit.capitalize()}"

# Create calculator tools
calculator = AdvancedCalculatorTool()

calculator_tool = Tool(
    name="calculator",
    description="Perform mathematical calculations. Supports basic arithmetic, functions like sqrt, sin, cos, log, and constants like pi, e.",
    func=calculator.calculate
)

unit_converter_tool = Tool(
    name="unit_converter",
    description="Convert between units. Format: 'value,from_unit,to_unit' (e.g., '100,celsius,fahrenheit' or '5,km,miles').",
    func=lambda input_str: calculator.unit_conversion(*input_str.split(','))
)

equation_solver_tool = Tool(
    name="solve_equation",
    description="Solve mathematical equations or evaluate expressions.",
    func=calculator.solve_equation
)

๐Ÿ“ File System Tools โ€‹

File Operations Tool โ€‹

python
import os
import shutil
from pathlib import Path
from typing import List, Optional

class FileSystemTool:
    def __init__(self, base_directory: str = "./agent_workspace"):
        self.base_directory = Path(base_directory)
        self.base_directory.mkdir(exist_ok=True)
        
        # Security: Only allow operations within base directory
        self.allowed_directory = self.base_directory.resolve()
    
    def _is_safe_path(self, path: str) -> bool:
        """Check if path is within allowed directory"""
        try:
            requested_path = (self.base_directory / path).resolve()
            return requested_path.is_relative_to(self.allowed_directory)
        except:
            return False
    
    def read_file(self, file_path: str) -> str:
        """Read contents of a text file"""
        if not self._is_safe_path(file_path):
            return "Error: Access denied - path outside allowed directory"
        
        try:
            full_path = self.base_directory / file_path
            with open(full_path, 'r', encoding='utf-8') as file:
                content = file.read()
            
            return f"File content ({len(content)} characters):\n{content}"
            
        except FileNotFoundError:
            return f"Error: File '{file_path}' not found"
        except Exception as e:
            return f"Error reading file: {str(e)}"
    
    def write_file(self, file_path: str, content: str) -> str:
        """Write content to a file"""
        if not self._is_safe_path(file_path):
            return "Error: Access denied - path outside allowed directory"
        
        try:
            full_path = self.base_directory / file_path
            
            # Create directory if it doesn't exist
            full_path.parent.mkdir(parents=True, exist_ok=True)
            
            with open(full_path, 'w', encoding='utf-8') as file:
                file.write(content)
            
            return f"Successfully wrote {len(content)} characters to '{file_path}'"
            
        except Exception as e:
            return f"Error writing file: {str(e)}"
    
    def list_directory(self, directory_path: str = ".") -> str:
        """List contents of a directory"""
        if not self._is_safe_path(directory_path):
            return "Error: Access denied - path outside allowed directory"
        
        try:
            full_path = self.base_directory / directory_path
            
            if not full_path.exists():
                return f"Error: Directory '{directory_path}' not found"
            
            if not full_path.is_dir():
                return f"Error: '{directory_path}' is not a directory"
            
            items = []
            for item in full_path.iterdir():
                item_type = "๐Ÿ“" if item.is_dir() else "๐Ÿ“„"
                size = f" ({item.stat().st_size} bytes)" if item.is_file() else ""
                items.append(f"{item_type} {item.name}{size}")
            
            if items:
                return f"Contents of '{directory_path}':\n" + "\n".join(items)
            else:
                return f"Directory '{directory_path}' is empty"
                
        except Exception as e:
            return f"Error listing directory: {str(e)}"
    
    def create_directory(self, directory_path: str) -> str:
        """Create a new directory"""
        if not self._is_safe_path(directory_path):
            return "Error: Access denied - path outside allowed directory"
        
        try:
            full_path = self.base_directory / directory_path
            full_path.mkdir(parents=True, exist_ok=True)
            return f"Directory '{directory_path}' created successfully"
            
        except Exception as e:
            return f"Error creating directory: {str(e)}"
    
    def delete_file(self, file_path: str) -> str:
        """Delete a file"""
        if not self._is_safe_path(file_path):
            return "Error: Access denied - path outside allowed directory"
        
        try:
            full_path = self.base_directory / file_path
            
            if not full_path.exists():
                return f"Error: File '{file_path}' not found"
            
            if full_path.is_file():
                full_path.unlink()
                return f"File '{file_path}' deleted successfully"
            else:
                return f"Error: '{file_path}' is not a file"
                
        except Exception as e:
            return f"Error deleting file: {str(e)}"
    
    def search_files(self, pattern: str, directory: str = ".") -> str:
        """Search for files matching a pattern"""
        if not self._is_safe_path(directory):
            return "Error: Access denied - path outside allowed directory"
        
        try:
            full_path = self.base_directory / directory
            matching_files = []
            
            for file_path in full_path.rglob(pattern):
                if file_path.is_file():
                    relative_path = file_path.relative_to(self.base_directory)
                    matching_files.append(str(relative_path))
            
            if matching_files:
                return f"Files matching '{pattern}':\n" + "\n".join(matching_files)
            else:
                return f"No files found matching pattern '{pattern}'"
                
        except Exception as e:
            return f"Error searching files: {str(e)}"

# Create file system tools
file_system = FileSystemTool()

read_file_tool = Tool(
    name="read_file",
    description="Read the contents of a text file. Provide the file path relative to the workspace.",
    func=file_system.read_file
)

write_file_tool = Tool(
    name="write_file",
    description="Write content to a file. Format: 'file_path,content' (use ||| as separator for complex content).",
    func=lambda input_str: file_system.write_file(*input_str.split('|||', 1))
)

list_directory_tool = Tool(
    name="list_directory",
    description="List contents of a directory. Provide directory path or '.' for current directory.",
    func=file_system.list_directory
)

create_directory_tool = Tool(
    name="create_directory",
    description="Create a new directory. Provide the directory path.",
    func=file_system.create_directory
)

search_files_tool = Tool(
    name="search_files",
    description="Search for files matching a pattern. Format: 'pattern,directory' (e.g., '*.py,.' for Python files).",
    func=lambda input_str: file_system.search_files(*input_str.split(',', 1))
)

๐Ÿค– Agent Types & Implementations โ€‹

๐Ÿง  ReAct Agent (Reasoning + Acting) โ€‹

python
from langchain.agents import create_react_agent, AgentExecutor
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI

class ReActAgent:
    def __init__(self, llm, tools: List[Tool]):
        self.llm = llm
        self.tools = tools
        self.agent_executor = self._create_agent()
    
    def _create_agent(self):
        """Create ReAct agent with custom prompt"""
        
        react_prompt = PromptTemplate.from_template("""
        You are an intelligent assistant that can use tools to help answer questions and complete tasks.

        You have access to the following tools:
        {tools}

        Use the following format:

        Question: the input question you must answer
        Thought: you should always think about what to do
        Action: the action to take, should be one of [{tool_names}]
        Action Input: the input to the action
        Observation: the result of the action
        ... (this Thought/Action/Action Input/Observation can repeat N times)
        Thought: I now know the final answer
        Final Answer: the final answer to the original input question

        Important guidelines:
        1. Always explain your reasoning in the Thought section
        2. Use tools when you need external information or capabilities
        3. Break down complex tasks into smaller steps
        4. If a tool fails, try an alternative approach
        5. Provide clear, helpful final answers

        Begin!

        Question: {input}
        Thought: {agent_scratchpad}
        """)
        
        # Create agent
        agent = create_react_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=react_prompt
        )
        
        # Create agent executor
        return AgentExecutor(
            agent=agent,
            tools=self.tools,
            verbose=True,
            handle_parsing_errors=True,
            max_iterations=10,
            max_execution_time=300  # 5 minutes timeout
        )
    
    def run(self, query: str) -> str:
        """Run the agent with a query"""
        try:
            result = self.agent_executor.invoke({"input": query})
            return result["output"]
        except Exception as e:
            return f"Agent execution error: {str(e)}"
    
    def run_with_history(self, query: str, chat_history: List[tuple] = None) -> str:
        """Run agent with conversation history"""
        if chat_history:
            history_context = "\n".join([
                f"Previous Q: {q}\nPrevious A: {a}" 
                for q, a in chat_history[-3:]  # Last 3 exchanges
            ])
            
            enhanced_query = f"""
            Previous conversation context:
            {history_context}
            
            Current question: {query}
            """
        else:
            enhanced_query = query
        
        return self.run(enhanced_query)

# Create comprehensive tool list
all_tools = [
    search_tool,
    webpage_tool,
    weather_tool,
    database_query_tool,
    database_schema_tool,
    list_tables_tool,
    calculator_tool,
    unit_converter_tool,
    read_file_tool,
    write_file_tool,
    list_directory_tool,
    create_directory_tool,
    search_files_tool,
    load_data_tool,
    analyze_data_tool,
    filter_data_tool
]

# Initialize ReAct agent
llm = OpenAI(temperature=0.1)
react_agent = ReActAgent(llm, all_tools)

# Example usage
response = react_agent.run("What's the weather like in New York and can you save this information to a file?")
print(response)

๐ŸŽฏ Specialized Task Agents โ€‹

Data Analysis Agent โ€‹

python
class DataAnalysisAgent:
    def __init__(self, llm):
        self.llm = llm
        self.data_tools = [
            load_data_tool,
            analyze_data_tool,
            filter_data_tool,
            database_query_tool,
            calculator_tool
        ]
        self.agent = ReActAgent(llm, self.data_tools)
    
    def analyze_dataset(self, data_source: str, analysis_goals: List[str]) -> str:
        """Comprehensive dataset analysis"""
        goals_text = "\n".join([f"- {goal}" for goal in analysis_goals])
        
        query = f"""
        Please perform a comprehensive analysis of the dataset from: {data_source}
        
        Analysis goals:
        {goals_text}
        
        Please:
        1. Load and examine the data structure
        2. Perform summary statistics
        3. Check for missing data
        4. Analyze correlations if applicable
        5. Filter data if needed based on the goals
        6. Provide insights and recommendations
        """
        
        return self.agent.run(query)
    
    def answer_data_question(self, question: str, context: str = "") -> str:
        """Answer specific questions about data"""
        enhanced_query = f"""
        Context: {context}
        
        Data question: {question}
        
        Please use the available data tools to investigate and answer this question thoroughly.
        """
        
        return self.agent.run(enhanced_query)

# Usage
data_agent = DataAnalysisAgent(llm)
analysis_result = data_agent.analyze_dataset(
    "employees.csv",
    [
        "Identify salary distribution by department",
        "Find employees hired in the last 2 years",
        "Calculate average salary by department"
    ]
)

Research Agent โ€‹

python
class ResearchAgent:
    def __init__(self, llm):
        self.llm = llm
        self.research_tools = [
            search_tool,
            webpage_tool,
            write_file_tool,
            read_file_tool,
            create_directory_tool
        ]
        self.agent = ReActAgent(llm, self.research_tools)
    
    def conduct_research(self, topic: str, research_depth: str = "comprehensive") -> str:
        """Conduct research on a topic"""
        
        if research_depth == "quick":
            query = f"""
            Conduct quick research on: {topic}
            
            Please:
            1. Search for current information about {topic}
            2. Summarize key findings
            3. Save a brief summary to a file named '{topic.replace(' ', '_')}_quick_research.txt'
            """
        else:  # comprehensive
            query = f"""
            Conduct comprehensive research on: {topic}
            
            Please:
            1. Search for multiple sources of information about {topic}
            2. Visit key websites to gather detailed information
            3. Analyze and synthesize the information
            4. Create a detailed research report
            5. Save the report to a file named '{topic.replace(' ', '_')}_research_report.txt'
            6. Include sources and key findings
            """
        
        return self.agent.run(query)
    
    def compare_topics(self, topic1: str, topic2: str) -> str:
        """Compare two topics through research"""
        query = f"""
        Research and compare: {topic1} vs {topic2}
        
        Please:
        1. Research both {topic1} and {topic2}
        2. Identify key similarities and differences
        3. Create a comparison analysis
        4. Save the comparison to a file named '{topic1.replace(' ', '_')}_vs_{topic2.replace(' ', '_')}_comparison.txt'
        """
        
        return self.agent.run(query)

# Usage
research_agent = ResearchAgent(llm)
research_result = research_agent.conduct_research(
    "Large Language Models in 2024",
    research_depth="comprehensive"
)

๐Ÿ”„ Multi-Agent Coordination โ€‹

python
class MultiAgentOrchestrator:
    def __init__(self, llm):
        self.llm = llm
        self.agents = {
            'research': ResearchAgent(llm),
            'data': DataAnalysisAgent(llm),
            'general': ReActAgent(llm, all_tools)
        }
    
    def delegate_task(self, task: str, preferred_agent: str = None) -> str:
        """Delegate task to appropriate agent"""
        
        if preferred_agent and preferred_agent in self.agents:
            agent = self.agents[preferred_agent]
        else:
            # Auto-select agent based on task content
            agent = self._select_agent(task)
        
        return agent.run(task)
    
    def _select_agent(self, task: str) -> ReActAgent:
        """Select appropriate agent based on task content"""
        task_lower = task.lower()
        
        research_keywords = ['research', 'find information', 'search', 'investigate', 'study']
        data_keywords = ['analyze', 'data', 'statistics', 'database', 'calculate', 'csv']
        
        if any(keyword in task_lower for keyword in research_keywords):
            return self.agents['research']
        elif any(keyword in task_lower for keyword in data_keywords):
            return self.agents['data']
        else:
            return self.agents['general']
    
    def collaborative_task(self, task: str, required_agents: List[str]) -> str:
        """Execute task requiring multiple agents"""
        results = {}
        
        for agent_name in required_agents:
            if agent_name in self.agents:
                agent_task = f"Your part of the collaborative task: {task}"
                results[agent_name] = self.agents[agent_name].run(agent_task)
        
        # Combine results
        combined_result = "Collaborative Task Results:\n\n"
        for agent_name, result in results.items():
            combined_result += f"=== {agent_name.upper()} AGENT ===\n{result}\n\n"
        
        return combined_result

# Usage
orchestrator = MultiAgentOrchestrator(llm)

# Auto-delegated task
result = orchestrator.delegate_task("Research the latest trends in machine learning and analyze any available performance data")

# Collaborative task
collaborative_result = orchestrator.collaborative_task(
    "Create a comprehensive report on renewable energy adoption with supporting data analysis",
    ['research', 'data']
)

๐Ÿš€ Getting Started with Agent Tooling โ€‹

๐Ÿ“š Simple Agent Setup โ€‹

python
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from langchain.tools import Tool

# Define a simple tool
def simple_calculator(expression: str) -> str:
    try:
        result = eval(expression)
        return f"Result: {result}"
    except:
        return "Error in calculation"

# Create tool
calc_tool = Tool(
    name="calculator",
    description="Perform basic math calculations",
    func=simple_calculator
)

# Initialize agent
llm = OpenAI(temperature=0)
agent = initialize_agent(
    tools=[calc_tool],
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# Run agent
response = agent.run("What is 15 * 7 + 23?")
print(response)

๐ŸŽฏ Next Steps โ€‹

  1. Custom Tool Development: Create domain-specific tools for your use cases
  2. Agent Optimization: Fine-tune agent prompts and reasoning patterns
  3. Error Handling: Implement robust error handling and recovery
  4. Multi-Agent Systems: Build coordinated agent workflows
  5. Production Deployment: Scale agents for real-world applications

๐Ÿ“– Additional Resources โ€‹

  • Custom Tool Creation: Advanced tool development patterns
  • Agent Security: Safe execution and sandboxing techniques
  • Performance Optimization: Scaling agent systems
  • Agent Evaluation: Testing and monitoring agent performance

Master Agent Tooling to build autonomous AI systems that can reason, plan, and execute complex tasks by leveraging external tools and APIs.

Released under the MIT License.