Running Assessments
Complete guide to executing security assessments, monitoring progress, and managing results
Learn how to execute comprehensive security assessments using ModelRed's SDK, monitor progress in real-time, and effectively manage your results.
Quick Start
import asyncio
from modelred import ModelRed
async def main():
async with ModelRed(api_key="mr_your_api_key_here") as client:
# Simple assessment
result = await client.run_assessment(
model_id="my-model",
test_suites=["base64_injection", "toxicity_generation"],
wait_for_completion=True
)
print(f"Security Score: {result.overall_score}/10")
print(f"Risk Level: {result.risk_level}")
asyncio.run(main())
Assessment Parameters
Required Parameters
model_id
The identifier of your registered AI model
model_id="my-gpt-model"
test_suites
List of test suites to execute
test_suites=["base64_injection", "toxicity_generation"]
Optional Parameters
priority
optionalAssessment execution priority level
Options: low
, normal
, high
, critical
wait_for_completion
optionalWhether to wait for assessment completion
Default: False
(returns immediately with assessment_id)
timeout_minutes
optionalMaximum wait time for assessment completion
Default: 15
minutes
progress_callback
optionalFunction called with progress updates
Signature: callback(progress: int, status: str)
Execution Patterns
Synchronous Assessment (Wait for Completion)
async def run_synchronous_assessment():
async with ModelRed(api_key="mr_your_api_key") as client:
# Wait for completion with progress tracking
def progress_handler(progress, status):
print(f"Progress: {progress}% - {status}")
result = await client.run_assessment(
model_id="my-model",
test_suites=["base64_injection", "toxicity_generation"],
priority="high",
wait_for_completion=True,
timeout_minutes=20,
progress_callback=progress_handler
)
# Result is immediately available
print(f"Assessment completed: {result.overall_score}/10")
return result
Asynchronous Assessment (Background Execution)
async def run_asynchronous_assessment():
async with ModelRed(api_key="mr_your_api_key") as client:
# Start assessment without waiting
result = await client.run_assessment(
model_id="my-model",
test_suites=["advanced_jailbreak_ablation", "malware_payload"],
priority="normal",
wait_for_completion=False # Returns immediately
)
assessment_id = result.assessment_id
print(f"Assessment started: {assessment_id}")
# Check status periodically
while True:
status = await client.get_assessment_status(assessment_id)
print(f"Status: {status.state} - {status.progress}%")
if status.state == "completed":
# Retrieve final results
final_result = await client.get_assessment_result(assessment_id)
return final_result
elif status.state == "failed":
print(f"Assessment failed: {status.error_message}")
return None
await asyncio.sleep(30) # Check every 30 seconds
Priority Levels
Understanding Priority
Low Priority
Background processing during off-peak hours
- • Longer queue times
- • Cost-optimized execution
- • Best for batch processing
Normal Priority
Standard processing queue (default)
- • Balanced performance
- • Regular queue times
- • Most common use case
High Priority
Faster processing for important assessments
- • Reduced queue times
- • Production deployments
- • Time-sensitive testing
Critical Priority
Immediate processing for urgent security needs
- • Highest priority queue
- • Security incidents
- • Emergency assessments
Assessment Management
Checking Assessment Status
async def monitor_assessment(assessment_id: str):
async with ModelRed(api_key="mr_your_api_key") as client:
status = await client.get_assessment_status(assessment_id)
return {
"state": status.state, # queued, running, completed, failed
"progress": status.progress, # 0-100
"estimated_completion": status.estimated_completion,
"error_message": status.error_message if status.state == "failed" else None
}
Retrieving Assessment Results
async def get_results(assessment_id: str):
async with ModelRed(api_key="mr_your_api_key") as client:
# Get detailed results
result = await client.get_assessment_result(assessment_id)
return {
"overall_score": result.overall_score,
"risk_level": result.risk_level.value,
"test_suite_results": result.test_suite_results,
"vulnerabilities_found": result.vulnerabilities_found,
"completion_time": result.completion_time
}
Listing Your Assessments
async def list_assessments():
async with ModelRed(api_key="mr_your_api_key") as client:
# Get recent assessments
assessments = await client.list_assessments(
limit=10,
model_id="my-model", # optional filter
state="completed" # optional filter
)
for assessment in assessments:
print(f"{assessment.id}: {assessment.overall_score}/10 ({assessment.state})")
Batch Processing
Running Multiple Assessments
async def run_batch_assessments():
async with ModelRed(api_key="mr_your_api_key") as client:
models = ["model-1", "model-2", "model-3"]
test_configs = [
["base64_injection", "toxicity_generation"],
["advanced_jailbreak_ablation"],
["malware_top_level", "xss_markdown_exfil"]
]
# Start all assessments
assessment_ids = []
for i, model_id in enumerate(models):
result = await client.run_assessment(
model_id=model_id,
test_suites=test_configs[i % len(test_configs)],
priority="normal",
wait_for_completion=False
)
assessment_ids.append(result.assessment_id)
# Monitor all assessments
completed_results = []
while assessment_ids:
for assessment_id in assessment_ids[:]:
status = await client.get_assessment_status(assessment_id)
if status.state == "completed":
result = await client.get_assessment_result(assessment_id)
completed_results.append(result)
assessment_ids.remove(assessment_id)
elif status.state == "failed":
print(f"Assessment {assessment_id} failed: {status.error_message}")
assessment_ids.remove(assessment_id)
if assessment_ids:
await asyncio.sleep(30) # Check every 30 seconds
return completed_results
Error Handling
Common Error Scenarios
async def robust_assessment_execution():
async with ModelRed(api_key="mr_your_api_key") as client:
try:
result = await client.run_assessment(
model_id="my-model",
test_suites=["base64_injection", "toxicity_generation"],
priority="high",
wait_for_completion=True,
timeout_minutes=15
)
return result
except ModelRedTimeoutError:
print("Assessment timed out - check status manually")
# Handle timeout gracefully
except ModelRedQuotaExceededError as e:
print(f"Quota exceeded: {e.message}")
# Handle quota limits
except ModelRedValidationError as e:
print(f"Invalid parameters: {e.message}")
# Handle validation errors
except ModelRedServiceError as e:
print(f"Service error: {e.message}")
# Handle service issues
except Exception as e:
print(f"Unexpected error: {e}")
# Handle unexpected errors
Retry Logic
async def run_assessment_with_retry(max_retries=3):
async with ModelRed(api_key="mr_your_api_key") as client:
for attempt in range(max_retries):
try:
result = await client.run_assessment(
model_id="my-model",
test_suites=["toxicity_generation"],
wait_for_completion=True
)
return result
except ModelRedServiceError as e:
if attempt == max_retries - 1:
raise # Last attempt failed
wait_time = 2 ** attempt # Exponential backoff
print(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
Performance Optimization
Efficient Test Suite Selection
# Optimize for speed - quick encoding tests
quick_assessment = [
"base64_injection",
"hexadecimal_injection",
"rot13_encoding"
]
# Comprehensive security - longer execution time
comprehensive_assessment = [
"advanced_jailbreak_ablation", # 8-12 minutes
"malware_payload", # 8-12 minutes
"latent_injection_report" # 4-6 minutes
]
# Balanced approach - medium execution time
balanced_assessment = [
"toxicity_generation", # 3-5 minutes
"prompt_inject_hate", # 3-4 minutes
"package_hallucination_python" # 3-4 minutes
]
Parallel Execution Strategies
async def parallel_model_testing():
async with ModelRed(api_key="mr_your_api_key") as client:
# Test multiple models simultaneously
tasks = []
models = ["model-a", "model-b", "model-c"]
for model_id in models:
task = client.run_assessment(
model_id=model_id,
test_suites=["toxicity_generation", "base64_injection"],
priority="normal",
wait_for_completion=True
)
tasks.append(task)
# Execute all assessments in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Model {models[i]} assessment failed: {result}")
else:
successful_results.append(result)
return successful_results
Best Practices
Assessment Planning
- Start Small: Begin with free tier test suites to validate your setup
- Incremental Testing: Add more complex test suites gradually
- Regular Monitoring: Set up automated assessments for production models
- Documentation: Keep track of assessment configurations and results
Resource Management
- Quota Awareness: Monitor your monthly assessment limits
- Priority Allocation: Use appropriate priority levels for different scenarios
- Batch Processing: Group related assessments to optimize resource usage
- Error Recovery: Implement robust error handling and retry logic
Security Workflow Integration
# Example CI/CD integration
async def ci_cd_security_gate():
"""Security assessment for CI/CD pipeline"""
async with ModelRed(api_key=os.getenv("MODELRED_API_KEY")) as client:
# Quick security check
result = await client.run_assessment(
model_id=os.getenv("MODEL_ID"),
test_suites=["base64_injection", "toxicity_generation"],
priority="high",
wait_for_completion=True,
timeout_minutes=10
)
# Fail pipeline if security score is too low
if result.overall_score < 7:
raise Exception(f"Security score too low: {result.overall_score}/10")
print(f"Security gate passed: {result.overall_score}/10")
return True
Related Documentation
- Test Suites Reference - Complete list of all available test suites
- Tier System - Understanding subscription tiers and access levels
- Attack Categories - Security vulnerability categories
- Assessment Results - Interpreting and acting on results