Skip to content

Atomic updates

Atomic updates modify values in DynamoDB without reading them first. This avoids race conditions when multiple requests update the same item.

Key features

  • Increment/decrement numbers without read-modify-write
  • Append/prepend items to lists
  • Set values only if attribute doesn't exist
  • Remove attributes
  • Combine multiple operations in one request
  • Add conditions for safe updates

The problem with read-modify-write

Without atomic updates, incrementing a counter looks like this:

# Dangerous - race condition!
user = User.get(pk="USER#123")
user.login_count = user.login_count + 1
user.save()

If two requests run at the same time:

  1. Request A reads login_count = 5
  2. Request B reads login_count = 5
  3. Request A writes login_count = 6
  4. Request B writes login_count = 6

You lost an increment. The count should be 7, but it's 6.

Atomic updates solve this by doing the math in DynamoDB:

# Safe - no race condition
user.update(atomic=[User.login_count.add(1)])

Getting started

Counters

The most common use case. Increment or decrement a number:

"""Atomic counter example."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute


class PageView(Model):
    model_config = ModelConfig(table="analytics")

    pk = StringAttribute(partition_key=True)  # page_url
    sk = StringAttribute(sort_key=True)  # date
    views = NumberAttribute()


async def main():
    # Increment counter without reading first
    page = PageView(pk="/home", sk="2024-01-15", views=0)
    await page.save()

    # Each request increments atomically
    await page.update(atomic=[PageView.views.add(1)])

    # Multiple increments are safe - no race conditions
    # Request 1: views = 0 + 1 = 1
    # Request 2: views = 1 + 1 = 2
    # Request 3: views = 2 + 1 = 3


asyncio.run(main())

Each add(1) is atomic. Even with thousands of concurrent requests, every increment is counted.

Safe balance transfer

Combine atomic updates with conditions to prevent overdrafts:

"""Safe balance transfer with condition."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class Account(Model):
    model_config = ModelConfig(table="accounts")

    pk = StringAttribute(partition_key=True)  # account_id
    balance = NumberAttribute()


async def withdraw(account: Account, amount: int) -> bool:
    """Withdraw money only if balance is sufficient."""
    try:
        await account.update(
            atomic=[Account.balance.add(-amount)],
            condition=Account.balance >= amount,
        )
        return True
    except ConditionalCheckFailedException:
        return False


async def main():
    # Usage
    account = Account(pk="ACC#123", balance=100)
    await account.save()

    # This succeeds - balance goes from 100 to 50
    success = await withdraw(account, 50)
    print(f"Withdrew 50: {success}")  # True

    # This fails - balance is 50, can't withdraw 75
    success = await withdraw(account, 75)
    print(f"Withdrew 75: {success}")  # False


asyncio.run(main())

The condition balance >= amount is checked atomically with the update. If the balance is too low, the whole operation fails.

Advanced

Inventory management

Track stock and reservations atomically:

"""Inventory management with atomic updates."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class Product(Model):
    model_config = ModelConfig(table="products")

    pk = StringAttribute(partition_key=True)  # product_id
    stock = NumberAttribute()
    reserved = NumberAttribute()


class OutOfStock(Exception):
    pass


async def reserve_stock(product: Product, quantity: int) -> None:
    """Reserve stock for an order."""
    try:
        await product.update(
            atomic=[
                Product.stock.add(-quantity),
                Product.reserved.add(quantity),
            ],
            condition=Product.stock >= quantity,
        )
    except ConditionalCheckFailedException:
        raise OutOfStock(f"Not enough stock for {product.pk}")


async def release_stock(product: Product, quantity: int) -> None:
    """Release reserved stock (order cancelled)."""
    await product.update(
        atomic=[
            Product.stock.add(quantity),
            Product.reserved.add(-quantity),
        ]
    )


async def main():
    # Usage
    product = Product(pk="SKU#ABC123", stock=10, reserved=0)
    await product.save()

    # Reserve 3 units
    await reserve_stock(product, 3)
    # stock: 7, reserved: 3

    # Try to reserve 10 more - fails
    try:
        await reserve_stock(product, 10)
    except OutOfStock:
        print("Cannot reserve - not enough stock")

    # Cancel order - release the 3 units
    await release_stock(product, 3)
    # stock: 10, reserved: 0


asyncio.run(main())

Both stock and reserved are updated in one atomic operation. No item can be double-sold.

Rate limiting

Enforce API rate limits with atomic counters:

"""API rate limiting with atomic counters."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class ApiUsage(Model):
    model_config = ModelConfig(table="api_usage")

    pk = StringAttribute(partition_key=True)  # user_id
    sk = StringAttribute(sort_key=True)  # date (YYYY-MM-DD)
    requests = NumberAttribute()


