Getting Http Response from boto3 table.batch_writer objectPython: replacing a function within a class of a moduleWhat does the batch._client.describe_endpoints function do?Why do people use Heroku when AWS is present? What distinguishes Heroku from AWS?How to save S3 object to a file using boto3Open S3 object as a string with Boto3What is the difference between the AWS boto and boto3Boto3, python and how to handle errorsBoto3: get credentials dynamically?Complete scan of dynamoDb with boto3Getting Dict response - Boto3Get largest value of primary key from DynamoDB table using boto3Unable to delete item from DynamoDB StringSet with boto3
Has anyone in space seen or photographed a simple laser pointer from Earth?
Why would non-kinetic weapons be used for orbital bombardment?
Optimization terminology: "Exact" v. "Approximate"
How were Martello towers supposed to work?
Which star / galaxy is moving away from us the fastest?
Is there a strong legal guarantee that the U.S. can give to another country that it won't attack them?
Single word for "refusing to move to next activity unless present one is completed."
What's the point of having a RAID 1 configuration over incremental backups to a secondary drive?
What were the main German words for a prostitute before 1800?
What is a "shilicashe?"
Indesign - how to change the style of the page numbers?
Why did Harry Potter get a bedroom?
What does the phrase "head down the rat's hole" mean here?
How to befriend private nested class
LED glows slightly during soldering
How can I get a player to accept that they should stop trying to pull stunts without thinking them through first?
Integer Lists of Noah
Why isn't pressure filtration popular compared to vacuum filtration?
Is there any reason why MCU changed the Snap to Blip
Fast validation of time windows in a routing problem
How do you move up one folder in Finder?
If your plane is out-of-control, why does military training instruct releasing the joystick to neutralize controls?
This one's for Matthew:
Is it possible to create a craft with specific bones, like the bones of a forgotten beast?
Getting Http Response from boto3 table.batch_writer object
Python: replacing a function within a class of a moduleWhat does the batch._client.describe_endpoints function do?Why do people use Heroku when AWS is present? What distinguishes Heroku from AWS?How to save S3 object to a file using boto3Open S3 object as a string with Boto3What is the difference between the AWS boto and boto3Boto3, python and how to handle errorsBoto3: get credentials dynamically?Complete scan of dynamoDb with boto3Getting Dict response - Boto3Get largest value of primary key from DynamoDB table using boto3Unable to delete item from DynamoDB StringSet with boto3
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
There is a list of data in a csv that I want to put into a dynamodb table on aws. See sample list below.
Mary,F,7065
Anna,F,2604
Emma,F,2003
Elizabeth,F,1939
Minnie,F,1746
Margaret,F,1578
Ida,F,1472
Alice,F,1414
Bertha,F,1320
Sarah,F,1288
Annie,F,1258
Clara,F,1226
Ella,F,1156
Florence,F,1063
Cora,F,1045
Martha,F,1040
Laura,F,1012
Nellie,F,995
Grace,F,982
Carrie,F,949
Maude,F,858
Mabel,F,808
Bessie,F,796
Jennie,F,793
Gertrude,F,787
Julia,F,783
Hattie,F,769
Edith,F,768
Mattie,F,704
Rose,F,700
Catherine,F,688
Lillian,F,672
Ada,F,652
Lillie,F,647
Helen,F,636
Jessie,F,635
Louise,F,635
Ethel,F,633
Lula,F,621
Myrtle,F,615
Eva,F,614
Frances,F,605
Lena,F,603
Lucy,F,590
Edna,F,588
Maggie,F,582
Pearl,F,569
Daisy,F,564
Fannie,F,560
Josephine,F,544
In order to write more than 25 items to a dynamodb table, the documents use a batch_writer object.
resource = boto3.resource('dynamodb')
table = resource.Table('Names')
with table.batch_writer() as batch:
for item in items:
batch.put_item(item)
Is there a way to return an http response to indicate a successful completion of the batch_write? I know that it is asyncronous. Is there a wait or fetch or something to call?
python-3.x amazon-web-services amazon-dynamodb boto3
add a comment |
There is a list of data in a csv that I want to put into a dynamodb table on aws. See sample list below.
Mary,F,7065
Anna,F,2604
Emma,F,2003
Elizabeth,F,1939
Minnie,F,1746
Margaret,F,1578
Ida,F,1472
Alice,F,1414
Bertha,F,1320
Sarah,F,1288
Annie,F,1258
Clara,F,1226
Ella,F,1156
Florence,F,1063
Cora,F,1045
Martha,F,1040
Laura,F,1012
Nellie,F,995
Grace,F,982
Carrie,F,949
Maude,F,858
Mabel,F,808
Bessie,F,796
Jennie,F,793
Gertrude,F,787
Julia,F,783
Hattie,F,769
Edith,F,768
Mattie,F,704
Rose,F,700
Catherine,F,688
Lillian,F,672
Ada,F,652
Lillie,F,647
Helen,F,636
Jessie,F,635
Louise,F,635
Ethel,F,633
Lula,F,621
Myrtle,F,615
Eva,F,614
Frances,F,605
Lena,F,603
Lucy,F,590
Edna,F,588
Maggie,F,582
Pearl,F,569
Daisy,F,564
Fannie,F,560
Josephine,F,544
In order to write more than 25 items to a dynamodb table, the documents use a batch_writer object.
resource = boto3.resource('dynamodb')
table = resource.Table('Names')
with table.batch_writer() as batch:
for item in items:
batch.put_item(item)
Is there a way to return an http response to indicate a successful completion of the batch_write? I know that it is asyncronous. Is there a wait or fetch or something to call?
python-3.x amazon-web-services amazon-dynamodb boto3
add a comment |
There is a list of data in a csv that I want to put into a dynamodb table on aws. See sample list below.
Mary,F,7065
Anna,F,2604
Emma,F,2003
Elizabeth,F,1939
Minnie,F,1746
Margaret,F,1578
Ida,F,1472
Alice,F,1414
Bertha,F,1320
Sarah,F,1288
Annie,F,1258
Clara,F,1226
Ella,F,1156
Florence,F,1063
Cora,F,1045
Martha,F,1040
Laura,F,1012
Nellie,F,995
Grace,F,982
Carrie,F,949
Maude,F,858
Mabel,F,808
Bessie,F,796
Jennie,F,793
Gertrude,F,787
Julia,F,783
Hattie,F,769
Edith,F,768
Mattie,F,704
Rose,F,700
Catherine,F,688
Lillian,F,672
Ada,F,652
Lillie,F,647
Helen,F,636
Jessie,F,635
Louise,F,635
Ethel,F,633
Lula,F,621
Myrtle,F,615
Eva,F,614
Frances,F,605
Lena,F,603
Lucy,F,590
Edna,F,588
Maggie,F,582
Pearl,F,569
Daisy,F,564
Fannie,F,560
Josephine,F,544
In order to write more than 25 items to a dynamodb table, the documents use a batch_writer object.
resource = boto3.resource('dynamodb')
table = resource.Table('Names')
with table.batch_writer() as batch:
for item in items:
batch.put_item(item)
Is there a way to return an http response to indicate a successful completion of the batch_write? I know that it is asyncronous. Is there a wait or fetch or something to call?
python-3.x amazon-web-services amazon-dynamodb boto3
There is a list of data in a csv that I want to put into a dynamodb table on aws. See sample list below.
Mary,F,7065
Anna,F,2604
Emma,F,2003
Elizabeth,F,1939
Minnie,F,1746
Margaret,F,1578
Ida,F,1472
Alice,F,1414
Bertha,F,1320
Sarah,F,1288
Annie,F,1258
Clara,F,1226
Ella,F,1156
Florence,F,1063
Cora,F,1045
Martha,F,1040
Laura,F,1012
Nellie,F,995
Grace,F,982
Carrie,F,949
Maude,F,858
Mabel,F,808
Bessie,F,796
Jennie,F,793
Gertrude,F,787
Julia,F,783
Hattie,F,769
Edith,F,768
Mattie,F,704
Rose,F,700
Catherine,F,688
Lillian,F,672
Ada,F,652
Lillie,F,647
Helen,F,636
Jessie,F,635
Louise,F,635
Ethel,F,633
Lula,F,621
Myrtle,F,615
Eva,F,614
Frances,F,605
Lena,F,603
Lucy,F,590
Edna,F,588
Maggie,F,582
Pearl,F,569
Daisy,F,564
Fannie,F,560
Josephine,F,544
In order to write more than 25 items to a dynamodb table, the documents use a batch_writer object.
resource = boto3.resource('dynamodb')
table = resource.Table('Names')
with table.batch_writer() as batch:
for item in items:
batch.put_item(item)
Is there a way to return an http response to indicate a successful completion of the batch_write? I know that it is asyncronous. Is there a wait or fetch or something to call?
python-3.x amazon-web-services amazon-dynamodb boto3
python-3.x amazon-web-services amazon-dynamodb boto3
asked Mar 21 at 17:50
polkapolka
7271 gold badge14 silver badges28 bronze badges
7271 gold badge14 silver badges28 bronze badges
add a comment |
add a comment |
2 Answers
2
active
oldest
votes
The documents for the BatchWriter object instantiated by batch_writer are located (<3 Open Source) here. Looking at the BatchWriter class, the _flush method generates a response, it just doesn't store it anywhere.
class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
"""
:type table_name: str
:param table_name: The name of the table. The class handles
batch writes to a single table.
:type client: ``botocore.client.Client``
:param client: A botocore client. Note this client
**must** have the dynamodb customizations applied
to it for transforming AttributeValues into the
wire protocol. What this means in practice is that
you need to use a client that comes from a DynamoDB
resource if you're going to instantiate this class
directly, i.e
``boto3.resource('dynamodb').Table('foo').meta.client``.
:type flush_amount: int
:param flush_amount: The number of items to keep in
a local buffer before sending a batch_write_item
request to DynamoDB.
:type overwrite_by_pkeys: list(string)
:param overwrite_by_pkeys: De-duplicate request items in buffer
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``
"""
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
def put_item(self, Item):
self._add_request_and_process('PutRequest': 'Item': Item)
def delete_item(self, Key):
self._add_request_and_process('DeleteRequest': 'Key': Key)
def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()
def _remove_dup_pkeys_request_if_any(self, request):
pkey_values_new = self._extract_pkey_values(request)
for item in self._items_buffer:
if self._extract_pkey_values(item) == pkey_values_new:
self._items_buffer.remove(item)
logger.debug("With overwrite_by_pkeys enabled, skipping "
"request:%s", item)
def _extract_pkey_values(self, request):
if request.get('PutRequest'):
return [request['PutRequest']['Item'][key]
for key in self._overwrite_by_pkeys]
elif request.get('DeleteRequest'):
return [request['DeleteRequest']['Key'][key]
for key in self._overwrite_by_pkeys]
return None
def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
How I solved it:
I built on the responses to this question about overwriting class methods. They all work, but the best for my use case was to overwrite the class instance with this version of _flush.
First I built a new version of _flush.
import logging
import types
## New Flush
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
self._response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = self._response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
Then I overwrote the instance method like this.
with batch_writer() as batch:
batch._flush=types.MethodType(_flush, batch)
for item in items:
batch.put_item(Item=item)
print(batch._response)
And this generates an output like this.
'UnprocessedItems': ,
'ResponseMetadata': 'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'HTTPStatusCode': 200,
'HTTPHeaders': 'server': 'Server',
'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '23',
'connection': 'keep-alive',
'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'x-amz-crc32': '4185382645',
'RetryAttempts': 0
add a comment |
There doesn't appear to be any built-in way to do this. The _flush
method on BatchWriter
does log a debug message when it finishes a batch, though. If you just want to see what's happening, you could enable debug logging before your put_item
loop:
import logging
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG)
If you want to take some action instead you could create a custom logging.Handler
, something like this:
import logging
import sys
class CatchBatchWrites(logging.Handler):
def handle(self, record):
if record.msg.startswith('Batch write sent'):
processed, unprocessed = record.args
# do something with these numbers
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG) # still necessary
logger.addHandler(CatchBatchWrites())
Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation.
– polka
Mar 26 at 18:24
I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more.
– Nathan Vērzemnieks
Mar 27 at 4:42
Ok. Made a new question here, @nathan-vērzemnieks
– polka
Mar 27 at 5:08
I am trying to implement your solution, and I am getting this valueerror.ValueError: not enough values to unpack (expected 2, got 0)
– polka
Mar 27 at 5:26
Running the logging is supplying a record.msg ofBatch write sent %s, unprocessed: %s
.
– polka
Mar 27 at 5:45
|
show 1 more comment
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55286446%2fgetting-http-response-from-boto3-table-batch-writer-object%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
The documents for the BatchWriter object instantiated by batch_writer are located (<3 Open Source) here. Looking at the BatchWriter class, the _flush method generates a response, it just doesn't store it anywhere.
class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
"""
:type table_name: str
:param table_name: The name of the table. The class handles
batch writes to a single table.
:type client: ``botocore.client.Client``
:param client: A botocore client. Note this client
**must** have the dynamodb customizations applied
to it for transforming AttributeValues into the
wire protocol. What this means in practice is that
you need to use a client that comes from a DynamoDB
resource if you're going to instantiate this class
directly, i.e
``boto3.resource('dynamodb').Table('foo').meta.client``.
:type flush_amount: int
:param flush_amount: The number of items to keep in
a local buffer before sending a batch_write_item
request to DynamoDB.
:type overwrite_by_pkeys: list(string)
:param overwrite_by_pkeys: De-duplicate request items in buffer
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``
"""
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
def put_item(self, Item):
self._add_request_and_process('PutRequest': 'Item': Item)
def delete_item(self, Key):
self._add_request_and_process('DeleteRequest': 'Key': Key)
def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()
def _remove_dup_pkeys_request_if_any(self, request):
pkey_values_new = self._extract_pkey_values(request)
for item in self._items_buffer:
if self._extract_pkey_values(item) == pkey_values_new:
self._items_buffer.remove(item)
logger.debug("With overwrite_by_pkeys enabled, skipping "
"request:%s", item)
def _extract_pkey_values(self, request):
if request.get('PutRequest'):
return [request['PutRequest']['Item'][key]
for key in self._overwrite_by_pkeys]
elif request.get('DeleteRequest'):
return [request['DeleteRequest']['Key'][key]
for key in self._overwrite_by_pkeys]
return None
def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
How I solved it:
I built on the responses to this question about overwriting class methods. They all work, but the best for my use case was to overwrite the class instance with this version of _flush.
First I built a new version of _flush.
import logging
import types
## New Flush
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
self._response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = self._response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
Then I overwrote the instance method like this.
with batch_writer() as batch:
batch._flush=types.MethodType(_flush, batch)
for item in items:
batch.put_item(Item=item)
print(batch._response)
And this generates an output like this.
'UnprocessedItems': ,
'ResponseMetadata': 'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'HTTPStatusCode': 200,
'HTTPHeaders': 'server': 'Server',
'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '23',
'connection': 'keep-alive',
'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'x-amz-crc32': '4185382645',
'RetryAttempts': 0
add a comment |
The documents for the BatchWriter object instantiated by batch_writer are located (<3 Open Source) here. Looking at the BatchWriter class, the _flush method generates a response, it just doesn't store it anywhere.
class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
"""
:type table_name: str
:param table_name: The name of the table. The class handles
batch writes to a single table.
:type client: ``botocore.client.Client``
:param client: A botocore client. Note this client
**must** have the dynamodb customizations applied
to it for transforming AttributeValues into the
wire protocol. What this means in practice is that
you need to use a client that comes from a DynamoDB
resource if you're going to instantiate this class
directly, i.e
``boto3.resource('dynamodb').Table('foo').meta.client``.
:type flush_amount: int
:param flush_amount: The number of items to keep in
a local buffer before sending a batch_write_item
request to DynamoDB.
:type overwrite_by_pkeys: list(string)
:param overwrite_by_pkeys: De-duplicate request items in buffer
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``
"""
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
def put_item(self, Item):
self._add_request_and_process('PutRequest': 'Item': Item)
def delete_item(self, Key):
self._add_request_and_process('DeleteRequest': 'Key': Key)
def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()
def _remove_dup_pkeys_request_if_any(self, request):
pkey_values_new = self._extract_pkey_values(request)
for item in self._items_buffer:
if self._extract_pkey_values(item) == pkey_values_new:
self._items_buffer.remove(item)
logger.debug("With overwrite_by_pkeys enabled, skipping "
"request:%s", item)
def _extract_pkey_values(self, request):
if request.get('PutRequest'):
return [request['PutRequest']['Item'][key]
for key in self._overwrite_by_pkeys]
elif request.get('DeleteRequest'):
return [request['DeleteRequest']['Key'][key]
for key in self._overwrite_by_pkeys]
return None
def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
How I solved it:
I built on the responses to this question about overwriting class methods. They all work, but the best for my use case was to overwrite the class instance with this version of _flush.
First I built a new version of _flush.
import logging
import types
## New Flush
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
self._response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = self._response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
Then I overwrote the instance method like this.
with batch_writer() as batch:
batch._flush=types.MethodType(_flush, batch)
for item in items:
batch.put_item(Item=item)
print(batch._response)
And this generates an output like this.
'UnprocessedItems': ,
'ResponseMetadata': 'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'HTTPStatusCode': 200,
'HTTPHeaders': 'server': 'Server',
'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '23',
'connection': 'keep-alive',
'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'x-amz-crc32': '4185382645',
'RetryAttempts': 0
add a comment |
The documents for the BatchWriter object instantiated by batch_writer are located (<3 Open Source) here. Looking at the BatchWriter class, the _flush method generates a response, it just doesn't store it anywhere.
class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
"""
:type table_name: str
:param table_name: The name of the table. The class handles
batch writes to a single table.
:type client: ``botocore.client.Client``
:param client: A botocore client. Note this client
**must** have the dynamodb customizations applied
to it for transforming AttributeValues into the
wire protocol. What this means in practice is that
you need to use a client that comes from a DynamoDB
resource if you're going to instantiate this class
directly, i.e
``boto3.resource('dynamodb').Table('foo').meta.client``.
:type flush_amount: int
:param flush_amount: The number of items to keep in
a local buffer before sending a batch_write_item
request to DynamoDB.
:type overwrite_by_pkeys: list(string)
:param overwrite_by_pkeys: De-duplicate request items in buffer
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``
"""
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
def put_item(self, Item):
self._add_request_and_process('PutRequest': 'Item': Item)
def delete_item(self, Key):
self._add_request_and_process('DeleteRequest': 'Key': Key)
def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()
def _remove_dup_pkeys_request_if_any(self, request):
pkey_values_new = self._extract_pkey_values(request)
for item in self._items_buffer:
if self._extract_pkey_values(item) == pkey_values_new:
self._items_buffer.remove(item)
logger.debug("With overwrite_by_pkeys enabled, skipping "
"request:%s", item)
def _extract_pkey_values(self, request):
if request.get('PutRequest'):
return [request['PutRequest']['Item'][key]
for key in self._overwrite_by_pkeys]
elif request.get('DeleteRequest'):
return [request['DeleteRequest']['Key'][key]
for key in self._overwrite_by_pkeys]
return None
def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
How I solved it:
I built on the responses to this question about overwriting class methods. They all work, but the best for my use case was to overwrite the class instance with this version of _flush.
First I built a new version of _flush.
import logging
import types
## New Flush
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
self._response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = self._response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
Then I overwrote the instance method like this.
with batch_writer() as batch:
batch._flush=types.MethodType(_flush, batch)
for item in items:
batch.put_item(Item=item)
print(batch._response)
And this generates an output like this.
'UnprocessedItems': ,
'ResponseMetadata': 'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'HTTPStatusCode': 200,
'HTTPHeaders': 'server': 'Server',
'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '23',
'connection': 'keep-alive',
'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'x-amz-crc32': '4185382645',
'RetryAttempts': 0
The documents for the BatchWriter object instantiated by batch_writer are located (<3 Open Source) here. Looking at the BatchWriter class, the _flush method generates a response, it just doesn't store it anywhere.
class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
"""
:type table_name: str
:param table_name: The name of the table. The class handles
batch writes to a single table.
:type client: ``botocore.client.Client``
:param client: A botocore client. Note this client
**must** have the dynamodb customizations applied
to it for transforming AttributeValues into the
wire protocol. What this means in practice is that
you need to use a client that comes from a DynamoDB
resource if you're going to instantiate this class
directly, i.e
``boto3.resource('dynamodb').Table('foo').meta.client``.
:type flush_amount: int
:param flush_amount: The number of items to keep in
a local buffer before sending a batch_write_item
request to DynamoDB.
:type overwrite_by_pkeys: list(string)
:param overwrite_by_pkeys: De-duplicate request items in buffer
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``
"""
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
def put_item(self, Item):
self._add_request_and_process('PutRequest': 'Item': Item)
def delete_item(self, Key):
self._add_request_and_process('DeleteRequest': 'Key': Key)
def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()
def _remove_dup_pkeys_request_if_any(self, request):
pkey_values_new = self._extract_pkey_values(request)
for item in self._items_buffer:
if self._extract_pkey_values(item) == pkey_values_new:
self._items_buffer.remove(item)
logger.debug("With overwrite_by_pkeys enabled, skipping "
"request:%s", item)
def _extract_pkey_values(self, request):
if request.get('PutRequest'):
return [request['PutRequest']['Item'][key]
for key in self._overwrite_by_pkeys]
elif request.get('DeleteRequest'):
return [request['DeleteRequest']['Key'][key]
for key in self._overwrite_by_pkeys]
return None
def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
How I solved it:
I built on the responses to this question about overwriting class methods. They all work, but the best for my use case was to overwrite the class instance with this version of _flush.
First I built a new version of _flush.
import logging
import types
## New Flush
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
self._response = self._client.batch_write_item(
RequestItems=self._table_name: items_to_send)
unprocessed_items = self._response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
Then I overwrote the instance method like this.
with batch_writer() as batch:
batch._flush=types.MethodType(_flush, batch)
for item in items:
batch.put_item(Item=item)
print(batch._response)
And this generates an output like this.
'UnprocessedItems': ,
'ResponseMetadata': 'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'HTTPStatusCode': 200,
'HTTPHeaders': 'server': 'Server',
'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '23',
'connection': 'keep-alive',
'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'x-amz-crc32': '4185382645',
'RetryAttempts': 0
answered Mar 29 at 19:28
polkapolka
7271 gold badge14 silver badges28 bronze badges
7271 gold badge14 silver badges28 bronze badges
add a comment |
add a comment |
There doesn't appear to be any built-in way to do this. The _flush
method on BatchWriter
does log a debug message when it finishes a batch, though. If you just want to see what's happening, you could enable debug logging before your put_item
loop:
import logging
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG)
If you want to take some action instead you could create a custom logging.Handler
, something like this:
import logging
import sys
class CatchBatchWrites(logging.Handler):
def handle(self, record):
if record.msg.startswith('Batch write sent'):
processed, unprocessed = record.args
# do something with these numbers
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG) # still necessary
logger.addHandler(CatchBatchWrites())
Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation.
– polka
Mar 26 at 18:24
I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more.
– Nathan Vērzemnieks
Mar 27 at 4:42
Ok. Made a new question here, @nathan-vērzemnieks
– polka
Mar 27 at 5:08
I am trying to implement your solution, and I am getting this valueerror.ValueError: not enough values to unpack (expected 2, got 0)
– polka
Mar 27 at 5:26
Running the logging is supplying a record.msg ofBatch write sent %s, unprocessed: %s
.
– polka
Mar 27 at 5:45
|
show 1 more comment
There doesn't appear to be any built-in way to do this. The _flush
method on BatchWriter
does log a debug message when it finishes a batch, though. If you just want to see what's happening, you could enable debug logging before your put_item
loop:
import logging
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG)
If you want to take some action instead you could create a custom logging.Handler
, something like this:
import logging
import sys
class CatchBatchWrites(logging.Handler):
def handle(self, record):
if record.msg.startswith('Batch write sent'):
processed, unprocessed = record.args
# do something with these numbers
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG) # still necessary
logger.addHandler(CatchBatchWrites())
Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation.
– polka
Mar 26 at 18:24
I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more.
– Nathan Vērzemnieks
Mar 27 at 4:42
Ok. Made a new question here, @nathan-vērzemnieks
– polka
Mar 27 at 5:08
I am trying to implement your solution, and I am getting this valueerror.ValueError: not enough values to unpack (expected 2, got 0)
– polka
Mar 27 at 5:26
Running the logging is supplying a record.msg ofBatch write sent %s, unprocessed: %s
.
– polka
Mar 27 at 5:45
|
show 1 more comment
There doesn't appear to be any built-in way to do this. The _flush
method on BatchWriter
does log a debug message when it finishes a batch, though. If you just want to see what's happening, you could enable debug logging before your put_item
loop:
import logging
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG)
If you want to take some action instead you could create a custom logging.Handler
, something like this:
import logging
import sys
class CatchBatchWrites(logging.Handler):
def handle(self, record):
if record.msg.startswith('Batch write sent'):
processed, unprocessed = record.args
# do something with these numbers
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG) # still necessary
logger.addHandler(CatchBatchWrites())
There doesn't appear to be any built-in way to do this. The _flush
method on BatchWriter
does log a debug message when it finishes a batch, though. If you just want to see what's happening, you could enable debug logging before your put_item
loop:
import logging
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG)
If you want to take some action instead you could create a custom logging.Handler
, something like this:
import logging
import sys
class CatchBatchWrites(logging.Handler):
def handle(self, record):
if record.msg.startswith('Batch write sent'):
processed, unprocessed = record.args
# do something with these numbers
logger = logging.getLogger('boto3.dynamodb.table')
logger.setLevel(logging.DEBUG) # still necessary
logger.addHandler(CatchBatchWrites())
edited Mar 27 at 19:13
answered Mar 26 at 5:53
Nathan VērzemnieksNathan Vērzemnieks
4,6881 gold badge6 silver badges18 bronze badges
4,6881 gold badge6 silver badges18 bronze badges
Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation.
– polka
Mar 26 at 18:24
I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more.
– Nathan Vērzemnieks
Mar 27 at 4:42
Ok. Made a new question here, @nathan-vērzemnieks
– polka
Mar 27 at 5:08
I am trying to implement your solution, and I am getting this valueerror.ValueError: not enough values to unpack (expected 2, got 0)
– polka
Mar 27 at 5:26
Running the logging is supplying a record.msg ofBatch write sent %s, unprocessed: %s
.
– polka
Mar 27 at 5:45
|
show 1 more comment
Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation.
– polka
Mar 26 at 18:24
I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more.
– Nathan Vērzemnieks
Mar 27 at 4:42
Ok. Made a new question here, @nathan-vērzemnieks
– polka
Mar 27 at 5:08
I am trying to implement your solution, and I am getting this valueerror.ValueError: not enough values to unpack (expected 2, got 0)
– polka
Mar 27 at 5:26
Running the logging is supplying a record.msg ofBatch write sent %s, unprocessed: %s
.
– polka
Mar 27 at 5:45
Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation.
– polka
Mar 26 at 18:24
Let me know if this has to be a separate question, but what does the batch._client.describe_endpoints function do? It has an http response, but I don't know what it is referring to or whether it is relevant to the above situation.
– polka
Mar 26 at 18:24
I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more.
– Nathan Vērzemnieks
Mar 27 at 4:42
I don't think it's relevant, but I'm not sure what it does! Yes - I would suggest making that a separate question if you do want to know more.
– Nathan Vērzemnieks
Mar 27 at 4:42
Ok. Made a new question here, @nathan-vērzemnieks
– polka
Mar 27 at 5:08
Ok. Made a new question here, @nathan-vērzemnieks
– polka
Mar 27 at 5:08
I am trying to implement your solution, and I am getting this valueerror.
ValueError: not enough values to unpack (expected 2, got 0)
– polka
Mar 27 at 5:26
I am trying to implement your solution, and I am getting this valueerror.
ValueError: not enough values to unpack (expected 2, got 0)
– polka
Mar 27 at 5:26
Running the logging is supplying a record.msg of
Batch write sent %s, unprocessed: %s
.– polka
Mar 27 at 5:45
Running the logging is supplying a record.msg of
Batch write sent %s, unprocessed: %s
.– polka
Mar 27 at 5:45
|
show 1 more comment
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55286446%2fgetting-http-response-from-boto3-table-batch-writer-object%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown