Predictions API

The client.predictions component provides methods to retrieve, list, and manage predictions across all content types. This is the central hub for tracking the status of all processing jobs in the platform.

Quick Examples

Get a Prediction

# Retrieve a specific prediction by ID
prediction = client.predictions.get("pred_abc123")
print(f"Status: {prediction.status}")

List Predictions

# List recent predictions
predictions = client.predictions.list(limit=10)
for pred in predictions:
    print(f"{pred.id}: {pred.status} ({pred.type})")

Wait for Completion

# Wait for a prediction to complete
completed = client.predictions.wait(
    "pred_abc123",
    timeout=60,  # Maximum wait time in seconds
    sleep=1      # Check interval in seconds
)
print(f"Completed at: {completed.completed_at}")

Core Operations

Retrieving Predictions

Get details about a specific prediction:

# Get prediction by ID
prediction = client.predictions.get("pred_abc123")

# Access prediction properties
print(f"ID: {prediction.id}")
print(f"Status: {prediction.status}")
print(f"Created: {prediction.created_at}")
print(f"Type: {prediction.type}")

# If completed, access the structured response
if prediction.status == "completed" and prediction.response:
    print(f"Result: {prediction.response}")

Listing Predictions

List predictions you’ve created (with pagination):

# Basic listing with default pagination
predictions = client.predictions.list()

# Custom pagination
predictions = client.predictions.list(
    skip=0,   # Skip this many items
    limit=10  # Return at most this many items
)

# Process the list
for prediction in predictions:
    print(f"ID: {prediction.id}, Status: {prediction.status}")

Waiting for Completion

Wait for long-running predictions to complete:

# Wait with default settings (60 seconds timeout, 1 second checks)
completed = client.predictions.wait("pred_abc123")

# Wait with custom timeout and polling interval
completed = client.predictions.wait(
    "pred_abc123",
    timeout=300,  # Maximum wait time (5 minutes)
    sleep=2       # Check every 2 seconds
)

# Check results after waiting
if completed.status == "completed":
    print(f"Success! Result: {completed.response}")
else:
    print(f"Failed or timed out: {completed.status}")

The wait() method will raise a TimeoutError if the prediction doesn’t complete within the specified timeout.

Prediction Statuses

Predictions can have the following statuses:

StatusDescription
enqueuedThe prediction is waiting to be processed
pendingThe prediction is preparing to start
runningThe prediction is actively being processed
completedThe prediction has completed successfully
failedThe prediction encountered an error
pausedThe prediction has been paused

Media-Specific APIs

The base Predictions class is extended by specialized prediction classes for different media types:

Image Predictions

# Generate prediction from image files
prediction = client.image.generate(
    images=[Path("image.jpg")],  # List of Path objects or PIL Images
    domain="document.invoice"
)

# Generate prediction from image URLs
prediction = client.image.generate(
    urls=["https://example.com/image.jpg"],
    domain="document.invoice"
)

# Generate schema from image
schema = client.image.schema(
    images=[Path("image.jpg")]
)

Document, Audio, and Video Predictions

These specialized APIs follow a consistent pattern:

# Process a document file
prediction = client.document.generate(
    file="document.pdf",
    domain="document.invoice"
)

# Process from a URL
prediction = client.audio.generate(
    url="https://example.com/audio.mp3",
    domain="audio.transcription"
)

Response Structure

The PredictionResponse object includes these key fields:

class PredictionResponse(BaseModel):
    id: str                         # Unique prediction identifier
    status: Literal[                # Current job status
        "enqueued",
        "pending",
        "running",
        "completed",
        "failed",
        "paused"
    ]
    type: str                       # Prediction type (e.g., "image", "document")
    created_at: datetime            # When the prediction was created
    completed_at: Optional[datetime] # When the prediction was completed (if done)
    response: Optional[Any]         # Structured result data
    usage: CreditUsage              # Usage and billing information

The usage field contains a CreditUsage object:

class CreditUsage(BaseModel):
    elements_processed: Optional[int]  # Number of elements processed
    element_type: Optional[str]        # Type of element processed
    credits_used: Optional[int]        # Credits consumed by the operation

