Skip to content

Optimistic locking

Optimistic locking prevents concurrent updates from overwriting each other. When two processes try to update the same item, the second one fails instead of silently overwriting the first.

How it works

Add a VersionAttribute to your model. pydynox handles the rest:

  1. First save sets version to 1
  2. Each save increments version by 1
  3. Before saving, pydynox checks that the version in DynamoDB matches the local version
  4. If versions don't match, save fails with ConditionalCheckFailedException

Basic usage

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute


class Document(Model):
    model_config = ModelConfig(table="documents")
    pk = StringAttribute(partition_key=True)
    content = StringAttribute()
    version = VersionAttribute()


async def main():
    # Create new document
    doc = Document(pk="DOC#VERSION", content="Hello")
    print(doc.version)  # None

    await doc.save()
    print(doc.version)  # 1

    # Update document
    doc.content = "Hello World"
    await doc.save()
    print(doc.version)  # 2

    # Load from DB - version is preserved
    loaded = await Document.get(pk="DOC#VERSION")
    print(loaded.version)  # 2


asyncio.run(main())

Concurrent updates

When two processes load the same item and try to update it:

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class Document(Model):
    model_config = ModelConfig(table="documents")
    pk = StringAttribute(partition_key=True)
    content = StringAttribute()
    version = VersionAttribute()


async def main():
    # Create document
    doc = Document(pk="DOC#CONCURRENT", content="Original")
    await doc.save()

    # Two processes load the same document
    process_a = await Document.get(pk="DOC#CONCURRENT")
    process_b = await Document.get(pk="DOC#CONCURRENT")

    # Both have version 1
    print(process_a.version)  # 1
    print(process_b.version)  # 1

    # Process A updates first - succeeds
    process_a.content = "Updated by A"
    await process_a.save()
    print(process_a.version)  # 2

    # Process B tries to update - fails!
    process_b.content = "Updated by B"
    try:
        await process_b.save()
    except ConditionalCheckFailedException:
        print("Conflict! Someone else updated the document.")


asyncio.run(main())

Handling conflicts

When a save fails due to version mismatch, reload the item and retry:

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class Counter(Model):
    model_config = ModelConfig(table="counters")
    pk = StringAttribute(partition_key=True)
    count = NumberAttribute()
    version = VersionAttribute()


async def increment_with_retry(pk: str, max_retries: int = 3) -> Counter:
    """Increment counter with retry on conflict."""
    for attempt in range(max_retries):
        counter = await Counter.get(pk=pk)
        if counter is None:
            counter = Counter(pk=pk, count=0)

        # Increment
        counter.count = counter.count + 1

        try:
            await counter.save()
            return counter
        except ConditionalCheckFailedException:
            if attempt == max_retries - 1:
                raise
            print(f"Conflict on attempt {attempt + 1}, retrying...")

    raise RuntimeError("Should not reach here")


async def main():
    counter = await increment_with_retry("COUNTER#RETRY")
    print(f"Count: {counter.count}, Version: {counter.version}")


asyncio.run(main())

Get the current item without extra GET

Use return_values_on_condition_check_failure=True to get the current item directly from the exception. This saves a round trip:

from pydynox.pydynox_core import ConditionalCheckFailedException

try:
    client.update_item(
        "users",
        {"pk": "USER#123"},
        updates={"name": "Alice", "version": 2},
        condition_expression="#v = :expected",
        expression_attribute_names={"#v": "version"},
        expression_attribute_values={":expected": 1},
        return_values_on_condition_check_failure=True,
    )
except ConditionalCheckFailedException as e:
    # No extra GET needed
    current_version = e.item["version"]
    print(f"Version conflict! Current version is {current_version}")

Async with high concurrency

For async code with many concurrent operations, always use retry with backoff:

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class Counter(Model):
    model_config = ModelConfig(table="counters")
    pk = StringAttribute(partition_key=True)
    value = StringAttribute()
    version = VersionAttribute()