class RateLimitExceeded(Exception):
    pass


async def track_request(user_id: str, date: str, daily_limit: int = 1000) -> int:
    """Track API request and enforce rate limit.

    Returns the new request count.
    Raises RateLimitExceeded if over limit.
    """
    usage = await ApiUsage.get(pk=user_id, sk=date)

    if usage is None:
        # First request of the day
        usage = ApiUsage(pk=user_id, sk=date, requests=1)
        await usage.save()
        return 1

    try:
        await usage.update(
            atomic=[ApiUsage.requests.add(1)],
            condition=ApiUsage.requests < daily_limit,
        )
        # Fetch updated count
        updated = await ApiUsage.get(pk=user_id, sk=date)
        return updated.requests
    except ConditionalCheckFailedException:
        raise RateLimitExceeded(f"User {user_id} exceeded {daily_limit} requests/day")


async def main():
    try:
        count = await track_request("user_123", "2024-01-15")
        print(f"Request #{count} recorded")
    except RateLimitExceeded as e:
        print(f"Rate limit hit: {e}")


asyncio.run(main())

The condition ensures you can't exceed the limit, even with concurrent requests.

Shopping cart

Manage cart items and totals:

"""Shopping cart with list operations."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import ListAttribute, NumberAttribute, StringAttribute


class Cart(Model):
    model_config = ModelConfig(table="carts")

    pk = StringAttribute(partition_key=True)  # user_id
    items = ListAttribute()
    total = NumberAttribute()


async def add_to_cart(cart: Cart, item: dict, price: float) -> None:
    """Add item to cart and update total."""
    await cart.update(
        atomic=[
            Cart.items.append([item]),
            Cart.total.add(price),
        ]
    )


async def apply_discount(cart: Cart, discount: float) -> None:
    """Apply discount to cart total."""
    await cart.update(
        atomic=[Cart.total.add(-discount)],
        condition=Cart.total >= discount,
    )


async def main():
    # Usage
    cart = Cart(pk="USER#123", items=[], total=0)
    await cart.save()

    # Add items
    await add_to_cart(cart, {"sku": "SHIRT-M", "qty": 1}, 29.99)
    await add_to_cart(cart, {"sku": "PANTS-L", "qty": 2}, 49.99)

    # Cart now has:
    # items: [{"sku": "SHIRT-M", "qty": 1}, {"sku": "PANTS-L", "qty": 2}]
    # total: 79.98

    # Apply $10 discount
    await apply_discount(cart, 10.00)
    # total: 69.98


asyncio.run(main())

List operations

Add items to lists without reading the whole list:

"""Managing user tags with list operations."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import ListAttribute, StringAttribute


class User(Model):
    model_config = ModelConfig(table="users")

    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    tags = ListAttribute()


async def main():
    # Create user with initial tags
    user = User(pk="USER#123", sk="PROFILE", tags=["member"])
    await user.save()

    # Add tags to the end
    await user.update(atomic=[User.tags.append(["premium", "verified"])])
    # tags: ["member", "premium", "verified"]

    # Add tags to the beginning
    await user.update(atomic=[User.tags.prepend(["vip"])])
    # tags: ["vip", "member", "premium", "verified"]


asyncio.run(main())

Default values

Set a value only if the attribute doesn't exist:

