Skip to content

Model Output - LangChain Output Processing ​

Master response handling and output optimization for effective LLM communication

πŸ“€ What is Model Output? ​

Model Output in LangChain refers to the systematic handling and processing of responses from language models. It's crucial for extracting structured data, formatting responses, validating quality, and transforming raw LLM outputs into actionable information for your applications.

Simple Analogy: Think of Model Output as the "quality control and packaging department" for your LLM responses - it takes raw language model outputs and transforms them into polished, structured, and validated results ready for your application.

🎯 Model Output Architecture ​

text
                    πŸ“€ LANGCHAIN MODEL OUTPUT OVERVIEW πŸ“€
                     (Processing Perfect Responses)

    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                     πŸ€– RAW LLM OUTPUT                          β”‚
    β”‚                "Unprocessed Model Response"                     β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚         πŸ“€ OUTPUT COMPONENTS            β”‚
    β””β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚        β”‚        β”‚        β”‚
       β–Ό        β–Ό        β–Ό        β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚πŸ” PARSE β”‚πŸ—οΈ STRUCTβ”‚βœ… VALID β”‚πŸŽ― FORMATβ”‚
  β”‚RAW TEXT β”‚ URED    β”‚ ATE     β”‚ OUTPUT  β”‚
  β”‚         β”‚ DATA    β”‚         β”‚         β”‚
  β”‚Extract  β”‚JSON/XML β”‚Quality  β”‚User     β”‚
  β”‚Content  β”‚Objects  β”‚Checks   β”‚Ready    β”‚
  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
       β”‚         β”‚         β”‚         β”‚
       β–Ό         β–Ό         β–Ό         β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                    ✨ PROCESSED OUTPUT                         β”‚
    β”‚              Structured, Validated, Formatted                  β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ” Output Parsing ​

Extracting structured data from LLM responses

Basic Text Parsing ​

python
from langchain.schema.output_parser import StrOutputParser
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field

# Simple string output parser
string_parser = StrOutputParser()

# Custom output parser for specific patterns
class CustomTextParser(StrOutputParser):
    def parse(self, text: str) -> dict:
        lines = text.strip().split('\n')
        result = {}
        
        for line in lines:
            if ':' in line:
                key, value = line.split(':', 1)
                result[key.strip()] = value.strip()
        
        return result

# Usage
parser = CustomTextParser()
parsed_output = parser.parse("Name: John Doe\nAge: 30\nOccupation: Engineer")
print(parsed_output)  # {'Name': 'John Doe', 'Age': '30', 'Occupation': 'Engineer'}

Structured Data Parsing ​

python
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List

class PersonInfo(BaseModel):
    name: str = Field(description="Person's full name")
    age: int = Field(description="Person's age")
    skills: List[str] = Field(description="List of skills")
    experience_years: int = Field(description="Years of professional experience")

# Create parser
parser = PydanticOutputParser(pydantic_object=PersonInfo)

# Get format instructions for the LLM
format_instructions = parser.get_format_instructions()

# Use in prompt
prompt_template = PromptTemplate(
    template="Extract person information from this text: {text}\n{format_instructions}",
    input_variables=["text"],
    partial_variables={"format_instructions": format_instructions}
)

JSON Output Parser ​

python
from langchain.output_parsers import OutputFixingParser
from langchain.output_parsers.json import SimpleJsonOutputParser
import json

# Basic JSON parser
json_parser = SimpleJsonOutputParser()

# Robust JSON parser with error fixing
def create_robust_json_parser(llm):
    return OutputFixingParser.from_llm(
        parser=json_parser,
        llm=llm
    )

# Custom JSON validation
def parse_json_response(response: str) -> dict:
    """Parse JSON response with error handling"""
    try:
        # Try direct parsing
        return json.loads(response)
    except json.JSONDecodeError:
        # Try to extract JSON from text
        json_start = response.find('{')
        json_end = response.rfind('}') + 1
        
        if json_start != -1 and json_end != -1:
            json_text = response[json_start:json_end]
            try:
                return json.loads(json_text)
            except json.JSONDecodeError:
                pass
        
        raise ValueError("Could not parse JSON from response")

# Advanced JSON extraction
class RobustJSONExtractor:
    def __init__(self):
        self.extraction_strategies = [
            self._direct_parse,
            self._bracket_extraction,
            self._regex_extraction,
            self._cleanup_and_parse
        ]
    
    def extract_json(self, text: str) -> dict:
        """Try multiple strategies to extract JSON from text"""
        for strategy in self.extraction_strategies:
            try:
                result = strategy(text)
                if result:
                    return result
            except:
                continue
        
        raise ValueError("Could not extract valid JSON from response")
    
    def _direct_parse(self, text: str) -> dict:
        return json.loads(text)
    
    def _bracket_extraction(self, text: str) -> dict:
        start = text.find('{')
        end = text.rfind('}') + 1
        if start != -1 and end != -1:
            return json.loads(text[start:end])
        return None
    
    def _regex_extraction(self, text: str) -> dict:
        import re
        json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
        matches = re.findall(json_pattern, text)
        
        for match in matches:
            try:
                return json.loads(match)
            except:
                continue
        return None
    
    def _cleanup_and_parse(self, text: str) -> dict:
        # Remove common formatting issues
        cleaned = text.replace('```json', '').replace('```', '')
        cleaned = cleaned.strip()
        return json.loads(cleaned)