Auto-casting Responses

All specialized prediction classes support auto-casting responses to their appropriate schema types:

# Enable auto-casting with the autocast parameter
prediction = client.document.generate(
    file="invoice.pdf",
    domain="document.invoice",
    autocast=True  # Convert response to appropriate Pydantic model
)

# Now the response is a typed Pydantic model
invoice = prediction.response
print(f"Invoice number: {invoice.invoice_number}")
print(f"Total: {invoice.total_amount}")

Common Patterns

Process and Wait

A common pattern is to start a prediction and wait for it to complete:

# 1. Start the prediction
prediction = client.document.generate(
    file="large-document.pdf",
    domain="document.invoice"
)

# 2. Wait for completion if needed
if prediction.status != "completed":
    try:
        prediction = client.predictions.wait(
            prediction.id,
            timeout=120  # Wait up to 2 minutes
        )
    except TimeoutError:
        print("Processing is taking longer than expected")
        # Handle timeout case

# 3. Process the results
if prediction.status == "completed":
    form_data = prediction.response
    print(f"Form data: {form_data}")

Batch Processing

For batch operations, use the batch parameter and track multiple predictions:

# Start multiple predictions in batch mode
prediction_ids = []
for file_path in document_files:
    prediction = client.document.generate(
        file=file_path,
        domain="document.invoice",
        batch=True  # Process asynchronously
    )
    prediction_ids.append(prediction.id)

# Track completion status
completed = 0
total = len(prediction_ids)

print(f"Started {total} predictions")

# Check status periodically
while completed < total:
    completed = 0
    for pred_id in prediction_ids:
        prediction = client.predictions.get(pred_id)
        if prediction.status in ["completed", "failed"]:
            completed += 1

    print(f"Progress: {completed}/{total} complete")
    time.sleep(5)  # Check every 5 seconds

print("All predictions complete!")

Error Handling

Implement robust error handling:

try:
    # Start prediction
    prediction = client.document.generate(
        file="document.pdf",
        domain="document.invoice"
    )

    # Wait for completion
    prediction = client.predictions.wait(prediction.id)

    # Check for success
    if prediction.status == "completed":
        print("Processing successful!")
        result = prediction.response
    else:
        print(f"Processing failed: {prediction.status}")

except TimeoutError:
    print("Prediction timed out")
except ValueError as e:
    print(f"Invalid parameters: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Best Practices

Efficient Polling

Use appropriate intervals when waiting for predictions:

# ✅ Good: Use longer intervals for long-running jobs
def wait_with_backoff(prediction_id):
    """Wait with increasing backoff intervals."""
    start_time = time.time()
    wait_time = 1  # Start with 1 second

    while time.time() - start_time < 300:  # 5 minute timeout
        prediction = client.predictions.get(prediction_id)

        if prediction.status in ["completed", "failed"]:
            return prediction

        # Increase wait time with each check
        wait_time = min(wait_time * 1.5, 30)  # Cap at 30 seconds
        print(f"Waiting {wait_time:.1f} seconds...")
        time.sleep(wait_time)

    raise TimeoutError("Prediction timed out")

Using Appropriate Timeouts

Set timeouts based on the expected processing time:

# For quick predictions (e.g., simple image classification)
prediction = client.predictions.wait(
    prediction_id,
    timeout=30,  # 30 seconds
    sleep=1
)

# For complex processing (e.g., large documents, long videos)
prediction = client.predictions.wait(
    prediction_id,
    timeout=600,  # 10 minutes
    sleep=5      # Check less frequently
)

Progress Reporting with tqdm

For better user experience, use the tqdm library for progress reporting:

from tqdm import tqdm
import time

# Generate a prediction
prediction = client.document.generate(file="large_file.pdf", domain="document.invoice")

# Wait with progress bar
timeout = 120  # 2 minutes
for _ in tqdm(range(timeout), desc="Processing document"):
    prediction = client.predictions.get(prediction.id)
    if prediction.status in ["completed", "failed"]:
        break
    time.sleep(1)