Skip to content

Observability

Know what's happening in your DynamoDB operations. pydynox gives you metrics on every call - duration, capacity consumed, items returned. No extra code needed.

Why observability matters

DynamoDB bills by capacity consumed. Without metrics, you're flying blind:

  • Is that query using 1 RCU or 100?
  • Why is this Lambda timing out?
  • Which operation is eating all my capacity?

pydynox answers these questions automatically. Every operation returns metrics, and logs are built-in.

Key features

  • Metrics on every operation (duration, RCU/WCU, item counts)
  • Model-level metrics with class methods (no field conflicts)
  • Automatic logging at INFO level
  • Custom logger support (Powertools, structlog)
  • Correlation ID for request tracing
  • AWS SDK debug logs when you need them

Getting started

pydynox has two ways to access metrics:

  1. Client metrics - Direct access on client operations (low-level)
  2. Model metrics - Class methods on Model classes (high-level)

Client metrics

For low-level client operations, use client.get_last_metrics() and client.get_total_metrics().

Write operations also return metrics directly:

import asyncio

from pydynox import DynamoDBClient


async def main():
    client = DynamoDBClient()

    # put_item returns OperationMetrics directly
    metrics = await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})

    print(metrics.duration_ms)  # 8.2
    print(metrics.consumed_wcu)  # 1.0

    # Same for delete_item and update_item
    metrics = await client.delete_item("users", {"pk": "USER#1", "sk": "PROFILE"})
    print(metrics.duration_ms)


asyncio.run(main())

Read operations store metrics for retrieval:

import asyncio

from pydynox import DynamoDBClient


async def main():
    client = DynamoDBClient()

    # get_item returns a plain dict
    item = await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})

    if item:
        print(item["name"])  # Works like a normal dict

    # Access metrics via client.get_last_metrics()
    metrics = client.get_last_metrics()
    print(metrics.duration_ms)  # 12.1
    print(metrics.consumed_rcu)  # 0.5


asyncio.run(main())

Get total metrics across all operations:

import asyncio

from pydynox import DynamoDBClient

client = DynamoDBClient()


async def main():
    # Do some operations
    await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
    await client.put_item("users", {"pk": "USER#2", "sk": "PROFILE", "name": "Jane"})
    await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})

    # Get total metrics
    total = client.get_total_metrics()
    print(total.total_rcu)  # 0.5
    print(total.total_wcu)  # 2.0
    print(total.operation_count)  # 3
    print(total.put_count)  # 2
    print(total.get_count)  # 1


asyncio.run(main())

Model metrics

For Model operations, use class methods. This avoids conflicts with user fields named "metrics".

"""Model metrics example - using class methods."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute


class User(Model):
    model_config = ModelConfig(table="users")
    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    name = StringAttribute()
    age = NumberAttribute()


async def main():
    # Reset metrics before starting
    User.reset_metrics()

    # Do some operations
    user = User(pk="USER#1", sk="PROFILE", name="John", age=30)
    await user.save()

    # Get metrics from last operation
    last = User.get_last_metrics()
    print(f"Save took {last.duration_ms}ms")
    print(f"Consumed {last.consumed_wcu} WCU")

    # Do more operations
    await User.get(pk="USER#1", sk="PROFILE")
    await User.get(pk="USER#2", sk="PROFILE")

    # Get total metrics
    total = User.get_total_metrics()
    print(f"Total operations: {total.operation_count}")
    print(f"Total RCU: {total.total_rcu}")
    print(f"Total WCU: {total.total_wcu}")
    print(f"Gets: {total.get_count}, Puts: {total.put_count}")


asyncio.run(main())

Each Model class has isolated metrics:

"""Each Model class has isolated metrics."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute
from pydynox.testing import MemoryBackend


class User(Model):
    model_config = ModelConfig(table="users")
    pk = StringAttribute(partition_key=True)
    name = StringAttribute()


class Order(Model):
    model_config = ModelConfig(table="orders")
    pk = StringAttribute(partition_key=True)
    total = StringAttribute()


async def main():
    with MemoryBackend():
        # Reset both
        User.reset_metrics()
        Order.reset_metrics()

        # Operations on User
        await User(pk="USER#1", name="John").save()
        await User.get(pk="USER#1")

        # Operations on Order
        await Order(pk="ORDER#1", total="100").save()

        # Metrics are isolated per class
        print(f"User: {User.get_total_metrics().operation_count} ops")  # 2
        print(f"Order: {Order.get_total_metrics().operation_count} ops")  # 1


asyncio.run(main())

Reset metrics per request

In long-running processes (FastAPI, Flask), metrics accumulate forever. Reset at the start of each request:

"""Reset metrics per request - important for long-running processes."""

import asyncio

from pydynox import Model, ModelConfig
from pydynox.attributes import StringAttribute


class Order(Model):
    model_config = ModelConfig(table="orders")
    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    status = StringAttribute()