List and Array Parsing ​

python
class ListOutputParser:
    def __init__(self, delimiter: str = '\n'):
        self.delimiter = delimiter
    
    def parse(self, text: str) -> List[str]:
        """Parse text into a list of items"""
        items = text.split(self.delimiter)
        
        # Clean and filter items
        cleaned_items = []
        for item in items:
            item = item.strip()
            
            # Remove common list markers
            item = re.sub(r'^[-*β€’]\s*', '', item)
            item = re.sub(r'^\d+\.\s*', '', item)
            
            if item:
                cleaned_items.append(item)
        
        return cleaned_items

# Numbered list parser
class NumberedListParser(ListOutputParser):
    def parse(self, text: str) -> Dict[int, str]:
        """Parse numbered list into dictionary"""
        items = super().parse(text)
        result = {}
        
        for i, item in enumerate(items, 1):
            # Try to extract number from start of item
            match = re.match(r'^(\d+)\.\s*(.+)', item)
            if match:
                num, content = match.groups()
                result[int(num)] = content
            else:
                result[i] = item
        
        return result

πŸ—οΈ Response Formatting ​

Converting raw outputs to usable formats

Multi-format Output Handler ​

python
from enum import Enum
from typing import Union, Dict, List
import json
import html

class OutputFormat(Enum):
    TEXT = "text"
    JSON = "json"
    MARKDOWN = "markdown"
    HTML = "html"
    XML = "xml"

class ResponseFormatter:
    def __init__(self):
        self.formatters = {
            OutputFormat.TEXT: self._format_text,
            OutputFormat.JSON: self._format_json,
            OutputFormat.MARKDOWN: self._format_markdown,
            OutputFormat.HTML: self._format_html,
            OutputFormat.XML: self._format_xml
        }
    
    def format_response(self, content: str, format_type: OutputFormat, 
                       metadata: Dict = None) -> str:
        formatter = self.formatters.get(format_type)
        if not formatter:
            raise ValueError(f"Unsupported format: {format_type}")
        
        return formatter(content, metadata or {})
    
    def _format_text(self, content: str, metadata: Dict) -> str:
        formatted = content.strip()
        
        if metadata.get("add_timestamp"):
            from datetime import datetime
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            formatted = f"[{timestamp}] {formatted}"
        
        return formatted
    
    def _format_json(self, content: str, metadata: Dict) -> str:
        try:
            data = json.loads(content)
        except json.JSONDecodeError:
            data = {"content": content}
        
        if metadata:
            data["metadata"] = metadata
        
        return json.dumps(data, indent=2, ensure_ascii=False)
    
    def _format_markdown(self, content: str, metadata: Dict) -> str:
        lines = content.split('\n')
        formatted_lines = []
        
        for line in lines:
            line = line.strip()
            if line.startswith('**') and line.endswith('**'):
                # Convert bold to markdown header
                formatted_lines.append(f"## {line[2:-2]}")
            elif line and not line.startswith('#'):
                formatted_lines.append(line)
            else:
                formatted_lines.append(line)
        
        formatted = '\n\n'.join(filter(None, formatted_lines))
        
        if metadata.get("title"):
            formatted = f"# {metadata['title']}\n\n{formatted}"
        
        return formatted
    
    def _format_html(self, content: str, metadata: Dict) -> str:
        escaped_content = html.escape(content)
        
        # Convert newlines to <br> tags
        escaped_content = escaped_content.replace('\n', '<br>\n')
        
        # Wrap in div with optional CSS class
        css_class = metadata.get("css_class", "llm-response")
        return f'<div class="{css_class}">{escaped_content}</div>'
    
    def _format_xml(self, content: str, metadata: Dict) -> str:
        root_tag = metadata.get("root_tag", "response")
        escaped_content = html.escape(content)
        
        return f"""<?xml version="1.0" encoding="UTF-8"?>
<{root_tag}>
    <content>{escaped_content}</content>
    <timestamp>{datetime.now().isoformat()}</timestamp>
</{root_tag}>"""

Advanced Response Structuring ​

