When putting a huge size of data into DynamoDB, it is always the best practice to use batch write operation. Basically, AWS service client provides batch_write_item
request, which allows to put or delete multiple items (up to 25 items and 16 MB) with a single call.
The response of batch_write_item
request includes helpful information of the processing of the request as follows.
UnprocessedItems
has the list of items that cannot be put or deleted wellItemCollectionMetrics
has the statistics of the requested data, such as sizeConsumedCapacity
has the consumed RCU and WCU of the request
Let’s check the response of batch_write_item
request with a simple example.
import boto3
dynamodb = boto3.client('dynamodb')
-- Create test table named BatchWriteTest
dynamodb.create_table(
TableName='BatchWriteTest',
AttributeDefinitions=[
{'AttributeName': 'PK', 'AttributeType': 'S'},
{'AttributeName': 'SK', 'AttributeType': 'S'},
],
KeySchema=[
{'AttributeName': 'PK', 'KeyType': 'HASH'},
{'AttributeName': 'SK', 'KeyType': 'RANGE'},
],
BillingMode='PAY_PER_REQUEST',
)
dynamodb.get_waiter('table_exists').wait(TableName='BatchWriteTest')
-- Request batch writing of 5 items
response = dynamodb.batch_write_item(
RequestItems={
'BatchWriteTest': [
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-1'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-2'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-3'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-4'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-5'}}}},
]
},
ReturnConsumedCapacity='INDEXES',
ReturnItemCollectionMetrics='SIZE',
)
If batch_write_item
request succeeded partially for some reason (e.g. throughput exceeded, account max throughput reached .. etc), the failed processing items will be returned to us in UnprocessedItems
attribute. You can use the information to retry.
Moreover, the response can contain consumed capacity and statistics information if you set it.
By the way, you should be aware of the success and failure of batch_write_item
request. Partial success or failure of the items results in success of the request. So the failed partial items is stored in UnprocessedItems
attribute. Only when processing of all items fails with throughput limitation exception, batch_write_item
request throws exception.
{
'UnprocessedItems': {},
'ConsumedCapacity': [
{'TableName': 'BatchWriteTest', 'CapacityUnits': 5.0, 'Table': {'CapacityUnits': 5.0}}
],
'ItemCollectionMetrics': {},
'ResponseMetadata': {
'HTTPHeaders': {
'connection': 'keep-alive',
'content-length': '152',
'content-type': 'application/x-amz-json-1.0',
'date': 'Sat, 20 Feb 2021 11:02:34 GMT',
'server': 'Server',
'x-amz-crc32': '309077995',
'x-amzn-requestid': 'MNDLBR81RN8OEEGPU0M5H0T387VV4KQNSO5AEMVJF66Q9ASUAAJG'
},
'HTTPStatusCode': 200,
'RequestId': 'MNDLBR81RN8OEEGPU0M5H0T387VV4KQNSO5AEMVJF66Q9ASUAAJG',
'RetryAttempts': 0
}
}
However, if resource client is available, there might be no reason not to use it. Resource client makes our life easier, saves times for writing codes and helps to be more pythonic.
The corresponding method of batch_write_item
in DynamoDB resource client is batch_writer
context manager. Here is the usage of batch_writer
, which is much easier to use.
batch_write_test_table = boto3.resource('dynamodb').Table('BatchWriteTest')
with batch_write_test_table.batch_writer() as batch:
for no in range(1, 6):
batch.put_item(Item={'PK': 'hi', 'SK': f'user-{no}'})
IT IS GREAT! With the resource client and batch_writer
context manager, we don’t have to:
- set attribute type
- group items manually up to 25
- implement retry logic of unprocessed items
- write the dirty and long request structure
One thing we lose is the response information. batch_writer
doesn’t store any response within the context manager, hence there is no way to get ConsumedCapacity
, ItemCollectionMetrics
and UnprocessedItems
information.
Assume that you’re writing a migration script with batch_writer
and you want to log the consumed capacity and unprocessed items. With the original batch_writer
, you can’t do this. You should use batch_write_item
with service client, that will make you to write many boilerplate codes.
However, by overriding the batch_writer
, there is a way to store and check the intermediate responses. Firstly, let’s see how the batch_writer
is written in boto3
package in table.py
.
# boto3/dynamodb/table.py
# ..
class TableResource(object):
def __init__(self, *args, **kwargs):
super(TableResource, self).__init__(*args, **kwargs)
def batch_writer(self, overwrite_by_pkeys=None):
# ..
return BatchWriter(self.name, self.meta.client,
overwrite_by_pkeys=overwrite_by_pkeys)
class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
# ..
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
def put_item(self, Item):
self._add_request_and_process({'PutRequest': {'Item': Item}})
def delete_item(self, Key):
self._add_request_and_process({'DeleteRequest': {'Key': Key}})
def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()
# ..
def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send})
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
The batch_writer
method is an instance of BatchWriter
class. Actually, the magic of batch_writer
is so simple that any one is able to understand the process.
When adding items with put_item
or delete_item
, the instance simply put the requested item into a buffer named _items_buffer
. If the buffer size reaches 25 (which is defined in _flush_amount
), _flush
method is triggered, which restructure the batch_write_item
request format with 25 items from the buffer, and execute batch_write_item
. If there is unprocessed items, simply put the items into the buffer again, and wait the next flush trigger. Finally, when context manager finishes, flush all items in the buffer.
If we look through _flush
method, we can see the response of batch_write_item
is stored into response
variable, but it is not stored to the instance. It is only used for getting unprocessed items.
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send})
unprocessed_items = response['UnprocessedItems']
If you want to use any information in the response, override this method in a way. Imagine that we want to get total consumed WCU with batch_writer
.
Firstly, let’s create a new class and inherit the original class BatchWriter
.
from boto3.dynamodb.table import BatchWriter
class DynamoDBBatchWriter(BatchWriter):
def __init__(self, table_name, client, flush_amount=25, overwrite_by_pkeys=None):
super().__init__(table_name, client, flush_amount, overwrite_by_pkeys)
self.responses = []
self.consumed_wcu = 0
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send},
ReturnConsumedCapacity='INDEXES'
)
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
self.responses.append(response)
self.consumed_wcu += response['ConsumedCapacity'][0]['CapacityUnits']
In the new DynamoDBBatchWriter
class, it adds two more member variables - self.responses
and self.consumed_wcu
. While overriding _flush
method, responses of batch_write_item
is stored to self.responses
and self.consumed_wcu
member variables.
By doing this, we now see how many WCUs are used with this code.
with DynamoDBBatchWriter('BatchWriteTest', boto3.resource('dynamodb')) as batch:
for no in range(100):
batch.put_item(Item={'PK': 'hi', 'SK': f'user-{no}'})
print(f'Total consumed WCU with the batch writer: {batch.consumed_wcu}')
print(f'Responsed of actual batch write request: {batch.responses}')
It prints out like this.
Total consumed WCU with the batch writer: 100.0
Responsed of actual batch write request: [{'UnprocessedItems': {}, 'ConsumedCapacity': [{'TableName': 'BatchWriteTest', 'CapacityUnits': 25.0, 'Table': {'CapacityUnits': 25.0}}], ..]
Hoping this helps someone :)