Import optimization strategies
While the default batch import settings work well for many use cases, optimizing batch imports can improve performance and efficiency, especially with large datasets.
Performance depends on several factors:
- Batch size
- Concurrent requests
- Object size
- Vectorization method (integrated vs. pre-computed)
- Network latency
- Weaviate resource availability (CPU, memory)
Which batching method to use
We've discussed the choice between .fixed_size(), .rate_limit() and .dynamic() methods in lesson 2.2.
While .dynamic() can adapt to changing conditions, it can be opaque and potentially unpredictable loading patterns. For example, it may trigger autoscaling events in cloud environments, leading to variable performance and unexpected costs.
.rate_limit() is specifically designed for scenarios involving external vectorizers with strict API rate limits. Use it when necessary to avoid rate limit errors.
For most, .fixed_size() is the recommended, best-practice method. It provides predictable performance and allows you to tune batch size and concurrency.
At the time of writing, the client is configured with defaults of batch_size=100 and concurrent_requests=2, which works well for many use cases.
Let's explore how to optimize these settings further.
A server-side batching algorithm is planned for a future Weaviate release. This will offload batching logic to the Weaviate server, for a closer integration of the feedback loop of server load metrics and batch sending.
Concurrency considerations
Batch imports are I/O-bound operations, involving network requests and potentially waiting for vectorization. Therefore, increasing concurrency can often lead to better throughput. On the other hand, the throughput should be balanced against the Weaviate server's capacity to handle the ingestion load and any other bottlenecks such as embedding inference time or API rate limits.
The batching algorithm in our client is not thread-safe. Using threading or multiprocessing to parallelize batch imports can lead to data corruption or unexpected behavior. Instead, adjust the concurrent_requests parameter to control parallelism within a single process & thread.
Tuning concurrent_requests and batch_size
These two parameters together determine the rate of data ingestion.
products = client.collections.use("Products")
with products.batch.fixed_size(
batch_size=200, # Process 200 objects per batch (default is 100)
concurrent_requests=4 # Process 4 batches in parallel (default is 2)
) as batch:
for product in product_data:
batch.add_object(properties=product)
The total number of in-flight objects being processed at any given time is approximately:
Total in-flight objects = batch_size * concurrent_requests
Then, you can estimate the throughput as:
Throughput (objects/sec) ≈ Total in-flight objects / Average processing time per batch (sec)
As a result, increasing either batch_size or concurrent_requests can improve throughput. Generally, you can increase either concurrency or batch size if the data throughput is the only bottleneck in the system.
However, consider the following guidelines:
Whether to increase batch size or concurrency
Increasing batch_size has the effect of reducing the overhead per object, as there will be fewer, but "heavier" requests. This is especially beneficial when dealing with small objects where the overhead is significant compared to the object size.
Conversely, increasing concurrent_requests can help to better utilize available resources and improve throughput. This can increase the overall system saturation.
If network latency is high
In this scenario, increasing concurrent_requests is often more effective. High latency means each request takes longer to complete, so having multiple requests in flight helps to keep the pipeline full and maximizes resource utilization.
If timeouts occur
This may mean that each batch is taking too long to process, or Weaviate is overloaded.
If a batch is taking too long to process, reducing batch_size can help, as smaller batches are quicker to process and less likely to hit timeout limits. To balance this out, consider increasing concurrent_requests slightly to maintain overall throughput.
However, if Weaviate is overloaded (e.g., high CPU/memory usage), consider reducing one or both of batch_size and concurrent_requests to lower the ingestion load.
If Weaviate CPU & memory are saturated
In this scenario, consider upgrading the instance type to provide more CPU and memory resources. For large, one-time ingestions, you could temporarily provision additional resources for the import process, then downgrade afterward.
You could also add more nodes to a cluster to distribute the load; however this may require data migration depending on your setup.
At this point, consider lowering one or both of batch_size and concurrent_requests to reduce the ingestion load, as pushing more data will likely exacerbate the saturation issue.
When optimizing for write throughput, keep in mind the impact on read/query performance. A larger Weaviate cluster with more nodes can handle higher write throughput, but may introduce additional latency for queries, as each query needs to read from more nodes.
If embedding inference is the bottleneck
In this scenario, this requires a different optimization approach, as the vectorization step is the slowest part of the process. However, you can still try to increase concurrent_requests to see if it helps.
If the cause of this bottleneck is an external vectorizer with rate limits, consider using the .rate_limit() method to avoid hitting those limits, and contacting the vectorizer provider to explore higher rate limits.
Performance optimization decision tree
Is import taking too long?
├─ Yes: Measure stage timings (embedding vs upload vs network)
│ │
│ ├─ Is Weaviate CPU saturated (>80%)?
│ │ └─ Solution: Add nodes or upgrade instance
│ │
│ ├─ Is embedding time too long?
│ │ ├─ External API: Increase concurrent_requests, use .rate_limit(), consider local models or separate vectorization step
│ │ └─ Local model: May need more resources / GPU for vectorization
│ │
│ ├─ Is network latency high (>100ms)?
│ │ └─ Solution: Increase concurrent_requests
│ │
│ └─ Weaviate underutilized (<50% CPU)?
│ └─ Solution: Increase concurrent_requests by 2-4, then test batch_size if needed
│
├─ Are you seeing errors?
│ ├─ Timeouts: Decrease batch_size (+ optionally increase concurrent_requests)
│ ├─ Embedding model provider rate limits: Use .rate_limit()
│ ├─ Memory errors: Decrease batch_size
│ └─ Other: Check error messages
│
└─ Already fast enough?
└─ Great! Document optimal settings for future reference.
Advanced optimization techniques
Pre-computing vectors
For many, if not most, use cases, using Weaviate's integrated vectorizers is the best, most convenient option.
However, in scenarios where maximum ingestion speed is critical, you could consider pre-computing vectors outside of Weaviate. This can allow you to fully parallelize vectorization, and send the vectors directly to Weaviate during import.
import weaviate
from openai import OpenAI
from concurrent.futures import ThreadPoolExecutor
import os
# Pre-compute all vectors in parallel
openai_client = OpenAI()
def get_embedding(text):
response = openai_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
# Vectorize in parallel
with ThreadPoolExecutor(max_workers=10) as executor:
texts = [f"{p['name']}" for p in product_data]
vectors = list(executor.map(get_embedding, texts))
# Now import with pre-computed vectors (fast!)
client = weaviate.connect_to_weaviate_cloud(
cluster_url=os.getenv("WEAVIATE_URL"),
auth_credentials=os.getenv("WEAVIATE_API_KEY")
)
products = client.collections.use("Products")
with products.batch.fixed_size(batch_size=200) as batch:
for product, vector in zip(product_data, vectors):
batch.add_object(
properties=product,
vector=vector
)
client.close()
In some scenarios, it may be beneficial to back up your computed embeddings externally, such as in a cloud storage service. This allows you to re-use the embeddings for future imports or updates without needing to re-compute them, saving time and costs associated with vectorization.
Monitoring
If you have configured monitoring (e.g. through Prometheus) for your Weaviate instance, you can track relevant metrics to help optimize batch imports.
As Weaviate engages with external API service providers, it logs various metrics that can be useful for monitoring and optimization.
These include API related information such as "token_limit", "token_remaining", "request_limit", "request_remaining", and "estimated_requests_needed".
Weaviate also logs the duration of each request to external vectorizers, using metrics such as: t2v_request_duration_seconds{vectorizer="text2vec-openai"} where the vectorizer label will vary based on the specific vectorizer in use.
Review these metrics to identify potential bottlenecks or issues during batch imports.
For the latest Weaviate metrics, refer to this source code file.
Performance benchmarking
To find the optimal batch import settings for your specific use case, consider running benchmarks with different configurations.
Here is a basic, example benchmarking script:
import weaviate
import time
import os
from tqdm import tqdm
def create_collection(client: WeaviateClient, collection_name: str):
client.collections.create(
name=collection_name,
properties=[
Property(name="sku", data_type=DataType.TEXT),
Property(name="name", data_type=DataType.TEXT),
Property(name="price", data_type=DataType.NUMBER),
],
vector_config=Configure.Vectors.text2vec_openai()
)
def benchmark_batch_import(batch_size: int, concurrent_requests: int, data: list[dict]):
"""Benchmark a specific configuration"""
client = weaviate.connect_to_weaviate_cloud(
cluster_url=os.getenv("WEAVIATE_URL"),
auth_credentials=os.getenv("WEAVIATE_API_KEY"),
headers={"X-OpenAI-Api-Key": os.getenv("OPENAI_API_KEY")} # Headers with any required API keys
)
collection_name = f"Products_{batch_size}_{concurrent_requests}"
client.collections.delete(collection_name) # Clean up if exists
create_collection(client, collection_name) # Custom function to create collection
products = client.collections.use("Products")
start = time.time()
with products.batch.fixed_size(
batch_size=batch_size,
concurrent_requests=concurrent_requests
) as batch:
for item in tqdm(data):
batch.add_object(properties=item)
elapsed = time.time() - start
errors = len(products.batch.failed_objects)
client.close()
return {
"batch_size": batch_size,
"concurrent_requests": concurrent_requests,
"time": elapsed,
"rate": len(data) / elapsed,
"errors": errors
}
# Test different configurations
configs = [
(100, 2),
(200, 2),
(100, 4),
(200, 4),
]
test_data = product_data[:1000] # Test with 1000 objects
print("Configuration Benchmark")
print("-" * 60)
for batch_size, concurrent in configs:
result = benchmark_batch_import(batch_size, concurrent, test_data)
print(f"Batch: {result['batch_size']:3d} | "
f"Concurrent: {result['concurrent_requests']} | "
f"Time: {result['time']:6.2f}s | "
f"Rate: {result['rate']:6.0f} obj/s | "
f"Errors: {result['errors']}")
Summary
Optimization is an iterative process, where the specific settings to use will depend on your situation such as data, Weaviate cluster setup and vectorization setup.
But, these general guidelines can help you get started:
- For external vectorizers with rate limits, use
.rate_limit()- Otherwise, start with
.fixed_size() - Start with batch_size=100-200, concurrent_requests=2
- Otherwise, start with
- Monitor Weaviate CPU/memory during import
- Test different batch sizes with your data
- Track import progress for long-running jobs
- Benchmark different configurations
- Document optimal settings for your use case
- Consider pre-computing vectors for maximum speed
Congratulations! You've learned the fundamentals of data ingestion in Weaviate, from understanding the pipeline to implementing production-ready batch imports with proper error handling, UUID strategies, and optimization techniques.