python
class StructuredResponseBuilder:
    def __init__(self):
        self.response_templates = {
            "qa": self._build_qa_response,
            "analysis": self._build_analysis_response,
            "tutorial": self._build_tutorial_response,
            "summary": self._build_summary_response
        }
    
    def build_structured_response(self, content: str, response_type: str, 
                                metadata: Dict = None) -> Dict:
        """Build a structured response with consistent format"""
        
        builder = self.response_templates.get(response_type)
        if not builder:
            return self._build_generic_response(content, metadata)
        
        return builder(content, metadata or {})
    
    def _build_qa_response(self, content: str, metadata: Dict) -> Dict:
        return {
            "type": "qa",
            "question": metadata.get("question", ""),
            "answer": content,
            "confidence": metadata.get("confidence", 0.8),
            "sources": metadata.get("sources", []),
            "timestamp": datetime.now().isoformat()
        }
    
    def _build_analysis_response(self, content: str, metadata: Dict) -> Dict:
        # Try to extract analysis components
        sections = self._extract_analysis_sections(content)
        
        return {
            "type": "analysis",
            "summary": sections.get("summary", ""),
            "findings": sections.get("findings", []),
            "recommendations": sections.get("recommendations", []),
            "full_analysis": content,
            "metadata": metadata
        }
    
    def _build_tutorial_response(self, content: str, metadata: Dict) -> Dict:
        steps = self._extract_tutorial_steps(content)
        
        return {
            "type": "tutorial",
            "title": metadata.get("title", "Tutorial"),
            "difficulty": metadata.get("difficulty", "intermediate"),
            "estimated_time": metadata.get("estimated_time", ""),
            "steps": steps,
            "full_content": content
        }
    
    def _build_summary_response(self, content: str, metadata: Dict) -> Dict:
        return {
            "type": "summary",
            "summary": content,
            "key_points": self._extract_key_points(content),
            "word_count": len(content.split()),
            "original_length": metadata.get("original_length", 0),
            "compression_ratio": self._calculate_compression_ratio(content, metadata)
        }
    
    def _extract_analysis_sections(self, content: str) -> Dict:
        """Extract analysis sections from content"""
        sections = {}
        
        # Simple extraction based on common patterns
        if "summary:" in content.lower():
            summary_start = content.lower().find("summary:")
            summary_content = content[summary_start:].split('\n')[0]
            sections["summary"] = summary_content.replace("summary:", "").strip()
        
        # Extract bullet points as findings
        findings = re.findall(r'[β€’\-*]\s*(.+)', content)
        sections["findings"] = findings
        
        return sections
    
    def _extract_tutorial_steps(self, content: str) -> List[Dict]:
        """Extract tutorial steps from content"""
        steps = []
        
        # Look for numbered steps
        step_pattern = r'(\d+)\.\s*(.+?)(?=\d+\.|$)'
        matches = re.findall(step_pattern, content, re.DOTALL)
        
        for step_num, step_content in matches:
            steps.append({
                "step_number": int(step_num),
                "instruction": step_content.strip(),
                "estimated_time": "5 minutes"  # Default
            })
        
        return steps
    
    def _extract_key_points(self, content: str) -> List[str]:
        """Extract key points from content"""
        # Simple extraction of sentences with strong indicators
        sentences = content.split('.')
        key_points = []
        
        indicators = ["important", "key", "crucial", "essential", "main"]
        
        for sentence in sentences:
            if any(indicator in sentence.lower() for indicator in indicators):
                key_points.append(sentence.strip())
        
        return key_points[:5]  # Limit to top 5

βœ… Output Validation ​

Ensuring response quality and accuracy

Quality Assessment Engine ​

