Best Practices

Production deployment patterns, security considerations, and performance optimization for the ModelRed SDK.

Introduction

Follow these guidelines to build robust, secure, and performant integrations with the ModelRed SDK. These best practices cover security, error handling, performance optimization, and production deployment patterns.

Security

API Key Management

Never hardcode API keys in source code:

    # ❌ Never do this
    from modelred import ModelRed
    
    client = ModelRed(api_key="mr_abc123def456...")
    # ✅ Use environment variables
    import os
    from modelred import ModelRed
    
    api_key = os.environ["MODELRED_API_KEY"]
    client = ModelRed(api_key=api_key)

Environment Variables

Use a .env file for local development:

.env
# .env (add to .gitignore!)
MODELRED_API_KEY=mr_your_key_here
DETECTOR_API_KEY=sk_your_detector_key

Load with python-dotenv:

from dotenv import load_dotenv
import os
from modelred import ModelRed

load_dotenv()

client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])

Secrets Management

For production, use proper secrets management:

    import boto3
    import json
    from modelred import ModelRed
    
    def get_secret(secret_name):
        client = boto3.client('secretsmanager')
        response = client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    
    secrets = get_secret('modelred/api-keys')
    client = ModelRed(api_key=secrets['api_key'])
    from google.cloud import secretmanager
    import json
    from modelred import ModelRed
    
    def get_secret(project_id, secret_id):
        client = secretmanager.SecretManagerServiceClient()
        name = f"projects/{project_id}/secrets/{secret_id}/versions/latest"
        response = client.access_secret_version(request={"name": name})
        return json.loads(response.payload.data.decode('UTF-8'))
    
    secrets = get_secret('my-project', 'modelred-api-keys')
    client = ModelRed(api_key=secrets['api_key'])
    from azure.identity import DefaultAzureCredential
    from azure.keyvault.secrets import SecretClient
    from modelred import ModelRed
    
    credential = DefaultAzureCredential()
    client = SecretClient(
        vault_url="https://my-vault.vault.azure.net",
        credential=credential
    )
    
    api_key = client.get_secret("modelred-api-key").value
    mr_client = ModelRed(api_key=api_key)

Security: Never commit .env files to version control. Add .env to your .gitignore file.

Detector Keys

Handle detector API keys with the same care:

# ✅ Good
detector_key = os.environ["OPENAI_API_KEY"]

assessment = client.create_assessment_by_id(
    model_id="model_123",
    probe_pack_ids=["pack_1"],
    detector_provider="openai",
    detector_api_key=detector_key,  # From environment
    detector_model="gpt-4o-mini",
)

Configuration

Centralized Client

Create a single client instance and reuse it:

config.py
import os
from modelred import ModelRed

_client = None

def get_client():
    global _client
    if _client is None:
        _client = ModelRed(
            api_key=os.environ["MODELRED_API_KEY"],
            timeout=30.0,
            max_retries=3,
        )
    return _client
usage.py
from config import get_client

client = get_client()
assessments = client.list_assessments()

Configuration Object

Use a configuration class for complex setups:

from dataclasses import dataclass
import os
from modelred import ModelRed

@dataclass
class ModelRedConfig:
    api_key: str
    detector_provider: str
    detector_api_key: str
    detector_model: str
    timeout: float = 30.0
    max_retries: int = 3

    @classmethod
    def from_env(cls):
        return cls(
            api_key=os.environ["MODELRED_API_KEY"],
            detector_provider=os.environ["DETECTOR_PROVIDER"],
            detector_api_key=os.environ["DETECTOR_API_KEY"],
            detector_model=os.environ["DETECTOR_MODEL"],
        )

    def create_client(self):
        return ModelRed(
            api_key=self.api_key,
            timeout=self.timeout,
            max_retries=self.max_retries,
        )

# Usage
config = ModelRedConfig.from_env()
client = config.create_client()

Error Handling

Graceful Degradation

Handle errors without breaking your application:

from modelred import APIError, RateLimited
import logging

logger = logging.getLogger(__name__)

