Why spark partition all the data in one executor?Apache Spark: The number of cores vs. the number of executorsSubmit Spark Job to Google Cloud Platformspark job cassandra errorSpark FundamentalsWhat are workers, executors, cores in Spark Standalone cluster?Spark Saving Results to HDFSSpark - Persisting RDD with Storage level memory and disk ser is not storing data to diskHow to load extra spark properties using --properties-file option in spark yarn cluster mode?Block missing exception while processing the data from hdfs in spark standalone clusterSpark performance: local faster than cluster (very uneven executor load)

How serious is plagiarism in a master’s thesis?

How did Einstein know the speed of light was constant?

Will a free neutron radiate if it is de-accelerated?

Isn't "Dave's protocol" good if only the database, and not the code, is leaked?

Has there ever been a cold war other than between the U.S. and the U.S.S.R.?

Motorcyle Chain needs to be cleaned every time you lube it?

What happens if the limit of 4 billion files was exceeded in an ext4 partition?

Why is the saxophone not common in classical repertoire?

Does Evolution Sage proliferate Blast Zone when played?

Is it possible that Curiosity measured its own methane or failed doing the spectrometry?

Does the Milky Way orbit around anything?

Speeding up thousands of string parses

Is it bad to suddenly introduce another element to your fantasy world a good ways into the story?

Did Snape really give Umbridge a fake Veritaserum potion that Harry later pretended to drink?

How can solar sailed ships be protected from space debris?

What is the fundamental difference between catching whales and hunting other animals?

Sleepy tired vs physically tired

Implementing absolute value function in c

Did Stalin kill all Soviet officers involved in the Winter War?

Can you use a weapon affected by Heat Metal each turn if you drop it in between?

CPA filed late returns, stating I would get money; IRS says they were filed too late

How can I define a very large matrix efficiently?

How can I effectively map a multi-level dungeon?

Are there advantages in writing by hand over typing out a story?



Why spark partition all the data in one executor?


Apache Spark: The number of cores vs. the number of executorsSubmit Spark Job to Google Cloud Platformspark job cassandra errorSpark FundamentalsWhat are workers, executors, cores in Spark Standalone cluster?Spark Saving Results to HDFSSpark - Persisting RDD with Storage level memory and disk ser is not storing data to diskHow to load extra spark properties using --properties-file option in spark yarn cluster mode?Block missing exception while processing the data from hdfs in spark standalone clusterSpark performance: local faster than cluster (very uneven executor load)






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








1















I am working with Spark GraphX. I am building a graph from a file (around 620 mb, 50K vertices and almost 50 millions of edges). I am using a spark cluster with: 4 workers, each one with 8 cores and 13.4g of ram, 1 driver with the same specs. When I submit my .jar to the cluster, randomly one of the workers loads all the data on it. All the task needed for the computing are requested to that worker. While the computing the remaining three are without doing nothing. I have try everything and i do not found nothing that can force to compute in all of the workers.



When Spark build the graph and I look for the number of partitions of the RDD of vertices say 5, but if I repartition that RDD for example with 32 (number of cores in total) Spark load the data in every worker but gets slow the computation.



Im launching the spark submit by this way:



spark-submit --master spark://172.30.200.20:7077 --driver-memory 12g --executor-memory 12g --class interscore.InterScore /root/interscore/interscore.jar hdfs://172.30.200.20:9000/user/hadoop/interscore/network.dat hdfs://172.30.200.20:9000/user/hadoop/interscore/community.dat 111


The code is here:



object InterScore extends App
val sparkConf = new SparkConf().setAppName("Big-InterScore")
val sc = new SparkContext(sparkConf)

val t0 = System.currentTimeMillis
runInterScore(args(0), args(1), args(2))
println("Running time " + (System.currentTimeMillis - t0).toDouble / 1000)

sc.stop()

def runInterScore(netPath:String, communitiesPath:String, outputPath:String) =
val communities = sc.textFile(communitiesPath).map(x =>
val a = x.split('t')
(a(0).toLong, a(1).toInt)
).cache

val graph = GraphLoader.edgeListFile(sc, netPath, true)
.partitionBy(PartitionStrategy.RandomVertexCut)
.groupEdges(_ + _)
.joinVertices(communities)((_, _, c) => c)
.cache

val lvalues = graph.aggregateMessages[Double](
m =>
m.sendToDst(if (m.srcAttr != m.dstAttr) 1 else 0)
m.sendToSrc(if (m.srcAttr != m.dstAttr) 1 else 0)
, _ + _)