python
class OutputValidator:
    def __init__(self):
        self.min_length = 10
        self.max_length = 5000
        self.quality_thresholds = {
            "completeness": 0.7,
            "coherence": 0.6,
            "relevance": 0.8,
            "accuracy": 0.7
        }
    
    def validate_response(self, response: str, expected_type: str = "general", 
                         context: Dict = None) -> Dict:
        validation_result = {
            "is_valid": True,
            "quality_scores": {},
            "issues": [],
            "suggestions": [],
            "overall_score": 0.0
        }
        
        # Basic validation
        basic_validation = self._basic_validation(response)
        validation_result.update(basic_validation)
        
        if not validation_result["is_valid"]:
            return validation_result
        
        # Quality scoring
        quality_scores = self._calculate_quality_scores(response, expected_type, context)
        validation_result["quality_scores"] = quality_scores
        
        # Overall score calculation
        overall_score = sum(quality_scores.values()) / len(quality_scores)
        validation_result["overall_score"] = overall_score
        
        # Generate suggestions
        validation_result["suggestions"] = self._generate_improvement_suggestions(
            quality_scores, expected_type
        )
        
        # Check quality thresholds
        for metric, score in quality_scores.items():
            threshold = self.quality_thresholds.get(metric, 0.5)
            if score < threshold:
                validation_result["issues"].append(
                    f"Low {metric} score: {score:.2f} < {threshold}"
                )
        
        return validation_result
    
    def _basic_validation(self, response: str) -> Dict:
        """Perform basic validation checks"""
        result = {"is_valid": True, "basic_issues": []}
        
        # Length validation
        if len(response) < self.min_length:
            result["basic_issues"].append("Response too short")
            result["is_valid"] = False
        elif len(response) > self.max_length:
            result["basic_issues"].append("Response too long")
        
        # Content validation
        if not response.strip():
            result["basic_issues"].append("Empty response")
            result["is_valid"] = False
        
        # Encoding validation
        try:
            response.encode('utf-8')
        except UnicodeEncodeError:
            result["basic_issues"].append("Invalid character encoding")
            result["is_valid"] = False
        
        return result
    
    def _calculate_quality_scores(self, response: str, expected_type: str, 
                                context: Dict = None) -> Dict[str, float]:
        """Calculate quality scores across multiple dimensions"""
        scores = {}
        
        scores["completeness"] = self._measure_completeness(response, expected_type)
        scores["coherence"] = self._measure_coherence(response)
        scores["relevance"] = self._measure_relevance(response, context)
        scores["accuracy"] = self._measure_accuracy(response, context)
        scores["structure"] = self._measure_structure(response)
        
        return scores
    
    def _measure_completeness(self, response: str, expected_type: str) -> float:
        """Measure how complete the response is"""
        word_count = len(response.split())
        
        # Different expectations for different response types
        expected_lengths = {
            "summary": (50, 200),
            "explanation": (100, 500),
            "tutorial": (200, 1000),
            "analysis": (150, 800),
            "general": (20, 300)
        }
        
        min_words, max_words = expected_lengths.get(expected_type, (20, 300))
        
        if word_count < min_words:
            return word_count / min_words
        elif word_count > max_words:
            return max(0.8, 1.0 - (word_count - max_words) / max_words)
        else:
            return 1.0
    
    def _measure_coherence(self, response: str) -> float:
        """Measure logical flow and coherence"""
        sentences = response.split('.')
        
        # Check for transition words
        transition_words = [
            "however", "therefore", "furthermore", "additionally", 
            "consequently", "moreover", "in contrast", "similarly"
        ]
        
        transitions_found = 0
        for sentence in sentences:
            if any(word in sentence.lower() for word in transition_words):
                transitions_found += 1
        
        # Basic coherence score based on structure
        has_introduction = any(word in response[:100].lower() 
                             for word in ["introduction", "overview", "first"])
        has_conclusion = any(word in response[-100:].lower() 
                           for word in ["conclusion", "summary", "finally"])
        
        coherence_score = 0.3  # Base score
        
        if transitions_found > 0:
            coherence_score += 0.3 * min(transitions_found / len(sentences), 0.5)
        
        if has_introduction:
            coherence_score += 0.2
        
        if has_conclusion:
            coherence_score += 0.2
        
        return min(coherence_score, 1.0)
    
    def _measure_relevance(self, response: str, context: Dict = None) -> float:
        """Measure how relevant the response is to the context"""
        if not context or "keywords" not in context:
            return 0.8  # Default score when no context available
        
        keywords = context["keywords"]
        response_lower = response.lower()
        
        keyword_matches = sum(1 for keyword in keywords 
                            if keyword.lower() in response_lower)
        
        relevance_score = keyword_matches / len(keywords) if keywords else 0.8
        return min(relevance_score, 1.0)
    
    def _measure_accuracy(self, response: str, context: Dict = None) -> float:
        """Measure factual accuracy (basic heuristics)"""
        # This is a simplified accuracy measure
        # In practice, you'd use fact-checking APIs or knowledge bases
        
        accuracy_score = 0.8  # Default optimistic score
        
        # Check for uncertainty expressions (good sign)
        uncertainty_phrases = ["might", "could", "possibly", "appears to", "seems"]
        if any(phrase in response.lower() for phrase in uncertainty_phrases):
            accuracy_score += 0.1
        
        # Check for overconfident statements (potential red flag)
        overconfident_phrases = ["definitely", "certainly", "always", "never"]
        overconfident_count = sum(1 for phrase in overconfident_phrases 
                                if phrase in response.lower())
        
        if overconfident_count > 2:
            accuracy_score -= 0.1
        
        return min(max(accuracy_score, 0.0), 1.0)
    
    def _measure_structure(self, response: str) -> float:
        """Measure how well-structured the response is"""
        structure_score = 0.0
        
        # Check for proper punctuation
        if any(char in response for char in '.!?'):
            structure_score += 0.3
        
        # Check for paragraph breaks
        if '\n\n' in response or response.count('\n') > 1:
            structure_score += 0.2
        
        # Check for lists or bullet points
        if any(marker in response for marker in ['- ', 'β€’ ', '1. ', '2. ']):
            structure_score += 0.2
        
        # Check for proper capitalization
        sentences = response.split('.')
        properly_capitalized = sum(1 for s in sentences 
                                 if s.strip() and s.strip()[0].isupper())
        
        if properly_capitalized > len(sentences) * 0.8:
            structure_score += 0.3
        
        return min(structure_score, 1.0)
    
    def _generate_improvement_suggestions(self, quality_scores: Dict[str, float], 
                                        expected_type: str) -> List[str]:
        """Generate specific improvement suggestions"""
        suggestions = []
        
        if quality_scores["completeness"] < 0.7:
            suggestions.append(f"Expand the response to provide more comprehensive coverage of the topic")
        
        if quality_scores["coherence"] < 0.6:
            suggestions.append("Improve logical flow with better transitions between ideas")
        
        if quality_scores["structure"] < 0.7:
            suggestions.append("Enhance structure with proper paragraphs, punctuation, and formatting")
        
        if quality_scores["relevance"] < 0.8:
            suggestions.append("Focus more closely on the specific topic and requirements")
        
        return suggestions

