Predictions API
The client.predictions
component provides methods to retrieve, list, and manage predictions across all content types. This is the central hub for tracking the status of all processing jobs in the platform.
Quick Examples
Get a Prediction
# Retrieve a specific prediction by ID
prediction = client.predictions.get("pred_abc123")
print(f"Status: {prediction.status}")
List Predictions
# List recent predictions
predictions = client.predictions.list(limit=10)
for pred in predictions:
print(f"{pred.id}: {pred.status} ({pred.type})")
Wait for Completion
# Wait for a prediction to complete
completed = client.predictions.wait(
"pred_abc123",
timeout=60, # Maximum wait time in seconds
sleep=1 # Check interval in seconds
)
print(f"Completed at: {completed.completed_at}")
Core Operations
Retrieving Predictions
Get details about a specific prediction:
# Get prediction by ID
prediction = client.predictions.get("pred_abc123")
# Access prediction properties
print(f"ID: {prediction.id}")
print(f"Status: {prediction.status}")
print(f"Created: {prediction.created_at}")
print(f"Type: {prediction.type}")
# If completed, access the structured response
if prediction.status == "completed" and prediction.response:
print(f"Result: {prediction.response}")
Listing Predictions
List predictions you’ve created (with pagination):
# Basic listing with default pagination
predictions = client.predictions.list()
# Custom pagination
predictions = client.predictions.list(
skip=0, # Skip this many items
limit=10 # Return at most this many items
)
# Process the list
for prediction in predictions:
print(f"ID: {prediction.id}, Status: {prediction.status}")
Waiting for Completion
Wait for long-running predictions to complete:
# Wait with default settings (60 seconds timeout, 1 second checks)
completed = client.predictions.wait("pred_abc123")
# Wait with custom timeout and polling interval
completed = client.predictions.wait(
"pred_abc123",
timeout=300, # Maximum wait time (5 minutes)
sleep=2 # Check every 2 seconds
)
# Check results after waiting
if completed.status == "completed":
print(f"Success! Result: {completed.response}")
else:
print(f"Failed or timed out: {completed.status}")
The wait()
method will raise a TimeoutError
if the prediction doesn’t complete within the specified timeout.
Prediction Statuses
Predictions can have the following statuses:
Status | Description |
---|
enqueued | The prediction is waiting to be processed |
pending | The prediction is preparing to start |
running | The prediction is actively being processed |
completed | The prediction has completed successfully |
failed | The prediction encountered an error |
paused | The prediction has been paused |
The base Predictions
class is extended by specialized prediction classes for different media types:
Image Predictions
# Generate prediction from image files
prediction = client.image.generate(
images=[Path("image.jpg")], # List of Path objects or PIL Images
domain="document.invoice"
)
# Generate prediction from image URLs
prediction = client.image.generate(
urls=["https://example.com/image.jpg"],
domain="document.invoice"
)
# Generate schema from image
schema = client.image.schema(
images=[Path("image.jpg")]
)
Document, Audio, and Video Predictions
These specialized APIs follow a consistent pattern:
# Process a document file
prediction = client.document.generate(
file="document.pdf",
domain="document.invoice"
)
# Process from a URL
prediction = client.audio.generate(
url="https://example.com/audio.mp3",
domain="audio.transcription"
)
Response Structure
The PredictionResponse
object includes these key fields:
class PredictionResponse(BaseModel):
id: str # Unique prediction identifier
status: Literal[ # Current job status
"enqueued",
"pending",
"running",
"completed",
"failed",
"paused"
]
type: str # Prediction type (e.g., "image", "document")
created_at: datetime # When the prediction was created
completed_at: Optional[datetime] # When the prediction was completed (if done)
response: Optional[Any] # Structured result data
usage: CreditUsage # Usage and billing information
The usage
field contains a CreditUsage
object:
class CreditUsage(BaseModel):
elements_processed: Optional[int] # Number of elements processed
element_type: Optional[str] # Type of element processed
credits_used: Optional[int] # Credits consumed by the operation
Auto-casting Responses
All specialized prediction classes support auto-casting responses to their appropriate schema types:
# Enable auto-casting with the autocast parameter
prediction = client.document.generate(
file="invoice.pdf",
domain="document.invoice",
autocast=True # Convert response to appropriate Pydantic model
)
# Now the response is a typed Pydantic model
invoice = prediction.response
print(f"Invoice number: {invoice.invoice_number}")
print(f"Total: {invoice.total_amount}")
Common Patterns
Process and Wait
A common pattern is to start a prediction and wait for it to complete:
# 1. Start the prediction
prediction = client.document.generate(
file="large-document.pdf",
domain="document.invoice"
)
# 2. Wait for completion if needed
if prediction.status != "completed":
try:
prediction = client.predictions.wait(
prediction.id,
timeout=120 # Wait up to 2 minutes
)
except TimeoutError:
print("Processing is taking longer than expected")
# Handle timeout case
# 3. Process the results
if prediction.status == "completed":
form_data = prediction.response
print(f"Form data: {form_data}")
Batch Processing
For batch operations, use the batch parameter and track multiple predictions:
# Start multiple predictions in batch mode
prediction_ids = []
for file_path in document_files:
prediction = client.document.generate(
file=file_path,
domain="document.invoice",
batch=True # Process asynchronously
)
prediction_ids.append(prediction.id)
# Track completion status
completed = 0
total = len(prediction_ids)
print(f"Started {total} predictions")
# Check status periodically
while completed < total:
completed = 0
for pred_id in prediction_ids:
prediction = client.predictions.get(pred_id)
if prediction.status in ["completed", "failed"]:
completed += 1
print(f"Progress: {completed}/{total} complete")
time.sleep(5) # Check every 5 seconds
print("All predictions complete!")
Error Handling
Implement robust error handling:
try:
# Start prediction
prediction = client.document.generate(
file="document.pdf",
domain="document.invoice"
)
# Wait for completion
prediction = client.predictions.wait(prediction.id)
# Check for success
if prediction.status == "completed":
print("Processing successful!")
result = prediction.response
else:
print(f"Processing failed: {prediction.status}")
except TimeoutError:
print("Prediction timed out")
except ValueError as e:
print(f"Invalid parameters: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Best Practices
Efficient Polling
Use appropriate intervals when waiting for predictions:
# ✅ Good: Use longer intervals for long-running jobs
def wait_with_backoff(prediction_id):
"""Wait with increasing backoff intervals."""
start_time = time.time()
wait_time = 1 # Start with 1 second
while time.time() - start_time < 300: # 5 minute timeout
prediction = client.predictions.get(prediction_id)
if prediction.status in ["completed", "failed"]:
return prediction
# Increase wait time with each check
wait_time = min(wait_time * 1.5, 30) # Cap at 30 seconds
print(f"Waiting {wait_time:.1f} seconds...")
time.sleep(wait_time)
raise TimeoutError("Prediction timed out")
Using Appropriate Timeouts
Set timeouts based on the expected processing time:
# For quick predictions (e.g., simple image classification)
prediction = client.predictions.wait(
prediction_id,
timeout=30, # 30 seconds
sleep=1
)
# For complex processing (e.g., large documents, long videos)
prediction = client.predictions.wait(
prediction_id,
timeout=600, # 10 minutes
sleep=5 # Check less frequently
)
Progress Reporting with tqdm
For better user experience, use the tqdm
library for progress reporting:
from tqdm import tqdm
import time
# Generate a prediction
prediction = client.document.generate(file="large_file.pdf", domain="document.invoice")
# Wait with progress bar
timeout = 120 # 2 minutes
for _ in tqdm(range(timeout), desc="Processing document"):
prediction = client.predictions.get(prediction.id)
if prediction.status in ["completed", "failed"]:
break
time.sleep(1)