async def increment_with_retry(pk: str, max_retries: int = 5) -> Counter:
    """Increment counter with retry on conflict."""
    for attempt in range(max_retries):
        counter = await Counter.async_get(pk=pk)
        if counter is None:
            counter = Counter(pk=pk, value="0")

        counter.value = str(int(counter.value) + 1)

        try:
            await counter.async_save()
            return counter
        except ConditionalCheckFailedException:
            if attempt == max_retries - 1:
                raise
            # Small delay before retry
            await asyncio.sleep(0.01 * (attempt + 1))

    raise RuntimeError("Should not reach here")


async def main():
    # Create counter
    counter = Counter(pk="COUNTER#1", value="0")
    await counter.async_save()

    # Run 10 concurrent increments
    tasks = [increment_with_retry("COUNTER#1") for _ in range(10)]
    await asyncio.gather(*tasks)

    # Final value should be 10
    final = await Counter.async_get(pk="COUNTER#1")
    print(f"Final value: {final.value}")  # 10
    print(f"Final version: {final.version}")  # 11 (1 create + 10 updates)


if __name__ == "__main__":
    asyncio.run(main())

Delete with version check

Delete also checks the version. If someone else updated the item, delete fails:

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class Document(Model):
    model_config = ModelConfig(table="documents")
    pk = StringAttribute(partition_key=True)
    content = StringAttribute()
    version = VersionAttribute()


async def main():
    # Create and update document
    doc = Document(pk="DOC#DELETE", content="Hello")
    await doc.save()
    doc.content = "Updated"
    await doc.save()
    print(f"Version: {doc.version}")  # 2

    # Load stale copy
    stale = await Document.get(pk="DOC#DELETE")

    # Update again
    doc.content = "Updated again"
    await doc.save()
    print(f"Version: {doc.version}")  # 3

    # Try to delete with stale version - fails!
    try:
        await stale.delete()
    except ConditionalCheckFailedException:
        print("Can't delete - version mismatch")

    # Delete with current version - succeeds
    await doc.delete()
    print("Deleted successfully")


asyncio.run(main())

Combining with user conditions

You can add your own conditions. They're combined with the version check using AND:

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException


class Document(Model):
    model_config = ModelConfig(table="documents")
    pk = StringAttribute(partition_key=True)
    status = StringAttribute()
    content = StringAttribute()
    version = VersionAttribute()


async def main():
    # Create document
    doc = Document(pk="DOC#CONDITION", status="draft", content="Hello")
    await doc.save()

    # Update only if status is "draft"
    # This combines with version check: (status = "draft" AND version = 1)
    doc.content = "Updated content"
    await doc.save(condition=Document.status == "draft")
    print(f"Updated! Version: {doc.version}")  # 2

    # Change status
    doc.status = "published"
    await doc.save()
    print(f"Published! Version: {doc.version}")  # 3

    # Try to update draft-only - fails because status is "published"
    doc.content = "Another update"
    try:
        await doc.save(condition=Document.status == "draft")
    except ConditionalCheckFailedException:
        print("Can't update - not a draft")


asyncio.run(main())

When to use

Use case Examples Recommendation
Counters and balances Page views, account balances, inventory ✅ Use it
Documents with edits Wiki pages, configs, user profiles ✅ Use it
State machines Order status, workflow steps ✅ Use it
Shared resources Seat reservations, appointment slots ✅ Use it
High-frequency updates Hot keys, real-time counters ❌ Use transactions
Simple increments Like counts, view counts ❌ Use update() with add()
Single writer per item Background jobs, migrations ❌ Skip it

Things to know

Version increments before save. If save fails, your local object has a wrong version. Always reload after a failed save.

update() does not use versioning. Only save() and delete() check and increment the version. If you need atomic field updates with versioning, reload and save.

New items check for existence. Creating an item with VersionAttribute uses attribute_not_exists condition. Creating the same item twice fails.

Use retry with backoff. In high-concurrency scenarios, add exponential backoff between retries to reduce contention.