Content Safety Validation ​

python
class ContentSafetyValidator:
    def __init__(self):
        self.toxicity_keywords = [
            # Add appropriate content filters based on your use case
        ]
        
        self.sensitive_topics = [
            "personal information", "financial data", "medical advice",
            "legal advice", "harmful instructions"
        ]
    
    def validate_content_safety(self, response: str) -> Dict:
        """Validate content for safety and appropriateness"""
        
        safety_result = {
            "is_safe": True,
            "warnings": [],
            "risk_level": "low",
            "detected_issues": []
        }
        
        # Toxicity check
        toxicity_check = self._check_toxicity(response)
        if toxicity_check["has_toxicity"]:
            safety_result["is_safe"] = False
            safety_result["detected_issues"].extend(toxicity_check["issues"])
            safety_result["risk_level"] = "high"
        
        # Sensitive topic check
        sensitive_check = self._check_sensitive_topics(response)
        if sensitive_check["has_sensitive_content"]:
            safety_result["warnings"].extend(sensitive_check["warnings"])
            if safety_result["risk_level"] == "low":
                safety_result["risk_level"] = "medium"
        
        # Privacy check
        privacy_check = self._check_privacy_leakage(response)
        if privacy_check["has_privacy_concerns"]:
            safety_result["warnings"].extend(privacy_check["concerns"])
        
        return safety_result
    
    def _check_toxicity(self, text: str) -> Dict:
        """Check for toxic or harmful content"""
        text_lower = text.lower()
        detected_keywords = []
        
        for keyword in self.toxicity_keywords:
            if keyword in text_lower:
                detected_keywords.append(keyword)
        
        return {
            "has_toxicity": len(detected_keywords) > 0,
            "issues": [f"Detected potentially harmful content: {kw}" 
                      for kw in detected_keywords]
        }
    
    def _check_sensitive_topics(self, text: str) -> Dict:
        """Check for sensitive topics that need disclaimers"""
        text_lower = text.lower()
        detected_topics = []
        
        for topic in self.sensitive_topics:
            if any(word in text_lower for word in topic.split()):
                detected_topics.append(topic)
        
        warnings = []
        if "medical" in text_lower:
            warnings.append("Response contains medical information. Add disclaimer about consulting healthcare professionals.")
        
        if "legal" in text_lower:
            warnings.append("Response contains legal information. Add disclaimer about consulting legal professionals.")
        
        return {
            "has_sensitive_content": len(detected_topics) > 0,
            "topics": detected_topics,
            "warnings": warnings
        }
    
    def _check_privacy_leakage(self, text: str) -> Dict:
        """Check for potential privacy information leakage"""
        import re
        
        privacy_patterns = [
            (r'\b\d{3}-\d{2}-\d{4}\b', "Social Security Number"),
            (r'\b\d{4}\s?\d{4}\s?\d{4}\s?\d{4}\b', "Credit Card Number"),
            (r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "Email Address")
        ]
        
        concerns = []
        for pattern, concern_type in privacy_patterns:
            if re.search(pattern, text):
                concerns.append(f"Potential {concern_type} detected")
        
        return {
            "has_privacy_concerns": len(concerns) > 0,
            "concerns": concerns
        }

🎯 Advanced Output Processing ​

Stream Processing ​