def get_assessments_safe(client):
    try:
        return client.list_assessments(page_size=50)
    except RateLimited as e:
        logger.warning(f"Rate limited: {e.message}")
        return {"data": [], "total": 0}  # Empty response
    except APIError as e:
        logger.error(f"API error: {e.message}", exc_info=True)
        return {"data": [], "total": 0}

Retry Logic

Implement smart retries for transient failures:

import time
from modelred import RateLimited, ServerError

def with_retry(func, max_attempts=3, base_delay=1):
    for attempt in range(max_attempts):
        try:
            return func()
        except (RateLimited, ServerError) as e:
            if attempt == max_attempts - 1:
                raise

            delay = base_delay * (2 ** attempt)
            logger.info(f"Retry {attempt + 1}/{max_attempts} in {delay}s")
            time.sleep(delay)

# Usage
result = with_retry(
    lambda: client.create_assessment_by_id(
        model_id="model_123",
        probe_pack_ids=["pack_1"],
        detector_provider="openai",
        detector_api_key=os.environ["OPENAI_API_KEY"],
        detector_model="gpt-4o-mini",
    )
)

Structured Logging

Log errors with context:

import logging
from modelred import APIError

logger = logging.getLogger(__name__)

try:
    assessment = client.create_assessment_by_id(...)
except APIError as e:
    logger.error(
        "Assessment creation failed",
        extra={
            "error_status": e.status,
            "error_code": e.code,
            "error_message": e.message,
            "model_id": "model_123",
        },
        exc_info=True,
    )

Performance

Connection Reuse

Reuse client instances to benefit from connection pooling and reduce overhead.

Async Operations

Use async client for concurrent operations to maximize throughput.

Smart Caching

Cache frequently accessed data with appropriate TTLs to reduce API calls.

Server-Side Filtering

Filter data on the server rather than fetching everything and filtering locally.

Connection Reuse

# ❌ Bad: Creates new connection pool for each request
def get_models():
    client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
    return client.list_models()

# ✅ Good: Reuse client
client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])

def get_models():
    return client.list_models()

Async for Concurrency

Use async client for concurrent operations:

import asyncio
from modelred import AsyncModelRed

async def fetch_multiple_assessments(assessment_ids):
    async with AsyncModelRed(api_key=os.environ["MODELRED_API_KEY"]) as client:
        tasks = [
            client.get_assessment(aid)
            for aid in assessment_ids
        ]
        return await asyncio.gather(*tasks)

# Fetch 10 assessments concurrently
results = asyncio.run(fetch_multiple_assessments(assessment_ids))

Batch Operations

Group operations to reduce API calls:

# ❌ Bad: One request per model
for model_id in model_ids:
    response = client.list_assessments(page_size=1)
    # Process...

# ✅ Good: Fetch all at once with filtering
all_assessments = list(client.iter_assessments(page_size=100))
# Filter locally if needed

Caching

Cache frequently accessed data:

import time

class CachedModelRed:
    def __init__(self, client, ttl=300):
        self.client = client
        self.ttl = ttl
        self._models_cache = None
        self._models_timestamp = 0

    def list_models_cached(self):
        now = time.time()
        if (self._models_cache is None or
            (now - self._models_timestamp) > self.ttl):
            self._models_cache = list(
                self.client.iter_models(page_size=100)
            )
            self._models_timestamp = now
        return self._models_cache

cached = CachedModelRed(client, ttl=600)  # 10 min cache
models = cached.list_models_cached()

Testing

Mocking the Client

Use unittest.mock for testing:

from unittest.mock import Mock
import pytest
from modelred import NotFound

def test_assessment_not_found():
    mock_client = Mock()
    mock_client.get_assessment.side_effect = NotFound(
        404, "Assessment not found", None, None
    )

    with pytest.raises(NotFound):
        mock_client.get_assessment("missing_id")

Test Fixtures

Create reusable fixtures:

import pytest
from unittest.mock import Mock

