A simple counting step following a group by key is extremely slow in a DataFlow pipelineComputing GroupBy once then passing it to multiple transformations in Google DataFlow (Python SDK)Dataflow Apache beam Python job stuck at Group by stepCost of each pipeline jobMarking a key as complete in a GroupBy | Dataflow Streaming PipelineHow do you determine how many resources to provision in a Google Dataflow streaming pipeline?Dataflow abnormality in time to complete the job and the total CPU hours with reshuflle via random keyMonitoring works or identifying bottlenecks in data pipelineGoogle Cloud Dataflow fails in combine function due to worker losing contactHot keys in dataflow/apache beam pipeline, is turning on shuffle mode the only solutionDataflow Job to put csv to bigqueryBeam/Dataflow - Big CoGroupByKey results cause slow pipelineHow do I add Dataflow memory usage to Stackdriver?
Avoiding dust scattering when you drill
Is determiner 'a' needed in "one would call such a value a constant"?
Why do Russians sometimes spell "жирный" (fatty) as "жырный"?
Is it possible for a company to grow but its stock price stays the same or decrease?
What organs or modifications would be needed to have hairy fish?
Difference between two vector layer
How to study endgames?
To what degree did the Supreme Court limit Boris Johnson's ability to prorogue?
Is there an in-universe explanation of how Frodo's arrival in Valinor was recorded in the Red Book?
How to compare integers in Tex?
Incomplete iffalse: How to shift a scope in polar coordinate?
Assembly of PCBs containing a mix of SMT and thru-hole parts?
Windows 10 deletes lots of tiny files super slowly. Anything that can be done to speed it up?
As a team leader is it appropriate to bring in fundraiser candy?
Science fiction episode about the creation of a living pegasus, even though flightless
If a spaceship ran out of fuel somewhere in space between Earth and Mars, does it slowly drift off to the Sun?
Worlds with different mathematics and logic
Impossible violin chord, how to fix this?
A word that refers to saying something in an attempt to anger or embarrass someone into doing something that they don’t want to do?
What is the climate impact of planting one tree?
Garage door sticks on a bolt
Creating specific options in `Manipulate[]`
Sci-fi movie with one survivor and an organism(?) recreating his memories
Why does `FindFit` fail so badly in this simple case?
A simple counting step following a group by key is extremely slow in a DataFlow pipeline
Computing GroupBy once then passing it to multiple transformations in Google DataFlow (Python SDK)Dataflow Apache beam Python job stuck at Group by stepCost of each pipeline jobMarking a key as complete in a GroupBy | Dataflow Streaming PipelineHow do you determine how many resources to provision in a Google Dataflow streaming pipeline?Dataflow abnormality in time to complete the job and the total CPU hours with reshuflle via random keyMonitoring works or identifying bottlenecks in data pipelineGoogle Cloud Dataflow fails in combine function due to worker losing contactHot keys in dataflow/apache beam pipeline, is turning on shuffle mode the only solutionDataflow Job to put csv to bigqueryBeam/Dataflow - Big CoGroupByKey results cause slow pipelineHow do I add Dataflow memory usage to Stackdriver?
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.
The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.
The counting step was implemented following a corresponding step in the WordCount example:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.
Please advise what might be the reason and how this can be optimized.
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
The pipeline steps including the counting one can be seen below:
google-cloud-dataflow apache-beam
add a comment
|
I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.
The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.
The counting step was implemented following a corresponding step in the WordCount example:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.
Please advise what might be the reason and how this can be optimized.
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
The pipeline steps including the counting one can be seen below:
google-cloud-dataflow apache-beam
1
GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.
– kpax
Apr 2 at 6:23
Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.
– kpax
Apr 5 at 3:09
add a comment
|
I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.
The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.
The counting step was implemented following a corresponding step in the WordCount example:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.
Please advise what might be the reason and how this can be optimized.
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
The pipeline steps including the counting one can be seen below:
google-cloud-dataflow apache-beam
I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.
The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.
The counting step was implemented following a corresponding step in the WordCount example:
def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1
self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)
return (key, count)
The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.
Please advise what might be the reason and how this can be optimized.
Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB
The pipeline steps including the counting one can be seen below:
google-cloud-dataflow apache-beam
google-cloud-dataflow apache-beam
edited Mar 29 at 17:43
kpax
asked Mar 28 at 15:22
kpaxkpax
1461 gold badge3 silver badges11 bronze badges
1461 gold badge3 silver badges11 bronze badges
1
GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.
– kpax
Apr 2 at 6:23
Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.
– kpax
Apr 5 at 3:09
add a comment
|
1
GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.
– kpax
Apr 2 at 6:23
Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.
– kpax
Apr 5 at 3:09
1
1
GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.
– kpax
Apr 2 at 6:23
GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.
– kpax
Apr 2 at 6:23
Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.
– kpax
Apr 5 at 3:09
Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.
– kpax
Apr 5 at 3:09
add a comment
|
1 Answer
1
active
oldest
votes
The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.
Try replacing your GroupByKey + ParDo
with a beam.combiners.Count.PerKey
, or a similar combine transform that suits your use case.
Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?
– kpax
Mar 29 at 4:24
Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.
– kpax
Mar 29 at 4:48
This issue might also be related stackoverflow.com/questions/55479129
– kpax
Apr 2 at 16:54
Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.
– kpax
Apr 3 at 23:19
Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.
– kpax
Apr 4 at 4:21
|
show 4 more comments
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/4.0/"u003ecc by-sa 4.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55401268%2fa-simple-counting-step-following-a-group-by-key-is-extremely-slow-in-a-dataflow%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.
Try replacing your GroupByKey + ParDo
with a beam.combiners.Count.PerKey
, or a similar combine transform that suits your use case.
Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?
– kpax
Mar 29 at 4:24
Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.
– kpax
Mar 29 at 4:48
This issue might also be related stackoverflow.com/questions/55479129
– kpax
Apr 2 at 16:54
Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.
– kpax
Apr 3 at 23:19
Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.
– kpax
Apr 4 at 4:21
|
show 4 more comments
The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.
Try replacing your GroupByKey + ParDo
with a beam.combiners.Count.PerKey
, or a similar combine transform that suits your use case.
Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?
– kpax
Mar 29 at 4:24
Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.
– kpax
Mar 29 at 4:48
This issue might also be related stackoverflow.com/questions/55479129
– kpax
Apr 2 at 16:54
Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.
– kpax
Apr 3 at 23:19
Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.
– kpax
Apr 4 at 4:21
|
show 4 more comments
The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.
Try replacing your GroupByKey + ParDo
with a beam.combiners.Count.PerKey
, or a similar combine transform that suits your use case.
The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.
Try replacing your GroupByKey + ParDo
with a beam.combiners.Count.PerKey
, or a similar combine transform that suits your use case.
answered Mar 28 at 23:11
PabloPablo
4,33921 silver badges39 bronze badges
4,33921 silver badges39 bronze badges
Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?
– kpax
Mar 29 at 4:24
Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.
– kpax
Mar 29 at 4:48
This issue might also be related stackoverflow.com/questions/55479129
– kpax
Apr 2 at 16:54
Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.
– kpax
Apr 3 at 23:19
Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.
– kpax
Apr 4 at 4:21
|
show 4 more comments
Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?
– kpax
Mar 29 at 4:24
Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.
– kpax
Mar 29 at 4:48
This issue might also be related stackoverflow.com/questions/55479129
– kpax
Apr 2 at 16:54
Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.
– kpax
Apr 3 at 23:19
Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.
– kpax
Apr 4 at 4:21
Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?
– kpax
Mar 29 at 4:24
Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?
– kpax
Mar 29 at 4:24
Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.
– kpax
Mar 29 at 4:48
Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.
– kpax
Mar 29 at 4:48
This issue might also be related stackoverflow.com/questions/55479129
– kpax
Apr 2 at 16:54
This issue might also be related stackoverflow.com/questions/55479129
– kpax
Apr 2 at 16:54
Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.
– kpax
Apr 3 at 23:19
Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.
– kpax
Apr 3 at 23:19
Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.
– kpax
Apr 4 at 4:21
Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.
– kpax
Apr 4 at 4:21
|
show 4 more comments
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55401268%2fa-simple-counting-step-following-a-group-by-key-is-extremely-slow-in-a-dataflow%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.
– kpax
Apr 2 at 6:23
Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.
– kpax
Apr 5 at 3:09