Usage#

Client instantiation#

You should try to re-use clients, and especially HTTP clients, as much as possible. Don’t create a new one for each action.

The aiodynamo.client.Client class takes three required and three optional arguments:

  1. An HTTP client adaptor, conforming to the aiodynamo.http.base.HTTP interface.

  2. An instance of aiodynamo.credentials.Credentials to authenticate with DynamoDB. You may use Credentials.auto() to use the default loading strategy.

  3. The region your DynamoDB is in.

  4. An optional endpoint URL of your DynamoDB, as a yarl.URL instance. Useful when using a local DynamoDB implementation such as dynalite or dynamodb-local.

  5. Which numeric type to use. This should be a callable which accepts a string as input and returns your numeric type as output. Defaults to float.

  6. The throttling configuration to use. An instance of aiodynamo.models.RetryConfig. By default, if the DynamoDB rate limit is exceeded, aiodynamo will retry up for up to one minute with increasing delays.

Credentials#

In most cases, Credentials.auto() will load the credentials as you’d expect. Specifically, it will try multiple credentials providers in this order: aiodynamo.credentials.EnvironmentCredentials, aiodynamo.credentials.FileCredentials, aiodynamo.credentials.ContainerMetadataCredentials, aiodynamo.credentials.InstanceMetadataCredentialsV2 and aiodynamo.credentials.InstanceMetadataCredentialsV1.

In case you want to explicitly pass the credentials from Python, use aiodynamo.credentials.StaticCredentials.

classmethod Credentials.auto() ChainCredentials#

Return the default credentials loader chain.


class aiodynamo.credentials.EnvironmentCredentials#

Loads the credentials from the environment.


class aiodynamo.credentials.ContainerMetadataCredentials(
timeout: float | int = 2,
max_attempts: int = 3,
base_url: ~yarl.URL = URL('http://169.254.170.2'),
relative_uri: str | None = <factory>,
full_uri: ~yarl.URL | None = <factory>,
auth_token: str | None = <factory>,
)#

Loads credentials from the ECS container metadata endpoint.


class aiodynamo.credentials.InstanceMetadataCredentialsV2(
timeout: float | int = 1,
max_attempts: int = 1,
base_url: ~yarl.URL = URL('http://169.254.169.254'),
disabled: bool = <factory>,
token_session_duration_seconds: int = 21600,
)#

Loads credentials from the EC2 instance metadata endpoint using IMDSv2. IMDSv2 is considered more secure than IMDSv1, but it may not be available in older AWS SDKs.


class aiodynamo.credentials.InstanceMetadataCredentialsV1(
timeout: float | int = 1,
max_attempts: int = 1,
base_url: ~yarl.URL = URL('http://169.254.169.254'),
disabled: bool = <factory>,
)#

Loads credentials from the EC2 instance metadata endpoint using IMDSv1.


class aiodynamo.credentials.ChainCredentials(
candidates: Sequence[Credentials],
)#

Chains multiple credentials providers together, trying them in order. Returns the first key found. Exceptions are suppressed.

Once a credentials provider returns a key, only that provider will be used in subsequent calls.


class aiodynamo.credentials.StaticCredentials(key: Key | None)#

Static credentials provided in Python.


class aiodynamo.credentials.FileCredentials(
*,
path: Path | None = None,
profile_name: str | None = None,
)#

Loads the credentials from an AWS credentials file


class aiodynamo.credentials.Key(id: 'str', secret: 'str', token: 'Optional[str]' = None)#
id: str#
secret: str#
token: str | None = None#

The Client class#

class aiodynamo.client.Client#
Client.table(name: str) Table#
async Client.table_exists(name: str) bool#
async Client.create_table(
name: str,
throughput: Throughput | PayPerRequest,
keys: KeySchema,
*,
lsis: List[LocalSecondaryIndex] | None = None,
gsis: List[GlobalSecondaryIndex] | None = None,
stream: StreamSpecification | None = None,
wait_for_active: bool | RetryConfig = False,
) TableDescription#