@pytest.fixture
def mock_client():
    client = Mock()
    client.list_models.return_value = {
        "data": [
            {"id": "model_1", "displayName": "GPT-4"},
            {"id": "model_2", "displayName": "Claude"},
        ],
        "total": 2,
        "page": 1,
        "totalPages": 1,
    }
    return client

def test_list_models(mock_client):
    response = mock_client.list_models()
    assert len(response["data"]) == 2

Integration Tests

For integration tests, use separate test keys:

import os
import pytest
from modelred import ModelRed

@pytest.fixture
def test_client():
    test_api_key = os.environ.get("MODELRED_TEST_API_KEY")
    if not test_api_key:
        pytest.skip("No test API key provided")
    return ModelRed(api_key=test_api_key)

def test_list_models_integration(test_client):
    response = test_client.list_models(page_size=10)
    assert "data" in response
    assert "total" in response

Resource Management

Context Managers

Always use context managers for cleanup:

    with ModelRed(api_key=os.environ["MODELRED_API_KEY"]) as client:
        assessments = client.list_assessments()
        # client.close() called automatically
    async with AsyncModelRed(api_key=os.environ["MODELRED_API_KEY"]) as client:
        assessments = await client.list_assessments()
        # await client.aclose() called automatically

Manual Cleanup

If not using context managers, ensure cleanup:

client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
try:
    assessments = client.list_assessments()
finally:
    client.close()  # Always close

Data Handling

Pagination

Use iterators for large datasets:

# ✅ Memory efficient
for assessment in client.iter_assessments(page_size=50):
    process(assessment)  # Process and discard

# ❌ Memory inefficient
all_assessments = list(client.iter_assessments(page_size=50))
for assessment in all_assessments:
    process(assessment)

Streaming Processing

Process data as it arrives:

def export_to_database(client):
    """Stream assessments to database without loading all."""
    for assessment in client.iter_assessments(page_size=100):
        db.insert(assessment)  # Insert immediately
        # Assessment can be garbage collected

Filtering

Filter on the server, not the client:

# ✅ Good: Server-side filtering
completed = client.list_assessments(
    status="COMPLETED",
    provider="openai",
)

# ❌ Bad: Client-side filtering
all_assessments = list(client.iter_assessments(page_size=50))
completed = [a for a in all_assessments if a["status"] == "COMPLETED"]

Monitoring

Health Checks

Implement health checks for your integration:

from modelred import APIError

def health_check():
    try:
        client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
        response = client.list_models(page_size=1)
        return {"status": "healthy", "models": response["total"]}
    except APIError as e:
        return {
            "status": "unhealthy",
            "error": str(e),
            "status_code": e.status,
        }

Metrics

Track key metrics:

import time
from prometheus_client import Counter, Histogram

api_requests = Counter(
    'modelred_api_requests_total',
    'Total ModelRed API requests',
    ['method', 'status']
)

api_duration = Histogram(
    'modelred_api_duration_seconds',
    'ModelRed API request duration'
)

def tracked_request(func):
    start = time.time()
    try:
        result = func()
        api_requests.labels(method='list_models', status='success').inc()
        return result
    except Exception as e:
        api_requests.labels(method='list_models', status='error').inc()
        raise
    finally:
        duration = time.time() - start
        api_duration.observe(duration)

models = tracked_request(lambda: client.list_models())

Detector Selection

Choose Appropriate Detectors

Match detector to use case:

    # Fast and cheap for development
    config = {
        "provider": "openai",
        "model": "gpt-4o-mini",
    }
    # More thorough for pre-deployment
    config = {
        "provider": "openai",
        "model": "gpt-4o",
    }
    # Most reliable for production
    config = {
        "provider": "anthropic",
        "model": "claude-3-5-sonnet-20241022",
    }

Environment-Based Configuration

def get_detector_config(use_case):
    """Return detector config based on use case."""
    configs = {
        "development": {
            "provider": "openai",
            "model": "gpt-4o-mini",
        },
        "staging": {
            "provider": "openai",
            "model": "gpt-4o",
        },
        "production": {
            "provider": "anthropic",
            "model": "claude-3-5-sonnet-20241022",
        },
    }
    return configs.get(use_case, configs["development"])