val communitiesIndices = communities.map(x => x._2).distinct.collect
val verticesWithLValue = graph.vertices.repartition(32).join(lvalues).cache
println("K = " + communitiesIndices.size)
graph.unpersist()
graph.vertices.unpersist()
communitiesIndices.foreach(c =>
//COMPUTE c

)

}









share|improve this question






















  • Hello @jmachin could you provide your DAG from Spark UI and the execution plan from df.explain()?

    – Alexandros Biratsis
    Mar 28 at 7:31

















1















I am working with Spark GraphX. I am building a graph from a file (around 620 mb, 50K vertices and almost 50 millions of edges). I am using a spark cluster with: 4 workers, each one with 8 cores and 13.4g of ram, 1 driver with the same specs. When I submit my .jar to the cluster, randomly one of the workers loads all the data on it. All the task needed for the computing are requested to that worker. While the computing the remaining three are without doing nothing. I have try everything and i do not found nothing that can force to compute in all of the workers.



When Spark build the graph and I look for the number of partitions of the RDD of vertices say 5, but if I repartition that RDD for example with 32 (number of cores in total) Spark load the data in every worker but gets slow the computation.



Im launching the spark submit by this way:



spark-submit --master spark://172.30.200.20:7077 --driver-memory 12g --executor-memory 12g --class interscore.InterScore /root/interscore/interscore.jar hdfs://172.30.200.20:9000/user/hadoop/interscore/network.dat hdfs://172.30.200.20:9000/user/hadoop/interscore/community.dat 111


The code is here:



object InterScore extends App
val sparkConf = new SparkConf().setAppName("Big-InterScore")
val sc = new SparkContext(sparkConf)

val t0 = System.currentTimeMillis
runInterScore(args(0), args(1), args(2))
println("Running time " + (System.currentTimeMillis - t0).toDouble / 1000)

sc.stop()

def runInterScore(netPath:String, communitiesPath:String, outputPath:String) =
val communities = sc.textFile(communitiesPath).map(x =>
val a = x.split('t')
(a(0).toLong, a(1).toInt)
).cache

val graph = GraphLoader.edgeListFile(sc, netPath, true)
.partitionBy(PartitionStrategy.RandomVertexCut)
.groupEdges(_ + _)
.joinVertices(communities)((_, _, c) => c)
.cache

val lvalues = graph.aggregateMessages[Double](
m =>
m.sendToDst(if (m.srcAttr != m.dstAttr) 1 else 0)
m.sendToSrc(if (m.srcAttr != m.dstAttr) 1 else 0)
, _ + _)

val communitiesIndices = communities.map(x => x._2).distinct.collect
val verticesWithLValue = graph.vertices.repartition(32).join(lvalues).cache
println("K = " + communitiesIndices.size)
graph.unpersist()
graph.vertices.unpersist()
communitiesIndices.foreach(c =>
//COMPUTE c

)

}









share|improve this question






















  • Hello @jmachin could you provide your DAG from Spark UI and the execution plan from df.explain()?

    – Alexandros Biratsis
    Mar 28 at 7:31













1












1








1


2






I am working with Spark GraphX. I am building a graph from a file (around 620 mb, 50K vertices and almost 50 millions of edges). I am using a spark cluster with: 4 workers, each one with 8 cores and 13.4g of ram, 1 driver with the same specs. When I submit my .jar to the cluster, randomly one of the workers loads all the data on it. All the task needed for the computing are requested to that worker. While the computing the remaining three are without doing nothing. I have try everything and i do not found nothing that can force to compute in all of the workers.



When Spark build the graph and I look for the number of partitions of the RDD of vertices say 5, but if I repartition that RDD for example with 32 (number of cores in total) Spark load the data in every worker but gets slow the computation.



Im launching the spark submit by this way:



spark-submit --master spark://172.30.200.20:7077 --driver-memory 12g --executor-memory 12g --class interscore.InterScore /root/interscore/interscore.jar hdfs://172.30.200.20:9000/user/hadoop/interscore/network.dat hdfs://172.30.200.20:9000/user/hadoop/interscore/community.dat 111


The code is here:



object InterScore extends App
val sparkConf = new SparkConf().setAppName("Big-InterScore")
val sc = new SparkContext(sparkConf)

val t0 = System.currentTimeMillis
runInterScore(args(0), args(1), args(2))
println("Running time " + (System.currentTimeMillis - t0).toDouble / 1000)

sc.stop()

def runInterScore(netPath:String, communitiesPath:String, outputPath:String) =
val communities = sc.textFile(communitiesPath).map(x =>
val a = x.split('t')
(a(0).toLong, a(1).toInt)
).cache

val graph = GraphLoader.edgeListFile(sc, netPath, true)
.partitionBy(PartitionStrategy.RandomVertexCut)
.groupEdges(_ + _)
.joinVertices(communities)((_, _, c) => c)
.cache

val lvalues = graph.aggregateMessages[Double](
m =>
m.sendToDst(if (m.srcAttr != m.dstAttr) 1 else 0)
m.sendToSrc(if (m.srcAttr != m.dstAttr) 1 else 0)
, _ + _)

val communitiesIndices = communities.map(x => x._2).distinct.collect
val verticesWithLValue = graph.vertices.repartition(32).join(lvalues).cache
println("K = " + communitiesIndices.size)
graph.unpersist()
graph.vertices.unpersist()
communitiesIndices.foreach(c =>
//COMPUTE c

)

}









share|improve this question














I am working with Spark GraphX. I am building a graph from a file (around 620 mb, 50K vertices and almost 50 millions of edges). I am using a spark cluster with: 4 workers, each one with 8 cores and 13.4g of ram, 1 driver with the same specs. When I submit my .jar to the cluster, randomly one of the workers loads all the data on it. All the task needed for the computing are requested to that worker. While the computing the remaining three are without doing nothing. I have try everything and i do not found nothing that can force to compute in all of the workers.



When Spark build the graph and I look for the number of partitions of the RDD of vertices say 5, but if I repartition that RDD for example with 32 (number of cores in total) Spark load the data in every worker but gets slow the computation.



Im launching the spark submit by this way:



spark-submit --master spark://172.30.200.20:7077 --driver-memory 12g --executor-memory 12g --class interscore.InterScore /root/interscore/interscore.jar hdfs://172.30.200.20:9000/user/hadoop/interscore/network.dat hdfs://172.30.200.20:9000/user/hadoop/interscore/community.dat 111


The code is here:



object InterScore extends App
val sparkConf = new SparkConf().setAppName("Big-InterScore")
val sc = new SparkContext(sparkConf)

val t0 = System.currentTimeMillis
runInterScore(args(0), args(1), args(2))
println("Running time " + (System.currentTimeMillis - t0).toDouble / 1000)

sc.stop()

def runInterScore(netPath:String, communitiesPath:String, outputPath:String) =
val communities = sc.textFile(communitiesPath).map(x =>
val a = x.split('t')
(a(0).toLong, a(1).toInt)
).cache

val graph = GraphLoader.edgeListFile(sc, netPath, true)
.partitionBy(PartitionStrategy.RandomVertexCut)
.groupEdges(_ + _)
.joinVertices(communities)((_, _, c) => c)
.cache

val lvalues = graph.aggregateMessages[Double](
m =>
m.sendToDst(if (m.srcAttr != m.dstAttr) 1 else 0)
m.sendToSrc(if (m.srcAttr != m.dstAttr) 1 else 0)
, _ + _)

val communitiesIndices = communities.map(x => x._2).distinct.collect
val verticesWithLValue = graph.vertices.repartition(32).join(lvalues).cache
println("K = " + communitiesIndices.size)
graph.unpersist()
graph.vertices.unpersist()
communitiesIndices.foreach(c =>
//COMPUTE c

)

}






apache-spark spark-graphx






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Mar 25 at 19:10









jmachinjmachin

62 bronze badges




62 bronze badges












  • Hello @jmachin could you provide your DAG from Spark UI and the execution plan from df.explain()?

    – Alexandros Biratsis
    Mar 28 at 7:31

















  • Hello @jmachin could you provide your DAG from Spark UI and the execution plan from df.explain()?

    – Alexandros Biratsis
    Mar 28 at 7:31
















Hello @jmachin could you provide your DAG from Spark UI and the execution plan from df.explain()?

– Alexandros Biratsis
Mar 28 at 7:31





Hello @jmachin could you provide your DAG from Spark UI and the execution plan from df.explain()?

– Alexandros Biratsis
Mar 28 at 7:31












0






active

oldest

votes










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55344909%2fwhy-spark-partition-all-the-data-in-one-executor%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes




Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.







Is this question similar to what you get asked at work? Learn more about asking and sharing private information with your coworkers using Stack Overflow for Teams.



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55344909%2fwhy-spark-partition-all-the-data-in-one-executor%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript