Skip to main content

Error handling in batch imports

Handing errors effectively during batch imports is crucial for maintaining data integrity and ensuring a smooth ingestion process. In this lesson, we'll explore common error types, how to detect them, and strategies for handling them.

Request success ≠ insertion success

It's very important to understand that a successful API response does not guarantee that all objects were ingested successfully. It simply means that the request was received and processed.

import weaviate
from weaviate.classes.config import Configure, Property, DataType
import os

client = weaviate.connect_to_weaviate_cloud(
cluster_url=os.getenv("WEAVIATE_URL"),
auth_credentials=os.getenv("WEAVIATE_API_KEY"),
headers={
"X-OpenAI-Api-Key": os.getenv("OPENAI_API_KEY")
}
)

client.collections.create(
name="Products",
properties=[
Property(name="title", data_type=DataType.TEXT),
],
vector_config=Configure.Vectors.text2vec_openai()
)

objects = [
{"name": "Sherlock Holmes"}, # 'name' property not in schema
{"title": 1984}, # 'title' should be TEXT, not NUMBER
{"title": "To Kill a Mockingbird"}, # Valid object
]

collection = client.collections.use("Products")

# This will run without raising exceptions
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)

Here, depending on your configuration, either 1 or 2 out of 3 objects fail to be ingested due to schema validation errors.

Without checking failed_objects, you may not realize there were issues. Let's learn how to identify and handle batch ingestion errors properly.

Accessing batch errors

Different levels of error monitoring are available during and after the batch import process.

During import (monitoring)

During the import, the batch.number_errors property provides a running count of errors encountered so far.

A typical pattern is to monitor this count and take action if it exceeds a threshold; as it may be indicative of some systematic issue with the data or configuration.

with collection.batch.fixed_size(batch_size=100) as batch:
for i, obj in enumerate(objects):
batch.add_object(properties=obj)

# Check errors every 500 objects
if i % 500 == 0:
print(f"Processed {i} objects; Errors so far: {batch.number_errors}")

# Stop if too many errors
if batch.number_errors > break_threshold:
print(f"Error count > threshold of {break_threshold} - stopping import")
break

After context manager exits

Upon exiting the batch context manager, you can check the batch.failed_objects list to see which objects failed and why.

Each failed object includes details such as the UUID, error message, and the original object data, allowing for targeted troubleshooting.

# GOOD: Check for errors after import
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)

# Check for failures (if length > 0, truthy)
if collection.batch.failed_objects:
print(f"Failed to import {len(collection.batch.failed_objects)} objects")
for failed in collection.batch.failed_objects[:3]: # Inspect the first 3 errors
print(f"Original UUID: {failed.original_uuid}")
print(f"Error: {failed.message}")
print(f"Object: {failed.object_.properties}")
else:
print("Import successful - no errors")

Error handling strategies

Once you've identified errors, you need a strategy to handle them. Some potential approaches include:

  1. Simply logging errors and continuing
  2. Failing immediately by throwing an exception
  3. Separating errors by type and handling them differently

Here are some indicative examples of each strategy.

For non-critical imports where some failures are acceptable, you can log errors but allow the import to continue uninterrupted:

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)

# Log failures but don't stop
if collection.batch.failed_objects:
logger.warning(f"Import completed with {len(collection.batch.failed_objects)} failures")
for failed in collection.batch.failed_objects:
logger.error(f"Failed: {failed.message}")
else:
logger.info("Import successful")

Retry logic pattern

For transient errors such as network issues, or rate limiting, retry logic can solve the problem. Here's an indicative example implementing retry logic for failed objects:

import weaviate
from weaviate.collections import Collection
from weaviate.collections.classes.batch import ErrorObject
import time
import os

def retry_failed_objects(collection: Collection, failed_objects: list[ErrorObject], max_attempts: int = 3):
"""Retry failed imports with exponential backoff"""
attempt = 1
remaining = [
{"properties": f.object_.properties, "uuid": f.object_.uuid}
for f in failed_objects
]

while remaining and attempt <= max_attempts:
print(f"\nRetry attempt {attempt}/{max_attempts}")
print(f"Retrying {len(remaining)} objects")

with collection.batch.fixed_size(batch_size=50) as batch:
for obj in remaining:
batch.add_object(
properties=obj["properties"],
uuid=obj["uuid"]
)

# Check what still failed
if collection.batch.failed_objects:
remaining = [
{"properties": f.object_.properties, "uuid": f.object_.uuid}
for f in collection.batch.failed_objects
]

if attempt < max_attempts:
wait_time = 2 ** attempt
print(f"Still {len(remaining)} failures, waiting {wait_time}s")
time.sleep(wait_time)
else:
remaining = []
print("All retries successful!")

attempt += 1

return remaining # Still failed after all attempts

# Usage
# Initial import
with collection.batch.fixed_size(batch_size=100) as batch:
for obj in objects:
batch.add_object(properties=obj)

# Retry failures
if collection.batch.failed_objects:
still_failed = retry_failed_objects(collection, collection.batch.failed_objects)
print(f"\nFinal result: {len(still_failed)} objects still failed")

Error handling checklist

For production imports, consider implementing the following:

  • Monitor batch.number_errors during import
  • Check batch.failed_objects after import
  • Log failed objects to file for analysis
  • Implement retry logic for transient failures
  • Set error thresholds (fail fast if too many errors)
  • Handle different error types appropriately
  • Alert/notify on import failures
  • Test error handling with curated failure scenarios
What's next?

Now that you know how to handle errors, let's learn UUID strategies for deduplication and idempotent imports.

Login to track your progress