async def handle_request(order_id: str):
    """Handle a single request - reset metrics at start."""
    # Reset at start of each request
    Order.reset_metrics()

    # Do operations
    order = await Order.get(pk=f"ORDER#{order_id}", sk="DETAILS")
    if order:
        order.status = "processed"
        await order.save()

    # Log metrics for this request only
    total = Order.get_total_metrics()
    print(f"Request metrics: {total.total_rcu} RCU, {total.total_wcu} WCU")


# In FastAPI/Flask, call reset_metrics() at start of each request
# Otherwise metrics accumulate forever

asyncio.run(handle_request("123"))
"""Reset metrics per request - important for long-running processes."""

import asyncio

from pydynox import DynamoDBClient

client = DynamoDBClient()


async def handle_request(user_id: str) -> dict:
    # Reset at start of each request
    client.reset_metrics()

    # Do operations
    await client.put_item("users", {"pk": user_id, "sk": "PROFILE", "name": "John"})
    item = await client.get_item("users", {"pk": user_id, "sk": "PROFILE"})

    # Check total for this request
    total = client.get_total_metrics()
    print(f"Request used {total.total_rcu} RCU, {total.total_wcu} WCU")

    return item or {}


asyncio.run(handle_request("USER#1"))

What's in metrics

Field Type Description
duration_ms float How long the operation took
consumed_rcu float or None Read capacity units used
consumed_wcu float or None Write capacity units used
items_count int or None Items returned (query/scan)
scanned_count int or None Items scanned before filtering
request_id str or None AWS request ID for support tickets

Model total metrics

get_total_metrics() returns aggregated metrics:

Field Type Description
total_rcu float Total RCU consumed
total_wcu float Total WCU consumed
total_duration_ms float Total time spent
operation_count int Total operations
get_count int Number of get operations
put_count int Number of put operations
delete_count int Number of delete operations
update_count int Number of update operations
query_count int Number of query operations
scan_count int Number of scan operations

For KMS and S3 metrics, see Operations metrics.

Automatic logging

pydynox logs every operation at INFO level. Just configure Python logging:

import asyncio
import logging

from pydynox import DynamoDBClient

# Enable INFO level logs for pydynox
logging.basicConfig(level=logging.INFO)

client = DynamoDBClient()


async def main():
    # All operations are logged automatically
    await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
    # INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0

    await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})
    # INFO:pydynox:get_item table=users duration_ms=12.1 rcu=0.5


asyncio.run(main())

Output:

INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0
INFO:pydynox:get_item table=users duration_ms=12.1 rcu=0.5
INFO:pydynox:query table=users duration_ms=45.2 rcu=2.5 items=10

Slow operations (>100ms) get a warning:

WARNING:pydynox:query slow operation (150.3ms)

Disable logging

If you don't want pydynox logs at all:

import asyncio
import logging

from pydynox import DynamoDBClient

# Disable pydynox logs completely
logging.getLogger("pydynox").setLevel(logging.CRITICAL)

client = DynamoDBClient()


async def main():
    # No logs will be emitted
    await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
    await client.get_item("users", {"pk": "USER#1", "sk": "PROFILE"})


asyncio.run(main())

Advanced

Custom logger

Send pydynox logs to your own logger with set_logger():

import asyncio

from aws_lambda_powertools import Logger
from pydynox import DynamoDBClient, set_logger

# With AWS Lambda Powertools
logger = Logger()
set_logger(logger)


async def main():
    # Now all pydynox logs go through Powertools
    client = DynamoDBClient()
    await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})


asyncio.run(main())

Works with any logger that has debug, info, warning, error methods. Great for AWS Lambda Powertools or structlog.

Correlation ID

Track requests across your logs with set_correlation_id():

"""Correlation ID example for Lambda."""

import asyncio

from pydynox import DynamoDBClient, set_correlation_id

client = DynamoDBClient()


async def handler_async(event: dict, request_id: str) -> dict:
    # Set correlation ID from Lambda context
    set_correlation_id(request_id)

    # All pydynox logs will include this ID
    await client.put_item("users", {"pk": "USER#1", "sk": "PROFILE", "name": "John"})
    # INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0 correlation_id=abc-123

    return {"statusCode": 200}


asyncio.run(handler_async({}, "abc-123"))

All pydynox logs will include the correlation ID. Useful in Lambda where you want to trace a request through multiple DynamoDB calls.

SDK debug logs

For deep debugging, enable AWS SDK logs:

"""SDK debug logging example."""

import asyncio
import logging

from pydynox import DynamoDBClient, set_logger

# Create a logger
logger = logging.getLogger("pydynox")
logger.setLevel(logging.DEBUG)

# Enable SDK debug logs
set_logger(logger, sdk_debug=True)


async def main():
    # Now you'll see detailed AWS SDK logs
    client = DynamoDBClient()
    await client.get_item("users", {"pk": "USER#1"})


asyncio.run(main())

Or via environment variable:

# Basic SDK logs
RUST_LOG=aws_sdk_dynamodb=debug python app.py

# Full detail (HTTP bodies, retries, credentials)
RUST_LOG=aws_sdk_dynamodb=trace,aws_smithy_runtime=trace python app.py

Warning

SDK debug logs are verbose. Only enable when debugging specific issues.

