Spark SQL slow execution with resource idleWhat's the meaning of “Locality Level”on Spark clusterCount operation resulting in more rack_local pysparkWordCount on Azure using hadoop and sparkIncrease Spark Executors on ZeppelinSpark Job Speed Relational To SQL Server Sizeslower job scheduling in spark on yarnEach spark worker only having 2 executorsSaved table to Hive metastore with .saveAsTable(), how do I reload?In Spark job, few tasks are taking very long (stuck) when using Spark SQL group bySpark 2.2 fails with more memory or workers, succeeds with very little memory and few workersMaking Yarn dynamically allocate resources for SparkSpark Executor low performance while writing a dataframe to parquet
Does the "6 seconds per round" rule apply to speaking/roleplaying during combat situations?
How to generate random points without duplication?
setting shell options in a compound command
Did the first version of Linux developed by Linus Torvalds have a GUI?
Average spam confidence
Can a user sell my software (MIT license) without modification?
What can plausibly explain many of my very long and low-tech bridges?
What is the advantage of carrying a tripod and ND-filters when you could use image stacking instead?
Turing patterns
Movie about a boy who was born old and grew young
Question about JavaScript Math.random() and basic logic
How to express a term of multiplication
What are the words for people who cause trouble believing they know better?
How do I write "Show, Don't Tell" as a person with Asperger Syndrome?
How can drunken, homicidal elves successfully conduct a wild hunt?
2.8 is missing the Carve option in the Boolean Modifier
PL/SQL function to receive a number and return its binary format
Do the English have an ancient (obsolete) verb for the action of the book opening?
How many times can you cast a card exiled by Release to the Wind?
Building a road to escape Earth's gravity by making a pyramid on Antartica
How to pass a regex when finding a directory path in bash?
How to make a setting relevant?
Should an arbiter claim draw at a K+R vs K+R endgame?
How would a aircraft visually signal in distress?
Spark SQL slow execution with resource idle
What's the meaning of “Locality Level”on Spark clusterCount operation resulting in more rack_local pysparkWordCount on Azure using hadoop and sparkIncrease Spark Executors on ZeppelinSpark Job Speed Relational To SQL Server Sizeslower job scheduling in spark on yarnEach spark worker only having 2 executorsSaved table to Hive metastore with .saveAsTable(), how do I reload?In Spark job, few tasks are taking very long (stuck) when using Spark SQL group bySpark 2.2 fails with more memory or workers, succeeds with very little memory and few workersMaking Yarn dynamically allocate resources for SparkSpark Executor low performance while writing a dataframe to parquet
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I have a Spark SQL that used to execute < 10 mins now running at 3 hours after a cluster migration and need to deep dive on what it's actually doing. I'm new to spark and please don't mind if I'm asking something unrelated.
Increased spark.executor.memory
but no luck.
Env: Azure HDInsight Spark 2.4 on Azure Storage
SQL: Read and Join some data and finally write result to a Hive metastore.
The spark.sql
script ends with below code:.write.mode("overwrite").saveAsTable("default.mikemiketable")
Application Behavior:
Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish.
Left only 1 executor alive
Not sure what's the executor doing:
From time to time, we can tell the shuffle read increased:
Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left.
Release of almost all executor
Any guidance is greatly appreciated.
apache-spark pyspark-sql hdinsight
add a comment |
I have a Spark SQL that used to execute < 10 mins now running at 3 hours after a cluster migration and need to deep dive on what it's actually doing. I'm new to spark and please don't mind if I'm asking something unrelated.
Increased spark.executor.memory
but no luck.
Env: Azure HDInsight Spark 2.4 on Azure Storage
SQL: Read and Join some data and finally write result to a Hive metastore.
The spark.sql
script ends with below code:.write.mode("overwrite").saveAsTable("default.mikemiketable")
Application Behavior:
Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish.
Left only 1 executor alive
Not sure what's the executor doing:
From time to time, we can tell the shuffle read increased:
Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left.
Release of almost all executor
Any guidance is greatly appreciated.
apache-spark pyspark-sql hdinsight
What changed after migration? Spark version?
– Igor Dvorzhak
Mar 24 at 15:15
Hi @Magnus did you find some time to try increasing the partition number?
– Alexandros Biratsis
Mar 25 at 12:09
I checked my joining tables is a 9 GB table A left outer join 10MB table B left outer join 5MB table C left outer join 1MB table D 1MB table E left outer join with table C And I have 20GB of spark.executor.memory as well and that's it. Today I re-examed everything that if I 9 GB table A left outer join 10MB table B left outer join 5MB table C (pre-left outer join with table E) left outer join 1MB table D
– MagnusTheStrong
Mar 25 at 12:35
Hi @MagnusTheStrong since you have joins, try to repartition() based on the keys used on the joins! If you use more than one key in your join use that combination. As I mentioned below, the problem in your case is caused because of the large shuffling (as reflected on Spark UI). If you want to avoid shuffling you should ensure the co-existence of the keys through repartition i.e repartition(1024, "col1", "col2", "col3"). Good luck
– Alexandros Biratsis
Mar 26 at 8:27
add a comment |
I have a Spark SQL that used to execute < 10 mins now running at 3 hours after a cluster migration and need to deep dive on what it's actually doing. I'm new to spark and please don't mind if I'm asking something unrelated.
Increased spark.executor.memory
but no luck.
Env: Azure HDInsight Spark 2.4 on Azure Storage
SQL: Read and Join some data and finally write result to a Hive metastore.
The spark.sql
script ends with below code:.write.mode("overwrite").saveAsTable("default.mikemiketable")
Application Behavior:
Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish.
Left only 1 executor alive
Not sure what's the executor doing:
From time to time, we can tell the shuffle read increased:
Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left.
Release of almost all executor
Any guidance is greatly appreciated.
apache-spark pyspark-sql hdinsight
I have a Spark SQL that used to execute < 10 mins now running at 3 hours after a cluster migration and need to deep dive on what it's actually doing. I'm new to spark and please don't mind if I'm asking something unrelated.
Increased spark.executor.memory
but no luck.
Env: Azure HDInsight Spark 2.4 on Azure Storage
SQL: Read and Join some data and finally write result to a Hive metastore.
The spark.sql
script ends with below code:.write.mode("overwrite").saveAsTable("default.mikemiketable")
Application Behavior:
Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish.
Left only 1 executor alive
Not sure what's the executor doing:
From time to time, we can tell the shuffle read increased:
Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left.
Release of almost all executor
Any guidance is greatly appreciated.
apache-spark pyspark-sql hdinsight
apache-spark pyspark-sql hdinsight
edited Mar 24 at 15:29
Igor Dvorzhak
1,7031121
1,7031121
asked Mar 24 at 1:30
MagnusTheStrongMagnusTheStrong
5519
5519
What changed after migration? Spark version?
– Igor Dvorzhak
Mar 24 at 15:15
Hi @Magnus did you find some time to try increasing the partition number?
– Alexandros Biratsis
Mar 25 at 12:09
I checked my joining tables is a 9 GB table A left outer join 10MB table B left outer join 5MB table C left outer join 1MB table D 1MB table E left outer join with table C And I have 20GB of spark.executor.memory as well and that's it. Today I re-examed everything that if I 9 GB table A left outer join 10MB table B left outer join 5MB table C (pre-left outer join with table E) left outer join 1MB table D
– MagnusTheStrong
Mar 25 at 12:35
Hi @MagnusTheStrong since you have joins, try to repartition() based on the keys used on the joins! If you use more than one key in your join use that combination. As I mentioned below, the problem in your case is caused because of the large shuffling (as reflected on Spark UI). If you want to avoid shuffling you should ensure the co-existence of the keys through repartition i.e repartition(1024, "col1", "col2", "col3"). Good luck
– Alexandros Biratsis
Mar 26 at 8:27
add a comment |
What changed after migration? Spark version?
– Igor Dvorzhak
Mar 24 at 15:15
Hi @Magnus did you find some time to try increasing the partition number?
– Alexandros Biratsis
Mar 25 at 12:09
I checked my joining tables is a 9 GB table A left outer join 10MB table B left outer join 5MB table C left outer join 1MB table D 1MB table E left outer join with table C And I have 20GB of spark.executor.memory as well and that's it. Today I re-examed everything that if I 9 GB table A left outer join 10MB table B left outer join 5MB table C (pre-left outer join with table E) left outer join 1MB table D
– MagnusTheStrong
Mar 25 at 12:35
Hi @MagnusTheStrong since you have joins, try to repartition() based on the keys used on the joins! If you use more than one key in your join use that combination. As I mentioned below, the problem in your case is caused because of the large shuffling (as reflected on Spark UI). If you want to avoid shuffling you should ensure the co-existence of the keys through repartition i.e repartition(1024, "col1", "col2", "col3"). Good luck
– Alexandros Biratsis
Mar 26 at 8:27
What changed after migration? Spark version?
– Igor Dvorzhak
Mar 24 at 15:15
What changed after migration? Spark version?
– Igor Dvorzhak
Mar 24 at 15:15
Hi @Magnus did you find some time to try increasing the partition number?
– Alexandros Biratsis
Mar 25 at 12:09
Hi @Magnus did you find some time to try increasing the partition number?
– Alexandros Biratsis
Mar 25 at 12:09
I checked my joining tables is a 9 GB table A left outer join 10MB table B left outer join 5MB table C left outer join 1MB table D 1MB table E left outer join with table C And I have 20GB of spark.executor.memory as well and that's it. Today I re-examed everything that if I 9 GB table A left outer join 10MB table B left outer join 5MB table C (pre-left outer join with table E) left outer join 1MB table D
– MagnusTheStrong
Mar 25 at 12:35
I checked my joining tables is a 9 GB table A left outer join 10MB table B left outer join 5MB table C left outer join 1MB table D 1MB table E left outer join with table C And I have 20GB of spark.executor.memory as well and that's it. Today I re-examed everything that if I 9 GB table A left outer join 10MB table B left outer join 5MB table C (pre-left outer join with table E) left outer join 1MB table D
– MagnusTheStrong
Mar 25 at 12:35
Hi @MagnusTheStrong since you have joins, try to repartition() based on the keys used on the joins! If you use more than one key in your join use that combination. As I mentioned below, the problem in your case is caused because of the large shuffling (as reflected on Spark UI). If you want to avoid shuffling you should ensure the co-existence of the keys through repartition i.e repartition(1024, "col1", "col2", "col3"). Good luck
– Alexandros Biratsis
Mar 26 at 8:27
Hi @MagnusTheStrong since you have joins, try to repartition() based on the keys used on the joins! If you use more than one key in your join use that combination. As I mentioned below, the problem in your case is caused because of the large shuffling (as reflected on Spark UI). If you want to avoid shuffling you should ensure the co-existence of the keys through repartition i.e repartition(1024, "col1", "col2", "col3"). Good luck
– Alexandros Biratsis
Mar 26 at 8:27
add a comment |
1 Answer
1
active
oldest
votes
I would like to start with some observations for your case:
- From the tasks list you can see that that Shuffle Spill (Disk) and Shuffle Spill (Memory) have both very high values. The max block size for each partition during the exchange of data should not exceed 2GB therefore you should be aware to keep the size of shuffled data as low as possible. As rule of thumb you need to remember that the size of each partition should be ~200-500MB. For instance if the total data is 100GB you need at least 250-500 partitions to keep the partition size within the mentioned limits.
- The co-existence of two previous it also means that the executor memory was not sufficient and Spark was forced to spill data to the disk.
- The duration of the tasks is too high. A normal task should lasts between 50-200ms.
- Too many killed executors is another sign which shows that you are facing OOM problems.
Locality is RACK_LOCAL which is considered one of the lowest values you can achieve within a cluster. Briefly, that means that the task is being executed in a different node than the data is stored.
As solution I would try the next few things:
- Increase the number of partitions by using
repartition()
or via Spark settings withspark.sql.shuffle.partitions
to a number that meets the requirements above i.e 1000 or more. - Change the way you store the data and introduce partitioned data i.e day/month/year using
partitionBy
Hi Alexandros, I did try below action but nothing changed: .partitionBy("a date selected") in write.table statement -> nothing changed .repartition(1000) in write.table statement -> nothing changed sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed
– MagnusTheStrong
Mar 25 at 12:19
I do find out that in this new cluster, spark.sql.join.preferSortMergeJoin => false spark.sql.autoBroadcastJoinThreshold => -1
– MagnusTheStrong
Mar 25 at 12:21
Hello @MagnusTheStrong spark.sql.autoBroadcastJoinThreshold => -1 means that broadcasting is disabled by default for joins although I cant really say if it would be helpful if I dont see the code. Could you please be more specific when you say you applied the re-partition already? What did you try and how? It would be very useful if you could post some code. I am saying that because if the repartition occurs in a late stage then you have no benefit from it. So usually you must use repartition in a early stage to avoid the shuffling of data!
– Alexandros Biratsis
Mar 25 at 12:37
Also, by early I mean before join for example :)
– Alexandros Biratsis
Mar 25 at 12:48
Hello @MagnusTheStrong did you find a solution?
– Alexandros Biratsis
Mar 30 at 16:01
|
show 8 more comments
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55319965%2fspark-sql-slow-execution-with-resource-idle%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I would like to start with some observations for your case:
- From the tasks list you can see that that Shuffle Spill (Disk) and Shuffle Spill (Memory) have both very high values. The max block size for each partition during the exchange of data should not exceed 2GB therefore you should be aware to keep the size of shuffled data as low as possible. As rule of thumb you need to remember that the size of each partition should be ~200-500MB. For instance if the total data is 100GB you need at least 250-500 partitions to keep the partition size within the mentioned limits.
- The co-existence of two previous it also means that the executor memory was not sufficient and Spark was forced to spill data to the disk.
- The duration of the tasks is too high. A normal task should lasts between 50-200ms.
- Too many killed executors is another sign which shows that you are facing OOM problems.
Locality is RACK_LOCAL which is considered one of the lowest values you can achieve within a cluster. Briefly, that means that the task is being executed in a different node than the data is stored.
As solution I would try the next few things:
- Increase the number of partitions by using
repartition()
or via Spark settings withspark.sql.shuffle.partitions
to a number that meets the requirements above i.e 1000 or more. - Change the way you store the data and introduce partitioned data i.e day/month/year using
partitionBy
Hi Alexandros, I did try below action but nothing changed: .partitionBy("a date selected") in write.table statement -> nothing changed .repartition(1000) in write.table statement -> nothing changed sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed
– MagnusTheStrong
Mar 25 at 12:19
I do find out that in this new cluster, spark.sql.join.preferSortMergeJoin => false spark.sql.autoBroadcastJoinThreshold => -1
– MagnusTheStrong
Mar 25 at 12:21
Hello @MagnusTheStrong spark.sql.autoBroadcastJoinThreshold => -1 means that broadcasting is disabled by default for joins although I cant really say if it would be helpful if I dont see the code. Could you please be more specific when you say you applied the re-partition already? What did you try and how? It would be very useful if you could post some code. I am saying that because if the repartition occurs in a late stage then you have no benefit from it. So usually you must use repartition in a early stage to avoid the shuffling of data!
– Alexandros Biratsis
Mar 25 at 12:37
Also, by early I mean before join for example :)
– Alexandros Biratsis
Mar 25 at 12:48
Hello @MagnusTheStrong did you find a solution?
– Alexandros Biratsis
Mar 30 at 16:01
|
show 8 more comments
I would like to start with some observations for your case:
- From the tasks list you can see that that Shuffle Spill (Disk) and Shuffle Spill (Memory) have both very high values. The max block size for each partition during the exchange of data should not exceed 2GB therefore you should be aware to keep the size of shuffled data as low as possible. As rule of thumb you need to remember that the size of each partition should be ~200-500MB. For instance if the total data is 100GB you need at least 250-500 partitions to keep the partition size within the mentioned limits.
- The co-existence of two previous it also means that the executor memory was not sufficient and Spark was forced to spill data to the disk.
- The duration of the tasks is too high. A normal task should lasts between 50-200ms.
- Too many killed executors is another sign which shows that you are facing OOM problems.
Locality is RACK_LOCAL which is considered one of the lowest values you can achieve within a cluster. Briefly, that means that the task is being executed in a different node than the data is stored.
As solution I would try the next few things:
- Increase the number of partitions by using
repartition()
or via Spark settings withspark.sql.shuffle.partitions
to a number that meets the requirements above i.e 1000 or more. - Change the way you store the data and introduce partitioned data i.e day/month/year using
partitionBy
Hi Alexandros, I did try below action but nothing changed: .partitionBy("a date selected") in write.table statement -> nothing changed .repartition(1000) in write.table statement -> nothing changed sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed
– MagnusTheStrong
Mar 25 at 12:19
I do find out that in this new cluster, spark.sql.join.preferSortMergeJoin => false spark.sql.autoBroadcastJoinThreshold => -1
– MagnusTheStrong
Mar 25 at 12:21
Hello @MagnusTheStrong spark.sql.autoBroadcastJoinThreshold => -1 means that broadcasting is disabled by default for joins although I cant really say if it would be helpful if I dont see the code. Could you please be more specific when you say you applied the re-partition already? What did you try and how? It would be very useful if you could post some code. I am saying that because if the repartition occurs in a late stage then you have no benefit from it. So usually you must use repartition in a early stage to avoid the shuffling of data!
– Alexandros Biratsis
Mar 25 at 12:37
Also, by early I mean before join for example :)
– Alexandros Biratsis
Mar 25 at 12:48
Hello @MagnusTheStrong did you find a solution?
– Alexandros Biratsis
Mar 30 at 16:01
|
show 8 more comments
I would like to start with some observations for your case:
- From the tasks list you can see that that Shuffle Spill (Disk) and Shuffle Spill (Memory) have both very high values. The max block size for each partition during the exchange of data should not exceed 2GB therefore you should be aware to keep the size of shuffled data as low as possible. As rule of thumb you need to remember that the size of each partition should be ~200-500MB. For instance if the total data is 100GB you need at least 250-500 partitions to keep the partition size within the mentioned limits.
- The co-existence of two previous it also means that the executor memory was not sufficient and Spark was forced to spill data to the disk.
- The duration of the tasks is too high. A normal task should lasts between 50-200ms.
- Too many killed executors is another sign which shows that you are facing OOM problems.
Locality is RACK_LOCAL which is considered one of the lowest values you can achieve within a cluster. Briefly, that means that the task is being executed in a different node than the data is stored.
As solution I would try the next few things:
- Increase the number of partitions by using
repartition()
or via Spark settings withspark.sql.shuffle.partitions
to a number that meets the requirements above i.e 1000 or more. - Change the way you store the data and introduce partitioned data i.e day/month/year using
partitionBy
I would like to start with some observations for your case:
- From the tasks list you can see that that Shuffle Spill (Disk) and Shuffle Spill (Memory) have both very high values. The max block size for each partition during the exchange of data should not exceed 2GB therefore you should be aware to keep the size of shuffled data as low as possible. As rule of thumb you need to remember that the size of each partition should be ~200-500MB. For instance if the total data is 100GB you need at least 250-500 partitions to keep the partition size within the mentioned limits.
- The co-existence of two previous it also means that the executor memory was not sufficient and Spark was forced to spill data to the disk.
- The duration of the tasks is too high. A normal task should lasts between 50-200ms.
- Too many killed executors is another sign which shows that you are facing OOM problems.
Locality is RACK_LOCAL which is considered one of the lowest values you can achieve within a cluster. Briefly, that means that the task is being executed in a different node than the data is stored.
As solution I would try the next few things:
- Increase the number of partitions by using
repartition()
or via Spark settings withspark.sql.shuffle.partitions
to a number that meets the requirements above i.e 1000 or more. - Change the way you store the data and introduce partitioned data i.e day/month/year using
partitionBy
edited Apr 15 at 6:24
answered Mar 24 at 22:47
Alexandros BiratsisAlexandros Biratsis
2,01411521
2,01411521
Hi Alexandros, I did try below action but nothing changed: .partitionBy("a date selected") in write.table statement -> nothing changed .repartition(1000) in write.table statement -> nothing changed sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed
– MagnusTheStrong
Mar 25 at 12:19
I do find out that in this new cluster, spark.sql.join.preferSortMergeJoin => false spark.sql.autoBroadcastJoinThreshold => -1
– MagnusTheStrong
Mar 25 at 12:21
Hello @MagnusTheStrong spark.sql.autoBroadcastJoinThreshold => -1 means that broadcasting is disabled by default for joins although I cant really say if it would be helpful if I dont see the code. Could you please be more specific when you say you applied the re-partition already? What did you try and how? It would be very useful if you could post some code. I am saying that because if the repartition occurs in a late stage then you have no benefit from it. So usually you must use repartition in a early stage to avoid the shuffling of data!
– Alexandros Biratsis
Mar 25 at 12:37
Also, by early I mean before join for example :)
– Alexandros Biratsis
Mar 25 at 12:48
Hello @MagnusTheStrong did you find a solution?
– Alexandros Biratsis
Mar 30 at 16:01
|
show 8 more comments
Hi Alexandros, I did try below action but nothing changed: .partitionBy("a date selected") in write.table statement -> nothing changed .repartition(1000) in write.table statement -> nothing changed sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed
– MagnusTheStrong
Mar 25 at 12:19
I do find out that in this new cluster, spark.sql.join.preferSortMergeJoin => false spark.sql.autoBroadcastJoinThreshold => -1
– MagnusTheStrong
Mar 25 at 12:21
Hello @MagnusTheStrong spark.sql.autoBroadcastJoinThreshold => -1 means that broadcasting is disabled by default for joins although I cant really say if it would be helpful if I dont see the code. Could you please be more specific when you say you applied the re-partition already? What did you try and how? It would be very useful if you could post some code. I am saying that because if the repartition occurs in a late stage then you have no benefit from it. So usually you must use repartition in a early stage to avoid the shuffling of data!
– Alexandros Biratsis
Mar 25 at 12:37
Also, by early I mean before join for example :)
– Alexandros Biratsis
Mar 25 at 12:48
Hello @MagnusTheStrong did you find a solution?
– Alexandros Biratsis
Mar 30 at 16:01
Hi Alexandros, I did try below action but nothing changed: .partitionBy("a date selected") in write.table statement -> nothing changed .repartition(1000) in write.table statement -> nothing changed sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed
– MagnusTheStrong
Mar 25 at 12:19
Hi Alexandros, I did try below action but nothing changed: .partitionBy("a date selected") in write.table statement -> nothing changed .repartition(1000) in write.table statement -> nothing changed sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed
– MagnusTheStrong
Mar 25 at 12:19
I do find out that in this new cluster, spark.sql.join.preferSortMergeJoin => false spark.sql.autoBroadcastJoinThreshold => -1
– MagnusTheStrong
Mar 25 at 12:21
I do find out that in this new cluster, spark.sql.join.preferSortMergeJoin => false spark.sql.autoBroadcastJoinThreshold => -1
– MagnusTheStrong
Mar 25 at 12:21
Hello @MagnusTheStrong spark.sql.autoBroadcastJoinThreshold => -1 means that broadcasting is disabled by default for joins although I cant really say if it would be helpful if I dont see the code. Could you please be more specific when you say you applied the re-partition already? What did you try and how? It would be very useful if you could post some code. I am saying that because if the repartition occurs in a late stage then you have no benefit from it. So usually you must use repartition in a early stage to avoid the shuffling of data!
– Alexandros Biratsis
Mar 25 at 12:37
Hello @MagnusTheStrong spark.sql.autoBroadcastJoinThreshold => -1 means that broadcasting is disabled by default for joins although I cant really say if it would be helpful if I dont see the code. Could you please be more specific when you say you applied the re-partition already? What did you try and how? It would be very useful if you could post some code. I am saying that because if the repartition occurs in a late stage then you have no benefit from it. So usually you must use repartition in a early stage to avoid the shuffling of data!
– Alexandros Biratsis
Mar 25 at 12:37
Also, by early I mean before join for example :)
– Alexandros Biratsis
Mar 25 at 12:48
Also, by early I mean before join for example :)
– Alexandros Biratsis
Mar 25 at 12:48
Hello @MagnusTheStrong did you find a solution?
– Alexandros Biratsis
Mar 30 at 16:01
Hello @MagnusTheStrong did you find a solution?
– Alexandros Biratsis
Mar 30 at 16:01
|
show 8 more comments
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55319965%2fspark-sql-slow-execution-with-resource-idle%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
What changed after migration? Spark version?
– Igor Dvorzhak
Mar 24 at 15:15
Hi @Magnus did you find some time to try increasing the partition number?
– Alexandros Biratsis
Mar 25 at 12:09
I checked my joining tables is a 9 GB table A left outer join 10MB table B left outer join 5MB table C left outer join 1MB table D 1MB table E left outer join with table C And I have 20GB of spark.executor.memory as well and that's it. Today I re-examed everything that if I 9 GB table A left outer join 10MB table B left outer join 5MB table C (pre-left outer join with table E) left outer join 1MB table D
– MagnusTheStrong
Mar 25 at 12:35
Hi @MagnusTheStrong since you have joins, try to repartition() based on the keys used on the joins! If you use more than one key in your join use that combination. As I mentioned below, the problem in your case is caused because of the large shuffling (as reflected on Spark UI). If you want to avoid shuffling you should ensure the co-existence of the keys through repartition i.e repartition(1024, "col1", "col2", "col3"). Good luck
– Alexandros Biratsis
Mar 26 at 8:27