Documentation

Running Assessments

Complete guide to executing security assessments, monitoring progress, and managing results

Learn how to execute comprehensive security assessments using ModelRed's SDK, monitor progress in real-time, and effectively manage your results.

Quick Start

import asyncio
from modelred import ModelRed

async def main():
    async with ModelRed(api_key="mr_your_api_key_here") as client:
        # Simple assessment
        result = await client.run_assessment(
            model_id="my-model",
            test_suites=["base64_injection", "toxicity_generation"],
            wait_for_completion=True
        )

        print(f"Security Score: {result.overall_score}/10")
        print(f"Risk Level: {result.risk_level}")

asyncio.run(main())

Assessment Parameters

Required Parameters

model_id

The identifier of your registered AI model

model_id="my-gpt-model"

test_suites

List of test suites to execute

test_suites=["base64_injection", "toxicity_generation"]

Optional Parameters

priority

optional

Assessment execution priority level

Options: low, normal, high, critical

wait_for_completion

optional

Whether to wait for assessment completion

Default: False (returns immediately with assessment_id)

timeout_minutes

optional

Maximum wait time for assessment completion

Default: 15 minutes

progress_callback

optional

Function called with progress updates

Signature: callback(progress: int, status: str)

Execution Patterns

Synchronous Assessment (Wait for Completion)

async def run_synchronous_assessment():
    async with ModelRed(api_key="mr_your_api_key") as client:
        # Wait for completion with progress tracking
        def progress_handler(progress, status):
            print(f"Progress: {progress}% - {status}")

        result = await client.run_assessment(
            model_id="my-model",
            test_suites=["base64_injection", "toxicity_generation"],
            priority="high",
            wait_for_completion=True,
            timeout_minutes=20,
            progress_callback=progress_handler
        )

        # Result is immediately available
        print(f"Assessment completed: {result.overall_score}/10")
        return result

Asynchronous Assessment (Background Execution)

async def run_asynchronous_assessment():
    async with ModelRed(api_key="mr_your_api_key") as client:
        # Start assessment without waiting
        result = await client.run_assessment(
            model_id="my-model",
            test_suites=["advanced_jailbreak_ablation", "malware_payload"],
            priority="normal",
            wait_for_completion=False  # Returns immediately
        )

        assessment_id = result.assessment_id
        print(f"Assessment started: {assessment_id}")

        # Check status periodically
        while True:
            status = await client.get_assessment_status(assessment_id)
            print(f"Status: {status.state} - {status.progress}%")

            if status.state == "completed":
                # Retrieve final results
                final_result = await client.get_assessment_result(assessment_id)
                return final_result
            elif status.state == "failed":
                print(f"Assessment failed: {status.error_message}")
                return None

            await asyncio.sleep(30)  # Check every 30 seconds

Priority Levels

Understanding Priority

Low Priority

Background processing during off-peak hours

  • • Longer queue times
  • • Cost-optimized execution
  • • Best for batch processing

Normal Priority

Standard processing queue (default)

  • • Balanced performance
  • • Regular queue times
  • • Most common use case

High Priority

Faster processing for important assessments

  • • Reduced queue times
  • • Production deployments
  • • Time-sensitive testing

Critical Priority

Immediate processing for urgent security needs

  • • Highest priority queue
  • • Security incidents
  • • Emergency assessments

Assessment Management

Checking Assessment Status

async def monitor_assessment(assessment_id: str):
    async with ModelRed(api_key="mr_your_api_key") as client:
        status = await client.get_assessment_status(assessment_id)

        return {
            "state": status.state,  # queued, running, completed, failed
            "progress": status.progress,  # 0-100
            "estimated_completion": status.estimated_completion,
            "error_message": status.error_message if status.state == "failed" else None
        }

Retrieving Assessment Results

async def get_results(assessment_id: str):
    async with ModelRed(api_key="mr_your_api_key") as client:
        # Get detailed results
        result = await client.get_assessment_result(assessment_id)

        return {
            "overall_score": result.overall_score,
            "risk_level": result.risk_level.value,
            "test_suite_results": result.test_suite_results,
            "vulnerabilities_found": result.vulnerabilities_found,
            "completion_time": result.completion_time
        }

Listing Your Assessments

async def list_assessments():
    async with ModelRed(api_key="mr_your_api_key") as client:
        # Get recent assessments
        assessments = await client.list_assessments(
            limit=10,
            model_id="my-model",  # optional filter
            state="completed"      # optional filter
        )

        for assessment in assessments:
            print(f"{assessment.id}: {assessment.overall_score}/10 ({assessment.state})")

Batch Processing

Running Multiple Assessments

async def run_batch_assessments():
    async with ModelRed(api_key="mr_your_api_key") as client:
        models = ["model-1", "model-2", "model-3"]
        test_configs = [
            ["base64_injection", "toxicity_generation"],
            ["advanced_jailbreak_ablation"],
            ["malware_top_level", "xss_markdown_exfil"]
        ]

        # Start all assessments
        assessment_ids = []
        for i, model_id in enumerate(models):
            result = await client.run_assessment(
                model_id=model_id,
                test_suites=test_configs[i % len(test_configs)],
                priority="normal",
                wait_for_completion=False
            )
            assessment_ids.append(result.assessment_id)

        # Monitor all assessments
        completed_results = []
        while assessment_ids:
            for assessment_id in assessment_ids[:]:
                status = await client.get_assessment_status(assessment_id)

                if status.state == "completed":
                    result = await client.get_assessment_result(assessment_id)
                    completed_results.append(result)
                    assessment_ids.remove(assessment_id)
                elif status.state == "failed":
                    print(f"Assessment {assessment_id} failed: {status.error_message}")
                    assessment_ids.remove(assessment_id)

            if assessment_ids:
                await asyncio.sleep(30)  # Check every 30 seconds

        return completed_results

Error Handling

Common Error Scenarios

async def robust_assessment_execution():
    async with ModelRed(api_key="mr_your_api_key") as client:
        try:
            result = await client.run_assessment(
                model_id="my-model",
                test_suites=["base64_injection", "toxicity_generation"],
                priority="high",
                wait_for_completion=True,
                timeout_minutes=15
            )
            return result

        except ModelRedTimeoutError:
            print("Assessment timed out - check status manually")
            # Handle timeout gracefully

        except ModelRedQuotaExceededError as e:
            print(f"Quota exceeded: {e.message}")
            # Handle quota limits

        except ModelRedValidationError as e:
            print(f"Invalid parameters: {e.message}")
            # Handle validation errors

        except ModelRedServiceError as e:
            print(f"Service error: {e.message}")
            # Handle service issues

        except Exception as e:
            print(f"Unexpected error: {e}")
            # Handle unexpected errors

Retry Logic

async def run_assessment_with_retry(max_retries=3):
    async with ModelRed(api_key="mr_your_api_key") as client:
        for attempt in range(max_retries):
            try:
                result = await client.run_assessment(
                    model_id="my-model",
                    test_suites=["toxicity_generation"],
                    wait_for_completion=True
                )
                return result

            except ModelRedServiceError as e:
                if attempt == max_retries - 1:
                    raise  # Last attempt failed

                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)

Performance Optimization

Efficient Test Suite Selection

# Optimize for speed - quick encoding tests
quick_assessment = [
    "base64_injection",
    "hexadecimal_injection",
    "rot13_encoding"
]

# Comprehensive security - longer execution time
comprehensive_assessment = [
    "advanced_jailbreak_ablation",  # 8-12 minutes
    "malware_payload",              # 8-12 minutes
    "latent_injection_report"       # 4-6 minutes
]

# Balanced approach - medium execution time
balanced_assessment = [
    "toxicity_generation",          # 3-5 minutes
    "prompt_inject_hate",           # 3-4 minutes
    "package_hallucination_python"  # 3-4 minutes
]

Parallel Execution Strategies

async def parallel_model_testing():
    async with ModelRed(api_key="mr_your_api_key") as client:
        # Test multiple models simultaneously
        tasks = []
        models = ["model-a", "model-b", "model-c"]

        for model_id in models:
            task = client.run_assessment(
                model_id=model_id,
                test_suites=["toxicity_generation", "base64_injection"],
                priority="normal",
                wait_for_completion=True
            )
            tasks.append(task)

        # Execute all assessments in parallel
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results
        successful_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Model {models[i]} assessment failed: {result}")
            else:
                successful_results.append(result)

        return successful_results

Best Practices

Assessment Planning

  1. Start Small: Begin with free tier test suites to validate your setup
  2. Incremental Testing: Add more complex test suites gradually
  3. Regular Monitoring: Set up automated assessments for production models
  4. Documentation: Keep track of assessment configurations and results

Resource Management

  1. Quota Awareness: Monitor your monthly assessment limits
  2. Priority Allocation: Use appropriate priority levels for different scenarios
  3. Batch Processing: Group related assessments to optimize resource usage
  4. Error Recovery: Implement robust error handling and retry logic

Security Workflow Integration

# Example CI/CD integration
async def ci_cd_security_gate():
    """Security assessment for CI/CD pipeline"""
    async with ModelRed(api_key=os.getenv("MODELRED_API_KEY")) as client:
        # Quick security check
        result = await client.run_assessment(
            model_id=os.getenv("MODEL_ID"),
            test_suites=["base64_injection", "toxicity_generation"],
            priority="high",
            wait_for_completion=True,
            timeout_minutes=10
        )

        # Fail pipeline if security score is too low
        if result.overall_score < 7:
            raise Exception(f"Security score too low: {result.overall_score}/10")

        print(f"Security gate passed: {result.overall_score}/10")
        return True