A simple counting step following a group by key is extremely slow in a DataFlow pipelineComputing GroupBy once then passing it to multiple transformations in Google DataFlow (Python SDK)Dataflow Apache beam Python job stuck at Group by stepCost of each pipeline jobMarking a key as complete in a GroupBy | Dataflow Streaming PipelineHow do you determine how many resources to provision in a Google Dataflow streaming pipeline?Dataflow abnormality in time to complete the job and the total CPU hours with reshuflle via random keyMonitoring works or identifying bottlenecks in data pipelineGoogle Cloud Dataflow fails in combine function due to worker losing contactHot keys in dataflow/apache beam pipeline, is turning on shuffle mode the only solutionDataflow Job to put csv to bigqueryBeam/Dataflow - Big CoGroupByKey results cause slow pipelineHow do I add Dataflow memory usage to Stackdriver?

Avoiding dust scattering when you drill

Is determiner 'a' needed in "one would call such a value a constant"?

Why do Russians sometimes spell "жирный" (fatty) as "жырный"?

Is it possible for a company to grow but its stock price stays the same or decrease?

What organs or modifications would be needed to have hairy fish?

Difference between two vector layer

How to study endgames?

To what degree did the Supreme Court limit Boris Johnson's ability to prorogue?

Is there an in-universe explanation of how Frodo's arrival in Valinor was recorded in the Red Book?

How to compare integers in Tex?

Incomplete iffalse: How to shift a scope in polar coordinate?

Assembly of PCBs containing a mix of SMT and thru-hole parts?

Windows 10 deletes lots of tiny files super slowly. Anything that can be done to speed it up?

As a team leader is it appropriate to bring in fundraiser candy?

Science fiction episode about the creation of a living pegasus, even though flightless

If a spaceship ran out of fuel somewhere in space between Earth and Mars, does it slowly drift off to the Sun?

Worlds with different mathematics and logic

Impossible violin chord, how to fix this?

A word that refers to saying something in an attempt to anger or embarrass someone into doing something that they don’t want to do?

What is the climate impact of planting one tree?

Garage door sticks on a bolt

Creating specific options in `Manipulate[]`

Sci-fi movie with one survivor and an organism(?) recreating his memories

Why does `FindFit` fail so badly in this simple case?



A simple counting step following a group by key is extremely slow in a DataFlow pipeline


Computing GroupBy once then passing it to multiple transformations in Google DataFlow (Python SDK)Dataflow Apache beam Python job stuck at Group by stepCost of each pipeline jobMarking a key as complete in a GroupBy | Dataflow Streaming PipelineHow do you determine how many resources to provision in a Google Dataflow streaming pipeline?Dataflow abnormality in time to complete the job and the total CPU hours with reshuflle via random keyMonitoring works or identifying bottlenecks in data pipelineGoogle Cloud Dataflow fails in combine function due to worker losing contactHot keys in dataflow/apache beam pipeline, is turning on shuffle mode the only solutionDataflow Job to put csv to bigqueryBeam/Dataflow - Big CoGroupByKey results cause slow pipelineHow do I add Dataflow memory usage to Stackdriver?






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








2















I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.



The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.



The counting step was implemented following a corresponding step in the WordCount example:



def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1

self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)

return (key, count)


The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.



Please advise what might be the reason and how this can be optimized.



Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB


The pipeline steps including the counting one can be seen below:
enter image description here










share|improve this question





















  • 1





    GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.

    – kpax
    Apr 2 at 6:23











  • Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.

    – kpax
    Apr 5 at 3:09


















2















I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.



The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.



The counting step was implemented following a corresponding step in the WordCount example:



def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1

self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)

return (key, count)


The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.



Please advise what might be the reason and how this can be optimized.



Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB


The pipeline steps including the counting one can be seen below:
enter image description here










share|improve this question





















  • 1





    GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.

    – kpax
    Apr 2 at 6:23











  • Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.

    – kpax
    Apr 5 at 3:09














2












2








2


3






I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.



The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.