If wait_for_active is set to True, it will wait until the table status changed into Active. If after the defined wait time the table is not active, an exception will be raised. Passing a aiodynamo.models.PayPerRequest object in place of a aiodynamo.models.Throughput configuration will create a PAY_PER_REQUEST BillingMode table.

async Client.delete_table(
table: str,
*,
wait_for_disabled: bool | RetryConfig = False,
) None#

If wait_for_disabled is set to True, it will wait until the table status changed into Disabled. If after the defined wait time the table is not disabled, an exception will be raised.

async Client.put_item(
table: str,
item: Dict[str, Any],
*,
return_values: ReturnValues = ReturnValues.none,
condition: Condition | None = None,
) None | Dict[str, Any]#
async Client.update_item(
table: str,
key: Dict[str, Any],
update_expression: UpdateExpression,
*,
return_values: ReturnValues = ReturnValues.none,
condition: Condition | None = None,
) Dict[str, Any] | None#
async Client.delete_item(
table: str,
key: Dict[str, Any],
*,
return_values: ReturnValues = ReturnValues.none,
condition: Condition | None = None,
) None | Dict[str, Any]#
async Client.get_item(
table: str,
key: Dict[str, Any],
*,
projection: ProjectionExpression | None = None,
consistent_read: bool = False,
) Dict[str, Any]#
async Client.query(
table: str,
key_condition: KeyCondition,
*,
start_key: Dict[str, Any] | None = None,
filter_expression: Condition | None = None,
scan_forward: bool = True,
index: str | None = None,
limit: int | None = None,
projection: ProjectionExpression | None = None,
select: Select = Select.all_attributes,
consistent_read: bool = False,
) AsyncIterator[Dict[str, Any]]#

Query the table. Unlike the DynamoDB API, the results are automatically de-paginated and a single stream of items is returned. For manual pagination, use query_single_page(…) instead.

Aiodynamo handles pagination automatically, so this method returns an asynchronous iterator of items.

To only retrieve a single page, use aiodynamo.client.Client.query_single_page()

async Client.query_single_page(
table: str,
key_condition: KeyCondition,
*,
start_key: Dict[str, Any] | None = None,
filter_expression: Condition | None = None,
scan_forward: bool = True,
index: str | None = None,
limit: int | None = None,
projection: ProjectionExpression | None = None,
select: Select = Select.all_attributes,
consistent_read: bool = False,
) Page#

Query a single DynamoDB page. To automatically handle pagination, uses query(…) instead.

Queries a single page from DynamoDB. To automatically handle pagination, use aiodynamo.client.Client.query()

async Client.scan(
table: str,
*,
index: str | None = None,
limit: int | None = None,
start_key: Dict[str, Any] | None = None,
projection: ProjectionExpression | None = None,
filter_expression: Condition | None = None,
consistent_read: bool = False,
) AsyncIterator[Dict[str, Any]]#

Scan the table. Unlike the DynamoDB API, the results are automatically de-paginated and a single stream of items is returned. For manual pagination, use scan_single_page(…) instead.

Aiodynamo handles pagination automatically, so this method returns an asynchronous iterator of items.

To only retrieve a single page, use aiodynamo.client.Client.scan_single_page()

async Client.scan_single_page(
table: str,
*,
index: str | None = None,
limit: int | None = None,
start_key: Dict[str, Any] | None = None,
projection: ProjectionExpression | None = None,
filter_expression: Condition | None = None,
consistent_read: bool = False,
) Page#

Scan a single DynamoDB page. To automatically handle pagination, uses scan(…) instead.

Scans a single page from DynamoDB. To automatically handle pagination, use aiodynamo.client.Client.scan()

async Client.count(
table: str,
key_condition: KeyCondition,
*,
start_key: Dict[str, Any] | None = None,
filter_expression: Condition | None = None,
index: str | None = None,
limit: int | None = None,
consistent_read: bool = False,
) int#