python
class StreamProcessor:
    def __init__(self):
        self.buffer = ""
        self.complete_responses = []
        self.processors = {
            "accumulate": self._accumulate_chunks,
            "real_time": self._process_real_time,
            "buffer": self._buffer_until_complete
        }
    
    def process_stream(self, chunk: str, mode: str = "accumulate") -> Dict:
        """Process streaming response chunks"""
        processor = self.processors.get(mode, self._accumulate_chunks)
        return processor(chunk)
    
    def _accumulate_chunks(self, chunk: str) -> Dict:
        """Accumulate chunks into complete response"""
        self.buffer += chunk
        
        return {
            "type": "accumulate",
            "current_chunk": chunk,
            "accumulated": self.buffer,
            "is_complete": self._is_response_complete(self.buffer)
        }
    
    def _process_real_time(self, chunk: str) -> Dict:
        """Process each chunk immediately"""
        processed_chunk = self._clean_chunk(chunk)
        
        return {
            "type": "real_time",
            "original_chunk": chunk,
            "processed_chunk": processed_chunk,
            "word_count": len(processed_chunk.split()),
            "contains_punctuation": any(p in processed_chunk for p in '.!?')
        }
    
    def _buffer_until_complete(self, chunk: str) -> Dict:
        """Buffer chunks until response is complete"""
        self.buffer += chunk
        
        if self._is_response_complete(self.buffer):
            complete_response = self.buffer
            self.buffer = ""  # Reset buffer
            
            return {
                "type": "complete",
                "response": complete_response,
                "is_complete": True,
                "final_processing": self._final_process(complete_response)
            }
        
        return {
            "type": "buffering",
            "is_complete": False,
            "buffer_size": len(self.buffer)
        }
    
    def _is_response_complete(self, text: str) -> bool:
        """Determine if response is complete"""
        # Simple heuristics for completion
        if not text:
            return False
        
        # Check for sentence endings
        if text.rstrip().endswith(('.', '!', '?')):
            return True
        
        # Check for completion markers
        completion_markers = ["[DONE]", "[END]", "###"]
        return any(marker in text for marker in completion_markers)
    
    def _clean_chunk(self, chunk: str) -> str:
        """Clean individual chunk"""
        # Remove common streaming artifacts
        chunk = chunk.replace("data: ", "")
        chunk = chunk.replace("[DONE]", "")
        return chunk.strip()
    
    def _final_process(self, response: str) -> Dict:
        """Final processing of complete response"""
        validator = OutputValidator()
        formatter = ResponseFormatter()
        
        validation = validator.validate_response(response)
        formatted = formatter.format_response(response, OutputFormat.TEXT)
        
        return {
            "validation": validation,
            "formatted": formatted,
            "metrics": {
                "word_count": len(response.split()),
                "character_count": len(response),
                "sentence_count": response.count('.') + response.count('!') + response.count('?')
            }
        }

Response Analytics ​

