Model Output - LangChain Output Processing β
Master response handling and output optimization for effective LLM communication
π€ What is Model Output? β
Model Output in LangChain refers to the systematic handling and processing of responses from language models. It's crucial for extracting structured data, formatting responses, validating quality, and transforming raw LLM outputs into actionable information for your applications.
Simple Analogy: Think of Model Output as the "quality control and packaging department" for your LLM responses - it takes raw language model outputs and transforms them into polished, structured, and validated results ready for your application.
π― Model Output Architecture β
π€ LANGCHAIN MODEL OUTPUT OVERVIEW π€
(Processing Perfect Responses)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β π€ RAW LLM OUTPUT β
β "Unprocessed Model Response" β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββββββββΌβββββββββββββββββββββ
β π€ OUTPUT COMPONENTS β
ββββ¬βββββββββ¬βββββββββ¬βββββββββ¬βββββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ
βπ PARSE βποΈ STRUCTββ
VALID βπ― FORMATβ
βRAW TEXT β URED β ATE β OUTPUT β
β β DATA β β β
βExtract βJSON/XML βQuality βUser β
βContent βObjects βChecks βReady β
ββββββ¬βββββ΄βββββ¬βββββ΄βββββ¬βββββ΄βββββ¬βββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β¨ PROCESSED OUTPUT β
β Structured, Validated, Formatted β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββπ Output Parsing β
Extracting structured data from LLM responses
Basic Text Parsing β
from langchain.schema.output_parser import StrOutputParser
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
# Simple string output parser
string_parser = StrOutputParser()
# Custom output parser for specific patterns
class CustomTextParser(StrOutputParser):
def parse(self, text: str) -> dict:
lines = text.strip().split('\n')
result = {}
for line in lines:
if ':' in line:
key, value = line.split(':', 1)
result[key.strip()] = value.strip()
return result
# Usage
parser = CustomTextParser()
parsed_output = parser.parse("Name: John Doe\nAge: 30\nOccupation: Engineer")
print(parsed_output) # {'Name': 'John Doe', 'Age': '30', 'Occupation': 'Engineer'}Structured Data Parsing β
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List
class PersonInfo(BaseModel):
name: str = Field(description="Person's full name")
age: int = Field(description="Person's age")
skills: List[str] = Field(description="List of skills")
experience_years: int = Field(description="Years of professional experience")
# Create parser
parser = PydanticOutputParser(pydantic_object=PersonInfo)
# Get format instructions for the LLM
format_instructions = parser.get_format_instructions()
# Use in prompt
prompt_template = PromptTemplate(
template="Extract person information from this text: {text}\n{format_instructions}",
input_variables=["text"],
partial_variables={"format_instructions": format_instructions}
)JSON Output Parser β
from langchain.output_parsers import OutputFixingParser
from langchain.output_parsers.json import SimpleJsonOutputParser
import json
# Basic JSON parser
json_parser = SimpleJsonOutputParser()
# Robust JSON parser with error fixing
def create_robust_json_parser(llm):
return OutputFixingParser.from_llm(
parser=json_parser,
llm=llm
)
# Custom JSON validation
def parse_json_response(response: str) -> dict:
"""Parse JSON response with error handling"""
try:
# Try direct parsing
return json.loads(response)
except json.JSONDecodeError:
# Try to extract JSON from text
json_start = response.find('{')
json_end = response.rfind('}') + 1
if json_start != -1 and json_end != -1:
json_text = response[json_start:json_end]
try:
return json.loads(json_text)
except json.JSONDecodeError:
pass
raise ValueError("Could not parse JSON from response")
# Advanced JSON extraction
class RobustJSONExtractor:
def __init__(self):
self.extraction_strategies = [
self._direct_parse,
self._bracket_extraction,
self._regex_extraction,
self._cleanup_and_parse
]
def extract_json(self, text: str) -> dict:
"""Try multiple strategies to extract JSON from text"""
for strategy in self.extraction_strategies:
try:
result = strategy(text)
if result:
return result
except:
continue
raise ValueError("Could not extract valid JSON from response")
def _direct_parse(self, text: str) -> dict:
return json.loads(text)
def _bracket_extraction(self, text: str) -> dict:
start = text.find('{')
end = text.rfind('}') + 1
if start != -1 and end != -1:
return json.loads(text[start:end])
return None
def _regex_extraction(self, text: str) -> dict:
import re
json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
matches = re.findall(json_pattern, text)
for match in matches:
try:
return json.loads(match)
except:
continue
return None
def _cleanup_and_parse(self, text: str) -> dict:
# Remove common formatting issues
cleaned = text.replace('```json', '').replace('```', '')
cleaned = cleaned.strip()
return json.loads(cleaned)List and Array Parsing β
class ListOutputParser:
def __init__(self, delimiter: str = '\n'):
self.delimiter = delimiter
def parse(self, text: str) -> List[str]:
"""Parse text into a list of items"""
items = text.split(self.delimiter)
# Clean and filter items
cleaned_items = []
for item in items:
item = item.strip()
# Remove common list markers
item = re.sub(r'^[-*β’]\s*', '', item)
item = re.sub(r'^\d+\.\s*', '', item)
if item:
cleaned_items.append(item)
return cleaned_items
# Numbered list parser
class NumberedListParser(ListOutputParser):
def parse(self, text: str) -> Dict[int, str]:
"""Parse numbered list into dictionary"""
items = super().parse(text)
result = {}
for i, item in enumerate(items, 1):
# Try to extract number from start of item
match = re.match(r'^(\d+)\.\s*(.+)', item)
if match:
num, content = match.groups()
result[int(num)] = content
else:
result[i] = item
return resultποΈ Response Formatting β
Converting raw outputs to usable formats
Multi-format Output Handler β
from enum import Enum
from typing import Union, Dict, List
import json
import html
class OutputFormat(Enum):
TEXT = "text"
JSON = "json"
MARKDOWN = "markdown"
HTML = "html"
XML = "xml"
class ResponseFormatter:
def __init__(self):
self.formatters = {
OutputFormat.TEXT: self._format_text,
OutputFormat.JSON: self._format_json,
OutputFormat.MARKDOWN: self._format_markdown,
OutputFormat.HTML: self._format_html,
OutputFormat.XML: self._format_xml
}
def format_response(self, content: str, format_type: OutputFormat,
metadata: Dict = None) -> str:
formatter = self.formatters.get(format_type)
if not formatter:
raise ValueError(f"Unsupported format: {format_type}")
return formatter(content, metadata or {})
def _format_text(self, content: str, metadata: Dict) -> str:
formatted = content.strip()
if metadata.get("add_timestamp"):
from datetime import datetime
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted = f"[{timestamp}] {formatted}"
return formatted
def _format_json(self, content: str, metadata: Dict) -> str:
try:
data = json.loads(content)
except json.JSONDecodeError:
data = {"content": content}
if metadata:
data["metadata"] = metadata
return json.dumps(data, indent=2, ensure_ascii=False)
def _format_markdown(self, content: str, metadata: Dict) -> str:
lines = content.split('\n')
formatted_lines = []
for line in lines:
line = line.strip()
if line.startswith('**') and line.endswith('**'):
# Convert bold to markdown header
formatted_lines.append(f"## {line[2:-2]}")
elif line and not line.startswith('#'):
formatted_lines.append(line)
else:
formatted_lines.append(line)
formatted = '\n\n'.join(filter(None, formatted_lines))
if metadata.get("title"):
formatted = f"# {metadata['title']}\n\n{formatted}"
return formatted
def _format_html(self, content: str, metadata: Dict) -> str:
escaped_content = html.escape(content)
# Convert newlines to <br> tags
escaped_content = escaped_content.replace('\n', '<br>\n')
# Wrap in div with optional CSS class
css_class = metadata.get("css_class", "llm-response")
return f'<div class="{css_class}">{escaped_content}</div>'
def _format_xml(self, content: str, metadata: Dict) -> str:
root_tag = metadata.get("root_tag", "response")
escaped_content = html.escape(content)
return f"""<?xml version="1.0" encoding="UTF-8"?>
<{root_tag}>
<content>{escaped_content}</content>
<timestamp>{datetime.now().isoformat()}</timestamp>
</{root_tag}>"""Advanced Response Structuring β
class StructuredResponseBuilder:
def __init__(self):
self.response_templates = {
"qa": self._build_qa_response,
"analysis": self._build_analysis_response,
"tutorial": self._build_tutorial_response,
"summary": self._build_summary_response
}
def build_structured_response(self, content: str, response_type: str,
metadata: Dict = None) -> Dict:
"""Build a structured response with consistent format"""
builder = self.response_templates.get(response_type)
if not builder:
return self._build_generic_response(content, metadata)
return builder(content, metadata or {})
def _build_qa_response(self, content: str, metadata: Dict) -> Dict:
return {
"type": "qa",
"question": metadata.get("question", ""),
"answer": content,
"confidence": metadata.get("confidence", 0.8),
"sources": metadata.get("sources", []),
"timestamp": datetime.now().isoformat()
}
def _build_analysis_response(self, content: str, metadata: Dict) -> Dict:
# Try to extract analysis components
sections = self._extract_analysis_sections(content)
return {
"type": "analysis",
"summary": sections.get("summary", ""),
"findings": sections.get("findings", []),
"recommendations": sections.get("recommendations", []),
"full_analysis": content,
"metadata": metadata
}
def _build_tutorial_response(self, content: str, metadata: Dict) -> Dict:
steps = self._extract_tutorial_steps(content)
return {
"type": "tutorial",
"title": metadata.get("title", "Tutorial"),
"difficulty": metadata.get("difficulty", "intermediate"),
"estimated_time": metadata.get("estimated_time", ""),
"steps": steps,
"full_content": content
}
def _build_summary_response(self, content: str, metadata: Dict) -> Dict:
return {
"type": "summary",
"summary": content,
"key_points": self._extract_key_points(content),
"word_count": len(content.split()),
"original_length": metadata.get("original_length", 0),
"compression_ratio": self._calculate_compression_ratio(content, metadata)
}
def _extract_analysis_sections(self, content: str) -> Dict:
"""Extract analysis sections from content"""
sections = {}
# Simple extraction based on common patterns
if "summary:" in content.lower():
summary_start = content.lower().find("summary:")
summary_content = content[summary_start:].split('\n')[0]
sections["summary"] = summary_content.replace("summary:", "").strip()
# Extract bullet points as findings
findings = re.findall(r'[β’\-*]\s*(.+)', content)
sections["findings"] = findings
return sections
def _extract_tutorial_steps(self, content: str) -> List[Dict]:
"""Extract tutorial steps from content"""
steps = []
# Look for numbered steps
step_pattern = r'(\d+)\.\s*(.+?)(?=\d+\.|$)'
matches = re.findall(step_pattern, content, re.DOTALL)
for step_num, step_content in matches:
steps.append({
"step_number": int(step_num),
"instruction": step_content.strip(),
"estimated_time": "5 minutes" # Default
})
return steps
def _extract_key_points(self, content: str) -> List[str]:
"""Extract key points from content"""
# Simple extraction of sentences with strong indicators
sentences = content.split('.')
key_points = []
indicators = ["important", "key", "crucial", "essential", "main"]
for sentence in sentences:
if any(indicator in sentence.lower() for indicator in indicators):
key_points.append(sentence.strip())
return key_points[:5] # Limit to top 5β Output Validation β
Ensuring response quality and accuracy
Quality Assessment Engine β
class OutputValidator:
def __init__(self):
self.min_length = 10
self.max_length = 5000
self.quality_thresholds = {
"completeness": 0.7,
"coherence": 0.6,
"relevance": 0.8,
"accuracy": 0.7
}
def validate_response(self, response: str, expected_type: str = "general",
context: Dict = None) -> Dict:
validation_result = {
"is_valid": True,
"quality_scores": {},
"issues": [],
"suggestions": [],
"overall_score": 0.0
}
# Basic validation
basic_validation = self._basic_validation(response)
validation_result.update(basic_validation)
if not validation_result["is_valid"]:
return validation_result
# Quality scoring
quality_scores = self._calculate_quality_scores(response, expected_type, context)
validation_result["quality_scores"] = quality_scores
# Overall score calculation
overall_score = sum(quality_scores.values()) / len(quality_scores)
validation_result["overall_score"] = overall_score
# Generate suggestions
validation_result["suggestions"] = self._generate_improvement_suggestions(
quality_scores, expected_type
)
# Check quality thresholds
for metric, score in quality_scores.items():
threshold = self.quality_thresholds.get(metric, 0.5)
if score < threshold:
validation_result["issues"].append(
f"Low {metric} score: {score:.2f} < {threshold}"
)
return validation_result
def _basic_validation(self, response: str) -> Dict:
"""Perform basic validation checks"""
result = {"is_valid": True, "basic_issues": []}
# Length validation
if len(response) < self.min_length:
result["basic_issues"].append("Response too short")
result["is_valid"] = False
elif len(response) > self.max_length:
result["basic_issues"].append("Response too long")
# Content validation
if not response.strip():
result["basic_issues"].append("Empty response")
result["is_valid"] = False
# Encoding validation
try:
response.encode('utf-8')
except UnicodeEncodeError:
result["basic_issues"].append("Invalid character encoding")
result["is_valid"] = False
return result
def _calculate_quality_scores(self, response: str, expected_type: str,
context: Dict = None) -> Dict[str, float]:
"""Calculate quality scores across multiple dimensions"""
scores = {}
scores["completeness"] = self._measure_completeness(response, expected_type)
scores["coherence"] = self._measure_coherence(response)
scores["relevance"] = self._measure_relevance(response, context)
scores["accuracy"] = self._measure_accuracy(response, context)
scores["structure"] = self._measure_structure(response)
return scores
def _measure_completeness(self, response: str, expected_type: str) -> float:
"""Measure how complete the response is"""
word_count = len(response.split())
# Different expectations for different response types
expected_lengths = {
"summary": (50, 200),
"explanation": (100, 500),
"tutorial": (200, 1000),
"analysis": (150, 800),
"general": (20, 300)
}
min_words, max_words = expected_lengths.get(expected_type, (20, 300))
if word_count < min_words:
return word_count / min_words
elif word_count > max_words:
return max(0.8, 1.0 - (word_count - max_words) / max_words)
else:
return 1.0
def _measure_coherence(self, response: str) -> float:
"""Measure logical flow and coherence"""
sentences = response.split('.')
# Check for transition words
transition_words = [
"however", "therefore", "furthermore", "additionally",
"consequently", "moreover", "in contrast", "similarly"
]
transitions_found = 0
for sentence in sentences:
if any(word in sentence.lower() for word in transition_words):
transitions_found += 1
# Basic coherence score based on structure
has_introduction = any(word in response[:100].lower()
for word in ["introduction", "overview", "first"])
has_conclusion = any(word in response[-100:].lower()
for word in ["conclusion", "summary", "finally"])
coherence_score = 0.3 # Base score
if transitions_found > 0:
coherence_score += 0.3 * min(transitions_found / len(sentences), 0.5)
if has_introduction:
coherence_score += 0.2
if has_conclusion:
coherence_score += 0.2
return min(coherence_score, 1.0)
def _measure_relevance(self, response: str, context: Dict = None) -> float:
"""Measure how relevant the response is to the context"""
if not context or "keywords" not in context:
return 0.8 # Default score when no context available
keywords = context["keywords"]
response_lower = response.lower()
keyword_matches = sum(1 for keyword in keywords
if keyword.lower() in response_lower)
relevance_score = keyword_matches / len(keywords) if keywords else 0.8
return min(relevance_score, 1.0)
def _measure_accuracy(self, response: str, context: Dict = None) -> float:
"""Measure factual accuracy (basic heuristics)"""
# This is a simplified accuracy measure
# In practice, you'd use fact-checking APIs or knowledge bases
accuracy_score = 0.8 # Default optimistic score
# Check for uncertainty expressions (good sign)
uncertainty_phrases = ["might", "could", "possibly", "appears to", "seems"]
if any(phrase in response.lower() for phrase in uncertainty_phrases):
accuracy_score += 0.1
# Check for overconfident statements (potential red flag)
overconfident_phrases = ["definitely", "certainly", "always", "never"]
overconfident_count = sum(1 for phrase in overconfident_phrases
if phrase in response.lower())
if overconfident_count > 2:
accuracy_score -= 0.1
return min(max(accuracy_score, 0.0), 1.0)
def _measure_structure(self, response: str) -> float:
"""Measure how well-structured the response is"""
structure_score = 0.0
# Check for proper punctuation
if any(char in response for char in '.!?'):
structure_score += 0.3
# Check for paragraph breaks
if '\n\n' in response or response.count('\n') > 1:
structure_score += 0.2
# Check for lists or bullet points
if any(marker in response for marker in ['- ', 'β’ ', '1. ', '2. ']):
structure_score += 0.2
# Check for proper capitalization
sentences = response.split('.')
properly_capitalized = sum(1 for s in sentences
if s.strip() and s.strip()[0].isupper())
if properly_capitalized > len(sentences) * 0.8:
structure_score += 0.3
return min(structure_score, 1.0)
def _generate_improvement_suggestions(self, quality_scores: Dict[str, float],
expected_type: str) -> List[str]:
"""Generate specific improvement suggestions"""
suggestions = []
if quality_scores["completeness"] < 0.7:
suggestions.append(f"Expand the response to provide more comprehensive coverage of the topic")
if quality_scores["coherence"] < 0.6:
suggestions.append("Improve logical flow with better transitions between ideas")
if quality_scores["structure"] < 0.7:
suggestions.append("Enhance structure with proper paragraphs, punctuation, and formatting")
if quality_scores["relevance"] < 0.8:
suggestions.append("Focus more closely on the specific topic and requirements")
return suggestionsContent Safety Validation β
class ContentSafetyValidator:
def __init__(self):
self.toxicity_keywords = [
# Add appropriate content filters based on your use case
]
self.sensitive_topics = [
"personal information", "financial data", "medical advice",
"legal advice", "harmful instructions"
]
def validate_content_safety(self, response: str) -> Dict:
"""Validate content for safety and appropriateness"""
safety_result = {
"is_safe": True,
"warnings": [],
"risk_level": "low",
"detected_issues": []
}
# Toxicity check
toxicity_check = self._check_toxicity(response)
if toxicity_check["has_toxicity"]:
safety_result["is_safe"] = False
safety_result["detected_issues"].extend(toxicity_check["issues"])
safety_result["risk_level"] = "high"
# Sensitive topic check
sensitive_check = self._check_sensitive_topics(response)
if sensitive_check["has_sensitive_content"]:
safety_result["warnings"].extend(sensitive_check["warnings"])
if safety_result["risk_level"] == "low":
safety_result["risk_level"] = "medium"
# Privacy check
privacy_check = self._check_privacy_leakage(response)
if privacy_check["has_privacy_concerns"]:
safety_result["warnings"].extend(privacy_check["concerns"])
return safety_result
def _check_toxicity(self, text: str) -> Dict:
"""Check for toxic or harmful content"""
text_lower = text.lower()
detected_keywords = []
for keyword in self.toxicity_keywords:
if keyword in text_lower:
detected_keywords.append(keyword)
return {
"has_toxicity": len(detected_keywords) > 0,
"issues": [f"Detected potentially harmful content: {kw}"
for kw in detected_keywords]
}
def _check_sensitive_topics(self, text: str) -> Dict:
"""Check for sensitive topics that need disclaimers"""
text_lower = text.lower()
detected_topics = []
for topic in self.sensitive_topics:
if any(word in text_lower for word in topic.split()):
detected_topics.append(topic)
warnings = []
if "medical" in text_lower:
warnings.append("Response contains medical information. Add disclaimer about consulting healthcare professionals.")
if "legal" in text_lower:
warnings.append("Response contains legal information. Add disclaimer about consulting legal professionals.")
return {
"has_sensitive_content": len(detected_topics) > 0,
"topics": detected_topics,
"warnings": warnings
}
def _check_privacy_leakage(self, text: str) -> Dict:
"""Check for potential privacy information leakage"""
import re
privacy_patterns = [
(r'\b\d{3}-\d{2}-\d{4}\b', "Social Security Number"),
(r'\b\d{4}\s?\d{4}\s?\d{4}\s?\d{4}\b', "Credit Card Number"),
(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "Email Address")
]
concerns = []
for pattern, concern_type in privacy_patterns:
if re.search(pattern, text):
concerns.append(f"Potential {concern_type} detected")
return {
"has_privacy_concerns": len(concerns) > 0,
"concerns": concerns
}π― Advanced Output Processing β
Stream Processing β
class StreamProcessor:
def __init__(self):
self.buffer = ""
self.complete_responses = []
self.processors = {
"accumulate": self._accumulate_chunks,
"real_time": self._process_real_time,
"buffer": self._buffer_until_complete
}
def process_stream(self, chunk: str, mode: str = "accumulate") -> Dict:
"""Process streaming response chunks"""
processor = self.processors.get(mode, self._accumulate_chunks)
return processor(chunk)
def _accumulate_chunks(self, chunk: str) -> Dict:
"""Accumulate chunks into complete response"""
self.buffer += chunk
return {
"type": "accumulate",
"current_chunk": chunk,
"accumulated": self.buffer,
"is_complete": self._is_response_complete(self.buffer)
}
def _process_real_time(self, chunk: str) -> Dict:
"""Process each chunk immediately"""
processed_chunk = self._clean_chunk(chunk)
return {
"type": "real_time",
"original_chunk": chunk,
"processed_chunk": processed_chunk,
"word_count": len(processed_chunk.split()),
"contains_punctuation": any(p in processed_chunk for p in '.!?')
}
def _buffer_until_complete(self, chunk: str) -> Dict:
"""Buffer chunks until response is complete"""
self.buffer += chunk
if self._is_response_complete(self.buffer):
complete_response = self.buffer
self.buffer = "" # Reset buffer
return {
"type": "complete",
"response": complete_response,
"is_complete": True,
"final_processing": self._final_process(complete_response)
}
return {
"type": "buffering",
"is_complete": False,
"buffer_size": len(self.buffer)
}
def _is_response_complete(self, text: str) -> bool:
"""Determine if response is complete"""
# Simple heuristics for completion
if not text:
return False
# Check for sentence endings
if text.rstrip().endswith(('.', '!', '?')):
return True
# Check for completion markers
completion_markers = ["[DONE]", "[END]", "###"]
return any(marker in text for marker in completion_markers)
def _clean_chunk(self, chunk: str) -> str:
"""Clean individual chunk"""
# Remove common streaming artifacts
chunk = chunk.replace("data: ", "")
chunk = chunk.replace("[DONE]", "")
return chunk.strip()
def _final_process(self, response: str) -> Dict:
"""Final processing of complete response"""
validator = OutputValidator()
formatter = ResponseFormatter()
validation = validator.validate_response(response)
formatted = formatter.format_response(response, OutputFormat.TEXT)
return {
"validation": validation,
"formatted": formatted,
"metrics": {
"word_count": len(response.split()),
"character_count": len(response),
"sentence_count": response.count('.') + response.count('!') + response.count('?')
}
}Response Analytics β
class ResponseAnalyzer:
def __init__(self):
self.metrics_history = []
def analyze_response(self, response: str, metadata: Dict = None) -> Dict:
"""Comprehensive response analysis"""
analysis = {
"basic_metrics": self._calculate_basic_metrics(response),
"linguistic_analysis": self._linguistic_analysis(response),
"sentiment_analysis": self._sentiment_analysis(response),
"readability": self._readability_analysis(response),
"topics": self._topic_extraction(response),
"quality_indicators": self._quality_indicators(response)
}
# Store in history for trend analysis
self.metrics_history.append({
"timestamp": datetime.now(),
"analysis": analysis,
"metadata": metadata or {}
})
return analysis
def _calculate_basic_metrics(self, text: str) -> Dict:
"""Calculate basic text metrics"""
words = text.split()
sentences = re.split(r'[.!?]+', text)
paragraphs = text.split('\n\n')
return {
"word_count": len(words),
"sentence_count": len([s for s in sentences if s.strip()]),
"paragraph_count": len([p for p in paragraphs if p.strip()]),
"character_count": len(text),
"avg_words_per_sentence": len(words) / max(len(sentences), 1),
"avg_sentence_length": len(text) / max(len(sentences), 1)
}
def _linguistic_analysis(self, text: str) -> Dict:
"""Analyze linguistic features"""
words = text.lower().split()
# Vocabulary diversity
unique_words = set(words)
vocabulary_diversity = len(unique_words) / len(words) if words else 0
# Common word categories
question_words = ["what", "how", "when", "where", "why", "who"]
action_words = ["create", "build", "develop", "implement", "design"]
question_word_count = sum(1 for word in words if word in question_words)
action_word_count = sum(1 for word in words if word in action_words)
return {
"vocabulary_diversity": vocabulary_diversity,
"question_word_density": question_word_count / len(words) if words else 0,
"action_word_density": action_word_count / len(words) if words else 0,
"unique_word_count": len(unique_words),
"repetition_rate": 1 - vocabulary_diversity
}
def _sentiment_analysis(self, text: str) -> Dict:
"""Basic sentiment analysis"""
positive_words = ["good", "great", "excellent", "helpful", "useful", "effective"]
negative_words = ["bad", "poor", "terrible", "useless", "ineffective", "problematic"]
words = text.lower().split()
positive_count = sum(1 for word in words if word in positive_words)
negative_count = sum(1 for word in words if word in negative_words)
total_sentiment_words = positive_count + negative_count
if total_sentiment_words == 0:
sentiment_score = 0.5 # Neutral
else:
sentiment_score = positive_count / total_sentiment_words
return {
"sentiment_score": sentiment_score,
"sentiment_label": self._get_sentiment_label(sentiment_score),
"positive_word_count": positive_count,
"negative_word_count": negative_count
}
def _get_sentiment_label(self, score: float) -> str:
if score > 0.6:
return "positive"
elif score < 0.4:
return "negative"
else:
return "neutral"
def _readability_analysis(self, text: str) -> Dict:
"""Analyze text readability"""
words = text.split()
sentences = re.split(r'[.!?]+', text)
# Simple readability metrics
avg_words_per_sentence = len(words) / max(len(sentences), 1)
avg_chars_per_word = sum(len(word) for word in words) / max(len(words), 1)
# Simplified readability score (0-1, higher = more readable)
readability_score = max(0, 1 - (avg_words_per_sentence - 15) / 20)
readability_score = max(0, readability_score - (avg_chars_per_word - 5) / 10)
return {
"readability_score": min(readability_score, 1.0),
"avg_words_per_sentence": avg_words_per_sentence,
"avg_chars_per_word": avg_chars_per_word,
"complexity_level": self._get_complexity_level(readability_score)
}
def _get_complexity_level(self, score: float) -> str:
if score > 0.8:
return "simple"
elif score > 0.6:
return "moderate"
elif score > 0.4:
return "complex"
else:
return "very_complex"
def _topic_extraction(self, text: str) -> Dict:
"""Extract main topics from text"""
words = text.lower().split()
# Remove common stop words
stop_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
content_words = [word for word in words if word not in stop_words and len(word) > 3]
# Count word frequency
word_freq = {}
for word in content_words:
word_freq[word] = word_freq.get(word, 0) + 1
# Get top topics
top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
return {
"top_topics": [word for word, freq in top_words],
"topic_frequencies": dict(top_words),
"topic_diversity": len(word_freq) / len(content_words) if content_words else 0
}
def _quality_indicators(self, text: str) -> Dict:
"""Calculate quality indicators"""
indicators = {}
# Structure indicators
indicators["has_introduction"] = any(word in text[:100].lower()
for word in ["introduction", "overview", "first"])
indicators["has_conclusion"] = any(word in text[-100:].lower()
for word in ["conclusion", "summary", "finally"])
indicators["has_examples"] = "example" in text.lower() or "for instance" in text.lower()
indicators["has_structure"] = any(marker in text for marker in ["1.", "2.", "β’", "-"])
# Content indicators
indicators["provides_explanation"] = any(word in text.lower()
for word in ["because", "due to", "since", "as"])
indicators["actionable"] = any(word in text.lower()
for word in ["should", "can", "will", "try", "implement"])
return indicatorsπ Getting Started with Model Output β
Basic Output Setup β
from langchain.schema.output_parser import StrOutputParser
def setup_basic_output_processing():
"""Set up basic output processing components"""
# Initialize components
string_parser = StrOutputParser()
formatter = ResponseFormatter()
validator = OutputValidator()
return string_parser, formatter, validator
def process_llm_output(raw_output: str, output_format: OutputFormat = OutputFormat.TEXT):
"""Process raw LLM output through the pipeline"""
# Initialize processors
_, formatter, validator = setup_basic_output_processing()
# Step 1: Validate output
validation_result = validator.validate_response(raw_output)
if not validation_result["is_valid"]:
return {
"success": False,
"error": "Validation failed",
"issues": validation_result["issues"],
"suggestions": validation_result["suggestions"]
}
# Step 2: Format output
formatted_output = formatter.format_response(raw_output, output_format)
# Step 3: Return processed result
return {
"success": True,
"raw_output": raw_output,
"formatted_output": formatted_output,
"validation": validation_result,
"metadata": {
"format": output_format.value,
"processing_timestamp": datetime.now().isoformat()
}
}
# Example usage
raw_response = "Machine learning is a subset of artificial intelligence..."
result = process_llm_output(raw_response, OutputFormat.MARKDOWN)
if result["success"]:
print("Processed Output:", result["formatted_output"])
print("Quality Score:", result["validation"]["overall_score"])
else:
print("Processing failed:", result["error"])Advanced Output Pipeline β
class ComprehensiveOutputProcessor:
def __init__(self):
self.validator = OutputValidator()
self.formatter = ResponseFormatter()
self.analyzer = ResponseAnalyzer()
self.safety_validator = ContentSafetyValidator()
self.stream_processor = StreamProcessor()
def process(self, output: str, config: Dict = None) -> Dict:
"""Comprehensive output processing pipeline"""
config = config or {}
result = {
"original_output": output,
"processed_output": None,
"validation": None,
"safety_check": None,
"analysis": None,
"formatted_outputs": {},
"processing_metadata": {
"timestamp": datetime.now().isoformat(),
"config": config
}
}
try:
# Step 1: Safety validation
safety_result = self.safety_validator.validate_content_safety(output)
result["safety_check"] = safety_result
if not safety_result["is_safe"] and config.get("strict_safety", True):
result["error"] = "Content safety validation failed"
return result
# Step 2: Quality validation
validation_result = self.validator.validate_response(
output,
config.get("expected_type", "general"),
config.get("context", {})
)
result["validation"] = validation_result
# Step 3: Analysis
if config.get("analyze", True):
analysis_result = self.analyzer.analyze_response(output)
result["analysis"] = analysis_result
# Step 4: Multi-format output
output_formats = config.get("formats", [OutputFormat.TEXT])
for format_type in output_formats:
try:
formatted = self.formatter.format_response(
output,
format_type,
config.get("format_metadata", {})
)
result["formatted_outputs"][format_type.value] = formatted
except Exception as e:
result["formatted_outputs"][format_type.value] = f"Formatting error: {str(e)}"
# Step 5: Set primary processed output
primary_format = config.get("primary_format", OutputFormat.TEXT)
result["processed_output"] = result["formatted_outputs"].get(
primary_format.value,
output
)
# Step 6: Generate recommendations
result["recommendations"] = self._generate_recommendations(result)
except Exception as e:
result["error"] = f"Processing error: {str(e)}"
return result
def _generate_recommendations(self, result: Dict) -> List[str]:
"""Generate recommendations based on processing results"""
recommendations = []
validation = result.get("validation", {})
safety = result.get("safety_check", {})
analysis = result.get("analysis", {})
# Quality recommendations
if validation.get("overall_score", 1.0) < 0.7:
recommendations.extend(validation.get("suggestions", []))
# Safety recommendations
if safety.get("warnings"):
recommendations.append("Consider adding appropriate disclaimers")
# Readability recommendations
readability = analysis.get("readability", {})
if readability.get("complexity_level") == "very_complex":
recommendations.append("Consider simplifying language for better readability")
return recommendations
def process_stream(self, chunks: List[str], config: Dict = None) -> Dict:
"""Process streaming output"""
config = config or {}
mode = config.get("stream_mode", "accumulate")
stream_results = []
for chunk in chunks:
chunk_result = self.stream_processor.process_stream(chunk, mode)
stream_results.append(chunk_result)
# Process final accumulated result if in accumulate mode
if mode == "accumulate" and stream_results:
final_text = stream_results[-1].get("accumulated", "")
if final_text:
return self.process(final_text, config)
return {
"stream_results": stream_results,
"mode": mode,
"total_chunks": len(chunks)
}
# Usage example
processor = ComprehensiveOutputProcessor()
# Single output processing
output_text = "Machine learning is a powerful technology that enables computers to learn from data..."
config = {
"expected_type": "explanation",
"formats": [OutputFormat.TEXT, OutputFormat.MARKDOWN, OutputFormat.JSON],
"primary_format": OutputFormat.MARKDOWN,
"analyze": True,
"context": {"keywords": ["machine learning", "technology", "data"]}
}
result = processor.process(output_text, config)
print("Processing successful:", "error" not in result)
print("Quality score:", result["validation"]["overall_score"])
print("Safety level:", result["safety_check"]["risk_level"])
print("Recommendations:", result["recommendations"])π― Next Steps β
- Implement Output Parsers: Create custom parsers for your specific data formats
- Build Validation Rules: Develop domain-specific quality validation criteria
- Deploy Safety Measures: Implement comprehensive content safety validation
- Monitor Output Quality: Set up analytics to track response quality over time
- Optimize Performance: Implement caching and batch processing for efficiency
π Additional Resources β
- Output Parser Documentation: Advanced parsing techniques
- Quality Metrics Guidelines: Best practices for response evaluation
- Safety Validation Tools: Content moderation and safety resources
- Community Examples: Real-world output processing implementations
Master Model Output to build intelligent, safe, and high-quality LangChain applications that deliver polished, validated responses to your users.