Log levels

Level What's logged
ERROR Exceptions, failed operations
WARNING Slow queries (>100ms)
INFO Operation summary (table, duration, rcu/wcu)
DEBUG Detailed request/response info

Use cases

Cost monitoring

Track capacity consumption per operation:

# For Model operations
user = User.get(pk="USER#123", sk="PROFILE")
last = User.get_last_metrics()
if last:
    print(f"This read cost {last.consumed_rcu} RCU")

# For client operations
client.get_item("users", {"pk": "USER#123"})
last = client.get_last_metrics()
if last:
    print(f"This read cost {last.consumed_rcu} RCU")

Performance debugging

Find slow operations:

for order in Order.query(partition_key="CUSTOMER#123"):
    print(order.total)

last = Order.get_last_metrics()
if last and last.duration_ms > 100:
    logger.warning(f"Slow query: {last.duration_ms}ms")

Lambda optimization

In Lambda, every millisecond counts:

def handler(event, context):
    set_correlation_id(context.aws_request_id)

    user = User.get(pk=event["user_id"])
    # Logs include request ID for tracing

    return {"statusCode": 200}

OpenTelemetry tracing

pydynox supports OpenTelemetry for distributed tracing. When enabled, every DynamoDB operation creates a span with useful attributes.

Installation

Install the optional dependency:

pip install pydynox[opentelemetry]

Basic usage

"""Basic OpenTelemetry tracing example."""

import asyncio

from pydynox import DynamoDBClient, Model, ModelConfig, enable_tracing
from pydynox.attributes import StringAttribute

# Enable tracing - uses global OTEL tracer
enable_tracing()

client = DynamoDBClient(region="us-east-1")


class User(Model):
    model_config = ModelConfig(table="users", client=client)
    pk = StringAttribute(partition_key=True)
    sk = StringAttribute(sort_key=True)
    name = StringAttribute()


async def main():
    # All operations now create spans automatically
    user = User(pk="USER#123", sk="PROFILE", name="John")
    await user.save()  # Span: "PutItem users"

    await User.get(pk="USER#123", sk="PROFILE")  # Span: "GetItem users"


asyncio.run(main())

Span attributes

Each span follows OTEL Database Semantic Conventions:

Attribute Example Description
db.system.name aws.dynamodb Database system
db.operation.name PutItem DynamoDB operation
db.collection.name users Table name
db.namespace us-east-1 AWS region
server.address dynamodb.us-east-1.amazonaws.com Endpoint
aws.dynamodb.consumed_capacity.read 1.0 RCU consumed
aws.dynamodb.consumed_capacity.write 1.0 WCU consumed
aws.request_id ABC123... AWS request ID
error.type ConditionalCheckFailedException Error class (if failed)

Custom configuration

"""Custom tracer and configuration example."""

from opentelemetry import trace
from pydynox import enable_tracing

# Use a custom tracer
tracer = trace.get_tracer("my-service", "1.0.0")

enable_tracing(
    tracer=tracer,
    record_exceptions=True,  # Add exception events to spans
    record_consumed_capacity=True,  # Add RCU/WCU as attributes
    span_name_prefix="myapp",  # Spans become: "myapp PutItem users"
)
Parameter Type Default Description
tracer Tracer None Custom OTEL tracer
record_exceptions bool True Add exception events to spans
record_consumed_capacity bool True Add RCU/WCU as attributes
span_name_prefix str None Prefix for span names

Disable tracing

"""Disable tracing example."""

from pydynox import disable_tracing, enable_tracing

# Enable tracing
enable_tracing()

# ... do some operations with tracing ...

# Disable when no longer needed
disable_tracing()

Span naming

Span names follow OTEL conventions:

  • Single operation: "PutItem users"
  • Batch operation: "BATCH BatchWriteItem users"
  • With prefix: "myapp PutItem users"

Context propagation

pydynox spans automatically connect to the current active span. This means if you create a parent span in your code, all DynamoDB operations inside it become child spans.

from opentelemetry import trace
from pydynox import enable_tracing, Model

enable_tracing()
tracer = trace.get_tracer("my-service")

# In a Lambda handler or HTTP request
with tracer.start_as_current_span("handle_request"):
    user = User.get(pk="USER#123")  # Child span: "GetItem users"
    user.name = "Updated"
    user.save()                      # Child span: "PutItem users"

All spans share the same trace_id, so you can see the full request flow in your tracing backend (Jaeger, X-Ray, etc.).

Logs with trace context

When tracing is enabled, pydynox logs automatically include trace_id and span_id. This helps correlate logs with spans in your tracing backend.

from pydynox import enable_tracing

enable_tracing()

# Logs now include trace context
user.save()
# INFO:pydynox:put_item table=users duration_ms=8.2 wcu=1.0 trace_id=abc123... span_id=def456...

With a custom logger like AWS Lambda Powertools:

from aws_lambda_powertools import Logger
from pydynox import enable_tracing, set_logger

logger = Logger()
set_logger(logger)
enable_tracing()

# Powertools logs include trace_id and span_id as structured fields
user.save()

Next steps