The counting step was implemented following a corresponding step in the WordCount example:



def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1

self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)

return (key, count)


The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.



Please advise what might be the reason and how this can be optimized.



Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB


The pipeline steps including the counting one can be seen below:
enter image description here










share|improve this question
















I have a DataFlow pipeline trying to build an index (key-value pairs) and compute some metrics (like a number of values per key). The input data is about 60 GB total, stored on GCS and the pipeline has about 126 workers allocated. Per Stackdriver all workers have about 6% CPU utilization.



The pipeline seems to make no progress despite having 126 workers and based on the wall time the bottleneck seems to be a simple counting step that follows a group by. While all other steps have on average less than 1 hour spent in them, the counting step took already 50 days of the wall time. There seems to be no helpful information all warnings in the log.



The counting step was implemented following a corresponding step in the WordCount example:



def count_keywords_per_product(self, key_and_group):
key, group = key_and_group
count = 0
for e in group:
count += 1

self.stats.product_counter.inc()
self.stats.keywords_per_product_dist.update(count)

return (key, count)


The preceding step "Group keywords" is a simple beam.GroupByKey() transformation.



Please advise what might be the reason and how this can be optimized.



Current resource metrics:
Current vCPUs 126
Total vCPU time 1,753.649 vCPU hr
Current memory 472.5 GB
Total memory time 6,576.186 GB hr
Current PD 3.08 TB
Total PD time 43,841.241 GB hr
Current SSD PD 0 B
Total SSD PD time 0 GB hr
Total Shuffle data processed 1.03 TB
Billable Shuffle data processed 529.1 GB


The pipeline steps including the counting one can be seen below:
enter image description here







google-cloud-dataflow apache-beam






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 29 at 17:43







kpax

















asked Mar 28 at 15:22









kpaxkpax

1461 gold badge3 silver badges11 bronze badges




1461 gold badge3 silver badges11 bronze badges










  • 1





    GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.

    – kpax
    Apr 2 at 6:23











  • Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.

    – kpax
    Apr 5 at 3:09













  • 1





    GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.

    – kpax
    Apr 2 at 6:23











  • Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.

    – kpax
    Apr 5 at 3:09








1




1





GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.

– kpax
Apr 2 at 6:23





GCP support told me that re-iteration of grouping results are not cached in python. This might cause issues similar to the reported problem. As workaround users could use java sdk, or update their user code to reduce re-iteration. In my case, the only form of re-iteration that I have is that the PCollection containing GroupBy results gets written to a file and in a parallel branch used to compute the counters.

– kpax
Apr 2 at 6:23













Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.

– kpax
Apr 5 at 3:09






Here is a solution that worked for me stackoverflow.com/a/55527648/3745936. TL;DR; do not attempt to reuse GroupBy output in Python SDK for Apache Beam. It is either a bug or behavior that should be documented and prevented by the SDK.

– kpax
Apr 5 at 3:09













1 Answer
1






active

oldest

votes


















1
















The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.



Try replacing your GroupByKey + ParDo with a beam.combiners.Count.PerKey, or a similar combine transform that suits your use case.






share|improve this answer

























  • Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?

    – kpax
    Mar 29 at 4:24












  • Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.

    – kpax
    Mar 29 at 4:48











  • This issue might also be related stackoverflow.com/questions/55479129

    – kpax
    Apr 2 at 16:54











  • Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.

    – kpax
    Apr 3 at 23:19











  • Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.

    – kpax
    Apr 4 at 4:21













Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/4.0/"u003ecc by-sa 4.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);














draft saved

draft discarded
















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55401268%2fa-simple-counting-step-following-a-group-by-key-is-extremely-slow-in-a-dataflow%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1
















The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.



Try replacing your GroupByKey + ParDo with a beam.combiners.Count.PerKey, or a similar combine transform that suits your use case.






share|improve this answer

























  • Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?

    – kpax
    Mar 29 at 4:24












  • Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.

    – kpax
    Mar 29 at 4:48











  • This issue might also be related stackoverflow.com/questions/55479129

    – kpax
    Apr 2 at 16:54











  • Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.

    – kpax
    Apr 3 at 23:19











  • Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.

    – kpax
    Apr 4 at 4:21















