Skip to content

Batch operations

Work with multiple items at once. Instead of making 100 separate API calls, batch operations let you send items in groups.

Key features

  • batch_get - Fetch up to 100 items per request (auto-splits larger batches)
  • BatchWriter - Write up to 25 items per request (auto-splits larger batches)
  • Automatic retry for failed items
  • Mix puts and deletes in one batch
  • Async-first API: BatchWriter and batch_get are async by default
  • Metrics on every operation (see observability)

Getting started

Batch get

Fetch multiple items by their keys. DynamoDB limits batch gets to 100 items per request, but pydynox handles larger batches automatically.

"""Async batch get example."""

import asyncio

from pydynox import DynamoDBClient, Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute

client = DynamoDBClient()


class User(Model):
    model_config = ModelConfig(table="users", client=client)
    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    name = StringAttribute()
    age = NumberAttribute(default=0)


async def main():
    # First create some users
    await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John", "age": 30})
    await client.put_item("users", {"pk": "USER#2", "sk": "PROFILE", "name": "Jane", "age": 25})
    await client.put_item("users", {"pk": "USER#3", "sk": "PROFILE", "name": "Bob", "age": 35})

    # Client-level batch get (async by default)
    keys = [
        {"pk": "USER#1", "sk": "PROFILE"},
        {"pk": "USER#2", "sk": "PROFILE"},
        {"pk": "USER#3", "sk": "PROFILE"},
    ]
    items = await client.batch_get("users", keys)
    for item in items:
        print(item["name"])

    # Model-level batch get - returns typed instances
    users = await User.batch_get(keys)
    for user in users:
        print(user.name, user.age)

    # Return as dicts for better performance
    users_dict = await User.batch_get(keys, as_dict=True)
    for user in users_dict:
        print(user["name"])


asyncio.run(main())

Items are returned in any order (not guaranteed to match input order). Missing items are silently skipped.

Batch write

Use BatchWriter to save or delete many items. The batch writer handles all the complexity for you: it groups items into batches, sends them to DynamoDB, and retries any items that fail.

"""Async batch write example."""

import asyncio

from pydynox import BatchWriter, DynamoDBClient

client = DynamoDBClient()


async def main():
    # Batch write (async by default) - items are sent in groups of 25
    async with BatchWriter(client, "users") as batch:
        for i in range(100):
            batch.put({"pk": f"USER#{i}", "sk": "PROFILE", "name": f"User {i}"})

    # Mix puts and deletes
    async with BatchWriter(client, "users") as batch:
        batch.put({"pk": "USER#1", "sk": "PROFILE", "name": "John"})
        batch.put({"pk": "USER#2", "sk": "PROFILE", "name": "Jane"})
        batch.delete({"pk": "USER#3", "sk": "PROFILE"})

    print("Batch write complete")


asyncio.run(main())

When you use BatchWriter as a context manager (with async with), it automatically flushes any remaining items when the block ends. This means you don't have to worry about items being left unsent.

The batch writer accepts two types of operations:

  • batch.put(item) - Add or replace an item
  • batch.delete(key) - Remove an item by its key

You can mix both operations in the same batch. DynamoDB processes them in any order, so don't rely on a specific sequence.

Sync operations

For sync code (scripts, CLI tools, or frameworks that don't support async), use the sync_ prefixed methods and SyncBatchWriter:

"""Sync batch get example."""

from pydynox import DynamoDBClient, Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute

client = DynamoDBClient()


class User(Model):
    model_config = ModelConfig(table="users", client=client)
    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    name = StringAttribute()
    age = NumberAttribute(default=0)


def main():
    # First create some users
    client.sync_put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John", "age": 30})
    client.sync_put_item("users", {"pk": "USER#2", "sk": "PROFILE", "name": "Jane", "age": 25})
    client.sync_put_item("users", {"pk": "USER#3", "sk": "PROFILE", "name": "Bob", "age": 35})

    # Client-level sync batch get
    keys = [
        {"pk": "USER#1", "sk": "PROFILE"},
        {"pk": "USER#2", "sk": "PROFILE"},
        {"pk": "USER#3", "sk": "PROFILE"},
    ]
    items = client.sync_batch_get("users", keys)
    for item in items:
        print(item["name"])

    # Model-level sync batch get - returns typed instances
    users = User.sync_batch_get(keys)
    for user in users:
        print(user.name, user.age)

    # Return as dicts for better performance
    users_dict = User.sync_batch_get(keys, as_dict=True)
    for user in users_dict:
        print(user["name"])


main()
"""Sync batch write example."""

from pydynox import DynamoDBClient, SyncBatchWriter

client = DynamoDBClient()


def main():
    # Sync batch write - items are sent in groups of 25
    with SyncBatchWriter(client, "users") as batch:
        for i in range(100):
            batch.put({"pk": f"USER#{i}", "sk": "PROFILE", "name": f"User {i}"})

    # Mix puts and deletes
    with SyncBatchWriter(client, "users") as batch:
        batch.put({"pk": "USER#1", "sk": "PROFILE", "name": "John"})
        batch.put({"pk": "USER#2", "sk": "PROFILE", "name": "Jane"})
        batch.delete({"pk": "USER#3", "sk": "PROFILE"})

    print("Sync batch write complete")


main()

API reference

Client methods

Async (default) Sync
await client.batch_get(table, keys) client.sync_batch_get(table, keys)
await client.batch_write(table, put_items, delete_keys) client.sync_batch_write(table, put_items, delete_keys)

Model methods

Async (default) Sync
await Model.batch_get(keys) Model.sync_batch_get(keys)

Context managers

Async (default) Sync
async with BatchWriter(client, table) with SyncBatchWriter(client, table)

Advanced

Manual flush

By default, the batch writer sends items to DynamoDB when it has 25 items ready, or when the context exits. If you want to send items earlier, call flush():

async with BatchWriter(client, "users") as batch:
    for i in range(100):
        batch.put({"pk": f"USER#{i}", "name": f"User {i}"})

        # Flush every 50 items instead of waiting
        if i % 50 == 0:
            await batch.flush()

This is useful when you want to see progress during long-running operations, or when you need to free up memory.

Error handling

DynamoDB sometimes can't process all items in a batch. This happens when you hit throughput limits or when there's a temporary service issue.

The batch writer automatically retries failed items with exponential backoff. If items still fail after all retries, an exception is raised when the context exits:

try:
    async with BatchWriter(client, "users") as batch:
        batch.put({"pk": "USER#1", "name": "John"})
except Exception as e:
    print(f"Some items failed: {e}")

Tip

If you're seeing frequent failures, consider using rate limiting to stay within your provisioned capacity.

Performance tips

  1. Use batch operations for bulk work - If you're saving more than a few items, batching is faster than individual put_item calls.

  2. Use as_dict=True for read-heavy workloads - Skip model instantiation when you just need the data.

  3. Don't batch single items - For one or two items, use regular put_item or get_item. The overhead of batching isn't worth it.

  4. Consider rate limiting - If you're writing a lot of data, combine batch operations with rate limiting to avoid throttling.

Testing your code

Test batch operations without DynamoDB using the built-in memory backend:

"""Testing batch operations with pydynox_memory_backend."""

import pytest
from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute


class User(Model):
    model_config = ModelConfig(table="users")
    pk = StringAttribute(partition_key=True)
    name = StringAttribute()
    age = NumberAttribute(default=0)


@pytest.mark.asyncio
async def test_batch_save(pydynox_memory_backend):
    """Test saving multiple items."""
    users = [User(pk=f"USER#{i}", name=f"User {i}", age=20 + i) for i in range(10)]

    for user in users:
        await user.save()

    # Verify all saved
    for i in range(10):
        found = await User.get(pk=f"USER#{i}")
        assert found is not None
        assert found.name == f"User {i}"


@pytest.mark.asyncio
async def test_batch_delete(pydynox_memory_backend):
    """Test deleting multiple items."""
    # Create users
    for i in range(5):
        await User(pk=f"USER#{i}", name=f"User {i}").save()

    # Delete some
    for i in range(3):
        user = await User.get(pk=f"USER#{i}")
        await user.delete()

    # Verify
    assert await User.get(pk="USER#0") is None
    assert await User.get(pk="USER#1") is None
    assert await User.get(pk="USER#2") is None
    assert await User.get(pk="USER#3") is not None
    assert await User.get(pk="USER#4") is not None


@pytest.mark.asyncio
async def test_batch_get(pydynox_memory_backend):
    """Test getting multiple items."""
    # Create users
    for i in range(5):
        await User(pk=f"USER#{i}", name=f"User {i}").save()

    # Batch get
    keys = [{"pk": f"USER#{i}"} for i in range(5)]
    results = await User.batch_get(keys)

    assert len(results) == 5

No setup needed. Just add pydynox_memory_backend to your test function. See Testing for more details.

Next steps