"""Using if_not_exists for default values."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute


class User(Model):
    model_config = ModelConfig(table="users")

    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    login_count = NumberAttribute()
    score = NumberAttribute()


async def main():
    # User without login_count
    user = User(pk="USER#123", sk="PROFILE")
    await user.save()

    # Set default value only if attribute doesn't exist
    await user.update(atomic=[User.login_count.if_not_exists(0)])
    # login_count: 0

    # Now increment it
    await user.update(atomic=[User.login_count.add(1)])
    # login_count: 1

    # if_not_exists won't overwrite existing value
    await user.update(atomic=[User.login_count.if_not_exists(999)])
    # login_count: still 1

    # Combine with add for "increment or initialize"
    await user.update(
        atomic=[
            User.score.if_not_exists(0),  # Initialize if missing
        ]
    )
    await user.update(atomic=[User.score.add(10)])  # Then increment
    # score: 10


asyncio.run(main())

Multiple operations

Combine several atomic operations in one request:

"""Multiple atomic operations in one request."""

import asyncio
from datetime import datetime

from pydynox import Model, ModelConfig
from pydynox.attributes import (
    ListAttribute,
    NumberAttribute,
    StringAttribute,
)


class User(Model):
    model_config = ModelConfig(table="users")

    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    login_count = NumberAttribute()
    last_login = StringAttribute()
    badges = ListAttribute()
    temp_token = StringAttribute()


async def main():
    user = User(
        pk="USER#123",
        sk="PROFILE",
        login_count=0,
        badges=[],
        temp_token="abc123",
    )
    await user.save()

    # Multiple operations in one request
    await user.update(
        atomic=[
            User.login_count.add(1),
            User.last_login.set(datetime.now().isoformat()),
            User.badges.append(["first_login"]),
            User.temp_token.remove(),
        ]
    )

    # Result:
    # login_count: 1
    # last_login: "2024-01-15T10:30:00"
    # badges: ["first_login"]
    # temp_token: None (removed)


asyncio.run(main())

All operations happen atomically. Either all succeed or none do.

Operations reference

Method Description Example
set(value) Set attribute to value User.name.set("Jane")
add(n) Add to number (use negative to subtract) User.count.add(1)
remove() Delete the attribute User.temp.remove()
append(items) Add items to end of list User.tags.append(["a", "b"])
prepend(items) Add items to start of list User.tags.prepend(["a"])
if_not_exists(value) Set only if attribute is missing User.count.if_not_exists(0)

Error handling

When a condition fails, you get ConditionalCheckFailedException:

from pydynox.exceptions import ConditionalCheckFailedException

try:
    account.update(
        atomic=[Account.balance.add(-100)],
        condition=Account.balance >= 100,
    )
except ConditionalCheckFailedException:
    print("Insufficient balance")

When to use atomic updates

Use atomic updates when:

  • Multiple requests might update the same item
  • You need counters (views, likes, inventory)
  • You want to avoid read-modify-write patterns
  • You need guaranteed consistency

Use regular update() with kwargs when:

  • You're the only writer
  • You need to set values based on other fields
  • You're doing a simple field update

Testing your code

Test atomic updates without DynamoDB using the built-in memory backend:

"""Testing atomic updates with pydynox_memory_backend."""

from pydynox import Model, ModelConfig
from pydynox.attributes import ListAttribute, NumberAttribute, StringAttribute


class Counter(Model):
    model_config = ModelConfig(table="counters")
    pk = StringAttribute(partition_key=True)
    count = NumberAttribute(default=0)
    tags = ListAttribute(default=list)


def test_atomic_increment(pydynox_memory_backend):
    """Test atomic counter increment."""
    counter = Counter(pk="views", count=0)
    counter.save()

    # Increment atomically
    counter.update(atomic=[Counter.count.add(1)])
    counter.update(atomic=[Counter.count.add(1)])
    counter.update(atomic=[Counter.count.add(1)])

    found = Counter.get(pk="views")
    assert found.count == 3


def test_atomic_decrement(pydynox_memory_backend):
    """Test atomic counter decrement."""
    counter = Counter(pk="stock", count=100)
    counter.save()

    # Decrement atomically
    counter.update(atomic=[Counter.count.add(-10)])

    found = Counter.get(pk="stock")
    assert found.count == 90


def test_atomic_append(pydynox_memory_backend):
    """Test atomic list append."""
    counter = Counter(pk="item", tags=["initial"])
    counter.save()

    # Append atomically
    counter.update(atomic=[Counter.tags.append(["new_tag"])])

    found = Counter.get(pk="item")
    assert "initial" in found.tags
    assert "new_tag" in found.tags


def test_atomic_set(pydynox_memory_backend):
    """Test atomic set operation."""
    counter = Counter(pk="item", count=10)
    counter.save()

    # Set atomically
    counter.update(atomic=[Counter.count.set(99)])

    found = Counter.get(pk="item")
    assert found.count == 99

No setup needed. Just add pydynox_memory_backend to your test function. See Testing for more details.

Next steps