1
















The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.



Try replacing your GroupByKey + ParDo with a beam.combiners.Count.PerKey, or a similar combine transform that suits your use case.






share|improve this answer

























  • Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?

    – kpax
    Mar 29 at 4:24












  • Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.

    – kpax
    Mar 29 at 4:48











  • This issue might also be related stackoverflow.com/questions/55479129

    – kpax
    Apr 2 at 16:54











  • Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.

    – kpax
    Apr 3 at 23:19











  • Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.

    – kpax
    Apr 4 at 4:21













1














1










1









The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.



Try replacing your GroupByKey + ParDo with a beam.combiners.Count.PerKey, or a similar combine transform that suits your use case.






share|improve this answer













The best way of having a sum per key here is to use a combine operation. The reason is that it can alleviate the problem of having hot keys.



Try replacing your GroupByKey + ParDo with a beam.combiners.Count.PerKey, or a similar combine transform that suits your use case.







share|improve this answer












share|improve this answer



share|improve this answer










answered Mar 28 at 23:11









PabloPablo

4,33921 silver badges39 bronze badges




4,33921 silver badges39 bronze badges















  • Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?

    – kpax
    Mar 29 at 4:24












  • Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.

    – kpax
    Mar 29 at 4:48











  • This issue might also be related stackoverflow.com/questions/55479129

    – kpax
    Apr 2 at 16:54











  • Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.

    – kpax
    Apr 3 at 23:19











  • Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.

    – kpax
    Apr 4 at 4:21

















  • Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?

    – kpax
    Mar 29 at 4:24












  • Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.

    – kpax
    Mar 29 at 4:48











  • This issue might also be related stackoverflow.com/questions/55479129

    – kpax
    Apr 2 at 16:54











  • Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.

    – kpax
    Apr 3 at 23:19











  • Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.

    – kpax
    Apr 4 at 4:21
















Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?

– kpax
Mar 29 at 4:24






Pablo thank you for your feedback. I will definitely try it and post the result here. However, the number of items per key in my pipeline is not exceeding a few thousand. And the approach I used for counting was taken from the official WordCount example. Could you please elaborate a bit about what's going on under the hood? Shouldn't DataFlow at least issue a warning and downscale when it is running all available workers at 5% CPU utilization and make no progress. Is there a cookbook on how to properly diagnose issues like this?

– kpax
Mar 29 at 4:24














Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.

– kpax
Mar 29 at 4:48





Another question, how to output the result of GroupByKey as a collection of key-value pairs without hitting this problem? My current approach was to feed the GroupByKey collection to beam.FlatMap that does formatting of each key-value pair, then feed it to WriteToText. Because it is also a GroupbByKey + ParDo, I assume it can be vulnerable to the same issue.

– kpax
Mar 29 at 4:48













This issue might also be related stackoverflow.com/questions/55479129

– kpax
Apr 2 at 16:54





This issue might also be related stackoverflow.com/questions/55479129

– kpax
Apr 2 at 16:54













Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.

– kpax
Apr 3 at 23:19





Adding a combiner (result_of_group_by_key | CombineValues(CountCombineFn())) didn't help. Still observing the same symptoms. The counting step appears as "Part running", makes no progress and all workers stay at 5% utilization. The preceding GroupBy step (it's output is also used by other transformation) was still running but based on the wall time it looks like it is using very few VMs, which makes me think is the root cause of the problem... I will try to insert a reshuffle() before the GroupBy to see if that helps but I thought I am not supposed to inject such "optimizations" myself.

– kpax
Apr 3 at 23:19













Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.

– kpax
Apr 4 at 4:21





Reshuffle didn't change anything. GroupBy is still "part running" with 5% worker CPU utilization.

– kpax
Apr 4 at 4:21




















draft saved

draft discarded















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55401268%2fa-simple-counting-step-following-a-group-by-key-is-extremely-slow-in-a-dataflow%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript