Pagination
Efficient iteration patterns for large datasets with both manual and automatic pagination approaches.
Introduction
The ModelRed SDK provides flexible pagination support for working with large collections of assessments, models, and probe packs. Choose between manual page control or automatic iteration based on your needs.
Pagination Basics
All list endpoints return paginated responses with this structure:
{
"data": [...], # Current page items
"total": 150, # Total items across all pages
"page": 1, # Current page number
"pageSize": 20, # Items per page
"totalPages": 8 # Total number of pages
}Manual Pagination
Basic Pattern
Manually control page navigation:
from modelred import ModelRed
client = ModelRed(api_key="mr_...")
# First page
page1 = client.list_assessments(page=1, page_size=20)
print(f"Page 1: {len(page1['data'])} items")
# Second page
page2 = client.list_assessments(page=2, page_size=20)
print(f"Page 2: {len(page2['data'])} items")
# Last page
last_page = page1["totalPages"]
final = client.list_assessments(page=last_page, page_size=20)
print(f"Last page: {len(final['data'])} items")Loop Through All Pages
def get_all_assessments(client):
page = 1
all_items = []
while True:
response = client.list_assessments(page=page, page_size=50)
all_items.extend(response["data"])
if page >= response["totalPages"]:
break
page += 1
return all_items
assessments = get_all_assessments(client)
print(f"Total assessments: {len(assessments)}")Check for More Pages
response = client.list_models(page=1, page_size=20)
has_more = response["page"] < response["totalPages"]
if has_more:
print(f"More pages available: {response['totalPages'] - response['page']}")Automatic Pagination
The SDK provides iterator methods that handle pagination automatically:
Iterate Assessments
# Iterate all assessments
for assessment in client.iter_assessments(page_size=50):
print(f"{assessment['id']}: {assessment['status']}")Iterate Models
# Iterate all models
for model in client.iter_models(page_size=50):
print(f"{model['displayName']} ({model['provider']})")Iterate Probe Packs
# Iterate owned probe packs
for pack in client.iter_owned_probes(page_size=50):
print(f"Owned: {pack['name']}")
# Iterate imported probe packs
for pack in client.iter_imported_probes(page_size=50):
print(f"Imported: {pack['name']}")With Filters
Iterators accept all filter parameters:
# Iterate completed OpenAI assessments
for assessment in client.iter_assessments(
page_size=50,
status="COMPLETED",
provider="openai",
):
print(f"Completed: {assessment['id']}")
# Iterate active Anthropic models
for model in client.iter_models(
page_size=50,
provider="anthropic",
status="active",
):
print(f"Active Anthropic: {model['displayName']}")Choosing Page Size
Small (10-20)
Use when:
- Building UI with paginated views
- Quick initial response needed
- Memory constrained
Medium (50-100)
Use when: - Iterating through data - Balance between requests and memory - Most common use case
Large (100+)
Use when:
- Minimizing API calls
- Batch processing
- Sufficient memory available
Small Page Size Example
# UI pagination
page = request.args.get("page", 1)
response = client.list_assessments(page=page, page_size=10)
# Return to frontend
return {
"items": response["data"],
"pagination": {
"page": response["page"],
"totalPages": response["totalPages"],
"total": response["total"],
}
}Medium Page Size Example
# Efficient iteration
for assessment in client.iter_assessments(page_size=50):
process(assessment)Large Page Size Example
# Batch export
all_models = []
for model in client.iter_models(page_size=100):
all_models.append(model)
export_to_csv(all_models)Performance Optimization
Parallel Page Fetching
Fetch multiple pages concurrently (synchronous):
from concurrent.futures import ThreadPoolExecutor
def fetch_page(page_num):
return client.list_assessments(page=page_num, page_size=50)
# Get total pages
first_page = client.list_assessments(page=1, page_size=50)
total_pages = first_page["totalPages"]
# Fetch all pages in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
pages = list(executor.map(fetch_page, range(1, total_pages + 1)))
# Combine results
all_assessments = []
for page in pages:
all_assessments.extend(page["data"])
print(f"Fetched {len(all_assessments)} assessments")Async Parallel Fetching
More efficient with async client:
import asyncio
from modelred import AsyncModelRed
async def fetch_all_parallel(client, total_pages):
tasks = [
client.list_assessments(page=page, page_size=50)
for page in range(1, total_pages + 1)
]
pages = await asyncio.gather(*tasks)
all_items = []
for page in pages:
all_items.extend(page["data"])
return all_items
async def main():
async with AsyncModelRed(api_key="mr_...") as client:
# Get total pages
first = await client.list_assessments(page=1, page_size=50)
total_pages = first["totalPages"]
# Fetch all pages
assessments = await fetch_all_parallel(client, total_pages)
print(f"Total: {len(assessments)}")
asyncio.run(main())Caching
Cache pages to reduce API calls:
import time
class CachedClient:
def __init__(self, client, cache_ttl=300):
self.client = client
self.cache_ttl = cache_ttl
self._cache = {}
def list_assessments_cached(self, page=1, page_size=20, **kwargs):
cache_key = (page, page_size, tuple(sorted(kwargs.items())))
if cache_key in self._cache:
cached_time, cached_data = self._cache[cache_key]
if time.time() - cached_time < self.cache_ttl:
return cached_data
data = self.client.list_assessments(
page=page,
page_size=page_size,
**kwargs
)
self._cache[cache_key] = (time.time(), data)
return data
cached = CachedClient(client)
response = cached.list_assessments_cached(page=1) # Fresh
response = cached.list_assessments_cached(page=1) # From cacheStreaming Pattern
Process items as they arrive instead of collecting all:
def process_assessments_stream(client, processor):
"""Process assessments without loading all into memory."""
for assessment in client.iter_assessments(page_size=50):
processor(assessment) # Process immediately
# Assessment can be garbage collected
def save_to_database(assessment):
# Save each assessment individually
db.save(assessment)
process_assessments_stream(client, save_to_database)Early Termination
Stop iteration when condition met:
def find_first_failed(client):
"""Find first failed assessment without fetching all."""
for assessment in client.iter_assessments(page_size=50):
if assessment["status"] == "FAILED":
return assessment
return None
failed = find_first_failed(client)
if failed:
print(f"Found failed assessment: {failed['id']}")Filtering vs. Pagination
Use API filters to reduce data transfer:
# Good: Filter on server
completed = client.list_assessments(
status="COMPLETED",
page_size=50,
)
# Only completed assessments returned
for assessment in completed["data"]:
process(assessment) # Bad: Fetch all then filter locally
all_assessments = []
for assessment in client.iter_assessments(page_size=50):
all_assessments.append(assessment)
# Wasteful - fetched unnecessary data
completed = [a for a in all_assessments if a["status"] == "COMPLETED"]Pagination Edge Cases
Empty Results
Handle empty result sets gracefully:
response = client.list_assessments(
search="nonexistent_model",
page_size=50,
)
if not response["data"]:
print("No assessments found")
else:
print(f"Found {len(response['data'])} assessments")Single Page
Handle single-page results:
response = client.list_models(page_size=100)
if response["totalPages"] == 1:
print("All models fit on one page")
else:
print(f"Models span {response['totalPages']} pages")Large Totals
Be aware of very large collections:
response = client.list_assessments(page=1, page_size=10)
if response["total"] > 10000:
print("Large dataset - consider filtering or streaming")
# Use specific filters or process incrementallyPerformance: For very large datasets (10,000+ items), use server-side filtering and streaming patterns to avoid memory issues.
Async Pagination
Async iterators work similarly:
import asyncio
from modelred import AsyncModelRed
async def main():
async with AsyncModelRed(api_key="mr_...") as client:
# Manual pagination
page1 = await client.list_assessments(page=1, page_size=50)
page2 = await client.list_assessments(page=2, page_size=50)
# Note: Async iterators not available in current SDK
# Use manual pagination for now
all_assessments = []
page = 1
while True:
response = await client.list_assessments(
page=page,
page_size=50
)
all_assessments.extend(response["data"])
if page >= response["totalPages"]:
break
page += 1
print(f"Total: {len(all_assessments)}")
asyncio.run(main())Note: Async iterators (iter_* methods) are not available in the current
SDK version. Use manual pagination with async/await for async operations.
Best Practices
Use Iterators
For simple iteration, use iter_* methods for cleaner code
Manual Control
Use manual pagination when building UIs or need precise page control
Optimize Page Size
Choose page size based on memory constraints and performance needs
Handle Edge Cases
Always check for empty results and handle large datasets appropriately
Use Iterators for Simple Cases
# Good: Simple and clear
for assessment in client.iter_assessments(page_size=50):
print(assessment["id"])Use Manual Pagination for Control
# Good: Need precise control
page_num = get_user_requested_page()
response = client.list_assessments(page=page_num, page_size=20)
return response # Return to UIOptimize Page Size
# Memory constrained
for item in client.iter_models(page_size=10):
process(item)
# Performance focused
for item in client.iter_models(page_size=100):
process(item)Always Handle Empty Results
response = client.list_assessments(page=1, page_size=50)
if response["data"]:
for assessment in response["data"]:
process(assessment)
else:
print("No assessments found")Avoid Loading Everything
# Bad: Loads all data into memory
all_items = list(client.iter_assessments(page_size=50))
# Good: Process incrementally
for item in client.iter_assessments(page_size=50):
process(item) # Process and discardCommon Patterns
Export to CSV
import csv
def export_assessments_to_csv(client, filename):
with open(filename, 'w', newline='') as f:
writer = None
for assessment in client.iter_assessments(page_size=100):
if writer is None:
# Create writer with headers from first item
writer = csv.DictWriter(f, fieldnames=assessment.keys())
writer.writeheader()
writer.writerow(assessment)
export_assessments_to_csv(client, "assessments.csv")Progress Tracking
from tqdm import tqdm
# Get total count
first_page = client.list_assessments(page=1, page_size=1)
total = first_page["total"]
# Progress bar
with tqdm(total=total) as pbar:
for assessment in client.iter_assessments(page_size=50):
process(assessment)
pbar.update(1)Batch Processing
def process_in_batches(client, batch_size=100):
batch = []
for assessment in client.iter_assessments(page_size=50):
batch.append(assessment)
if len(batch) >= batch_size:
process_batch(batch)
batch = []
# Process remaining items
if batch:
process_batch(batch)
def process_batch(items):
# Bulk database insert, API call, etc.
print(f"Processing batch of {len(items)}")Complete Example
from modelred import ModelRed
import os
import csv
client = ModelRed(api_key=os.environ["MODELRED_API_KEY"])
# Example 1: UI Pagination
def get_page_for_ui(page_num=1):
"""Get a single page for UI display."""
response = client.list_assessments(
page=page_num,
page_size=20,
status="COMPLETED",
)
return {
"items": response["data"],
"pagination": {
"current": response["page"],
"total": response["totalPages"],
"count": len(response["data"]),
}
}
# Example 2: Stream Processing
def export_all_to_csv():
"""Export all assessments without loading into memory."""
with open('assessments.csv', 'w', newline='') as f:
writer = None
for assessment in client.iter_assessments(page_size=100):
if writer is None:
writer = csv.DictWriter(f, fieldnames=assessment.keys())
writer.writeheader()
writer.writerow(assessment)
print("Export complete")
# Example 3: Find Specific Item
def find_failed_assessment():
"""Find first failed assessment without fetching all."""
for assessment in client.iter_assessments(
page_size=50,
status="FAILED"
):
return assessment
return None
# Example 4: Batch Processing
def process_assessments_in_batches():
"""Process assessments in batches of 50."""
batch = []
for assessment in client.iter_assessments(page_size=50):
batch.append(assessment)
if len(batch) >= 50:
# Process batch (e.g., bulk database insert)
print(f"Processing batch of {len(batch)}")
batch = []
# Process remaining
if batch:
print(f"Processing final batch of {len(batch)}")
# Run examples
page_data = get_page_for_ui(page_num=1)
print(f"Page 1: {page_data['pagination']}")
failed = find_failed_assessment()
if failed:
print(f"Found failed assessment: {failed['id']}")Next Steps
- Review Best Practices for production optimization
- Learn about Error Handling for robust pagination
- Explore Python SDK configuration options
- Check specific guides for Assessments, Models, and Probe Packs