python
class ResponseAnalyzer:
    def __init__(self):
        self.metrics_history = []
    
    def analyze_response(self, response: str, metadata: Dict = None) -> Dict:
        """Comprehensive response analysis"""
        
        analysis = {
            "basic_metrics": self._calculate_basic_metrics(response),
            "linguistic_analysis": self._linguistic_analysis(response),
            "sentiment_analysis": self._sentiment_analysis(response),
            "readability": self._readability_analysis(response),
            "topics": self._topic_extraction(response),
            "quality_indicators": self._quality_indicators(response)
        }
        
        # Store in history for trend analysis
        self.metrics_history.append({
            "timestamp": datetime.now(),
            "analysis": analysis,
            "metadata": metadata or {}
        })
        
        return analysis
    
    def _calculate_basic_metrics(self, text: str) -> Dict:
        """Calculate basic text metrics"""
        words = text.split()
        sentences = re.split(r'[.!?]+', text)
        paragraphs = text.split('\n\n')
        
        return {
            "word_count": len(words),
            "sentence_count": len([s for s in sentences if s.strip()]),
            "paragraph_count": len([p for p in paragraphs if p.strip()]),
            "character_count": len(text),
            "avg_words_per_sentence": len(words) / max(len(sentences), 1),
            "avg_sentence_length": len(text) / max(len(sentences), 1)
        }
    
    def _linguistic_analysis(self, text: str) -> Dict:
        """Analyze linguistic features"""
        words = text.lower().split()
        
        # Vocabulary diversity
        unique_words = set(words)
        vocabulary_diversity = len(unique_words) / len(words) if words else 0
        
        # Common word categories
        question_words = ["what", "how", "when", "where", "why", "who"]
        action_words = ["create", "build", "develop", "implement", "design"]
        
        question_word_count = sum(1 for word in words if word in question_words)
        action_word_count = sum(1 for word in words if word in action_words)
        
        return {
            "vocabulary_diversity": vocabulary_diversity,
            "question_word_density": question_word_count / len(words) if words else 0,
            "action_word_density": action_word_count / len(words) if words else 0,
            "unique_word_count": len(unique_words),
            "repetition_rate": 1 - vocabulary_diversity
        }
    
    def _sentiment_analysis(self, text: str) -> Dict:
        """Basic sentiment analysis"""
        positive_words = ["good", "great", "excellent", "helpful", "useful", "effective"]
        negative_words = ["bad", "poor", "terrible", "useless", "ineffective", "problematic"]
        
        words = text.lower().split()
        
        positive_count = sum(1 for word in words if word in positive_words)
        negative_count = sum(1 for word in words if word in negative_words)
        
        total_sentiment_words = positive_count + negative_count
        
        if total_sentiment_words == 0:
            sentiment_score = 0.5  # Neutral
        else:
            sentiment_score = positive_count / total_sentiment_words
        
        return {
            "sentiment_score": sentiment_score,
            "sentiment_label": self._get_sentiment_label(sentiment_score),
            "positive_word_count": positive_count,
            "negative_word_count": negative_count
        }
    
    def _get_sentiment_label(self, score: float) -> str:
        if score > 0.6:
            return "positive"
        elif score < 0.4:
            return "negative"
        else:
            return "neutral"
    
    def _readability_analysis(self, text: str) -> Dict:
        """Analyze text readability"""
        words = text.split()
        sentences = re.split(r'[.!?]+', text)
        
        # Simple readability metrics
        avg_words_per_sentence = len(words) / max(len(sentences), 1)
        avg_chars_per_word = sum(len(word) for word in words) / max(len(words), 1)
        
        # Simplified readability score (0-1, higher = more readable)
        readability_score = max(0, 1 - (avg_words_per_sentence - 15) / 20)
        readability_score = max(0, readability_score - (avg_chars_per_word - 5) / 10)
        
        return {
            "readability_score": min(readability_score, 1.0),
            "avg_words_per_sentence": avg_words_per_sentence,
            "avg_chars_per_word": avg_chars_per_word,
            "complexity_level": self._get_complexity_level(readability_score)
        }
    
    def _get_complexity_level(self, score: float) -> str:
        if score > 0.8:
            return "simple"
        elif score > 0.6:
            return "moderate"
        elif score > 0.4:
            return "complex"
        else:
            return "very_complex"
    
    def _topic_extraction(self, text: str) -> Dict:
        """Extract main topics from text"""
        words = text.lower().split()
        
        # Remove common stop words
        stop_words = {"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by"}
        content_words = [word for word in words if word not in stop_words and len(word) > 3]
        
        # Count word frequency
        word_freq = {}
        for word in content_words:
            word_freq[word] = word_freq.get(word, 0) + 1
        
        # Get top topics
        top_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:10]
        
        return {
            "top_topics": [word for word, freq in top_words],
            "topic_frequencies": dict(top_words),
            "topic_diversity": len(word_freq) / len(content_words) if content_words else 0
        }
    
    def _quality_indicators(self, text: str) -> Dict:
        """Calculate quality indicators"""
        indicators = {}
        
        # Structure indicators
        indicators["has_introduction"] = any(word in text[:100].lower() 
                                           for word in ["introduction", "overview", "first"])
        indicators["has_conclusion"] = any(word in text[-100:].lower() 
                                         for word in ["conclusion", "summary", "finally"])
        indicators["has_examples"] = "example" in text.lower() or "for instance" in text.lower()
        indicators["has_structure"] = any(marker in text for marker in ["1.", "2.", "β€’", "-"])
        
        # Content indicators
        indicators["provides_explanation"] = any(word in text.lower() 
                                               for word in ["because", "due to", "since", "as"])
        indicators["actionable"] = any(word in text.lower() 
                                     for word in ["should", "can", "will", "try", "implement"])
        
        return indicators

πŸš€ Getting Started with Model Output ​

Basic Output Setup ​

python
from langchain.schema.output_parser import StrOutputParser

def setup_basic_output_processing():
    """Set up basic output processing components"""
    
    # Initialize components
    string_parser = StrOutputParser()
    formatter = ResponseFormatter()
    validator = OutputValidator()
    
    return string_parser, formatter, validator

def process_llm_output(raw_output: str, output_format: OutputFormat = OutputFormat.TEXT):
    """Process raw LLM output through the pipeline"""
    
    # Initialize processors
    _, formatter, validator = setup_basic_output_processing()
    
    # Step 1: Validate output
    validation_result = validator.validate_response(raw_output)
    
    if not validation_result["is_valid"]:
        return {
            "success": False,
            "error": "Validation failed",
            "issues": validation_result["issues"],
            "suggestions": validation_result["suggestions"]
        }
    
    # Step 2: Format output
    formatted_output = formatter.format_response(raw_output, output_format)
    
    # Step 3: Return processed result
    return {
        "success": True,
        "raw_output": raw_output,
        "formatted_output": formatted_output,
        "validation": validation_result,
        "metadata": {
            "format": output_format.value,
            "processing_timestamp": datetime.now().isoformat()
        }
    }

# Example usage
raw_response = "Machine learning is a subset of artificial intelligence..."
result = process_llm_output(raw_response, OutputFormat.MARKDOWN)

if result["success"]:
    print("Processed Output:", result["formatted_output"])
    print("Quality Score:", result["validation"]["overall_score"])
else:
    print("Processing failed:", result["error"])

Advanced Output Pipeline ​