Aiodynamo handles pagination automatically, so this method returns the number of items.

Queries DynamoDB and returns number of matching items, optionally bounded by limit keyword argument.

async Client.scan_count(
table: str,
*,
index: str | None = None,
limit: int | None = None,
start_key: Dict[str, Any] | None = None,
filter_expression: Condition | None = None,
consistent_read: bool = False,
) int#

Count the number of items returned by a scan operation.

Aiodynamo handles pagination automatically, so this method returns the number of items.

Scans DynamoDB and returns number of matching items, optionally bounded by limit keyword argument.

async Client.batch_get(
request: Dict[str, BatchGetRequest],
) BatchGetResponse#
async Client.batch_write(
request: Dict[str, BatchWriteRequest],
) Dict[str, BatchWriteResult]#
async Client.transact_get_items(
items: List[Get],
) List[Dict[str, Any]]#
async Client.transact_write_items(
items: Sequence[Put | Update | Delete | ConditionCheck],
*,
request_token: str | None = None,
) None#

The Table class#

The aiodynamo.client.Table class wraps all methods on aiodynamo.client.Client so you don’t have to provide the table name each time.

This class should not be instantiated directly. Instead, create it by calling aiodynamo.client.Client.table().

Methods#

class aiodynamo.client.Table#
async Table.exists() bool#
async Table.create(
throughput: Throughput | PayPerRequest,
keys: KeySchema,
*,
lsis: List[LocalSecondaryIndex] | None = None,
gsis: List[GlobalSecondaryIndex] | None = None,
stream: StreamSpecification | None = None,
wait_for_active: bool | RetryConfig = False,
) TableDescription#

If wait_for_active is set to True, it will wait until the table status changed into Active. If after the defined wait time the table is not active, an exception will be raised.

async Table.delete(
*,
wait_for_disabled: bool | RetryConfig = False,
) None#

If wait_for_disabled is set to True, it will wait until the table status changed into Disabled. If after the defined wait time the table is not disabled, an exception will be raised.

async Table.put_item(
item: Dict[str, Any],
*,
return_values: ReturnValues = ReturnValues.none,
condition: Condition | None = None,
) None | Dict[str, Any]#

Create a new item or replace it if it already exists. This will overwrite all attributes in an item.

async Table.update_item(
key: Dict[str, Any],
update_expression: UpdateExpression,
*,
return_values: ReturnValues = ReturnValues.none,
condition: Condition | None = None,
) Dict[str, Any] | None#

Edit an item’s attribute or create a new item if it does not exist. This will edit only the passed attributes.

async Table.delete_item(
key: Dict[str, Any],
*,
return_values: ReturnValues = ReturnValues.none,
condition: Condition | None = None,
) None | Dict[str, Any]#
async Table.get_item(
key: Dict[str, Any],
*,
projection: ProjectionExpression | None = None,
consistent_read: bool = False,
) Dict[str, Any]#

Returns the attributes of an item from table. This will return all attributes by default. To get only some attributes, use a projection expression.

Table.query(
key_condition: KeyCondition,
*,
start_key: Dict[str, Any] | None = None,
filter_expression: Condition | None = None,
scan_forward: bool = True,
index: str | None = None,
limit: int | None = None,
projection: ProjectionExpression | None = None,
select: Select = Select.all_attributes,
consistent_read: bool = False,
) AsyncIterator[Dict[str, Any]]#

Query the table.

Unlike the DynamoDB API, the results are automatically de-paginated and a single stream of items is returned. For manual pagination, use query_single_page(…) instead.

To filter the result, use a filter expression. This will return all attributes by default. To get only some attributes, use a projection expression.

Aiodynamo handles pagination automatically, so this method returns an asynchronous iterator of items.

To only retrieve a single page, use aiodynamo.client.Table.query_single_page()

