Query
Query items from DynamoDB using typed conditions. Returns model instances with full type hints.
Tip
For large result sets, you might want to use as_dict=True. See as_dict.
Key features
- Type-safe queries with model attributes
- Range key conditions (begins_with, between, comparisons)
- Filter conditions on any attribute
- Automatic pagination
- Ascending/descending sort order
- Async support
Getting started
pydynox uses an async-first API. Methods without prefix are async (default), methods with sync_ prefix are sync.
Basic query
Use Model.query() to fetch items by hash key:
"""Basic query examples (async - default)."""
import asyncio
from pydynox import Model, ModelConfig, get_default_client
from pydynox.attributes import NumberAttribute, StringAttribute
client = get_default_client()
class Order(Model):
model_config = ModelConfig(table="orders")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
total = NumberAttribute()
status = StringAttribute()
async def main():
# Setup: create table and data
if not await client.table_exists("orders"):
await client.create_table(
"orders",
partition_key=("pk", "S"),
sort_key=("sk", "S"),
)
# Create some orders
for i in range(3):
await Order(
pk="CUSTOMER#123",
sk=f"ORDER#{i:03d}",
total=100 + i * 50,
status="pending",
).save()
# Query all orders for a customer
async for order in Order.query(partition_key="CUSTOMER#123"):
print(f"Order: {order.sk}, Total: {order.total}")
# Get first result only
first_order = await Order.query(partition_key="CUSTOMER#123").first()
if first_order:
print(f"First order: {first_order.sk}")
# Collect all results into a list
orders = [order async for order in Order.query(partition_key="CUSTOMER#123")]
print(f"Found {len(orders)} orders")
asyncio.run(main())
"""Basic query examples (sync - use sync_ prefix)."""
from pydynox import Model, ModelConfig, get_default_client
from pydynox.attributes import NumberAttribute, StringAttribute
client = get_default_client()
class Order(Model):
model_config = ModelConfig(table="orders")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
total = NumberAttribute()
status = StringAttribute()
def main():
# Setup: create table and data
if not client.sync_table_exists("orders"):
client.sync_create_table(
"orders",
partition_key=("pk", "S"),
sort_key=("sk", "S"),
)
# Create some orders
for i in range(3):
Order(
pk="CUSTOMER#123",
sk=f"ORDER#{i:03d}",
total=100 + i * 50,
status="pending",
).sync_save()
# Query all orders for a customer
for order in Order.sync_query(partition_key="CUSTOMER#123"):
print(f"Order: {order.sk}, Total: {order.total}")
# Get first result only
first_order = Order.sync_query(partition_key="CUSTOMER#123").first()
if first_order:
print(f"First order: {first_order.sk}")
# Collect all results into a list
orders = list(Order.sync_query(partition_key="CUSTOMER#123"))
print(f"Found {len(orders)} orders")
main()
The query returns a result that you can:
- Iterate with
async for(async) orfor(sync) - Get first result with
await .first()(async) or.first()(sync) - Collect all with
[x async for x in ...](async) orlist()(sync)
Range key conditions
Filter by sort key using attribute conditions:
Available range key conditions:
| Condition | Example | Description |
|---|---|---|
begins_with |
Order.sk.begins_with("ORDER#") |
Sort key starts with prefix |
between |
Order.sk.between("A", "Z") |
Sort key in range |
= |
Order.sk == "ORDER#001" |
Exact match |
< |
Order.sk < "ORDER#100" |
Less than |
<= |
Order.sk <= "ORDER#100" |
Less than or equal |
> |
Order.sk > "ORDER#001" |
Greater than |
>= |
Order.sk >= "ORDER#001" |
Greater than or equal |
Tip
Range key conditions are efficient. DynamoDB uses them to limit the items it reads.
Filter conditions
Filter results by any attribute:
"""Filter condition examples (async - default)."""
import asyncio
from pydynox import Model, ModelConfig, get_default_client
from pydynox.attributes import NumberAttribute, StringAttribute
client = get_default_client()
CUSTOMER_PK = "CUSTOMER#123"
class Order(Model):
model_config = ModelConfig(table="orders_filter")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
total = NumberAttribute()
status = StringAttribute()
async def main():
# Setup: create table and data
if not await client.table_exists("orders_filter"):
await client.create_table(
"orders_filter",
partition_key=("pk", "S"),
sort_key=("sk", "S"),
)
# Create orders with different statuses and totals
statuses = ["pending", "shipped", "delivered", "shipped", "pending"]
for i, status in enumerate(statuses):
await Order(
pk=CUSTOMER_PK,
sk=f"ORDER#{i:03d}",
total=50 + i * 30,
status=status,
).save()
# Filter by status
async for order in Order.query(
partition_key=CUSTOMER_PK,
filter_condition=Order.status == "shipped",
):
print(f"Shipped order: {order.sk}")
# Filter by total amount
async for order in Order.query(
partition_key=CUSTOMER_PK,
filter_condition=Order.total >= 100,
):
print(f"Large order: {order.sk}, Total: {order.total}")
# Combine multiple filters with & (AND)
async for order in Order.query(
partition_key=CUSTOMER_PK,
filter_condition=(Order.status == "shipped") & (Order.total > 50),
):
print(f"Shipped large order: {order.sk}")
# Combine filters with | (OR)
async for order in Order.query(
partition_key=CUSTOMER_PK,
filter_condition=(Order.status == "shipped") | (Order.status == "delivered"),
):
print(f"Completed order: {order.sk}")
asyncio.run(main())
Warning
Filter conditions are applied after DynamoDB reads the items. You still pay for the read capacity of filtered-out items.
Sorting
Control sort order:
"""Sorting and limit examples (async - default)."""
import asyncio
from pydynox import Model, ModelConfig, get_default_client
from pydynox.attributes import NumberAttribute, StringAttribute
client = get_default_client()
CUSTOMER_PK = "CUSTOMER#123"
class Order(Model):
model_config = ModelConfig(table="orders_sort")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
total = NumberAttribute()
status = StringAttribute()
async def main():
# Setup: create table and data
if not await client.table_exists("orders_sort"):
await client.create_table(
"orders_sort",
partition_key=("pk", "S"),
sort_key=("sk", "S"),
)
# Create 10 orders
for i in range(10):
await Order(
pk=CUSTOMER_PK,
sk=f"ORDER#{i:03d}",
total=100 + i * 10,
status="pending",
).save()
# Ascending order (default)
print("Ascending:")
async for order in Order.query(
partition_key=CUSTOMER_PK,
scan_index_forward=True,
):
print(f" {order.sk}")
# Descending order
print("Descending:")
async for order in Order.query(
partition_key=CUSTOMER_PK,
scan_index_forward=False,
):
print(f" {order.sk}")
# Get the 5 most recent orders (descending)
print("5 most recent:")
recent_orders = [
order
async for order in Order.query(
partition_key=CUSTOMER_PK,
scan_index_forward=False,
limit=5,
)
]
for order in recent_orders:
print(f" {order.sk}")
asyncio.run(main())
| Parameter | Default | Description |
|---|---|---|
scan_index_forward |
True |
True = ascending, False = descending |
Pagination
Understanding limit vs page_size
pydynox has two parameters that control pagination:
| Parameter | What it does | DynamoDB behavior |
|---|---|---|
limit |
Max total items to return | Stops iteration after N items |
page_size |
Items per DynamoDB request | Passed as Limit to DynamoDB API |
This is a common pattern in DynamoDB libraries.
Key behaviors:
limit=10→ Returns exactly 10 items (or less if table has fewer)page_size=25→ Fetches 25 items per request, returns ALL itemslimit=100, page_size=25→ Returns 100 items, fetching 25 per request (4 requests)- Neither set → Returns all items, DynamoDB decides page size
"""Limit vs page_size examples (async - default)."""
import asyncio
from pydynox import Model, ModelConfig, get_default_client
from pydynox.attributes import NumberAttribute, StringAttribute
client = get_default_client()
CUSTOMER_PK = "CUSTOMER#123"
class Order(Model):
model_config = ModelConfig(table="orders_limit")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
total = NumberAttribute()
status = StringAttribute()
async def main():
# Setup: create table and data
if not await client.table_exists("orders_limit"):
await client.create_table(
"orders_limit",
partition_key=("pk", "S"),
sort_key=("sk", "S"),
)
# Create 50 orders
for i in range(50):
await Order(
pk=CUSTOMER_PK,
sk=f"ORDER#{i:03d}",
total=100 + i,
status="pending",
).save()
# limit = total items to return (stops after N items)
# page_size = items per DynamoDB request (controls pagination)
# Example 1: Get exactly 10 items total
orders = [order async for order in Order.query(partition_key=CUSTOMER_PK, limit=10)]
print(f"Example 1: Got {len(orders)} orders")
# Example 2: Get all items, but fetch 25 per page
count = 0
async for order in Order.query(partition_key=CUSTOMER_PK, page_size=25):
count += 1
print(f"Example 2: Got {count} orders")
# Example 3: Get 20 items total, fetching 5 per page
orders = [
order
async for order in Order.query(
partition_key=CUSTOMER_PK,
limit=20,
page_size=5,
)
]
print(f"Example 3: Got {len(orders)} orders")
# Example 4: Manual pagination with page_size
result = Order.query(partition_key=CUSTOMER_PK, limit=10, page_size=10)
first_page = [order async for order in result]
print(f"Example 4: First page: {len(first_page)} items")
if result.last_evaluated_key:
result = Order.query(
partition_key=CUSTOMER_PK,
limit=10,
page_size=10,
last_evaluated_key=result.last_evaluated_key,
)
second_page = [order async for order in result]
print(f"Example 4: Second page: {len(second_page)} items")
asyncio.run(main())
Common mistake
If you only set limit, it also controls the DynamoDB page size. This means limit=10 will fetch 10 items per request AND stop after 10 total. If you want to fetch more items per request but still limit the total, use both limit and page_size.
Automatic pagination
By default, the iterator fetches all pages automatically:
# Async - fetches ALL orders, automatically handling pagination
async for order in Order.query(partition_key="CUSTOMER#123"):
print(order.sk)
# Sync
for order in Order.sync_query(partition_key="CUSTOMER#123"):
print(order.sk)
Manual pagination
For "load more" buttons or batch processing, use last_evaluated_key:
"""Pagination examples (async - default)."""
import asyncio
from pydynox import Model, ModelConfig, get_default_client
from pydynox.attributes import NumberAttribute, StringAttribute
client = get_default_client()
CUSTOMER_PK = "CUSTOMER#123"
class Order(Model):
model_config = ModelConfig(table="orders_page")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
total = NumberAttribute()
status = StringAttribute()
async def main():
# Setup: create table and data
if not await client.table_exists("orders_page"):
await client.create_table(
"orders_page",
partition_key=("pk", "S"),
sort_key=("sk", "S"),
)
# Create 25 orders
for i in range(25):
await Order(
pk=CUSTOMER_PK,
sk=f"ORDER#{i:03d}",
total=100 + i,
status="pending",
).save()
# Automatic pagination - iterator fetches all pages
print("All orders:")
async for order in Order.query(partition_key=CUSTOMER_PK):
print(f" {order.sk}")
# Manual pagination - control page size
result = Order.query(partition_key=CUSTOMER_PK, limit=10)
# Process first page
page_count = 0
async for order in result:
print(f"Page 1: {order.sk}")
page_count += 1
if page_count >= 10:
break
# Check if there are more pages
if result.last_evaluated_key:
print("More pages available")
# Fetch next page
next_result = Order.query(
partition_key=CUSTOMER_PK,
limit=10,
last_evaluated_key=result.last_evaluated_key,
)
async for order in next_result:
print(f"Page 2: {order.sk}")
asyncio.run(main())
Use last_evaluated_key to:
- Implement "load more" buttons
- Process large datasets in batches
- Resume interrupted queries
Advanced
Consistent reads
For strongly consistent reads:
# Async
orders = [
order
async for order in Order.query(
partition_key="CUSTOMER#123",
consistent_read=True,
)
]
# Sync
orders = list(
Order.sync_query(
partition_key="CUSTOMER#123",
consistent_read=True,
)
)
Or set it as default in ModelConfig:
Metrics
Access query metrics using class methods:
result = Order.query(partition_key="CUSTOMER#123")
orders = list(result)
# Get last operation metrics
last = Order.get_last_metrics()
if last:
print(f"Duration: {last.duration_ms}ms")
print(f"RCU consumed: {last.consumed_rcu}")
# Get total metrics across all operations
total = Order.get_total_metrics()
print(f"Total RCU: {total.total_rcu}")
For more details, see Observability.
Async queries
Async is the default. Use async for to iterate:
async for order in Order.query(partition_key="CUSTOMER#123"):
print(order.sk)
# Get first
first = await Order.query(partition_key="CUSTOMER#123").first()
# Collect all
orders = [order async for order in Order.query(partition_key="CUSTOMER#123")]
Sync queries
Use sync_query() for sync code:
for order in Order.sync_query(partition_key="CUSTOMER#123"):
print(order.sk)
# Get first
first = Order.sync_query(partition_key="CUSTOMER#123").first()
# Collect all
orders = list(Order.sync_query(partition_key="CUSTOMER#123"))
Return dicts instead of models
By default, query returns Model instances. Each item from DynamoDB is converted to a Python object with all the Model methods and hooks.
This conversion has a cost. Python object creation is slow compared to Rust. For queries that return many items (hundreds or thousands), this becomes a bottleneck.
Use as_dict=True to skip Model instantiation and get plain dicts:
"""Query returning dicts instead of Model instances (async - default)."""
import asyncio
from pydynox import Model, ModelConfig, get_default_client
from pydynox.attributes import NumberAttribute, StringAttribute
client = get_default_client()
CUSTOMER_PK = "CUSTOMER#123"
class Order(Model):
model_config = ModelConfig(table="orders_dict")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
total = NumberAttribute()
status = StringAttribute()
async def main():
# Setup: create table and data
if not await client.table_exists("orders_dict"):
await client.create_table(
"orders_dict",
partition_key=("pk", "S"),
sort_key=("sk", "S"),
)
# Create some orders
for i in range(5):
await Order(
pk=CUSTOMER_PK,
sk=f"ORDER#{i:03d}",
total=100 + i * 10,
status="pending",
).save()
# Return dicts instead of Model instances
async for order in Order.query(partition_key=CUSTOMER_PK, as_dict=True):
# order is a plain dict, not an Order instance
print(order.get("sk"), order.get("total"))
# Useful for read-only operations where you don't need Model methods
orders = [order async for order in Order.query(partition_key=CUSTOMER_PK, as_dict=True)]
print(f"Found {len(orders)} orders as dicts")
asyncio.run(main())
When to use as_dict=True:
- Read-only operations where you don't need
.save(),.delete(), or hooks - Queries returning many items (100+)
- Performance-critical code paths
- Data export or transformation pipelines
Trade-offs:
| Model instances | as_dict=True |
|
|---|---|---|
| Speed | Slower (Python object creation) | Faster (plain dicts) |
| Methods | .save(), .delete(), .update() |
None |
| Hooks | after_load runs |
No hooks |
| Type hints | Full IDE support | Dict access |
| Validation | Attribute types enforced | Raw DynamoDB types |
Why this happens
This is how Python works. Creating class instances is expensive. Rust handles the DynamoDB call and deserialization fast, but Python must create each Model object. There's no way around this in Python itself.
Query parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
partition_key |
Any | Required | Hash key value |
sort_key_condition |
Condition | None | Condition on sort key |
filter_condition |
Condition | None | Filter on any attribute |
limit |
int | None | Max total items to return |
page_size |
int | None | Items per DynamoDB request |
scan_index_forward |
bool | True | Sort order |
consistent_read |
bool | None | Strongly consistent read |
last_evaluated_key |
dict | None | Start key for pagination |
as_dict |
bool | False | Return dicts instead of Model instances |
Query vs GSI query
Use Model.query() when querying by the table's hash key.
Use GSI query when querying by a different attribute:
# Table query - by pk (async)
async for order in Order.query(partition_key="CUSTOMER#123"):
print(order.sk)
# GSI query - by status (async)
async for order in Order.status_index.query(partition_key="shipped"):
print(order.pk)
# Sync versions
for order in Order.sync_query(partition_key="CUSTOMER#123"):
print(order.sk)
for order in Order.status_index.sync_query(partition_key="shipped"):
print(order.pk)
Testing your code
Test queries without DynamoDB using the built-in memory backend:
"""Testing query operations with pydynox_memory_backend."""
from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, StringAttribute
from pydynox.conditions import Attr
class Order(Model):
model_config = ModelConfig(table="orders")
pk = StringAttribute(partition_key=True)
sk = StringAttribute(sort_key=True)
status = StringAttribute()
total = NumberAttribute()
def test_query_by_partition_key(pydynox_memory_backend):
"""Test querying items by partition key."""
Order(pk="CUSTOMER#1", sk="ORDER#001", status="pending", total=100).save()
Order(pk="CUSTOMER#1", sk="ORDER#002", status="shipped", total=200).save()
Order(pk="CUSTOMER#2", sk="ORDER#001", status="pending", total=50).save()
results = list(Order.query(partition_key="CUSTOMER#1"))
assert len(results) == 2
assert all(r.pk == "CUSTOMER#1" for r in results)
def test_query_with_sort_key_condition(pydynox_memory_backend):
"""Test query with range key condition."""
Order(pk="CUSTOMER#1", sk="ORDER#001", status="pending", total=100).save()
Order(pk="CUSTOMER#1", sk="ORDER#002", status="shipped", total=200).save()
Order(pk="CUSTOMER#1", sk="RETURN#001", status="pending", total=50).save()
results = list(
Order.query(
partition_key="CUSTOMER#1",
sort_key_condition=Order.sk.begins_with("ORDER#"),
)
)
assert len(results) == 2
def test_query_with_filter(pydynox_memory_backend):
"""Test query with filter condition."""
Order(pk="CUSTOMER#1", sk="ORDER#001", status="pending", total=100).save()
Order(pk="CUSTOMER#1", sk="ORDER#002", status="shipped", total=200).save()
Order(pk="CUSTOMER#1", sk="ORDER#003", status="pending", total=300).save()
results = list(
Order.query(
partition_key="CUSTOMER#1",
filter_condition=Attr("status").eq("pending"),
)
)
assert len(results) == 2
assert all(r.status == "pending" for r in results)
No setup needed. Just add pydynox_memory_backend to your test function. See Testing for more details.
Next steps
- Atomic updates - Increment, append, and other atomic operations
- Conditions - All condition operators
- Indexes - Query by non-key attributes