Agent Tooling - Intelligent Action Systems โ
Build autonomous AI agents that can interact with external systems and perform complex tasks
๐ค What is Agent Tooling? โ
Agent Tooling in LangChain enables language models to interact with external systems, APIs, and tools to perform actions beyond text generation. It transforms static language models into dynamic agents capable of reasoning, planning, and executing real-world tasks.
Simple Analogy: Think of Agent Tooling as giving an AI assistant "hands and eyes" - just like how a human assistant can use a calculator, search the web, send emails, or call APIs, LangChain agents can use tools to interact with the digital world and accomplish complex objectives.
๐ฏ Agent Architecture Overview โ
๐ค LANGCHAIN AGENT ARCHITECTURE ๐ค
(Intelligent Action & Reasoning)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ๐ง AGENT BRAIN โ
โ "Decision Making Core" โ
โโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโ
โ ๐ฏ REASONING PROCESS โ
โโโโฌโโโโโโโโโฌโโโโโโโโโฌโโโโโโโโโฌโโโโโโโโโโโ
โ โ โ โ
โผ โผ โผ โผ
โโโโโโโโโโโฌโโโโโโโโโโฌโโโโโโโโโโฌโโโโโโโโโโ
โ๐ค THINK โ๐ PLAN โ๐ง ACT โ๐ OBSERVEโ
โ โ โ โ โ
โAnalyze โCreate โExecute โCheck โ
โProblem โStrategy โTools โResults โ
โโโโโโฌโโโโโดโโโโโฌโโโโโดโโโโโฌโโโโโดโโโโโฌโโโโโ
โ โ โ โ
โผ โผ โผ โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ๐ ๏ธ TOOL ECOSYSTEM โ
โ โ
โ ๐ Web APIs ๐ Databases ๐ง Email ๐งฎ Calculators โ
โ ๐ Search ๐ Files ๐ Documents ๐จ Image Gen โ
โ ๐ฌ Chat ๐ Analytics ๐ง Custom โก Real-time โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ๐ ๏ธ Core Tool Types โ
๐ Web & API Tools โ
Connecting agents to external web services and APIs
Basic Web Search Tool โ
from langchain.tools import DuckDuckGoSearchRun
from langchain.utilities import SerpAPIWrapper
import requests
from typing import Optional
class WebSearchTool:
def __init__(self, search_provider: str = "duckduckgo"):
self.search_provider = search_provider
if search_provider == "duckduckgo":
self.search = DuckDuckGoSearchRun()
elif search_provider == "serpapi":
self.search = SerpAPIWrapper()
def search_web(self, query: str, num_results: int = 5) -> str:
"""Search the web for information"""
try:
if self.search_provider == "duckduckgo":
results = self.search.run(query)
return results
else:
results = self.search.run(query)
return results
except Exception as e:
return f"Error searching web: {str(e)}"
def get_webpage_content(self, url: str) -> str:
"""Fetch content from a specific webpage"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text content
text = soup.get_text()
# Clean up whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text[:2000] # Limit to 2000 characters
except Exception as e:
return f"Error fetching webpage: {str(e)}"
# Usage as LangChain Tool
from langchain.tools import Tool
web_search_tool = WebSearchTool()
search_tool = Tool(
name="web_search",
description="Search the web for current information. Use this when you need up-to-date information that might not be in your training data.",
func=web_search_tool.search_web
)
webpage_tool = Tool(
name="get_webpage",
description="Get the text content of a specific webpage. Provide the full URL.",
func=web_search_tool.get_webpage_content
)Custom API Integration Tool โ
import json
from typing import Dict, Any, Optional
class APIIntegrationTool:
def __init__(self, base_url: str, api_key: Optional[str] = None, headers: Optional[Dict] = None):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.default_headers = {
'Content-Type': 'application/json',
'User-Agent': 'LangChain-Agent/1.0'
}
if headers:
self.default_headers.update(headers)
if api_key:
self.default_headers['Authorization'] = f'Bearer {api_key}'
def make_api_request(self, endpoint: str, method: str = "GET", data: Optional[Dict] = None, params: Optional[Dict] = None) -> str:
"""Make a request to the API"""
try:
url = f"{self.base_url}/{endpoint.lstrip('/')}"
if method.upper() == "GET":
response = requests.get(url, headers=self.default_headers, params=params, timeout=30)
elif method.upper() == "POST":
response = requests.post(url, headers=self.default_headers, json=data, params=params, timeout=30)
elif method.upper() == "PUT":
response = requests.put(url, headers=self.default_headers, json=data, params=params, timeout=30)
elif method.upper() == "DELETE":
response = requests.delete(url, headers=self.default_headers, params=params, timeout=30)
else:
return f"Unsupported HTTP method: {method}"
response.raise_for_status()
# Try to parse JSON response
try:
result = response.json()
return json.dumps(result, indent=2)
except json.JSONDecodeError:
return response.text
except requests.exceptions.RequestException as e:
return f"API request failed: {str(e)}"
except Exception as e:
return f"Unexpected error: {str(e)}"
def get_weather(self, city: str) -> str:
"""Get weather information for a city"""
# Example using a weather API
params = {'q': city, 'appid': self.api_key, 'units': 'metric'}
return self.make_api_request('weather', 'GET', params=params)
def send_notification(self, message: str, recipient: str) -> str:
"""Send a notification"""
data = {'message': message, 'recipient': recipient}
return self.make_api_request('notifications', 'POST', data=data)
# Create tools from API integration
weather_api = APIIntegrationTool(
base_url="https://api.openweathermap.org/data/2.5",
api_key="your-weather-api-key"
)
weather_tool = Tool(
name="get_weather",
description="Get current weather information for a city. Provide the city name.",
func=weather_api.get_weather
)
notification_tool = Tool(
name="send_notification",
description="Send a notification message. Provide the message and recipient.",
func=lambda input_str: weather_api.send_notification(*input_str.split(',', 1))
)๐ Data & Database Tools โ
Database Query Tool โ
import sqlite3
import pandas as pd
from typing import List, Dict, Any
class DatabaseTool:
def __init__(self, database_path: str = ":memory:"):
self.database_path = database_path
self.connection = None
self._connect()
def _connect(self):
"""Establish database connection"""
try:
self.connection = sqlite3.connect(self.database_path)
self.connection.row_factory = sqlite3.Row # Enable column access by name
except Exception as e:
print(f"Database connection error: {e}")
def execute_query(self, query: str) -> str:
"""Execute SQL query and return results"""
try:
cursor = self.connection.cursor()
cursor.execute(query)
if query.strip().upper().startswith(('SELECT', 'WITH')):
# For SELECT queries, return results
results = cursor.fetchall()
if results:
# Convert to list of dictionaries
rows = [dict(row) for row in results]
df = pd.DataFrame(rows)
return f"Query executed successfully. Results:\n{df.to_string(index=False)}"
else:
return "Query executed successfully. No results found."
else:
# For INSERT, UPDATE, DELETE queries
self.connection.commit()
affected_rows = cursor.rowcount
return f"Query executed successfully. {affected_rows} rows affected."
except Exception as e:
return f"Query execution error: {str(e)}"
def get_table_schema(self, table_name: str) -> str:
"""Get schema information for a table"""
try:
cursor = self.connection.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
schema_info = cursor.fetchall()
if schema_info:
schema_str = f"Schema for table '{table_name}':\n"
for column in schema_info:
schema_str += f"- {column[1]} ({column[2]}) {'NOT NULL' if column[3] else 'NULL'} {'PRIMARY KEY' if column[5] else ''}\n"
return schema_str
else:
return f"Table '{table_name}' not found."
except Exception as e:
return f"Error getting schema: {str(e)}"
def list_tables(self) -> str:
"""List all tables in the database"""
try:
cursor = self.connection.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
if tables:
table_list = [table[0] for table in tables]
return f"Tables in database: {', '.join(table_list)}"
else:
return "No tables found in database."
except Exception as e:
return f"Error listing tables: {str(e)}"
def create_sample_data(self):
"""Create sample data for demonstration"""
try:
cursor = self.connection.cursor()
# Create employees table
cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
department TEXT,
salary REAL,
hire_date DATE
)
''')
# Insert sample data
sample_employees = [
(1, 'John Doe', 'Engineering', 75000, '2022-01-15'),
(2, 'Jane Smith', 'Marketing', 65000, '2021-06-20'),
(3, 'Bob Johnson', 'Engineering', 80000, '2020-03-10'),
(4, 'Alice Williams', 'Sales', 70000, '2023-02-14'),
(5, 'Charlie Brown', 'Engineering', 85000, '2019-11-30')
]
cursor.executemany(
'INSERT OR REPLACE INTO employees VALUES (?, ?, ?, ?, ?)',
sample_employees
)
self.connection.commit()
return "Sample employee data created successfully."
except Exception as e:
return f"Error creating sample data: {str(e)}"
# Create database tools
db_tool = DatabaseTool()
db_tool.create_sample_data()
database_query_tool = Tool(
name="database_query",
description="Execute SQL queries on the database. Use this to retrieve, insert, update, or delete data. Always use proper SQL syntax.",
func=db_tool.execute_query
)
database_schema_tool = Tool(
name="get_table_schema",
description="Get the schema (structure) of a database table. Provide the table name.",
func=db_tool.get_table_schema
)
list_tables_tool = Tool(
name="list_tables",
description="List all tables available in the database.",
func=lambda x: db_tool.list_tables()
)Data Analysis Tool โ
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from io import StringIO
import base64
class DataAnalysisTool:
def __init__(self):
self.data_cache = {}
def load_csv_data(self, file_path_or_data: str) -> str:
"""Load CSV data from file path or direct data"""
try:
if file_path_or_data.startswith('/') or file_path_or_data.endswith('.csv'):
# File path
df = pd.read_csv(file_path_or_data)
data_id = file_path_or_data.split('/')[-1].replace('.csv', '')
else:
# Direct CSV data
df = pd.read_csv(StringIO(file_path_or_data))
data_id = 'uploaded_data'
self.data_cache[data_id] = df
info = f"Data loaded successfully as '{data_id}':\n"
info += f"Shape: {df.shape}\n"
info += f"Columns: {list(df.columns)}\n"
info += f"Data types:\n{df.dtypes.to_string()}\n"
info += f"First 5 rows:\n{df.head().to_string()}"
return info
except Exception as e:
return f"Error loading data: {str(e)}"
def analyze_data(self, data_id: str, analysis_type: str = "summary") -> str:
"""Perform various data analysis operations"""
if data_id not in self.data_cache:
return f"Data '{data_id}' not found. Please load data first."
df = self.data_cache[data_id]
try:
if analysis_type == "summary":
return self._get_summary_statistics(df)
elif analysis_type == "missing":
return self._get_missing_data_info(df)
elif analysis_type == "correlation":
return self._get_correlation_analysis(df)
elif analysis_type == "distribution":
return self._get_distribution_analysis(df)
else:
return f"Unknown analysis type: {analysis_type}"
except Exception as e:
return f"Error during analysis: {str(e)}"
def _get_summary_statistics(self, df: pd.DataFrame) -> str:
"""Get summary statistics"""
numeric_df = df.select_dtypes(include=[np.number])
if numeric_df.empty:
return "No numeric columns found for summary statistics."
summary = "Summary Statistics:\n"
summary += numeric_df.describe().to_string()
# Add value counts for categorical columns
categorical_df = df.select_dtypes(include=['object'])
if not categorical_df.empty:
summary += "\n\nCategorical Column Value Counts:\n"
for col in categorical_df.columns:
summary += f"\n{col}:\n{df[col].value_counts().head().to_string()}"
return summary
def _get_missing_data_info(self, df: pd.DataFrame) -> str:
"""Get missing data information"""
missing_counts = df.isnull().sum()
missing_percentages = (missing_counts / len(df)) * 100
missing_info = "Missing Data Analysis:\n"
missing_info += "Column | Missing Count | Missing %\n"
missing_info += "-------|---------------|----------\n"
for col in df.columns:
missing_info += f"{col} | {missing_counts[col]} | {missing_percentages[col]:.2f}%\n"
total_missing = missing_counts.sum()
missing_info += f"\nTotal missing values: {total_missing}"
return missing_info
def _get_correlation_analysis(self, df: pd.DataFrame) -> str:
"""Get correlation analysis"""
numeric_df = df.select_dtypes(include=[np.number])
if numeric_df.shape[1] < 2:
return "Need at least 2 numeric columns for correlation analysis."
correlation_matrix = numeric_df.corr()
analysis = "Correlation Analysis:\n"
analysis += correlation_matrix.to_string()
# Find strong correlations
strong_correlations = []
for i in range(len(correlation_matrix.columns)):
for j in range(i+1, len(correlation_matrix.columns)):
corr_value = correlation_matrix.iloc[i, j]
if abs(corr_value) > 0.7: # Strong correlation threshold
col1 = correlation_matrix.columns[i]
col2 = correlation_matrix.columns[j]
strong_correlations.append(f"{col1} โ {col2}: {corr_value:.3f}")
if strong_correlations:
analysis += "\n\nStrong Correlations (|r| > 0.7):\n"
analysis += "\n".join(strong_correlations)
return analysis
def filter_data(self, data_id: str, filter_expression: str) -> str:
"""Filter data based on expression"""
if data_id not in self.data_cache:
return f"Data '{data_id}' not found."
try:
df = self.data_cache[data_id]
filtered_df = df.query(filter_expression)
filtered_id = f"{data_id}_filtered"
self.data_cache[filtered_id] = filtered_df
result = f"Data filtered successfully. New dataset '{filtered_id}' created.\n"
result += f"Original shape: {df.shape}\n"
result += f"Filtered shape: {filtered_df.shape}\n"
result += f"Rows removed: {len(df) - len(filtered_df)}\n"
result += f"First 5 rows of filtered data:\n{filtered_df.head().to_string()}"
return result
except Exception as e:
return f"Error filtering data: {str(e)}"
# Create data analysis tools
data_analyzer = DataAnalysisTool()
load_data_tool = Tool(
name="load_csv_data",
description="Load CSV data from file path or direct CSV content. Returns data ID for future reference.",
func=data_analyzer.load_csv_data
)
analyze_data_tool = Tool(
name="analyze_data",
description="Analyze loaded data. Format: 'data_id,analysis_type' where analysis_type can be 'summary', 'missing', 'correlation', or 'distribution'.",
func=lambda input_str: data_analyzer.analyze_data(*input_str.split(','))
)
filter_data_tool = Tool(
name="filter_data",
description="Filter data based on conditions. Format: 'data_id,filter_expression' (e.g., 'employees,salary > 70000').",
func=lambda input_str: data_analyzer.filter_data(*input_str.split(',', 1))
)๐งฎ Computational Tools โ
Calculator & Math Tool โ
import math
import re
from typing import Union
class AdvancedCalculatorTool:
def __init__(self):
self.constants = {
'pi': math.pi,
'e': math.e,
'phi': (1 + math.sqrt(5)) / 2, # Golden ratio
}
self.functions = {
'sqrt': math.sqrt,
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'log': math.log,
'ln': math.log,
'exp': math.exp,
'abs': abs,
'round': round,
'ceil': math.ceil,
'floor': math.floor,
}
def calculate(self, expression: str) -> str:
"""Safely evaluate mathematical expressions"""
try:
# Clean the expression
expression = expression.strip()
# Replace constants
for const, value in self.constants.items():
expression = expression.replace(const, str(value))
# Handle function calls
expression = self._handle_functions(expression)
# Validate expression (only allow safe characters)
if not re.match(r'^[0-9+\-*/().\s]+$', expression):
return "Error: Expression contains invalid characters"
# Evaluate safely
result = eval(expression)
return f"Result: {result}"
except ZeroDivisionError:
return "Error: Division by zero"
except Exception as e:
return f"Error: {str(e)}"
def _handle_functions(self, expression: str) -> str:
"""Handle function calls in expressions"""
for func_name, func in self.functions.items():
pattern = f'{func_name}\\(([^)]+)\\)'
matches = re.findall(pattern, expression)
for match in matches:
try:
arg = float(eval(match))
result = func(arg)
expression = expression.replace(f'{func_name}({match})', str(result))
except:
pass
return expression
def solve_equation(self, equation: str) -> str:
"""Solve simple algebraic equations"""
try:
# Handle simple linear equations like "2x + 5 = 15"
if '=' in equation and 'x' in equation:
left, right = equation.split('=')
left = left.strip()
right = right.strip()
# Simple linear equation solver
# This is a simplified implementation
return f"Equation solving not fully implemented yet. Expression: {equation}"
else:
return self.calculate(equation)
except Exception as e:
return f"Error solving equation: {str(e)}"
def unit_conversion(self, value: float, from_unit: str, to_unit: str) -> str:
"""Convert between common units"""
conversions = {
# Length (to meters)
'mm': 0.001, 'cm': 0.01, 'm': 1, 'km': 1000,
'inch': 0.0254, 'ft': 0.3048, 'yard': 0.9144, 'mile': 1609.34,
# Weight (to grams)
'mg': 0.001, 'g': 1, 'kg': 1000,
'oz': 28.3495, 'lb': 453.592,
# Temperature (special handling needed)
'celsius': 'C', 'fahrenheit': 'F', 'kelvin': 'K',
}
try:
from_unit = from_unit.lower()
to_unit = to_unit.lower()
# Temperature conversions
if from_unit in ['celsius', 'fahrenheit', 'kelvin'] or to_unit in ['celsius', 'fahrenheit', 'kelvin']:
return self._convert_temperature(value, from_unit, to_unit)
# Other unit conversions
if from_unit in conversions and to_unit in conversions:
# Convert to base unit, then to target unit
base_value = value * conversions[from_unit]
result = base_value / conversions[to_unit]
return f"{value} {from_unit} = {result} {to_unit}"
else:
return f"Conversion not supported: {from_unit} to {to_unit}"
except Exception as e:
return f"Error in unit conversion: {str(e)}"
def _convert_temperature(self, value: float, from_unit: str, to_unit: str) -> str:
"""Convert temperature between Celsius, Fahrenheit, and Kelvin"""
# Convert to Celsius first
if from_unit == 'fahrenheit':
celsius = (value - 32) * 5/9
elif from_unit == 'kelvin':
celsius = value - 273.15
else: # celsius
celsius = value
# Convert from Celsius to target
if to_unit == 'fahrenheit':
result = celsius * 9/5 + 32
elif to_unit == 'kelvin':
result = celsius + 273.15
else: # celsius
result = celsius
return f"{value}ยฐ {from_unit.capitalize()} = {result:.2f}ยฐ {to_unit.capitalize()}"
# Create calculator tools
calculator = AdvancedCalculatorTool()
calculator_tool = Tool(
name="calculator",
description="Perform mathematical calculations. Supports basic arithmetic, functions like sqrt, sin, cos, log, and constants like pi, e.",
func=calculator.calculate
)
unit_converter_tool = Tool(
name="unit_converter",
description="Convert between units. Format: 'value,from_unit,to_unit' (e.g., '100,celsius,fahrenheit' or '5,km,miles').",
func=lambda input_str: calculator.unit_conversion(*input_str.split(','))
)
equation_solver_tool = Tool(
name="solve_equation",
description="Solve mathematical equations or evaluate expressions.",
func=calculator.solve_equation
)๐ File System Tools โ
File Operations Tool โ
import os
import shutil
from pathlib import Path
from typing import List, Optional
class FileSystemTool:
def __init__(self, base_directory: str = "./agent_workspace"):
self.base_directory = Path(base_directory)
self.base_directory.mkdir(exist_ok=True)
# Security: Only allow operations within base directory
self.allowed_directory = self.base_directory.resolve()
def _is_safe_path(self, path: str) -> bool:
"""Check if path is within allowed directory"""
try:
requested_path = (self.base_directory / path).resolve()
return requested_path.is_relative_to(self.allowed_directory)
except:
return False
def read_file(self, file_path: str) -> str:
"""Read contents of a text file"""
if not self._is_safe_path(file_path):
return "Error: Access denied - path outside allowed directory"
try:
full_path = self.base_directory / file_path
with open(full_path, 'r', encoding='utf-8') as file:
content = file.read()
return f"File content ({len(content)} characters):\n{content}"
except FileNotFoundError:
return f"Error: File '{file_path}' not found"
except Exception as e:
return f"Error reading file: {str(e)}"
def write_file(self, file_path: str, content: str) -> str:
"""Write content to a file"""
if not self._is_safe_path(file_path):
return "Error: Access denied - path outside allowed directory"
try:
full_path = self.base_directory / file_path
# Create directory if it doesn't exist
full_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as file:
file.write(content)
return f"Successfully wrote {len(content)} characters to '{file_path}'"
except Exception as e:
return f"Error writing file: {str(e)}"
def list_directory(self, directory_path: str = ".") -> str:
"""List contents of a directory"""
if not self._is_safe_path(directory_path):
return "Error: Access denied - path outside allowed directory"
try:
full_path = self.base_directory / directory_path
if not full_path.exists():
return f"Error: Directory '{directory_path}' not found"
if not full_path.is_dir():
return f"Error: '{directory_path}' is not a directory"
items = []
for item in full_path.iterdir():
item_type = "๐" if item.is_dir() else "๐"
size = f" ({item.stat().st_size} bytes)" if item.is_file() else ""
items.append(f"{item_type} {item.name}{size}")
if items:
return f"Contents of '{directory_path}':\n" + "\n".join(items)
else:
return f"Directory '{directory_path}' is empty"
except Exception as e:
return f"Error listing directory: {str(e)}"
def create_directory(self, directory_path: str) -> str:
"""Create a new directory"""
if not self._is_safe_path(directory_path):
return "Error: Access denied - path outside allowed directory"
try:
full_path = self.base_directory / directory_path
full_path.mkdir(parents=True, exist_ok=True)
return f"Directory '{directory_path}' created successfully"
except Exception as e:
return f"Error creating directory: {str(e)}"
def delete_file(self, file_path: str) -> str:
"""Delete a file"""
if not self._is_safe_path(file_path):
return "Error: Access denied - path outside allowed directory"
try:
full_path = self.base_directory / file_path
if not full_path.exists():
return f"Error: File '{file_path}' not found"
if full_path.is_file():
full_path.unlink()
return f"File '{file_path}' deleted successfully"
else:
return f"Error: '{file_path}' is not a file"
except Exception as e:
return f"Error deleting file: {str(e)}"
def search_files(self, pattern: str, directory: str = ".") -> str:
"""Search for files matching a pattern"""
if not self._is_safe_path(directory):
return "Error: Access denied - path outside allowed directory"
try:
full_path = self.base_directory / directory
matching_files = []
for file_path in full_path.rglob(pattern):
if file_path.is_file():
relative_path = file_path.relative_to(self.base_directory)
matching_files.append(str(relative_path))
if matching_files:
return f"Files matching '{pattern}':\n" + "\n".join(matching_files)
else:
return f"No files found matching pattern '{pattern}'"
except Exception as e:
return f"Error searching files: {str(e)}"
# Create file system tools
file_system = FileSystemTool()
read_file_tool = Tool(
name="read_file",
description="Read the contents of a text file. Provide the file path relative to the workspace.",
func=file_system.read_file
)
write_file_tool = Tool(
name="write_file",
description="Write content to a file. Format: 'file_path,content' (use ||| as separator for complex content).",
func=lambda input_str: file_system.write_file(*input_str.split('|||', 1))
)
list_directory_tool = Tool(
name="list_directory",
description="List contents of a directory. Provide directory path or '.' for current directory.",
func=file_system.list_directory
)
create_directory_tool = Tool(
name="create_directory",
description="Create a new directory. Provide the directory path.",
func=file_system.create_directory
)
search_files_tool = Tool(
name="search_files",
description="Search for files matching a pattern. Format: 'pattern,directory' (e.g., '*.py,.' for Python files).",
func=lambda input_str: file_system.search_files(*input_str.split(',', 1))
)๐ค Agent Types & Implementations โ
๐ง ReAct Agent (Reasoning + Acting) โ
from langchain.agents import create_react_agent, AgentExecutor
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
class ReActAgent:
def __init__(self, llm, tools: List[Tool]):
self.llm = llm
self.tools = tools
self.agent_executor = self._create_agent()
def _create_agent(self):
"""Create ReAct agent with custom prompt"""
react_prompt = PromptTemplate.from_template("""
You are an intelligent assistant that can use tools to help answer questions and complete tasks.
You have access to the following tools:
{tools}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Important guidelines:
1. Always explain your reasoning in the Thought section
2. Use tools when you need external information or capabilities
3. Break down complex tasks into smaller steps
4. If a tool fails, try an alternative approach
5. Provide clear, helpful final answers
Begin!
Question: {input}
Thought: {agent_scratchpad}
""")
# Create agent
agent = create_react_agent(
llm=self.llm,
tools=self.tools,
prompt=react_prompt
)
# Create agent executor
return AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True,
handle_parsing_errors=True,
max_iterations=10,
max_execution_time=300 # 5 minutes timeout
)
def run(self, query: str) -> str:
"""Run the agent with a query"""
try:
result = self.agent_executor.invoke({"input": query})
return result["output"]
except Exception as e:
return f"Agent execution error: {str(e)}"
def run_with_history(self, query: str, chat_history: List[tuple] = None) -> str:
"""Run agent with conversation history"""
if chat_history:
history_context = "\n".join([
f"Previous Q: {q}\nPrevious A: {a}"
for q, a in chat_history[-3:] # Last 3 exchanges
])
enhanced_query = f"""
Previous conversation context:
{history_context}
Current question: {query}
"""
else:
enhanced_query = query
return self.run(enhanced_query)
# Create comprehensive tool list
all_tools = [
search_tool,
webpage_tool,
weather_tool,
database_query_tool,
database_schema_tool,
list_tables_tool,
calculator_tool,
unit_converter_tool,
read_file_tool,
write_file_tool,
list_directory_tool,
create_directory_tool,
search_files_tool,
load_data_tool,
analyze_data_tool,
filter_data_tool
]
# Initialize ReAct agent
llm = OpenAI(temperature=0.1)
react_agent = ReActAgent(llm, all_tools)
# Example usage
response = react_agent.run("What's the weather like in New York and can you save this information to a file?")
print(response)๐ฏ Specialized Task Agents โ
Data Analysis Agent โ
class DataAnalysisAgent:
def __init__(self, llm):
self.llm = llm
self.data_tools = [
load_data_tool,
analyze_data_tool,
filter_data_tool,
database_query_tool,
calculator_tool
]
self.agent = ReActAgent(llm, self.data_tools)
def analyze_dataset(self, data_source: str, analysis_goals: List[str]) -> str:
"""Comprehensive dataset analysis"""
goals_text = "\n".join([f"- {goal}" for goal in analysis_goals])
query = f"""
Please perform a comprehensive analysis of the dataset from: {data_source}
Analysis goals:
{goals_text}
Please:
1. Load and examine the data structure
2. Perform summary statistics
3. Check for missing data
4. Analyze correlations if applicable
5. Filter data if needed based on the goals
6. Provide insights and recommendations
"""
return self.agent.run(query)
def answer_data_question(self, question: str, context: str = "") -> str:
"""Answer specific questions about data"""
enhanced_query = f"""
Context: {context}
Data question: {question}
Please use the available data tools to investigate and answer this question thoroughly.
"""
return self.agent.run(enhanced_query)
# Usage
data_agent = DataAnalysisAgent(llm)
analysis_result = data_agent.analyze_dataset(
"employees.csv",
[
"Identify salary distribution by department",
"Find employees hired in the last 2 years",
"Calculate average salary by department"
]
)Research Agent โ
class ResearchAgent:
def __init__(self, llm):
self.llm = llm
self.research_tools = [
search_tool,
webpage_tool,
write_file_tool,
read_file_tool,
create_directory_tool
]
self.agent = ReActAgent(llm, self.research_tools)
def conduct_research(self, topic: str, research_depth: str = "comprehensive") -> str:
"""Conduct research on a topic"""
if research_depth == "quick":
query = f"""
Conduct quick research on: {topic}
Please:
1. Search for current information about {topic}
2. Summarize key findings
3. Save a brief summary to a file named '{topic.replace(' ', '_')}_quick_research.txt'
"""
else: # comprehensive
query = f"""
Conduct comprehensive research on: {topic}
Please:
1. Search for multiple sources of information about {topic}
2. Visit key websites to gather detailed information
3. Analyze and synthesize the information
4. Create a detailed research report
5. Save the report to a file named '{topic.replace(' ', '_')}_research_report.txt'
6. Include sources and key findings
"""
return self.agent.run(query)
def compare_topics(self, topic1: str, topic2: str) -> str:
"""Compare two topics through research"""
query = f"""
Research and compare: {topic1} vs {topic2}
Please:
1. Research both {topic1} and {topic2}
2. Identify key similarities and differences
3. Create a comparison analysis
4. Save the comparison to a file named '{topic1.replace(' ', '_')}_vs_{topic2.replace(' ', '_')}_comparison.txt'
"""
return self.agent.run(query)
# Usage
research_agent = ResearchAgent(llm)
research_result = research_agent.conduct_research(
"Large Language Models in 2024",
research_depth="comprehensive"
)๐ Multi-Agent Coordination โ
class MultiAgentOrchestrator:
def __init__(self, llm):
self.llm = llm
self.agents = {
'research': ResearchAgent(llm),
'data': DataAnalysisAgent(llm),
'general': ReActAgent(llm, all_tools)
}
def delegate_task(self, task: str, preferred_agent: str = None) -> str:
"""Delegate task to appropriate agent"""
if preferred_agent and preferred_agent in self.agents:
agent = self.agents[preferred_agent]
else:
# Auto-select agent based on task content
agent = self._select_agent(task)
return agent.run(task)
def _select_agent(self, task: str) -> ReActAgent:
"""Select appropriate agent based on task content"""
task_lower = task.lower()
research_keywords = ['research', 'find information', 'search', 'investigate', 'study']
data_keywords = ['analyze', 'data', 'statistics', 'database', 'calculate', 'csv']
if any(keyword in task_lower for keyword in research_keywords):
return self.agents['research']
elif any(keyword in task_lower for keyword in data_keywords):
return self.agents['data']
else:
return self.agents['general']
def collaborative_task(self, task: str, required_agents: List[str]) -> str:
"""Execute task requiring multiple agents"""
results = {}
for agent_name in required_agents:
if agent_name in self.agents:
agent_task = f"Your part of the collaborative task: {task}"
results[agent_name] = self.agents[agent_name].run(agent_task)
# Combine results
combined_result = "Collaborative Task Results:\n\n"
for agent_name, result in results.items():
combined_result += f"=== {agent_name.upper()} AGENT ===\n{result}\n\n"
return combined_result
# Usage
orchestrator = MultiAgentOrchestrator(llm)
# Auto-delegated task
result = orchestrator.delegate_task("Research the latest trends in machine learning and analyze any available performance data")
# Collaborative task
collaborative_result = orchestrator.collaborative_task(
"Create a comprehensive report on renewable energy adoption with supporting data analysis",
['research', 'data']
)๐ Getting Started with Agent Tooling โ
๐ Simple Agent Setup โ
from langchain.agents import initialize_agent, AgentType
from langchain.llms import OpenAI
from langchain.tools import Tool
# Define a simple tool
def simple_calculator(expression: str) -> str:
try:
result = eval(expression)
return f"Result: {result}"
except:
return "Error in calculation"
# Create tool
calc_tool = Tool(
name="calculator",
description="Perform basic math calculations",
func=simple_calculator
)
# Initialize agent
llm = OpenAI(temperature=0)
agent = initialize_agent(
tools=[calc_tool],
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# Run agent
response = agent.run("What is 15 * 7 + 23?")
print(response)๐ฏ Next Steps โ
- Custom Tool Development: Create domain-specific tools for your use cases
- Agent Optimization: Fine-tune agent prompts and reasoning patterns
- Error Handling: Implement robust error handling and recovery
- Multi-Agent Systems: Build coordinated agent workflows
- Production Deployment: Scale agents for real-world applications
๐ Additional Resources โ
- Custom Tool Creation: Advanced tool development patterns
- Agent Security: Safe execution and sandboxing techniques
- Performance Optimization: Scaling agent systems
- Agent Evaluation: Testing and monitoring agent performance
Master Agent Tooling to build autonomous AI systems that can reason, plan, and execute complex tasks by leveraging external tools and APIs.