async Table.query_single_page(
key_condition: KeyCondition,
*,
start_key: Dict[str, Any] | None = None,
filter_expression: Condition | None = None,
scan_forward: bool = True,
index: str | None = None,
limit: int | None = None,
projection: ProjectionExpression | None = None,
select: Select = Select.all_attributes,
consistent_read: bool = False,
) Page#

Query a single DynamoDB page. To automatically handle pagination, uses query(…) instead.

Queries a single page from DynamoDB. To automatically handle pagination, use aiodynamo.client.Table.query()

Table.scan(
*,
index: str | None = None,
limit: int | None = None,
start_key: Dict[str, Any] | None = None,
projection: ProjectionExpression | None = None,
filter_expression: Condition | None = None,
consistent_read: bool = False,
) AsyncIterator[Dict[str, Any]]#

Scan the table.

Unlike the DynamoDB API, the results are automatically de-paginated and a single stream of items is returned. For manual pagination, use scan_single_page(…) instead.

To filter the result, use a filter expression. This will return all attributes by default. To get only some attributes, use a projection expression.

Aiodynamo handles pagination automatically, so this method returns an asynchronous iterator of items.

To only retrieve a single page, use aiodynamo.client.Table.scan_single_page()

async Table.scan_single_page(
*,
index: str | None = None,
limit: int | None = None,
start_key: Dict[str, Any] | None = None,
projection: ProjectionExpression | None = None,
filter_expression: Condition | None = None,
consistent_read: bool = False,
) Page#

Scan a single DynamoDB page. To automatically handle pagination, uses scan(…) instead.

Scans a single page from DynamoDB. To automatically handle pagination, use aiodynamo.client.Table.scan()

The F class#

The aiodynamo.expressions.F class is used when building expressions that refer to fields in your DynamoDB items. It is used to build four different type of expressions: Projection Expression, Update Expression, Filter Expression and Condition.

To refer to a top-level field, simply pass the name of the field to the class constructor. To refer to a nested field, pass the path as multiple arguments. To refer to indices in a list, pass an integer.

For example, to refer to the bar field in the second element of the foo field, use F("foo", 1, "bar").

Projection Expressions#

class aiodynamo.expressions.ProjectionExpression#

Abstract base class to represent projection expresssions.

Projection expressions are built using the & operator. An instance of aiodynamo.expressions.F is a valid Projection expression too.

For example, to get the field foo and bar, you would use F("foo") & F("bar").

Update Expressions#

class aiodynamo.expressions.UpdateExpression#

Update expressions are created by calling methods on instances of aiodynamo.expressions.F and combining the return values of those method calls with the & operator.

UpdateExpression.debug(numeric_type: ~typing.Callable[[str], ~typing.Any] = <class 'float'>) str | None#

Format the expression as a string for debugging.


F.set(value: Any) UpdateExpression#

Set a field to a value.

F.set_if_not_exists(
value: Any,
) UpdateExpression#

Set a field to a value if the field does not exist in the item yet.

F.change(
diff: float | int | Decimal,
) UpdateExpression#

Change a numeric field by a given value.

F.append(
value: List[Any],
) UpdateExpression#

Add items to a list field. Note that the value passed in should be a list, not an individual item.

F.remove() UpdateExpression#

Remove a field.

F.add(
value: float | int | Decimal | Set[bytes] | Set[str] | Set[float | int | Decimal],
) UpdateExpression#

Add a value to a field. Only allowed for top level fields.

For numeric fields add a numeric value. For set fields, this will set the field to the union of the existing set and the set provided.

F.delete(
value: Set[Any],
) UpdateExpression#

Deletes all items in the set provided from the set stored in this field.

Filter Expression and Condition Expressions#

class aiodynamo.expressions.Condition#

Abstract base class of conditions and filters.