config = get_detector_config(os.environ.get("ENV", "development"))

assessment = client.create_assessment_by_id(
    model_id="model_123",
    probe_pack_ids=["pack_1"],
    detector_provider=config["provider"],
    detector_api_key=os.environ[f"{config['provider'].upper()}_API_KEY"],
    detector_model=config["model"],
)

Cross-Validation

Use multiple detectors for critical assessments:

def create_cross_validated_assessments(client, model_id, probe_pack_ids):
    """Create assessments with multiple detectors."""
    detectors = [
        {
            "provider": "openai",
            "model": "gpt-4o",
            "api_key": os.environ["OPENAI_API_KEY"],
        },
        {
            "provider": "anthropic",
            "model": "claude-3-5-sonnet-20241022",
            "api_key": os.environ["ANTHROPIC_API_KEY"],
        },
    ]

    assessments = []
    for detector in detectors:
        assessment = client.create_assessment_by_id(
            model_id=model_id,
            probe_pack_ids=probe_pack_ids,
            detector_provider=detector["provider"],
            detector_api_key=detector["api_key"],
            detector_model=detector["model"],
        )
        assessments.append(assessment)

    return assessments

Documentation

Type Hints

Use type hints for better IDE support:

from typing import Dict, Any, List
from modelred import ModelRed

def create_security_assessment(
    client: ModelRed,
    model_id: str,
    probe_packs: List[str],
) -> Dict[str, Any]:
    """
    Create a security assessment.

    Args:
        client: ModelRed client instance
        model_id: ID of the model to assess
        probe_packs: List of probe pack IDs

    Returns:
        Assessment response dict
    """
    return client.create_assessment_by_id(
        model_id=model_id,
        probe_pack_ids=probe_packs,
        detector_provider="openai",
        detector_api_key=os.environ["OPENAI_API_KEY"],
        detector_model="gpt-4o-mini",
    )

Comments

Document complex logic:

def process_assessment_results(assessment):
    """
    Process assessment results and extract key metrics.

    The assessment may be in various states:
    - COMPLETED: Full results available
    - FAILED: Error details in 'error' field
    - RUNNING: Partial results may be available
    """
    if assessment["status"] == "COMPLETED":
        return extract_metrics(assessment["results"])
    elif assessment["status"] == "FAILED":
        logger.error(f"Assessment failed: {assessment.get('error')}")
        return None
    else:
        # Still running, check back later
        return {"status": "pending"}

Deployment

Environment-Specific Config

Use different configurations per environment:

import os

ENV = os.environ.get("ENV", "development")

CONFIGS = {
    "development": {
        "api_key": os.environ["DEV_MODELRED_API_KEY"],
        "timeout": 60.0,
        "max_retries": 1,
    },
    "staging": {
        "api_key": os.environ["STAGING_MODELRED_API_KEY"],
        "timeout": 30.0,
        "max_retries": 3,
    },
    "production": {
        "api_key": os.environ["PROD_MODELRED_API_KEY"],
        "timeout": 20.0,
        "max_retries": 5,
    },
}

config = CONFIGS[ENV]
client = ModelRed(**config)

Containerization

Example Dockerfile:

Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Don't bake secrets into image
ENV MODELRED_API_KEY=""
ENV DETECTOR_API_KEY=""

CMD ["python", "app.py"]

Summary

Security

Never hardcode API keys. Use environment variables or secrets managers for production deployments.

Error Handling

Implement retry logic and graceful degradation. Handle expected errors appropriately.

Performance

Reuse clients, use async for concurrency, cache when appropriate, filter on the server.

Resource Management

Always use context managers for cleanup. Monitor health and track metrics.

Key Takeaways

Security — Environment variables or secrets managers only

Error Handling — Retry logic and graceful degradation

Performance — Connection reuse, async operations, smart caching

Resource Management — Context managers for automatic cleanup

Testing — Mock for unit tests, separate keys for integration tests

Monitoring — Health checks and metrics tracking

Documentation — Type hints and clear comments

Next Steps