python
class ComprehensiveOutputProcessor:
    def __init__(self):
        self.validator = OutputValidator()
        self.formatter = ResponseFormatter()
        self.analyzer = ResponseAnalyzer()
        self.safety_validator = ContentSafetyValidator()
        self.stream_processor = StreamProcessor()
    
    def process(self, output: str, config: Dict = None) -> Dict:
        """Comprehensive output processing pipeline"""
        
        config = config or {}
        
        result = {
            "original_output": output,
            "processed_output": None,
            "validation": None,
            "safety_check": None,
            "analysis": None,
            "formatted_outputs": {},
            "processing_metadata": {
                "timestamp": datetime.now().isoformat(),
                "config": config
            }
        }
        
        try:
            # Step 1: Safety validation
            safety_result = self.safety_validator.validate_content_safety(output)
            result["safety_check"] = safety_result
            
            if not safety_result["is_safe"] and config.get("strict_safety", True):
                result["error"] = "Content safety validation failed"
                return result
            
            # Step 2: Quality validation
            validation_result = self.validator.validate_response(
                output, 
                config.get("expected_type", "general"),
                config.get("context", {})
            )
            result["validation"] = validation_result
            
            # Step 3: Analysis
            if config.get("analyze", True):
                analysis_result = self.analyzer.analyze_response(output)
                result["analysis"] = analysis_result
            
            # Step 4: Multi-format output
            output_formats = config.get("formats", [OutputFormat.TEXT])
            
            for format_type in output_formats:
                try:
                    formatted = self.formatter.format_response(
                        output, 
                        format_type,
                        config.get("format_metadata", {})
                    )
                    result["formatted_outputs"][format_type.value] = formatted
                except Exception as e:
                    result["formatted_outputs"][format_type.value] = f"Formatting error: {str(e)}"
            
            # Step 5: Set primary processed output
            primary_format = config.get("primary_format", OutputFormat.TEXT)
            result["processed_output"] = result["formatted_outputs"].get(
                primary_format.value, 
                output
            )
            
            # Step 6: Generate recommendations
            result["recommendations"] = self._generate_recommendations(result)
            
        except Exception as e:
            result["error"] = f"Processing error: {str(e)}"
        
        return result
    
    def _generate_recommendations(self, result: Dict) -> List[str]:
        """Generate recommendations based on processing results"""
        recommendations = []
        
        validation = result.get("validation", {})
        safety = result.get("safety_check", {})
        analysis = result.get("analysis", {})
        
        # Quality recommendations
        if validation.get("overall_score", 1.0) < 0.7:
            recommendations.extend(validation.get("suggestions", []))
        
        # Safety recommendations
        if safety.get("warnings"):
            recommendations.append("Consider adding appropriate disclaimers")
        
        # Readability recommendations
        readability = analysis.get("readability", {})
        if readability.get("complexity_level") == "very_complex":
            recommendations.append("Consider simplifying language for better readability")
        
        return recommendations
    
    def process_stream(self, chunks: List[str], config: Dict = None) -> Dict:
        """Process streaming output"""
        
        config = config or {}
        mode = config.get("stream_mode", "accumulate")
        
        stream_results = []
        for chunk in chunks:
            chunk_result = self.stream_processor.process_stream(chunk, mode)
            stream_results.append(chunk_result)
        
        # Process final accumulated result if in accumulate mode
        if mode == "accumulate" and stream_results:
            final_text = stream_results[-1].get("accumulated", "")
            if final_text:
                return self.process(final_text, config)
        
        return {
            "stream_results": stream_results,
            "mode": mode,
            "total_chunks": len(chunks)
        }

# Usage example
processor = ComprehensiveOutputProcessor()

# Single output processing
output_text = "Machine learning is a powerful technology that enables computers to learn from data..."

config = {
    "expected_type": "explanation",
    "formats": [OutputFormat.TEXT, OutputFormat.MARKDOWN, OutputFormat.JSON],
    "primary_format": OutputFormat.MARKDOWN,
    "analyze": True,
    "context": {"keywords": ["machine learning", "technology", "data"]}
}

result = processor.process(output_text, config)

print("Processing successful:", "error" not in result)
print("Quality score:", result["validation"]["overall_score"])
print("Safety level:", result["safety_check"]["risk_level"])
print("Recommendations:", result["recommendations"])

🎯 Next Steps ​

  1. Implement Output Parsers: Create custom parsers for your specific data formats
  2. Build Validation Rules: Develop domain-specific quality validation criteria
  3. Deploy Safety Measures: Implement comprehensive content safety validation
  4. Monitor Output Quality: Set up analytics to track response quality over time
  5. Optimize Performance: Implement caching and batch processing for efficiency

πŸ“– Additional Resources ​

  • Output Parser Documentation: Advanced parsing techniques
  • Quality Metrics Guidelines: Best practices for response evaluation
  • Safety Validation Tools: Content moderation and safety resources
  • Community Examples: Real-world output processing implementations

Master Model Output to build intelligent, safe, and high-quality LangChain applications that deliver polished, validated responses to your users.

Released under the MIT License.