Filter and Condition expressions have the same syntax and they are created by calling methods on instances of aiodynamo.expressions.F and combining the return values of those method calls with the & or | operators. To negate a condition, use the ~ infix operator.

Condition.debug(numeric_type: ~typing.Callable[[str], ~typing.Any] = <class 'float'>) str#

Format the expression as a string for debugging.


F.does_not_exist() Condition#

Checks that a field does not exist.

F.exists() Condition#

Checks that a field exist.

F.attribute_type(
attribute_type: AttributeType,
) Condition#

Checks the attribute type of a field.

F.begins_with(substr: str) Condition#

Checks that a field begins with a substring. The substring must not be empty. Fields matching the substring provided completely are not returned.

F.between(
low: Any,
high: Any,
) Condition#

Checks that a field is between two given values.

F.contains(
value: str | bytes | int | float | Decimal,
) Condition#

Checks if a set or list contains a certain value. If a string or bytes object is used as a value, they must not be empty.

F.is_in(
values: Sequence[Any],
) Condition#

Checks if the field is in a sequence of values. Between one and one hundred values may be provided.

F.gt(other: Any) Condition#

Checks if a field is greater than a value.

F.gte(other: Any) Condition#

Checks if a field is greater than a value.

F.lt(other: Any) Condition#

Checks if a field is less than a value.

F.lte(other: Any) Condition#

Checks if a field is less or equal than a value.

F.equals(other: Any) Condition#

Checks if a field is equal to value.

F.not_equals(other: Any) Condition#

Checks if a field is not equal to a value.

F.size() Size#

Allows checking for the item size. Does not return a condition, to return a condition, call the appropriate method on the Size class instance returned.


class aiodynamo.expressions.Size(field: 'F')#
equals(other: Any) Condition#
field: F#
gt(other: Any) Condition#
gte(other: Any) Condition#
lt(other: Any) Condition#
lte(other: Any) Condition#
not_equals(other: Any) Condition#

Key conditions#

Key conditions are created using the aiodynamo.expressions.HashKey and optionally the aiodynamo.expressions.RangeKey classes.

class aiodynamo.expressions.HashKey(name: str, value: Any)#

Used for Key Conditions. To also constrain the Key Condition by the range key, create an instance of RangeKey, call a method on it and combine the HashKey with the return value of that method call using the & operator.


class aiodynamo.expressions.RangeKey(name: str)#

Can be used to further constrain a Key Condition. Must be used together with a HashKey instance.

The provided methods behave the same as their counterparts in the F class.

begins_with(substr: str) Condition#
between(
low: Any,
high: Any,
) Condition#
equals(other: Any) Condition#
gt(other: Any) Condition#
gte(other: Any) Condition#
lt(other: Any) Condition#
lte(other: Any) Condition#

Models#

class aiodynamo.models.PayPerRequest#
class aiodynamo.models.Throughput(read: 'int', write: 'int')#
read: int#
write: int#
class aiodynamo.models.KeySchema(hash_key: 'KeySpec', range_key: 'Optional[KeySpec]' = None)#
hash_key: KeySpec#
range_key: KeySpec | None = None#
class aiodynamo.models.LocalSecondaryIndex(name: 'str', schema: 'KeySchema', projection: 'Projection')#
name: str#
projection: Projection#
schema: KeySchema#
class aiodynamo.models.GlobalSecondaryIndex(
name: 'str',
schema: 'KeySchema',
projection: 'Projection',
throughput: 'Optional[Throughput]',
)#
name: str#
projection: Projection#
schema: KeySchema#
throughput: Throughput | None#
class aiodynamo.models.StreamSpecification(
enabled: 'bool' = False,
view_type: 'StreamViewType' = <StreamViewType.new_and_old_images: 'NEW_AND_OLD_IMAGES'>,
)#
enabled: bool = False#
view_type: StreamViewType = 'NEW_AND_OLD_IMAGES'#
class aiodynamo.models.StreamViewType(
value,
names=None,
*,
module=None,
qualname=None,
type=None,
start=1,
boundary=None,
)#
keys_only = 'KEYS_ONLY'#
new_and_old_images = 'NEW_AND_OLD_IMAGES'#
new_image = 'NEW_IMAGE'#
old_image = 'OLD_IMAGE'#
class aiodynamo.models.RetryConfig(time_limit_secs: float | int = 60)#
classmethod default() RetryConfig#

