Observability
Know what's happening in your DynamoDB operations. pydynox gives you metrics on every call - duration, capacity consumed, items returned. No extra code needed.
Why observability matters
DynamoDB bills by capacity consumed. Without metrics, you're flying blind:
- Is that query using 1 RCU or 100?
- Why is this Lambda timing out?
- Which operation is eating all my capacity?
pydynox answers these questions automatically. Every operation returns metrics, and logs are built-in.
Key features
- Metrics on every operation (duration, RCU/WCU, item counts)
- Model-level metrics with class methods (no field conflicts)
- Automatic logging at INFO level
- Custom logger support (Powertools, structlog)
- Correlation ID for request tracing
- AWS SDK debug logs when you need them
Getting started
pydynox has two ways to access metrics:
- Client metrics - Direct access on client operations (low-level)
- Model metrics - Class methods on Model classes (high-level)
Client metrics
For low-level client operations, use client.get_last_metrics() and client.get_total_metrics().
Write operations also return metrics directly:
import asyncio
from pydynox import DynamoDBClient
async def main():
client = DynamoDBClient()
# put_item returns OperationMetrics directly
metrics = await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
print(metrics.duration_ms) # 8.2
print(metrics.consumed_wcu) # 1.0
# Same for delete_item and update_item
metrics = await client.delete_item("users", {"pk": "USER#1", "sk": "PROFILE"})
print(metrics.duration_ms)
asyncio.run(main())
Read operations store metrics for retrieval:
import asyncio
from pydynox import DynamoDBClient
async def main():
client = DynamoDBClient()
# get_item returns a plain dict
item = await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})
if item:
print(item["name"]) # Works like a normal dict
# Access metrics via client.get_last_metrics()
metrics = client.get_last_metrics()
print(metrics.duration_ms) # 12.1
print(metrics.consumed_rcu) # 0.5
asyncio.run(main())
Get total metrics across all operations:
import asyncio
from pydynox import DynamoDBClient
client = DynamoDBClient()
async def main():
# Do some operations
await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
await client.put_item("users", {"pk": "USER#2", "sk": "PROFILE", "name": "Jane"})
await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})
# Get total metrics
total = client.get_total_metrics()
print(total.total_rcu) # 0.5
print(total.total_wcu) # 2.0
print(total.operation_count) # 3
print(total.put_count) # 2
print(total.get_count) # 1
asyncio.run(main())
Model metrics
For Model operations, use class methods. This avoids conflicts with user fields named "metrics".
"""Model metrics example - using class methods."""
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
class User(Model):
model_config = ModelConfig(table="users")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
name = StringAttribute()
age = NumberAttribute()
async def main():
# Reset metrics before starting
User.reset_metrics()
# Do some operations
user = User(pk="USER#1", sk="PROFILE", name="John", age=30)
await user.save()
# Get metrics from last operation
last = User.get_last_metrics()
print(f"Save took {last.duration_ms}ms")
print(f"Consumed {last.consumed_wcu} WCU")
# Do more operations
await User.get(pk="USER#1", sk="PROFILE")
await User.get(pk="USER#2", sk="PROFILE")
# Get total metrics
total = User.get_total_metrics()
print(f"Total operations: {total.operation_count}")
print(f"Total RCU: {total.total_rcu}")
print(f"Total WCU: {total.total_wcu}")
print(f"Gets: {total.get_count}, Puts: {total.put_count}")
asyncio.run(main())
Each Model class has isolated metrics:
"""Each Model class has isolated metrics."""
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute
from pydynox.testing import MemoryBackend
class User(Model):
model_config = ModelConfig(table="users")
pk = StringAttribute(partition_key=True)
name = StringAttribute()
class Order(Model):
model_config = ModelConfig(table="orders")
pk = StringAttribute(partition_key=True)
total = StringAttribute()
async def main():
with MemoryBackend():
# Reset both
User.reset_metrics()
Order.reset_metrics()
# Operations on User
await User(pk="USER#1", name="John").save()
await User.get(pk="USER#1")
# Operations on Order
await Order(pk="ORDER#1", total="100").save()
# Metrics are isolated per class
print(f"User: {User.get_total_metrics().operation_count} ops") # 2
print(f"Order: {Order.get_total_metrics().operation_count} ops") # 1
asyncio.run(main())
Reset metrics per request
In long-running processes (FastAPI, Flask), metrics accumulate forever. Reset at the start of each request:
"""Reset metrics per request - important for long-running processes."""
import asyncio
from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute
class Order(Model):
model_config = ModelConfig(table="orders")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
status = StringAttribute()
async def handle_request(order_id: str):
"""Handle a single request - reset metrics at start."""
# Reset at start of each request
Order.reset_metrics()
# Do operations
order = await Order.get(pk=f"ORDER#{order_id}", sk="DETAILS")
if order:
order.status = "processed"
await order.save()
# Log metrics for this request only
total = Order.get_total_metrics()
print(f"Request metrics: {total.total_rcu} RCU, {total.total_wcu} WCU")
# In FastAPI/Flask, call reset_metrics() at start of each request
# Otherwise metrics accumulate forever
asyncio.run(handle_request("123"))
"""Reset metrics per request - important for long-running processes."""
import asyncio
from pydynox import DynamoDBClient
client = DynamoDBClient()
async def handle_request(user_id: str) -> dict:
# Reset at start of each request
client.reset_metrics()
# Do operations
await client.put_item("users", {"pk": user_id, "sk": "PROFILE", "name": "John"})
item = await client.get_item("users", {"pk": user_id, "sk": "PROFILE"})
# Check total for this request
total = client.get_total_metrics()
print(f"Request used {total.total_rcu} RCU, {total.total_wcu} WCU")
return item or {}
asyncio.run(handle_request("USER#1"))
What's in metrics
| Field | Type | Description |
|---|---|---|
duration_ms |
float | How long the operation took |
consumed_rcu |
float or None | Read capacity units used |
consumed_wcu |
float or None | Write capacity units used |
items_count |
int or None | Items returned (query/scan) |
scanned_count |
int or None | Items scanned before filtering |
request_id |
str or None | AWS request ID for support tickets |
Model total metrics
get_total_metrics() returns aggregated metrics:
| Field | Type | Description |
|---|---|---|
total_rcu |
float | Total RCU consumed |
total_wcu |
float | Total WCU consumed |
total_duration_ms |
float | Total time spent |
operation_count |
int | Total operations |
get_count |
int | Number of get operations |
put_count |
int | Number of put operations |
delete_count |
int | Number of delete operations |
update_count |
int | Number of update operations |
query_count |
int | Number of query operations |
scan_count |
int | Number of scan operations |
For KMS and S3 metrics, see Operations metrics.
Automatic logging
pydynox logs every operation at INFO level. Just configure Python logging:
import asyncio
import logging
from pydynox import DynamoDBClient
# Enable INFO level logs for pydynox
logging.basicConfig(level=logging.INFO)
client = DynamoDBClient()
async def main():
# All operations are logged automatically
await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
# INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0
await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})
# INFO:pydynox:get_item table=users duration_ms=12.1 rcu=0.5
asyncio.run(main())
Output:
INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0
INFO:pydynox:get_item table=users duration_ms=12.1 rcu=0.5
INFO:pydynox:query table=users duration_ms=45.2 rcu=2.5 items=10
Slow operations (>100ms) get a warning:
Disable logging
If you don't want pydynox logs at all:
import asyncio
import logging
from pydynox import DynamoDBClient
# Disable pydynox logs completely
logging.getLogger("pydynox").setLevel(logging.CRITICAL)
client = DynamoDBClient()
async def main():
# No logs will be emitted
await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})
asyncio.run(main())
Advanced
Custom logger
Send pydynox logs to your own logger with set_logger():
import asyncio
from aws_lambda_powertools import Logger
from pydynox import DynamoDBClient, set_logger
# With AWS Lambda Powertools
logger = Logger()
set_logger(logger)
async def main():
# Now all pydynox logs go through Powertools
client = DynamoDBClient()
await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
asyncio.run(main())
Works with any logger that has debug, info, warning, error methods. Great for AWS Lambda Powertools or structlog.
Correlation ID
Track requests across your logs with set_correlation_id():
"""Correlation ID example for Lambda."""
import asyncio
from pydynox import DynamoDBClient, set_correlation_id
client = DynamoDBClient()
async def handler_async(event: dict, request_id: str) -> dict:
# Set correlation ID from Lambda context
set_correlation_id(request_id)
# All pydynox logs will include this ID
await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
# INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0 correlation_id=abc-123
return {"statusCode": 200}
asyncio.run(handler_async({}, "abc-123"))
All pydynox logs will include the correlation ID. Useful in Lambda where you want to trace a request through multiple DynamoDB calls.
SDK debug logs
For deep debugging, enable AWS SDK logs:
"""SDK debug logging example."""
import asyncio
import logging
from pydynox import DynamoDBClient, set_logger
# Create a logger
logger = logging.getLogger("pydynox")
logger.setLevel(logging.DEBUG)
# Enable SDK debug logs
set_logger(logger, sdk_debug=True)
async def main():
# Now you'll see detailed AWS SDK logs
client = DynamoDBClient()
await client.get_item("users", {"pk": "USER#1"})
asyncio.run(main())
Or via environment variable:
# Basic SDK logs
RUST_LOG=aws_sdk_dynamodb=debug python app.py
# Full detail (HTTP bodies, retries, credentials)
RUST_LOG=aws_sdk_dynamodb=trace,aws_smithy_runtime=trace python app.py
Warning
SDK debug logs are verbose. Only enable when debugging specific issues.
Log levels
| Level | What's logged |
|---|---|
| ERROR | Exceptions, failed operations |
| WARNING | Slow queries (>100ms) |
| INFO | Operation summary (table, duration, rcu/wcu) |
| DEBUG | Detailed request/response info |
Use cases
Cost monitoring
Track capacity consumption per operation:
# For Model operations
user = User.get(pk="USER#123", sk="PROFILE")
last = User.get_last_metrics()
if last:
print(f"This read cost {last.consumed_rcu} RCU")
# For client operations
client.get_item("users", {"pk": "USER#123"})
last = client.get_last_metrics()
if last:
print(f"This read cost {last.consumed_rcu} RCU")
Performance debugging
Find slow operations:
for order in Order.query(partition_key="CUSTOMER#123"):
print(order.total)
last = Order.get_last_metrics()
if last and last.duration_ms > 100:
logger.warning(f"Slow query: {last.duration_ms}ms")
Lambda optimization
In Lambda, every millisecond counts:
def handler(event, context):
set_correlation_id(context.aws_request_id)
user = User.get(pk=event["user_id"])
# Logs include request ID for tracing
return {"statusCode": 200}
OpenTelemetry tracing
pydynox supports OpenTelemetry for distributed tracing. When enabled, every DynamoDB operation creates a span with useful attributes.
Installation
Install the optional dependency:
Basic usage
"""Basic OpenTelemetry tracing example."""
import asyncio
from pydynox import DynamoDBClient, Model, ModelConfig, enable_tracing
from pydynox.attributes import StringAttribute
# Enable tracing - uses global OTEL tracer
enable_tracing()
client = DynamoDBClient(region="us-east-1")
class User(Model):
model_config = ModelConfig(table="users", client=client)
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
name = StringAttribute()
async def main():
# All operations now create spans automatically
user = User(pk="USER#123", sk="PROFILE", name="John")
await user.save() # Span: "PutItem users"
await User.get(pk="USER#123", sk="PROFILE") # Span: "GetItem users"
asyncio.run(main())
Span attributes
Each span follows OTEL Database Semantic Conventions:
| Attribute | Example | Description |
|---|---|---|
db.system.name |
aws.dynamodb |
Database system |
db.operation.name |
PutItem |
DynamoDB operation |
db.collection.name |
users |
Table name |
db.namespace |
us-east-1 |
AWS region |
server.address |
dynamodb.us-east-1.amazonaws.com |
Endpoint |
aws.dynamodb.consumed_capacity.read |
1.0 |
RCU consumed |
aws.dynamodb.consumed_capacity.write |
1.0 |
WCU consumed |
aws.request_id |
ABC123... |
AWS request ID |
error.type |
ConditionalCheckFailedException |
Error class (if failed) |
Custom configuration
"""Custom tracer and configuration example."""
from opentelemetry import trace
from pydynox import enable_tracing
# Use a custom tracer
tracer = trace.get_tracer("my-service", "1.0.0")
enable_tracing(
tracer=tracer,
record_exceptions=True, # Add exception events to spans
record_consumed_capacity=True, # Add RCU/WCU as attributes
span_name_prefix="myapp", # Spans become: "myapp PutItem users"
)
| Parameter | Type | Default | Description |
|---|---|---|---|
tracer |
Tracer | None | Custom OTEL tracer |
record_exceptions |
bool | True | Add exception events to spans |
record_consumed_capacity |
bool | True | Add RCU/WCU as attributes |
span_name_prefix |
str | None | Prefix for span names |
Disable tracing
Span naming
Span names follow OTEL conventions:
- Single operation:
"PutItem users" - Batch operation:
"BATCH BatchWriteItem users" - With prefix:
"myapp PutItem users"
Context propagation
pydynox spans automatically connect to the current active span. This means if you create a parent span in your code, all DynamoDB operations inside it become child spans.
from opentelemetry import trace
from pydynox import enable_tracing, Model
enable_tracing()
tracer = trace.get_tracer("my-service")
# In a Lambda handler or HTTP request
with tracer.start_as_current_span("handle_request"):
user = User.get(pk="USER#123") # Child span: "GetItem users"
user.name = "Updated"
user.save() # Child span: "PutItem users"
All spans share the same trace_id, so you can see the full request flow in your tracing backend (Jaeger, X-Ray, etc.).
Logs with trace context
When tracing is enabled, pydynox logs automatically include trace_id and span_id. This helps correlate logs with spans in your tracing backend.
from pydynox import enable_tracing
enable_tracing()
# Logs now include trace context
user.save()
# INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0 trace_id=abc123... span_id=def456...
With a custom logger like AWS Lambda Powertools:
from aws_lambda_powertools import Logger
from pydynox import enable_tracing, set_logger
logger = Logger()
set_logger(logger)
enable_tracing()
# Powertools logs include trace_id and span_id as structured fields
user.save()
Next steps
- Async support - Async/await for high-concurrency apps
- Rate limiting - Control throughput
- Exceptions - Error handling