Getting response of AWS DynamoDB BatchWriter request

Dongkyun
5 min readFeb 20, 2021

--

When putting a huge size of data into DynamoDB, it is always the best practice to use batch write operation. Basically, AWS service client provides batch_write_item request, which allows to put or delete multiple items (up to 25 items and 16 MB) with a single call.

The response of batch_write_item request includes helpful information of the processing of the request as follows.

  • UnprocessedItems has the list of items that cannot be put or deleted well
  • ItemCollectionMetrics has the statistics of the requested data, such as size
  • ConsumedCapacity has the consumed RCU and WCU of the request

Let’s check the response of batch_write_item request with a simple example.

import boto3


dynamodb = boto3.client('dynamodb')

-- Create test table named BatchWriteTest
dynamodb.create_table(
TableName='BatchWriteTest',
AttributeDefinitions=[
{'AttributeName': 'PK', 'AttributeType': 'S'},
{'AttributeName': 'SK', 'AttributeType': 'S'},
],
KeySchema=[
{'AttributeName': 'PK', 'KeyType': 'HASH'},
{'AttributeName': 'SK', 'KeyType': 'RANGE'},
],
BillingMode='PAY_PER_REQUEST',
)
dynamodb.get_waiter('table_exists').wait(TableName='BatchWriteTest')

-- Request batch writing of 5 items
response = dynamodb.batch_write_item(
RequestItems={
'BatchWriteTest': [
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-1'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-2'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-3'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-4'}}}},
{'PutRequest': {'Item': {'PK': {'S': 'hi'}, 'SK': {'S': 'user-5'}}}},
]
},
ReturnConsumedCapacity='INDEXES',
ReturnItemCollectionMetrics='SIZE',
)

If batch_write_item request succeeded partially for some reason (e.g. throughput exceeded, account max throughput reached .. etc), the failed processing items will be returned to us in UnprocessedItems attribute. You can use the information to retry.

Moreover, the response can contain consumed capacity and statistics information if you set it.

By the way, you should be aware of the success and failure of batch_write_item request. Partial success or failure of the items results in success of the request. So the failed partial items is stored in UnprocessedItems attribute. Only when processing of all items fails with throughput limitation exception, batch_write_item request throws exception.

{
'UnprocessedItems': {},
'ConsumedCapacity': [
{'TableName': 'BatchWriteTest', 'CapacityUnits': 5.0, 'Table': {'CapacityUnits': 5.0}}
],
'ItemCollectionMetrics': {},
'ResponseMetadata': {
'HTTPHeaders': {
'connection': 'keep-alive',
'content-length': '152',
'content-type': 'application/x-amz-json-1.0',
'date': 'Sat, 20 Feb 2021 11:02:34 GMT',
'server': 'Server',
'x-amz-crc32': '309077995',
'x-amzn-requestid': 'MNDLBR81RN8OEEGPU0M5H0T387VV4KQNSO5AEMVJF66Q9ASUAAJG'
},
'HTTPStatusCode': 200,
'RequestId': 'MNDLBR81RN8OEEGPU0M5H0T387VV4KQNSO5AEMVJF66Q9ASUAAJG',
'RetryAttempts': 0
}
}

However, if resource client is available, there might be no reason not to use it. Resource client makes our life easier, saves times for writing codes and helps to be more pythonic.

The corresponding method of batch_write_item in DynamoDB resource client is batch_writer context manager. Here is the usage of batch_writer, which is much easier to use.

batch_write_test_table = boto3.resource('dynamodb').Table('BatchWriteTest')

with batch_write_test_table.batch_writer() as batch:
for no in range(1, 6):
batch.put_item(Item={'PK': 'hi', 'SK': f'user-{no}'})

IT IS GREAT! With the resource client and batch_writer context manager, we don’t have to:

  • set attribute type
  • group items manually up to 25
  • implement retry logic of unprocessed items
  • write the dirty and long request structure

One thing we lose is the response information. batch_writer doesn’t store any response within the context manager, hence there is no way to get ConsumedCapacity, ItemCollectionMetrics and UnprocessedItems information.

Assume that you’re writing a migration script with batch_writer and you want to log the consumed capacity and unprocessed items. With the original batch_writer, you can’t do this. You should use batch_write_item with service client, that will make you to write many boilerplate codes.

However, by overriding the batch_writer, there is a way to store and check the intermediate responses. Firstly, let’s see how the batch_writer is written in boto3 package in table.py.

# boto3/dynamodb/table.py

# ..

class TableResource(object):
def __init__(self, *args, **kwargs):
super(TableResource, self).__init__(*args, **kwargs)

def batch_writer(self, overwrite_by_pkeys=None):
# ..
return BatchWriter(self.name, self.meta.client,
overwrite_by_pkeys=overwrite_by_pkeys)

class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
# ..
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys

def put_item(self, Item):
self._add_request_and_process({'PutRequest': {'Item': Item}})

def delete_item(self, Key):
self._add_request_and_process({'DeleteRequest': {'Key': Key}})

def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()

# ..

def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()

def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send})
unprocessed_items = response['UnprocessedItems']

if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()

The batch_writer method is an instance of BatchWriter class. Actually, the magic of batch_writer is so simple that any one is able to understand the process.

When adding items with put_item or delete_item, the instance simply put the requested item into a buffer named _items_buffer. If the buffer size reaches 25 (which is defined in _flush_amount), _flush method is triggered, which restructure the batch_write_item request format with 25 items from the buffer, and execute batch_write_item. If there is unprocessed items, simply put the items into the buffer again, and wait the next flush trigger. Finally, when context manager finishes, flush all items in the buffer.

If we look through _flush method, we can see the response of batch_write_item is stored into response variable, but it is not stored to the instance. It is only used for getting unprocessed items.

response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send})
unprocessed_items = response['UnprocessedItems']

If you want to use any information in the response, override this method in a way. Imagine that we want to get total consumed WCU with batch_writer.

Firstly, let’s create a new class and inherit the original class BatchWriter.

from boto3.dynamodb.table import BatchWriter


class DynamoDBBatchWriter(BatchWriter):
def __init__(self, table_name, client, flush_amount=25, overwrite_by_pkeys=None):
super().__init__(table_name, client, flush_amount, overwrite_by_pkeys)
self.responses = []
self.consumed_wcu = 0

def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send},
ReturnConsumedCapacity='INDEXES'
)
unprocessed_items = response['UnprocessedItems']

if unprocessed_items and unprocessed_items[self._table_name]:
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []

self.responses.append(response)
self.consumed_wcu += response['ConsumedCapacity'][0]['CapacityUnits']

In the new DynamoDBBatchWriter class, it adds two more member variables - self.responses and self.consumed_wcu. While overriding _flush method, responses of batch_write_item is stored to self.responses and self.consumed_wcu member variables.

By doing this, we now see how many WCUs are used with this code.

with DynamoDBBatchWriter('BatchWriteTest', boto3.resource('dynamodb')) as batch:
for no in range(100):
batch.put_item(Item={'PK': 'hi', 'SK': f'user-{no}'})

print(f'Total consumed WCU with the batch writer: {batch.consumed_wcu}')
print(f'Responsed of actual batch write request: {batch.responses}')

It prints out like this.

Total consumed WCU with the batch writer: 100.0
Responsed of actual batch write request: [{'UnprocessedItems': {}, 'ConsumedCapacity': [{'TableName': 'BatchWriteTest', 'CapacityUnits': 25.0, 'Table': {'CapacityUnits': 25.0}}], ..]

Hoping this helps someone :)

--

--