Error handling in batch imports
Handing errors effectively during batch imports is crucial for maintaining data integrity and ensuring a smooth ingestion process. In this lesson, we'll explore common error types, how to detect them, and strategies for handling them.
Request success ≠ insertion success
It's very important to understand that a successful API response does not guarantee that all objects were ingested successfully. It simply means that the request was received and processed.
import weaviate
from weaviate.classes.config import Configure, Property, DataType
import os
client = weaviate.connect_to_weaviate_cloud(
cluster_url=os.getenv("WEAVIATE_URL"),
auth_credentials=os.getenv("WEAVIATE_API_KEY"),
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_API_KEY")
}
)
client.collections.create(
name="Products",
properties=[
Property(name="title", data_type=DataType.TEXT),
],
vector_config=Configure.Vectors.text2vec_openai()
)
objects = [
{"name": "Sherlock Holmes"}, # 'name' property not in schema
{"title": 1984}, # 'title' should be TEXT, not NUMBER
{"title": "To Kill a Mockingbird"}, # Valid object
]
collection = client.collections.use("Products")
# This will run without raising exceptions
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)
Here, depending on your configuration, either 1 or 2 out of 3 objects fail to be ingested due to schema validation errors.
Without checking failed_objects, you may not realize there were issues. Let's learn how to identify and handle batch ingestion errors properly.
Accessing batch errors
Different levels of error monitoring are available during and after the batch import process.
During import (monitoring)
During the import, the batch.number_errors property provides a running count of errors encountered so far.
A typical pattern is to monitor this count and take action if it exceeds a threshold; as it may be indicative of some systematic issue with the data or configuration.
with collection.batch.fixed_size(batch_size=100) as batch:
for i, obj in enumerate(objects):
batch.add_object(properties=obj)
# Check errors every 500 objects
if i % 500 == 0:
print(f"Processed {i} objects; Errors so far: {batch.number_errors}")
# Stop if too many errors
if batch.number_errors > break_threshold:
print(f"Error count > threshold of {break_threshold} - stopping import")
break
After context manager exits
Upon exiting the batch context manager, you can check the batch.failed_objects list to see which objects failed and why.
Each failed object includes details such as the UUID, error message, and the original object data, allowing for targeted troubleshooting.
# GOOD: Check for errors after import
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)
# Check for failures (if length > 0, truthy)
if collection.batch.failed_objects:
print(f"Failed to import {len(collection.batch.failed_objects)} objects")
for failed in collection.batch.failed_objects[:3]: # Inspect the first 3 errors
print(f"Original UUID: {failed.original_uuid}")
print(f"Error: {failed.message}")
print(f"Object: {failed.object_.properties}")
else:
print("Import successful - no errors")
Error handling strategies
Once you've identified errors, you need a strategy to handle them. Some potential approaches include:
- Simply logging errors and continuing
- Failing immediately by throwing an exception
- Separating errors by type and handling them differently
Here are some indicative examples of each strategy.
- Log and continue
- Fail fast
- Separate by error type
For non-critical imports where some failures are acceptable, you can log errors but allow the import to continue uninterrupted:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)
# Log failures but don't stop
if collection.batch.failed_objects:
logger.warning(f"Import completed with {len(collection.batch.failed_objects)} failures")
for failed in collection.batch.failed_objects:
logger.error(f"Failed: {failed.message}")
else:
logger.info("Import successful")
For critical imports where any failure is unacceptable, you can implement a fail-fast approach that raises an exception as soon as errors are detected.
with collection.batch.fixed_size(batch_size=100) as batch:
for i, obj in enumerate(objects):
batch.add_object(properties=obj)
# Check every batch
if i % 100 == 0 and batch.number_errors > 0:
raise Exception(f"Import failed: {batch.number_errors} errors detected")
if collection.batch.failed_objects:
raise Exception(f"Import failed with {len(collection.batch.failed_objects)} errors")
print("Import successful - no errors")
Different errors may need different handling. So, you can implement suitable logic to separate errors by type and handle them accordingly.
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)
# Categorize failures
transient_errors = [] # Network, timeout, rate limit
schema_errors = [] # Validation, type mismatch
other_errors = []
for failed in collection.batch.failed_objects:
error_msg = failed.message.lower()
# Example categorization logic
# You can customize this based on your error messages
if any(word in error_msg for word in ["timeout", "rate limit", "network"]):
transient_errors.append(failed)
elif any(word in error_msg for word in ["schema", "validation", "type"]):
schema_errors.append(failed)
else:
other_errors.append(failed)
print(f"Transient errors (will retry): {len(transient_errors)}")
print(f"Schema errors (need fixing): {len(schema_errors)}")
print(f"Other errors: {len(other_errors)}")
# Retry transient errors only
# Fix schema errors manually
Retry logic pattern
For transient errors such as network issues, or rate limiting, retry logic can solve the problem. Here's an indicative example implementing retry logic for failed objects:
import weaviate
from weaviate.collections import Collection
from weaviate.collections.classes.batch import ErrorObject
import time
import os
def retry_failed_objects(collection: Collection, failed_objects: list[ErrorObject], max_attempts: int = 3):
"""Retry failed imports with exponential backoff"""
attempt = 1
remaining = [
{"properties": f.object_.properties, "uuid": f.object_.uuid}
for f in failed_objects
]
while remaining and attempt <= max_attempts:
print(f"\nRetry attempt {attempt}/{max_attempts}")
print(f"Retrying {len(remaining)} objects")
with collection.batch.fixed_size(batch_size=50) as batch:
for obj in remaining:
batch.add_object(
properties=obj["properties"],
uuid=obj["uuid"]
)
# Check what still failed
if collection.batch.failed_objects:
remaining = [
{"properties": f.object_.properties, "uuid": f.object_.uuid}
for f in collection.batch.failed_objects
]
if attempt < max_attempts:
wait_time = 2 ** attempt
print(f"Still {len(remaining)} failures, waiting {wait_time}s")
time.sleep(wait_time)
else:
remaining = []
print("All retries successful!")
attempt += 1
return remaining # Still failed after all attempts
# Usage
# Initial import
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)
# Retry failures
if collection.batch.failed_objects:
still_failed = retry_failed_objects(collection, collection.batch.failed_objects)
print(f"\nFinal result: {len(still_failed)} objects still failed")
Error handling checklist
For production imports, consider implementing the following:
- Monitor
batch.number_errorsduring import - Check
batch.failed_objectsafter import - Log failed objects to file for analysis
- Implement retry logic for transient failures
- Set error thresholds (fail fast if too many errors)
- Handle different error types appropriately
- Alert/notify on import failures
- Test error handling with curated failure scenarios
Now that you know how to handle errors, let's learn UUID strategies for deduplication and idempotent imports.