Best Practices
Production deployment patterns, security considerations, and performance optimization for the ModelRed SDK.
Introduction
Follow these guidelines to build robust, secure, and performant integrations with the ModelRed SDK. These best practices cover security, error handling, performance optimization, and production deployment patterns.
Security
API Key Management
Never hardcode API keys in source code:
# ❌ Never do this
from modelred import ModelRed
client = ModelRed(api_key="mr_abc123def456...") # ✅ Use environment variables
import os
from modelred import ModelRed
api_key = os.environ["MODELRED_API_KEY"]
client = ModelRed(api_key=api_key)Environment Variables
Use a .env file for local development:
# .env (add to .gitignore!)
MODELRED_API_KEY=mr_your_key_here
DETECTOR_API_KEY=sk_your_detector_keyLoad with python-dotenv:
from dotenv import load_dotenv
import os
from modelred import ModelRed
load_dotenv()
client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])Secrets Management
For production, use proper secrets management:
import boto3
import json
from modelred import ModelRed
def get_secret(secret_name):
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
secrets = get_secret('modelred/api-keys')
client = ModelRed(api_key=secrets['api_key']) from google.cloud import secretmanager
import json
from modelred import ModelRed
def get_secret(project_id, secret_id):
client = secretmanager.SecretManagerServiceClient()
name = f"projects/{project_id}/secrets/{secret_id}/versions/latest"
response = client.access_secret_version(request={"name": name})
return json.loads(response.payload.data.decode('UTF-8'))
secrets = get_secret('my-project', 'modelred-api-keys')
client = ModelRed(api_key=secrets['api_key']) from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from modelred import ModelRed
credential = DefaultAzureCredential()
client = SecretClient(
vault_url="https://my-vault.vault.azure.net",
credential=credential
)
api_key = client.get_secret("modelred-api-key").value
mr_client = ModelRed(api_key=api_key)Security: Never commit .env files to version control. Add .env to your
.gitignore file.
Detector Keys
Handle detector API keys with the same care:
# ✅ Good
detector_key = os.environ["OPENAI_API_KEY"]
assessment = client.create_assessment_by_id(
model_id="model_123",
probe_pack_ids=["pack_1"],
detector_provider="openai",
detector_api_key=detector_key, # From environment
detector_model="gpt-4o-mini",
)Configuration
Centralized Client
Create a single client instance and reuse it:
import os
from modelred import ModelRed
_client = None
def get_client():
global _client
if _client is None:
_client = ModelRed(
api_key=os.environ["MODELRED_API_KEY"],
timeout=30.0,
max_retries=3,
)
return _clientfrom config import get_client
client = get_client()
assessments = client.list_assessments()Configuration Object
Use a configuration class for complex setups:
from dataclasses import dataclass
import os
from modelred import ModelRed
@dataclass
class ModelRedConfig:
api_key: str
detector_provider: str
detector_api_key: str
detector_model: str
timeout: float = 30.0
max_retries: int = 3
@classmethod
def from_env(cls):
return cls(
api_key=os.environ["MODELRED_API_KEY"],
detector_provider=os.environ["DETECTOR_PROVIDER"],
detector_api_key=os.environ["DETECTOR_API_KEY"],
detector_model=os.environ["DETECTOR_MODEL"],
)
def create_client(self):
return ModelRed(
api_key=self.api_key,
timeout=self.timeout,
max_retries=self.max_retries,
)
# Usage
config = ModelRedConfig.from_env()
client = config.create_client()Error Handling
Graceful Degradation
Handle errors without breaking your application:
from modelred import APIError, RateLimited
import logging
logger = logging.getLogger(__name__)
def get_assessments_safe(client):
try:
return client.list_assessments(page_size=50)
except RateLimited as e:
logger.warning(f"Rate limited: {e.message}")
return {"data": [], "total": 0} # Empty response
except APIError as e:
logger.error(f"API error: {e.message}", exc_info=True)
return {"data": [], "total": 0}Retry Logic
Implement smart retries for transient failures:
import time
from modelred import RateLimited, ServerError
def with_retry(func, max_attempts=3, base_delay=1):
for attempt in range(max_attempts):
try:
return func()
except (RateLimited, ServerError) as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
logger.info(f"Retry {attempt + 1}/{max_attempts} in {delay}s")
time.sleep(delay)
# Usage
result = with_retry(
lambda: client.create_assessment_by_id(
model_id="model_123",
probe_pack_ids=["pack_1"],
detector_provider="openai",
detector_api_key=os.environ["OPENAI_API_KEY"],
detector_model="gpt-4o-mini",
)
)Structured Logging
Log errors with context:
import logging
from modelred import APIError
logger = logging.getLogger(__name__)
try:
assessment = client.create_assessment_by_id(...)
except APIError as e:
logger.error(
"Assessment creation failed",
extra={
"error_status": e.status,
"error_code": e.code,
"error_message": e.message,
"model_id": "model_123",
},
exc_info=True,
)Performance
Connection Reuse
Reuse client instances to benefit from connection pooling and reduce overhead.
Async Operations
Use async client for concurrent operations to maximize throughput.
Smart Caching
Cache frequently accessed data with appropriate TTLs to reduce API calls.
Server-Side Filtering
Filter data on the server rather than fetching everything and filtering locally.
Connection Reuse
# ❌ Bad: Creates new connection pool for each request
def get_models():
client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
return client.list_models()
# ✅ Good: Reuse client
client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
def get_models():
return client.list_models()Async for Concurrency
Use async client for concurrent operations:
import asyncio
from modelred import AsyncModelRed
async def fetch_multiple_assessments(assessment_ids):
async with AsyncModelRed(api_key=os.environ["MODELRED_API_KEY"]) as client:
tasks = [
client.get_assessment(aid)
for aid in assessment_ids
]
return await asyncio.gather(*tasks)
# Fetch 10 assessments concurrently
results = asyncio.run(fetch_multiple_assessments(assessment_ids))Batch Operations
Group operations to reduce API calls:
# ❌ Bad: One request per model
for model_id in model_ids:
response = client.list_assessments(page_size=1)
# Process...
# ✅ Good: Fetch all at once with filtering
all_assessments = list(client.iter_assessments(page_size=100))
# Filter locally if neededCaching
Cache frequently accessed data:
import time
class CachedModelRed:
def __init__(self, client, ttl=300):
self.client = client
self.ttl = ttl
self._models_cache = None
self._models_timestamp = 0
def list_models_cached(self):
now = time.time()
if (self._models_cache is None or
(now - self._models_timestamp) > self.ttl):
self._models_cache = list(
self.client.iter_models(page_size=100)
)
self._models_timestamp = now
return self._models_cache
cached = CachedModelRed(client, ttl=600) # 10 min cache
models = cached.list_models_cached()Testing
Mocking the Client
Use unittest.mock for testing:
from unittest.mock import Mock
import pytest
from modelred import NotFound
def test_assessment_not_found():
mock_client = Mock()
mock_client.get_assessment.side_effect = NotFound(
404, "Assessment not found", None, None
)
with pytest.raises(NotFound):
mock_client.get_assessment("missing_id")Test Fixtures
Create reusable fixtures:
import pytest
from unittest.mock import Mock
@pytest.fixture
def mock_client():
client = Mock()
client.list_models.return_value = {
"data": [
{"id": "model_1", "displayName": "GPT-4"},
{"id": "model_2", "displayName": "Claude"},
],
"total": 2,
"page": 1,
"totalPages": 1,
}
return client
def test_list_models(mock_client):
response = mock_client.list_models()
assert len(response["data"]) == 2Integration Tests
For integration tests, use separate test keys:
import os
import pytest
from modelred import ModelRed
@pytest.fixture
def test_client():
test_api_key = os.environ.get("MODELRED_TEST_API_KEY")
if not test_api_key:
pytest.skip("No test API key provided")
return ModelRed(api_key=test_api_key)
def test_list_models_integration(test_client):
response = test_client.list_models(page_size=10)
assert "data" in response
assert "total" in responseResource Management
Context Managers
Always use context managers for cleanup:
with ModelRed(api_key=os.environ["MODELRED_API_KEY"]) as client:
assessments = client.list_assessments()
# client.close() called automatically async with AsyncModelRed(api_key=os.environ["MODELRED_API_KEY"]) as client:
assessments = await client.list_assessments()
# await client.aclose() called automaticallyManual Cleanup
If not using context managers, ensure cleanup:
client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
try:
assessments = client.list_assessments()
finally:
client.close() # Always closeData Handling
Pagination
Use iterators for large datasets:
# ✅ Memory efficient
for assessment in client.iter_assessments(page_size=50):
process(assessment) # Process and discard
# ❌ Memory inefficient
all_assessments = list(client.iter_assessments(page_size=50))
for assessment in all_assessments:
process(assessment)Streaming Processing
Process data as it arrives:
def export_to_database(client):
"""Stream assessments to database without loading all."""
for assessment in client.iter_assessments(page_size=100):
db.insert(assessment) # Insert immediately
# Assessment can be garbage collectedFiltering
Filter on the server, not the client:
# ✅ Good: Server-side filtering
completed = client.list_assessments(
status="COMPLETED",
provider="openai",
)
# ❌ Bad: Client-side filtering
all_assessments = list(client.iter_assessments(page_size=50))
completed = [a for a in all_assessments if a["status"] == "COMPLETED"]Monitoring
Health Checks
Implement health checks for your integration:
from modelred import APIError
def health_check():
try:
client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
response = client.list_models(page_size=1)
return {"status": "healthy", "models": response["total"]}
except APIError as e:
return {
"status": "unhealthy",
"error": str(e),
"status_code": e.status,
}Metrics
Track key metrics:
import time
from prometheus_client import Counter, Histogram
api_requests = Counter(
'modelred_api_requests_total',
'Total ModelRed API requests',
['method', 'status']
)
api_duration = Histogram(
'modelred_api_duration_seconds',
'ModelRed API request duration'
)
def tracked_request(func):
start = time.time()
try:
result = func()
api_requests.labels(method='list_models', status='success').inc()
return result
except Exception as e:
api_requests.labels(method='list_models', status='error').inc()
raise
finally:
duration = time.time() - start
api_duration.observe(duration)
models = tracked_request(lambda: client.list_models())Detector Selection
Choose Appropriate Detectors
Match detector to use case:
# Fast and cheap for development
config = {
"provider": "openai",
"model": "gpt-4o-mini",
} # More thorough for pre-deployment
config = {
"provider": "openai",
"model": "gpt-4o",
} # Most reliable for production
config = {
"provider": "anthropic",
"model": "claude-3-5-sonnet-20241022",
}Environment-Based Configuration
def get_detector_config(use_case):
"""Return detector config based on use case."""
configs = {
"development": {
"provider": "openai",
"model": "gpt-4o-mini",
},
"staging": {
"provider": "openai",
"model": "gpt-4o",
},
"production": {
"provider": "anthropic",
"model": "claude-3-5-sonnet-20241022",
},
}
return configs.get(use_case, configs["development"])
config = get_detector_config(os.environ.get("ENV", "development"))
assessment = client.create_assessment_by_id(
model_id="model_123",
probe_pack_ids=["pack_1"],
detector_provider=config["provider"],
detector_api_key=os.environ[f"{config['provider'].upper()}_API_KEY"],
detector_model=config["model"],
)Cross-Validation
Use multiple detectors for critical assessments:
def create_cross_validated_assessments(client, model_id, probe_pack_ids):
"""Create assessments with multiple detectors."""
detectors = [
{
"provider": "openai",
"model": "gpt-4o",
"api_key": os.environ["OPENAI_API_KEY"],
},
{
"provider": "anthropic",
"model": "claude-3-5-sonnet-20241022",
"api_key": os.environ["ANTHROPIC_API_KEY"],
},
]
assessments = []
for detector in detectors:
assessment = client.create_assessment_by_id(
model_id=model_id,
probe_pack_ids=probe_pack_ids,
detector_provider=detector["provider"],
detector_api_key=detector["api_key"],
detector_model=detector["model"],
)
assessments.append(assessment)
return assessmentsDocumentation
Type Hints
Use type hints for better IDE support:
from typing import Dict, Any, List
from modelred import ModelRed
def create_security_assessment(
client: ModelRed,
model_id: str,
probe_packs: List[str],
) -> Dict[str, Any]:
"""
Create a security assessment.
Args:
client: ModelRed client instance
model_id: ID of the model to assess
probe_packs: List of probe pack IDs
Returns:
Assessment response dict
"""
return client.create_assessment_by_id(
model_id=model_id,
probe_pack_ids=probe_packs,
detector_provider="openai",
detector_api_key=os.environ["OPENAI_API_KEY"],
detector_model="gpt-4o-mini",
)Comments
Document complex logic:
def process_assessment_results(assessment):
"""
Process assessment results and extract key metrics.
The assessment may be in various states:
- COMPLETED: Full results available
- FAILED: Error details in 'error' field
- RUNNING: Partial results may be available
"""
if assessment["status"] == "COMPLETED":
return extract_metrics(assessment["results"])
elif assessment["status"] == "FAILED":
logger.error(f"Assessment failed: {assessment.get('error')}")
return None
else:
# Still running, check back later
return {"status": "pending"}Deployment
Environment-Specific Config
Use different configurations per environment:
import os
ENV = os.environ.get("ENV", "development")
CONFIGS = {
"development": {
"api_key": os.environ["DEV_MODELRED_API_KEY"],
"timeout": 60.0,
"max_retries": 1,
},
"staging": {
"api_key": os.environ["STAGING_MODELRED_API_KEY"],
"timeout": 30.0,
"max_retries": 3,
},
"production": {
"api_key": os.environ["PROD_MODELRED_API_KEY"],
"timeout": 20.0,
"max_retries": 5,
},
}
config = CONFIGS[ENV]
client = ModelRed(**config)Containerization
Example Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Don't bake secrets into image
ENV MODELRED_API_KEY=""
ENV DETECTOR_API_KEY=""
CMD ["python", "app.py"]Summary
Security
Never hardcode API keys. Use environment variables or secrets managers for production deployments.
Error Handling
Implement retry logic and graceful degradation. Handle expected errors appropriately.
Performance
Reuse clients, use async for concurrency, cache when appropriate, filter on the server.
Resource Management
Always use context managers for cleanup. Monitor health and track metrics.
Key Takeaways
✅ Security — Environment variables or secrets managers only
✅ Error Handling — Retry logic and graceful degradation
✅ Performance — Connection reuse, async operations, smart caching
✅ Resource Management — Context managers for automatic cleanup
✅ Testing — Mock for unit tests, separate keys for integration tests
✅ Monitoring — Health checks and metrics tracking
✅ Documentation — Type hints and clear comments
Next Steps
- Review Error Handling for robust error management
- Explore Pagination for efficient data handling
- Check the FAQ for common questions
- See Python SDK for configuration details