Default RetryConfig to be used as a throttle config for Client

classmethod default_wait_config() RetryConfig#

Default RetryConfig to be used as wait config for table level operations.

abstract delays() Iterable[float | int]#

Custom RetryConfig classes must implement this method. It should return an iterable yielding numbers of seconds indicating the delay before the next attempt is made.

class aiodynamo.models.ReturnValues(
value,
names=None,
*,
module=None,
qualname=None,
type=None,
start=1,
boundary=None,
)#
all_new = 'ALL_NEW'#
all_old = 'ALL_OLD'#
none = 'NONE'#
updated_new = 'UPDATED_NEW'#
updated_old = 'UPDATED_OLD'#
class aiodynamo.models.Projection(type: 'ProjectionType', attrs: 'Optional[List[str]]' = None)#
attrs: List[str] | None = None#
type: ProjectionType#
class aiodynamo.models.ProjectionType(
value,
names=None,
*,
module=None,
qualname=None,
type=None,
start=1,
boundary=None,
)#
all = 'ALL'#
include = 'INCLUDE'#
keys_only = 'KEYS_ONLY'#
class aiodynamo.models.BatchGetRequest(
keys: 'List[Item]',
projection: 'Optional[ProjectionExpression]' = None,
consistent_read: 'bool' = False,
)#
keys: List[Dict[str, Any]]#
projection: ProjectionExpression | None = None#
class aiodynamo.models.BatchGetResponse(
items: 'Dict[TableName, List[Item]]',
unprocessed_keys: 'Dict[TableName, List[Item]]',
)#
items: Dict[str, List[Dict[str, Any]]]#
unprocessed_keys: Dict[str, List[Dict[str, Any]]]#
class aiodynamo.models.BatchWriteRequest(
keys_to_delete: 'Optional[List[Item]]' = None,
items_to_put: 'Optional[List[Item]]' = None,
)#
items_to_put: List[Dict[str, Any]] | None = None#
keys_to_delete: List[Dict[str, Any]] | None = None#
class aiodynamo.models.BatchWriteResult(undeleted_keys: 'List[Item]', unput_items: 'List[Item]')#
undeleted_keys: List[Dict[str, Any]]#
unput_items: List[Dict[str, Any]]#

Operations#

class aiodynamo.operations.Get(
table: str,
key: Dict[str, Any],
projection: aiodynamo.expressions.ProjectionExpression | None = None,
)#
key: Dict[str, Any]#
projection: ProjectionExpression | None = None#
table: str#
class aiodynamo.operations.Put(
table: str,
item: Dict[str, Any],
condition: aiodynamo.expressions.Condition | None = None,
)#
condition: Condition | None = None#
item: Dict[str, Any]#
table: str#
class aiodynamo.operations.Update(
table: str,
key: Dict[str, Any],
expression: aiodynamo.expressions.UpdateExpression,
condition: aiodynamo.expressions.Condition | None = None,
)#
condition: Condition | None = None#
expression: UpdateExpression#
key: Dict[str, Any]#
table: str#
class aiodynamo.operations.Delete(
table: str,
key: Dict[str, Any],
condition: aiodynamo.expressions.Condition | None = None,
)#
condition: Condition | None = None#
key: Dict[str, Any]#
table: str#
class aiodynamo.operations.ConditionCheck(
table: str,
key: Dict[str, Any],
condition: aiodynamo.expressions.Condition | None = None,
)#
condition: Condition | None = None#
key: Dict[str, Any]#
table: str#