Documentation

Debugging

Advanced debugging techniques and logging for ModelRed SDK

🐛

Advanced Debugging

Deep dive into ModelRed SDK internals with comprehensive logging, debugging techniques, and diagnostic tools.

Logging Configuration

Enable Debug Logging

Configure comprehensive logging to track ModelRed SDK operations and identify issues.

PYTHON
import logging
import sys
from datetime import datetime
from modelred import ModelRed

# Configure comprehensive logging
def setup_debug_logging():
    """Setup detailed logging for ModelRed debugging"""

    # Create formatter with detailed information
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
    )

    # Console handler for immediate feedback
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.DEBUG)
    console_handler.setFormatter(formatter)

    # File handler for persistent logs
    file_handler = logging.FileHandler(f'modelred_debug_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)

    # Configure ModelRed logger
    modelred_logger = logging.getLogger('modelred')
    modelred_logger.setLevel(logging.DEBUG)
    modelred_logger.addHandler(console_handler)
    modelred_logger.addHandler(file_handler)

    # Configure HTTP client logging
    http_logger = logging.getLogger('aiohttp')
    http_logger.setLevel(logging.DEBUG)
    http_logger.addHandler(console_handler)
    http_logger.addHandler(file_handler)

    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)

    print("🔧 Debug logging enabled")
    print(f"📄 Log file: modelred_debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

# Enable debug logging
setup_debug_logging()

# Test with debug logging
async def debug_example():
    """Example with debug logging enabled"""
    logger = logging.getLogger('modelred.debug_example')

    logger.info("Starting ModelRed debug session")

    try:
        async with ModelRed() as client:
            logger.debug("ModelRed client initialized")

            # Each operation will now be logged in detail
            usage = await client.get_usage_stats()
            logger.info(f"Usage stats retrieved: {usage.tier}")

            models = await client.list_models()
            logger.info(f"Models listed: {len(models)} found")

    except Exception as e:
        logger.error(f"Debug session failed: {e}", exc_info=True)

# Run debug example
import asyncio
asyncio.run(debug_example())

Request/Response Inspection

🔍 HTTP Traffic Analysis

Inspect HTTP requests and responses to understand API interactions.

PYTHON
import aiohttp
import json
import time
from modelred import ModelRed

class DebugModelRed(ModelRed):
    """Enhanced ModelRed client with request/response logging"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_count = 0
        self.response_times = []

    async def _make_request(self, method: str, endpoint: str, **kwargs):
        """Override to add debugging capabilities"""
        self.request_count += 1
        request_id = f"REQ-{self.request_count:04d}"

        url = f"{self.base_url}/api/modelred{endpoint}"

        # Log request details
        print(f"\n🔄 {request_id} - {method} {endpoint}")
        print(f"   URL: {url}")
        if 'json' in kwargs:
            print(f"   Payload: {json.dumps(kwargs['json'], indent=2)}")
        if 'params' in kwargs:
            print(f"   Params: {kwargs['params']}")

        # Time the request
        start_time = time.time()

        try:
            # Make the actual request
            response_data = await super()._make_request(method, endpoint, **kwargs)

            # Log response details
            duration = time.time() - start_time
            self.response_times.append(duration)

            print(f"✅ {request_id} - Success ({duration:.3f}s)")
            print(f"   Response: {json.dumps(response_data, indent=2)[:200]}...")

            return response_data

        except Exception as e:
            duration = time.time() - start_time
            print(f"❌ {request_id} - Error ({duration:.3f}s)")
            print(f"   Error: {str(e)}")
            raise

    def get_debug_stats(self):
        """Get debugging statistics"""
        avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0
        return {
            'total_requests': self.request_count,
            'avg_response_time': avg_response_time,
            'response_times': self.response_times
        }

# Usage example
async def debug_requests():
    """Debug API requests and responses"""
    async with DebugModelRed() as client:
        print("🔍 Starting request debugging session")

        # Make several API calls
        await client.validate_api_key()
        await client.get_usage_stats()
        await client.list_models()
        await client.get_test_suites()

        # Print debugging statistics
        stats = client.get_debug_stats()
        print(f"\n📊 Debug Statistics:")
        print(f"   Total Requests: {stats['total_requests']}")
        print(f"   Average Response Time: {stats['avg_response_time']:.3f}s")
        print(f"   Response Times: {[f'{t:.3f}s' for t in stats['response_times']]}")

await debug_requests()

Error Stack Traces

🚨 Detailed Error Analysis

Capture and analyze complete error information including stack traces.

PYTHON
import traceback
import sys
from contextlib import contextmanager
from modelred import ModelRed, ModelRedError

class ErrorAnalyzer:
    """Comprehensive error analysis and reporting"""

    def __init__(self):
        self.errors = []

    @contextmanager
    def capture_errors(self, operation_name: str):
        """Context manager to capture and analyze errors"""
        try:
            print(f"🔄 Starting: {operation_name}")
            yield
            print(f"✅ Completed: {operation_name}")

        except Exception as e:
            error_info = self._analyze_error(e, operation_name)
            self.errors.append(error_info)
            print(f"❌ Failed: {operation_name}")
            self._print_error_analysis(error_info)
            raise

    def _analyze_error(self, error: Exception, operation: str) -> dict:
        """Analyze error and extract detailed information"""
        return {
            'operation': operation,
            'error_type': type(error).__name__,
            'error_message': str(error),
            'traceback': traceback.format_exc(),
            'is_modelred_error': isinstance(error, ModelRedError),
            'has_status_code': hasattr(error, 'status_code'),
            'status_code': getattr(error, 'status_code', None),
            'file': traceback.extract_tb(error.__traceback__)[-1].filename,
            'line': traceback.extract_tb(error.__traceback__)[-1].lineno,
            'function': traceback.extract_tb(error.__traceback__)[-1].name
        }

    def _print_error_analysis(self, error_info: dict):
        """Print detailed error analysis"""
        print(f"\n🔍 Error Analysis:")
        print(f"   Operation: {error_info['operation']}")
        print(f"   Type: {error_info['error_type']}")
        print(f"   Message: {error_info['error_message']}")
        print(f"   Location: {error_info['file']}:{error_info['line']} in {error_info['function']}")

        if error_info['is_modelred_error']:
            print(f"   ModelRed Error: Yes")
            if error_info['has_status_code']:
                print(f"   HTTP Status: {error_info['status_code']}")

        print(f"\n📋 Full Stack Trace:")
        print(error_info['traceback'])

    def generate_error_report(self):
        """Generate comprehensive error report"""
        if not self.errors:
            print("✅ No errors encountered")
            return

        print(f"\n📊 Error Report Summary:")
        print(f"   Total Errors: {len(self.errors)}")

        # Group by error type
        error_types = {}
        for error in self.errors:
            error_type = error['error_type']
            error_types[error_type] = error_types.get(error_type, 0) + 1

        print(f"   Error Types:")
        for error_type, count in error_types.items():
            print(f"     {error_type}: {count}")

        # Show operations that failed
        print(f"   Failed Operations:")
        for error in self.errors:
            print(f"     • {error['operation']}: {error['error_type']}")

# Usage example
async def debug_with_error_analysis():
    """Example using error analyzer"""
    analyzer = ErrorAnalyzer()

    async with ModelRed() as client:
        # Test successful operation
        with analyzer.capture_errors("API Key Validation"):
            await client.validate_api_key()

        # Test operation that might fail
        with analyzer.capture_errors("Invalid Model Access"):
            try:
                await client.get_model("non-existent-model")
            except Exception:
                pass  # Expected to fail

        # Test another operation
        with analyzer.capture_errors("Usage Stats"):
            await client.get_usage_stats()

    # Generate final error report
    analyzer.generate_error_report()

await debug_with_error_analysis()

Assessment Debugging

🧪 Assessment Deep Dive

Debug assessment execution with detailed monitoring and state tracking.

PYTHON
import asyncio
import json
from datetime import datetime, timedelta
from modelred import ModelRed

class AssessmentDebugger:
    """Detailed assessment debugging and monitoring"""

    def __init__(self, client: ModelRed):
        self.client = client
        self.assessment_logs = {}

    async def debug_assessment(self, model_id: str, test_suites: list,
                              max_wait: int = 300, check_interval: int = 10):
        """Run assessment with comprehensive debugging"""

        print(f"🧪 Starting debug assessment for model: {model_id}")
        print(f"   Test Suites: {test_suites}")
        print(f"   Max Wait: {max_wait}s, Check Interval: {check_interval}s")

        # Pre-assessment validation
        await self._validate_prerequisites(model_id, test_suites)

        # Start assessment
        start_time = datetime.now()
        try:
            result = await self.client.run_assessment(
                model_id=model_id,
                test_suites=test_suites,
                wait_for_completion=False
            )

            assessment_id = result.assessment_id
            print(f"✅ Assessment started: {assessment_id}")

            # Initialize logging for this assessment
            self.assessment_logs[assessment_id] = {
                'model_id': model_id,
                'test_suites': test_suites,
                'start_time': start_time,
                'status_checks': [],
                'progress_history': [],
                'final_result': None
            }

            # Monitor assessment
            await self._monitor_assessment(assessment_id, max_wait, check_interval)

        except Exception as e:
            print(f"❌ Assessment startup failed: {e}")
            await self._diagnose_startup_failure(model_id, test_suites, e)

    async def _validate_prerequisites(self, model_id: str, test_suites: list):
        """Validate assessment prerequisites"""
        print("🔍 Validating prerequisites...")

        # Check model exists
        try:
            model = await self.client.get_model(model_id)
            print(f"   ✅ Model found: {model.get('displayName', model_id)}")
            print(f"      Provider: {model['provider']}")
            print(f"      Status: {model.get('status', 'Unknown')}")
        except Exception as e:
            print(f"   ❌ Model validation failed: {e}")
            raise

        # Check test suites
        available_suites = await self.client.get_test_suites()
        available_names = [s['name'] for s in available_suites]

        for suite in test_suites:
            if suite in available_names:
                print(f"   ✅ Test suite available: {suite}")
            else:
                print(f"   ❌ Test suite not available: {suite}")
                print(f"      Available: {available_names}")
                raise ValueError(f"Test suite '{suite}' not available")

        # Check usage limits
        usage = await self.client.get_usage_stats()
        if usage.assessments_this_month >= usage.assessments_limit:
            print(f"   ⚠️ Assessment limit reached: {usage.assessments_this_month}/{usage.assessments_limit}")
        else:
            print(f"   ✅ Usage OK: {usage.assessments_this_month}/{usage.assessments_limit}")

    async def _monitor_assessment(self, assessment_id: str, max_wait: int, check_interval: int):
        """Monitor assessment with detailed logging"""
        print(f"👁️ Monitoring assessment: {assessment_id}")

        start_time = datetime.now()
        last_progress = -1
        stalled_count = 0

        while (datetime.now() - start_time).seconds < max_wait:
            try:
                # Get current status
                status_info = await self.client.get_assessment_status(assessment_id)
                check_time = datetime.now()

                # Log status check
                self.assessment_logs[assessment_id]['status_checks'].append({
                    'timestamp': check_time.isoformat(),
                    'status': status_info['status'],
                    'progress': status_info.get('progress', 0),
                    'elapsed': (check_time - start_time).seconds
                })

                status = status_info['status']
                progress = status_info.get('progress', 0)
                elapsed = (check_time - start_time).seconds

                print(f"   📊 {elapsed:3d}s - {status} ({progress}%)")

                # Track progress
                if progress != last_progress:
                    self.assessment_logs[assessment_id]['progress_history'].append({
                        'timestamp': check_time.isoformat(),
                        'progress': progress,
                        'elapsed': elapsed
                    })
                    last_progress = progress
                    stalled_count = 0
                else:
                    stalled_count += 1

                # Check for completion
                if status == 'COMPLETED':
                    print(f"✅ Assessment completed in {elapsed}s")
                    await self._get_final_results(assessment_id)
                    break
                elif status == 'FAILED':
                    print(f"❌ Assessment failed after {elapsed}s")
                    await self._analyze_failure(assessment_id, status_info)
                    break

                # Check for stalled progress
                if stalled_count >= 5:
                    print(f"⚠️ Assessment appears stalled (no progress for {stalled_count * check_interval}s)")
                    await self._diagnose_stalled_assessment(assessment_id)

                await asyncio.sleep(check_interval)

            except Exception as e:
                print(f"❌ Monitoring error: {e}")
                await asyncio.sleep(check_interval)

        else:
            print(f"⏰ Assessment monitoring timeout after {max_wait}s")
            await self._handle_timeout(assessment_id)

    async def _get_final_results(self, assessment_id: str):
        """Get and log final assessment results"""
        try:
            results = await self.client.get_assessment_results(assessment_id)

            self.assessment_logs[assessment_id]['final_result'] = {
                'overall_score': results.overall_score,
                'risk_level': results.risk_level.value,
                'total_tests': results.total_tests,
                'passed_tests': results.passed_tests,
                'failed_tests': results.failed_tests,
                'categories': results.categories,
                'recommendations': results.recommendations[:3]  # First 3
            }

            print(f"📊 Final Results:")
            print(f"   Overall Score: {results.overall_score}/10")
            print(f"   Risk Level: {results.risk_level.value}")
            print(f"   Tests: {results.passed_tests}/{results.total_tests} passed")

        except Exception as e:
            print(f"❌ Failed to get final results: {e}")

    async def _analyze_failure(self, assessment_id: str, status_info: dict):
        """Analyze assessment failure"""
        print(f"🔍 Analyzing failure for {assessment_id}")

        error_message = status_info.get('error_message', 'No error message provided')
        print(f"   Error: {error_message}")

        # Log failure details
        self.assessment_logs[assessment_id]['failure'] = {
            'error_message': error_message,
            'status_info': status_info,
            'analysis_timestamp': datetime.now().isoformat()
        }

        # Provide suggestions based on error
        if 'timeout' in error_message.lower():
            print("   💡 Suggestion: Try with fewer test suites or increase timeout")
        elif 'connection' in error_message.lower():
            print("   💡 Suggestion: Check model provider configuration")
        elif 'authentication' in error_message.lower():
            print("   💡 Suggestion: Verify provider API keys")

    def generate_debug_report(self, assessment_id: str = None):
        """Generate comprehensive debug report"""
        if assessment_id:
            assessments = {assessment_id: self.assessment_logs[assessment_id]}
        else:
            assessments = self.assessment_logs

        print(f"\n📋 Assessment Debug Report")
        print("=" * 50)

        for aid, log in assessments.items():
            print(f"\n🧪 Assessment: {aid}")
            print(f"   Model: {log['model_id']}")
            print(f"   Test Suites: {log['test_suites']}")
            print(f"   Start Time: {log['start_time']}")
            print(f"   Status Checks: {len(log['status_checks'])}")

            if log.get('final_result'):
                result = log['final_result']
                print(f"   Final Score: {result['overall_score']}/10")
                print(f"   Risk Level: {result['risk_level']}")

            if log.get('failure'):
                print(f"   Failed: {log['failure']['error_message']}")

            # Progress analysis
            if log['progress_history']:
                print(f"   Progress Points: {len(log['progress_history'])}")
                for i, progress in enumerate(log['progress_history'][:3]):
                    print(f"     {i+1}. {progress['progress']}% at {progress['elapsed']}s")

# Usage example
async def debug_assessment_example():
    """Example of assessment debugging"""
    async with ModelRed() as client:
        debugger = AssessmentDebugger(client)

        # Get a model to test (use first available)
        models = await client.list_models()
        if not models:
            print("❌ No models available for testing")
            return

        model_id = models[0]['modelId']

        try:
            await debugger.debug_assessment(
                model_id=model_id,
                test_suites=["basic_security"],
                max_wait=120,  # 2 minutes for demo
                check_interval=5
            )
        except Exception as e:
            print(f"❌ Debug assessment failed: {e}")

        # Generate final report
        debugger.generate_debug_report()

await debug_assessment_example()

Environment Diagnostics

🔧 System Environment Analysis

Comprehensive system and environment diagnostics for troubleshooting.

PYTHON
import sys
import os
import platform
import pkg_resources
import aiohttp
import asyncio
from pathlib import Path

async def comprehensive_environment_check():
    """Comprehensive environment and system diagnostics"""
    print("🔍 ModelRed Environment Diagnostics")
    print("=" * 50)

    # Python environment
    print(f"\n🐍 Python Environment:")
    print(f"   Python Version: {sys.version}")
    print(f"   Platform: {platform.platform()}")
    print(f"   Architecture: {platform.architecture()[0]}")
    print(f"   Executable: {sys.executable}")

    # Virtual environment detection
    if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
        print(f"   Virtual Env: Active ({sys.prefix})")
    else:
        print(f"   Virtual Env: Not detected")

    # Package versions
    print(f"\n📦 Key Package Versions:")
    key_packages = ['modelred', 'aiohttp', 'asyncio']

    for package in key_packages:
        try:
            version = pkg_resources.get_distribution(package).version
            print(f"   {package}: {version}")
        except pkg_resources.DistributionNotFound:
            print(f"   {package}: Not installed")
        except Exception as e:
            print(f"   {package}: Error checking version ({e})")

    # Environment variables
    print(f"\n🔐 Environment Variables:")
    env_vars = [
        'MODELRED_API_KEY',
        'OPENAI_API_KEY',
        'ANTHROPIC_API_KEY',
        'HUGGINGFACE_API_TOKEN',
        'AZURE_OPENAI_API_KEY',
        'AWS_ACCESS_KEY_ID'
    ]

    for var in env_vars:
        value = os.getenv(var)
        if value:
            # Mask sensitive values
            masked = f"{value[:8]}..." if len(value) > 8 else value[:4] + "..."
            print(f"   {var}: Set ({masked})")
        else:
            print(f"   {var}: Not set")

    # Network connectivity
    print(f"\n🌐 Network Connectivity:")
    await check_network_connectivity()

    # File system permissions
    print(f"\n📁 File System:")
    check_file_permissions()

    # Memory and performance
    print(f"\n💾 System Resources:")
    check_system_resources()

async def check_network_connectivity():
    """Check network connectivity to key endpoints"""
    endpoints = [
        ('ModelRed API', 'http://localhost:3000/api/health'),
        ('OpenAI API', 'https://api.openai.com/v1/models'),
        ('Anthropic API', 'https://api.anthropic.com/v1/models'),
        ('Google DNS', 'https://8.8.8.8')
    ]

    timeout = aiohttp.ClientTimeout(total=5)

    for name, url in endpoints:
        try:
            async with aiohttp.ClientSession(timeout=timeout) as session:
                async with session.get(url) as response:
                    print(f"   {name}: ✅ Reachable (HTTP {response.status})")
        except asyncio.TimeoutError:
            print(f"   {name}: ❌ Timeout")
        except Exception as e:
            print(f"   {name}: ❌ Error ({type(e).__name__})")

def check_file_permissions():
    """Check file system permissions"""
    current_dir = Path.cwd()

    # Check current directory permissions
    if os.access(current_dir, os.R_OK):
        print(f"   Current Dir Read: ✅")
    else:
        print(f"   Current Dir Read: ❌")

    if os.access(current_dir, os.W_OK):
        print(f"   Current Dir Write: ✅")
    else:
        print(f"   Current Dir Write: ❌")

    # Check for .env file
    env_file = current_dir / '.env'
    if env_file.exists():
        print(f"   .env File: ✅ Found")
        if os.access(env_file, os.R_OK):
            print(f"   .env Readable: ✅")
        else:
            print(f"   .env Readable: ❌")
    else:
        print(f"   .env File: ❌ Not found")

def check_system_resources():
    """Check system resources"""
    try:
        import psutil

        # Memory usage
        memory = psutil.virtual_memory()
        print(f"   Memory Usage: {memory.percent}% ({memory.used // (1024**3)}GB / {memory.total // (1024**3)}GB)")

        # CPU usage
        cpu_percent = psutil.cpu_percent(interval=1)
        print(f"   CPU Usage: {cpu_percent}%")

        # Disk usage
        disk = psutil.disk_usage('/')
        print(f"   Disk Usage: {disk.percent}% ({disk.used // (1024**3)}GB / {disk.total // (1024**3)}GB)")

    except ImportError:
        print("   Install psutil for detailed system info: pip install psutil")
    except Exception as e:
        print(f"   Error getting system resources: {e}")

# Run comprehensive environment check
await comprehensive_environment_check()

Next Steps