Optimistic locking
Optimistic locking prevents concurrent updates from overwriting each other. When two processes try to update the same item, the second one fails instead of silently overwriting the first.
How it works
Add a VersionAttribute to your model. pydynox handles the rest:
- First save sets version to 1
- Each save increments version by 1
- Before saving, pydynox checks that the version in DynamoDB matches the local version
- If versions don't match, save fails with
ConditionalCheckFailedException
Basic usage
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(partition_key=True)
content = StringAttribute()
version = VersionAttribute()
async def main():
# Create new document
doc = Document(pk="DOC#VERSION", content="Hello")
print(doc.version) # None
await doc.save()
print(doc.version) # 1
# Update document
doc.content = "Hello World"
await doc.save()
print(doc.version) # 2
# Load from DB - version is preserved
loaded = await Document.get(pk="DOC#VERSION")
print(loaded.version) # 2
asyncio.run(main())
Concurrent updates
When two processes load the same item and try to update it:
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(partition_key=True)
content = StringAttribute()
version = VersionAttribute()
async def main():
# Create document
doc = Document(pk="DOC#CONCURRENT", content="Original")
await doc.save()
# Two processes load the same document
process_a = await Document.get(pk="DOC#CONCURRENT")
process_b = await Document.get(pk="DOC#CONCURRENT")
# Both have version 1
print(process_a.version) # 1
print(process_b.version) # 1
# Process A updates first - succeeds
process_a.content = "Updated by A"
await process_a.save()
print(process_a.version) # 2
# Process B tries to update - fails!
process_b.content = "Updated by B"
try:
await process_b.save()
except ConditionalCheckFailedException:
print("Conflict! Someone else updated the document.")
asyncio.run(main())
Handling conflicts
When a save fails due to version mismatch, reload the item and retry:
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException
class Counter(Model):
model_config = ModelConfig(table="counters")
pk = StringAttribute(partition_key=True)
count = NumberAttribute()
version = VersionAttribute()
async def increment_with_retry(pk: str, max_retries: int = 3) -> Counter:
"""Increment counter with retry on conflict."""
for attempt in range(max_retries):
counter = await Counter.get(pk=pk)
if counter is None:
counter = Counter(pk=pk, count=0)
# Increment
counter.count = counter.count + 1
try:
await counter.save()
return counter
except ConditionalCheckFailedException:
if attempt == max_retries - 1:
raise
print(f"Conflict on attempt {attempt + 1}, retrying...")
raise RuntimeError("Should not reach here")
async def main():
counter = await increment_with_retry("COUNTER#RETRY")
print(f"Count: {counter.count}, Version: {counter.version}")
asyncio.run(main())
Get the current item without extra GET
Use return_values_on_condition_check_failure=True to get the current item directly from the exception. This saves a round trip:
from pydynox.pydynox_core import ConditionalCheckFailedException
try:
client.update_item(
"users",
{"pk": "USER#123"},
updates={"name": "Alice", "version": 2},
condition_expression="#v = :expected",
expression_attribute_names={"#v": "version"},
expression_attribute_values={":expected": 1},
return_values_on_condition_check_failure=True,
)
except ConditionalCheckFailedException as e:
# No extra GET needed
current_version = e.item["version"]
print(f"Version conflict! Current version is {current_version}")
Async with high concurrency
For async code with many concurrent operations, always use retry with backoff:
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException
class Counter(Model):
model_config = ModelConfig(table="counters")
pk = StringAttribute(partition_key=True)
value = StringAttribute()
version = VersionAttribute()
async def increment_with_retry(pk: str, max_retries: int = 5) -> Counter:
"""Increment counter with retry on conflict."""
for attempt in range(max_retries):
counter = await Counter.async_get(pk=pk)
if counter is None:
counter = Counter(pk=pk, value="0")
counter.value = str(int(counter.value) + 1)
try:
await counter.async_save()
return counter
except ConditionalCheckFailedException:
if attempt == max_retries - 1:
raise
# Small delay before retry
await asyncio.sleep(0.01 * (attempt + 1))
raise RuntimeError("Should not reach here")
async def main():
# Create counter
counter = Counter(pk="COUNTER#1", value="0")
await counter.async_save()
# Run 10 concurrent increments
tasks = [increment_with_retry("COUNTER#1") for _ in range(10)]
await asyncio.gather(*tasks)
# Final value should be 10
final = await Counter.async_get(pk="COUNTER#1")
print(f"Final value: {final.value}") # 10
print(f"Final version: {final.version}") # 11 (1 create + 10 updates)
if __name__ == "__main__":
asyncio.run(main())
Delete with version check
Delete also checks the version. If someone else updated the item, delete fails:
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(partition_key=True)
content = StringAttribute()
version = VersionAttribute()
async def main():
# Create and update document
doc = Document(pk="DOC#DELETE", content="Hello")
await doc.save()
doc.content = "Updated"
await doc.save()
print(f"Version: {doc.version}") # 2
# Load stale copy
stale = await Document.get(pk="DOC#DELETE")
# Update again
doc.content = "Updated again"
await doc.save()
print(f"Version: {doc.version}") # 3
# Try to delete with stale version - fails!
try:
await stale.delete()
except ConditionalCheckFailedException:
print("Can't delete - version mismatch")
# Delete with current version - succeeds
await doc.delete()
print("Deleted successfully")
asyncio.run(main())
Combining with user conditions
You can add your own conditions. They're combined with the version check using AND:
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute, VersionAttribute
from pydynox.exceptions import ConditionalCheckFailedException
class Document(Model):
model_config = ModelConfig(table="documents")
pk = StringAttribute(partition_key=True)
status = StringAttribute()
content = StringAttribute()
version = VersionAttribute()
async def main():
# Create document
doc = Document(pk="DOC#CONDITION", status="draft", content="Hello")
await doc.save()
# Update only if status is "draft"
# This combines with version check: (status = "draft" AND version = 1)
doc.content = "Updated content"
await doc.save(condition=Document.status == "draft")
print(f"Updated! Version: {doc.version}") # 2
# Change status
doc.status = "published"
await doc.save()
print(f"Published! Version: {doc.version}") # 3
# Try to update draft-only - fails because status is "published"
doc.content = "Another update"
try:
await doc.save(condition=Document.status == "draft")
except ConditionalCheckFailedException:
print("Can't update - not a draft")
asyncio.run(main())
When to use
| Use case | Examples | Recommendation |
|---|---|---|
| Counters and balances | Page views, account balances, inventory | ✅ Use it |
| Documents with edits | Wiki pages, configs, user profiles | ✅ Use it |
| State machines | Order status, workflow steps | ✅ Use it |
| Shared resources | Seat reservations, appointment slots | ✅ Use it |
| High-frequency updates | Hot keys, real-time counters | ❌ Use transactions |
| Simple increments | Like counts, view counts | ❌ Use update() with add() |
| Single writer per item | Background jobs, migrations | ❌ Skip it |
Things to know
Version increments before save. If save fails, your local object has a wrong version. Always reload after a failed save.
update() does not use versioning. Only save() and delete() check and increment the version. If you need atomic field updates with versioning, reload and save.
New items check for existence. Creating an item with VersionAttribute uses attribute_not_exists condition. Creating the same item twice fails.
Use retry with backoff. In high-concurrency scenarios, add exponential backoff between retries to reduce contention.