Batch operations
Work with multiple items at once. Instead of making 100 separate API calls, batch operations let you send items in groups.
Key features
batch_get- Fetch up to 100 items per request (auto-splits larger batches)BatchWriter- Write up to 25 items per request (auto-splits larger batches)- Automatic retry for failed items
- Mix puts and deletes in one batch
- Async-first API:
BatchWriterandbatch_getare async by default - Metrics on every operation (see observability)
Getting started
Batch get
Fetch multiple items by their keys. DynamoDB limits batch gets to 100 items per request, but pydynox handles larger batches automatically.
"""Async batch get example."""
import asyncio
from pydynox import DynamoDBClient, Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
client = DynamoDBClient()
class User(Model):
model_config = ModelConfig(table="users", client=client)
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
name = StringAttribute()
age = NumberAttribute(default=0)
async def main():
# First create some users
await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John", "age": 30})
await client.put_item("users", {"pk": "USER#2", "sk": "PROFILE", "name": "Jane", "age": 25})
await client.put_item("users", {"pk": "USER#3", "sk": "PROFILE", "name": "Bob", "age": 35})
# Client-level batch get (async by default)
keys = [
{"pk": "USER#1", "sk": "PROFILE"},
{"pk": "USER#2", "sk": "PROFILE"},
{"pk": "USER#3", "sk": "PROFILE"},
]
items = await client.batch_get("users", keys)
for item in items:
print(item["name"])
# Model-level batch get - returns typed instances
users = await User.batch_get(keys)
for user in users:
print(user.name, user.age)
# Return as dicts for better performance
users_dict = await User.batch_get(keys, as_dict=True)
for user in users_dict:
print(user["name"])
asyncio.run(main())
Items are returned in any order (not guaranteed to match input order). Missing items are silently skipped.
Batch write
Use BatchWriter to save or delete many items. The batch writer handles all the complexity for you: it groups items into batches, sends them to DynamoDB, and retries any items that fail.
"""Async batch write example."""
import asyncio
from pydynox import BatchWriter, DynamoDBClient
client = DynamoDBClient()
async def main():
# Batch write (async by default) - items are sent in groups of 25
async with BatchWriter(client, "users") as batch:
for i in range(100):
batch.put({"pk": f"USER#{i}", "sk": "PROFILE", "name": f"User {i}"})
# Mix puts and deletes
async with BatchWriter(client, "users") as batch:
batch.put({"pk": "USER#1", "sk": "PROFILE", "name": "John"})
batch.put({"pk": "USER#2", "sk": "PROFILE", "name": "Jane"})
batch.delete({"pk": "USER#3", "sk": "PROFILE"})
print("Batch write complete")
asyncio.run(main())
When you use BatchWriter as a context manager (with async with), it automatically flushes any remaining items when the block ends. This means you don't have to worry about items being left unsent.
The batch writer accepts two types of operations:
batch.put(item)- Add or replace an itembatch.delete(key)- Remove an item by its key
You can mix both operations in the same batch. DynamoDB processes them in any order, so don't rely on a specific sequence.
Sync operations
For sync code (scripts, CLI tools, or frameworks that don't support async), use the sync_ prefixed methods and SyncBatchWriter:
"""Sync batch get example."""
from pydynox import DynamoDBClient, Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
client = DynamoDBClient()
class User(Model):
model_config = ModelConfig(table="users", client=client)
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
name = StringAttribute()
age = NumberAttribute(default=0)
def main():
# First create some users
client.sync_put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John", "age": 30})
client.sync_put_item("users", {"pk": "USER#2", "sk": "PROFILE", "name": "Jane", "age": 25})
client.sync_put_item("users", {"pk": "USER#3", "sk": "PROFILE", "name": "Bob", "age": 35})
# Client-level sync batch get
keys = [
{"pk": "USER#1", "sk": "PROFILE"},
{"pk": "USER#2", "sk": "PROFILE"},
{"pk": "USER#3", "sk": "PROFILE"},
]
items = client.sync_batch_get("users", keys)
for item in items:
print(item["name"])
# Model-level sync batch get - returns typed instances
users = User.sync_batch_get(keys)
for user in users:
print(user.name, user.age)
# Return as dicts for better performance
users_dict = User.sync_batch_get(keys, as_dict=True)
for user in users_dict:
print(user["name"])
main()
"""Sync batch write example."""
from pydynox import DynamoDBClient, SyncBatchWriter
client = DynamoDBClient()
def main():
# Sync batch write - items are sent in groups of 25
with SyncBatchWriter(client, "users") as batch:
for i in range(100):
batch.put({"pk": f"USER#{i}", "sk": "PROFILE", "name": f"User {i}"})
# Mix puts and deletes
with SyncBatchWriter(client, "users") as batch:
batch.put({"pk": "USER#1", "sk": "PROFILE", "name": "John"})
batch.put({"pk": "USER#2", "sk": "PROFILE", "name": "Jane"})
batch.delete({"pk": "USER#3", "sk": "PROFILE"})
print("Sync batch write complete")
main()
API reference
Client methods
| Async (default) | Sync |
|---|---|
await client.batch_get(table, keys) |
client.sync_batch_get(table, keys) |
await client.batch_write(table, put_items, delete_keys) |
client.sync_batch_write(table, put_items, delete_keys) |
Model methods
| Async (default) | Sync |
|---|---|
await Model.batch_get(keys) |
Model.sync_batch_get(keys) |
Context managers
| Async (default) | Sync |
|---|---|
async with BatchWriter(client, table) |
with SyncBatchWriter(client, table) |
Advanced
Manual flush
By default, the batch writer sends items to DynamoDB when it has 25 items ready, or when the context exits. If you want to send items earlier, call flush():
async with BatchWriter(client, "users") as batch:
for i in range(100):
batch.put({"pk": f"USER#{i}", "name": f"User {i}"})
# Flush every 50 items instead of waiting
if i % 50 == 0:
await batch.flush()
This is useful when you want to see progress during long-running operations, or when you need to free up memory.
Error handling
DynamoDB sometimes can't process all items in a batch. This happens when you hit throughput limits or when there's a temporary service issue.
The batch writer automatically retries failed items with exponential backoff. If items still fail after all retries, an exception is raised when the context exits:
try:
async with BatchWriter(client, "users") as batch:
batch.put({"pk": "USER#1", "name": "John"})
except Exception as e:
print(f"Some items failed: {e}")
Tip
If you're seeing frequent failures, consider using rate limiting to stay within your provisioned capacity.
Performance tips
-
Use batch operations for bulk work - If you're saving more than a few items, batching is faster than individual
put_itemcalls. -
Use
as_dict=Truefor read-heavy workloads - Skip model instantiation when you just need the data. -
Don't batch single items - For one or two items, use regular
put_itemorget_item. The overhead of batching isn't worth it. -
Consider rate limiting - If you're writing a lot of data, combine batch operations with rate limiting to avoid throttling.
Testing your code
Test batch operations without DynamoDB using the built-in memory backend:
"""Testing batch operations with pydynox_memory_backend."""
import pytest
from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
class User(Model):
model_config = ModelConfig(table="users")
pk = StringAttribute(partition_key=True)
name = StringAttribute()
age = NumberAttribute(default=0)
@pytest.mark.asyncio
async def test_batch_save(pydynox_memory_backend):
"""Test saving multiple items."""
users = [User(pk=f"USER#{i}", name=f"User {i}", age=20 + i) for i in range(10)]
for user in users:
await user.save()
# Verify all saved
for i in range(10):
found = await User.get(pk=f"USER#{i}")
assert found is not None
assert found.name == f"User {i}"
@pytest.mark.asyncio
async def test_batch_delete(pydynox_memory_backend):
"""Test deleting multiple items."""
# Create users
for i in range(5):
await User(pk=f"USER#{i}", name=f"User {i}").save()
# Delete some
for i in range(3):
user = await User.get(pk=f"USER#{i}")
await user.delete()
# Verify
assert await User.get(pk="USER#0") is None
assert await User.get(pk="USER#1") is None
assert await User.get(pk="USER#2") is None
assert await User.get(pk="USER#3") is not None
assert await User.get(pk="USER#4") is not None
@pytest.mark.asyncio
async def test_batch_get(pydynox_memory_backend):
"""Test getting multiple items."""
# Create users
for i in range(5):
await User(pk=f"USER#{i}", name=f"User {i}").save()
# Batch get
keys = [{"pk": f"USER#{i}"} for i in range(5)]
results = await User.batch_get(keys)
assert len(results) == 5
No setup needed. Just add pydynox_memory_backend to your test function. See Testing for more details.
Next steps
- Transactions - All-or-nothing operations
- Rate limiting - Control throughput for bulk operations
- Observability - Track metrics on batch operations