Scala: How to get the content of PortableDataStream instance from an RDDHow to print the contents of RDD?How to convert rdd object to dataframe in sparkHow to calculate the mean of each pair in an RDD consisting of (Key, [Value]) pairs in Spark?Count Null Instances in Python RDDdata parallelism in spark : reading avro data from hdfsScala - RDD[string] to RDD[vector]Apache Spark RDD : How to get latest data based on Paired RDD key and valueScala RDD OperationHow to print a value outside a for loop in scala?How to open a file in the rdd having path to this file?
Ruling for Grappling a Creature Underwater While you are on Land?
Something in the TV
Church Booleans
Randomness Testing of Cryptographic Algorithim
Is refusing to concede in the face of an unstoppable Nexus combo punishable?
How does turbine efficiency compare with internal combustion engines if all the turbine power is converted to mechanical energy?
How to create a summation symbol with a vertical bar?
Do I have to learn /o/ or /ɔ/ separately?
Why does my house heat up, even when it's cool outside?
How can I support the recycling, but not the new production of aluminum?
Potential new partner angry about first collaboration - how to answer email to close up this encounter in a graceful manner
Are required indicators necessary for radio buttons?
Why don't politicians push for fossil fuel reduction by pointing out their scarcity?
How big would a Daddy Longlegs Spider need to be to kill an average Human?
Designing a prison for a telekinetic race
The teacher logged me in as administrator for doing a short task, is the whole system now compromised?
Apply for US visa question
Was Switzerland really impossible to invade during WW2?
How can I pack my food so it doesn't smell?
Earliest evidence of objects intended for future archaeologists?
Do living authors still get paid royalties for their old work?
are there nouns that change meaning based on gender?
Add dupe check parameter to an API call - Contact Form 7 plugin
Sous vide chicken without an internal temperature of 165 °F (75 °C)
Scala: How to get the content of PortableDataStream instance from an RDD
How to print the contents of RDD?How to convert rdd object to dataframe in sparkHow to calculate the mean of each pair in an RDD consisting of (Key, [Value]) pairs in Spark?Count Null Instances in Python RDDdata parallelism in spark : reading avro data from hdfsScala - RDD[string] to RDD[vector]Apache Spark RDD : How to get latest data based on Paired RDD key and valueScala RDD OperationHow to print a value outside a for loop in scala?How to open a file in the rdd having path to this file?
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
As I want to extract data from binaryFiles I read the files using
val dataRDD = sc.binaryRecord("Path") I get the result as org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]
I want to extract the content of my files wich is under the form of PortableDataStream
For that I tried: val data = dataRDD.map(x => x._2.open()).collect()
but I get the following error:java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream
If you have an idea how can I solve my issue, please HELP!
Many Thanks in advance.
scala apache-spark rdd
add a comment |
As I want to extract data from binaryFiles I read the files using
val dataRDD = sc.binaryRecord("Path") I get the result as org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]
I want to extract the content of my files wich is under the form of PortableDataStream
For that I tried: val data = dataRDD.map(x => x._2.open()).collect()
but I get the following error:java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream
If you have an idea how can I solve my issue, please HELP!
Many Thanks in advance.
scala apache-spark rdd
add a comment |
As I want to extract data from binaryFiles I read the files using
val dataRDD = sc.binaryRecord("Path") I get the result as org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]
I want to extract the content of my files wich is under the form of PortableDataStream
For that I tried: val data = dataRDD.map(x => x._2.open()).collect()
but I get the following error:java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream
If you have an idea how can I solve my issue, please HELP!
Many Thanks in advance.
scala apache-spark rdd
As I want to extract data from binaryFiles I read the files using
val dataRDD = sc.binaryRecord("Path") I get the result as org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]
I want to extract the content of my files wich is under the form of PortableDataStream
For that I tried: val data = dataRDD.map(x => x._2.open()).collect()
but I get the following error:java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream
If you have an idea how can I solve my issue, please HELP!
Many Thanks in advance.
scala apache-spark rdd
scala apache-spark rdd
asked Mar 27 at 13:08
IrielIriel
662 silver badges11 bronze badges
662 silver badges11 bronze badges
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
Actually, the PortableDataStream is Serializable. That's what it is meant for. Yet, open() returns a simple DataInputStream (HdfsDataInputStream in your case because your file is on HDFS) which is not Serializable, hence the error you get.
In fact, when you open the PortableDataStream, you just need to read the data right away. In scala, you can use scala.io.Source.fromInputStream:
val data : RDD[Array[String]] = sc
.binaryFiles("path/.../")
.map case (fileName, pds) =>
scala.io.Source.fromInputStream(pds.open())
.getLines().toArray
This code assumes that the data is textual. If it is not, you can adapt it to read any kind of binary data. Here is an example to create a sequence of bytes, that you could process the way you want.
val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
.map case (file, pds) =>
val dis = pds.open()
val bytes = Array.ofDim[Byte](1024)
val all = scala.collection.mutable.ArrayBuffer[Byte]()
while( dis.read(bytes) != -1)
all ++= bytes
all.toSeq
See the javadoc of DataInputStream for more possibilities. For instance, it possesses readLong, readDouble (and so on) methods.
Thank you for your response. I have tried your solution, there is a '}' missing and also there should I edited.toSeq()to.toSeqotherwise it gives meerror: not enough arguments for method apply: (idx: Int)String in trait SeqLike. Although the solution givesERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
– Iriel
Mar 27 at 15:26
I had not tested all the code, sorry. I fixed my answer.
– Oli
Mar 27 at 15:33
It actually does not work withtoSeq, probably because it simply encapsulates the iterator based on the inputStream.toArrayon the contrary actually extracts the data to put it in an array. I modified it in the answer. Thanks for your feedback ;)
– Oli
Mar 27 at 15:46
Thanks @Oli for your response. The problem still existjava.nio.charset.MalformedInputException: Input length = 1
– Iriel
Mar 27 at 15:50
OK, that's different. What kind of data are you reading? This was just an example for strings. If you read binary data, it will not work and you have to adapt the code...
– Oli
Mar 27 at 15:58
|
show 6 more comments
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55378014%2fscala-how-to-get-the-content-of-portabledatastream-instance-from-an-rdd%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Actually, the PortableDataStream is Serializable. That's what it is meant for. Yet, open() returns a simple DataInputStream (HdfsDataInputStream in your case because your file is on HDFS) which is not Serializable, hence the error you get.
In fact, when you open the PortableDataStream, you just need to read the data right away. In scala, you can use scala.io.Source.fromInputStream:
val data : RDD[Array[String]] = sc
.binaryFiles("path/.../")
.map case (fileName, pds) =>
scala.io.Source.fromInputStream(pds.open())
.getLines().toArray
This code assumes that the data is textual. If it is not, you can adapt it to read any kind of binary data. Here is an example to create a sequence of bytes, that you could process the way you want.
val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
.map case (file, pds) =>
val dis = pds.open()
val bytes = Array.ofDim[Byte](1024)
val all = scala.collection.mutable.ArrayBuffer[Byte]()
while( dis.read(bytes) != -1)
all ++= bytes
all.toSeq
See the javadoc of DataInputStream for more possibilities. For instance, it possesses readLong, readDouble (and so on) methods.
Thank you for your response. I have tried your solution, there is a '}' missing and also there should I edited.toSeq()to.toSeqotherwise it gives meerror: not enough arguments for method apply: (idx: Int)String in trait SeqLike. Although the solution givesERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
– Iriel
Mar 27 at 15:26
I had not tested all the code, sorry. I fixed my answer.
– Oli
Mar 27 at 15:33
It actually does not work withtoSeq, probably because it simply encapsulates the iterator based on the inputStream.toArrayon the contrary actually extracts the data to put it in an array. I modified it in the answer. Thanks for your feedback ;)
– Oli
Mar 27 at 15:46
Thanks @Oli for your response. The problem still existjava.nio.charset.MalformedInputException: Input length = 1
– Iriel
Mar 27 at 15:50
OK, that's different. What kind of data are you reading? This was just an example for strings. If you read binary data, it will not work and you have to adapt the code...
– Oli
Mar 27 at 15:58
|
show 6 more comments
Actually, the PortableDataStream is Serializable. That's what it is meant for. Yet, open() returns a simple DataInputStream (HdfsDataInputStream in your case because your file is on HDFS) which is not Serializable, hence the error you get.
In fact, when you open the PortableDataStream, you just need to read the data right away. In scala, you can use scala.io.Source.fromInputStream:
val data : RDD[Array[String]] = sc
.binaryFiles("path/.../")
.map case (fileName, pds) =>
scala.io.Source.fromInputStream(pds.open())
.getLines().toArray
This code assumes that the data is textual. If it is not, you can adapt it to read any kind of binary data. Here is an example to create a sequence of bytes, that you could process the way you want.
val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
.map case (file, pds) =>
val dis = pds.open()
val bytes = Array.ofDim[Byte](1024)
val all = scala.collection.mutable.ArrayBuffer[Byte]()
while( dis.read(bytes) != -1)
all ++= bytes
all.toSeq
See the javadoc of DataInputStream for more possibilities. For instance, it possesses readLong, readDouble (and so on) methods.
Thank you for your response. I have tried your solution, there is a '}' missing and also there should I edited.toSeq()to.toSeqotherwise it gives meerror: not enough arguments for method apply: (idx: Int)String in trait SeqLike. Although the solution givesERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
– Iriel
Mar 27 at 15:26
I had not tested all the code, sorry. I fixed my answer.
– Oli
Mar 27 at 15:33
It actually does not work withtoSeq, probably because it simply encapsulates the iterator based on the inputStream.toArrayon the contrary actually extracts the data to put it in an array. I modified it in the answer. Thanks for your feedback ;)
– Oli
Mar 27 at 15:46
Thanks @Oli for your response. The problem still existjava.nio.charset.MalformedInputException: Input length = 1
– Iriel
Mar 27 at 15:50
OK, that's different. What kind of data are you reading? This was just an example for strings. If you read binary data, it will not work and you have to adapt the code...
– Oli
Mar 27 at 15:58
|
show 6 more comments
Actually, the PortableDataStream is Serializable. That's what it is meant for. Yet, open() returns a simple DataInputStream (HdfsDataInputStream in your case because your file is on HDFS) which is not Serializable, hence the error you get.
In fact, when you open the PortableDataStream, you just need to read the data right away. In scala, you can use scala.io.Source.fromInputStream:
val data : RDD[Array[String]] = sc
.binaryFiles("path/.../")
.map case (fileName, pds) =>
scala.io.Source.fromInputStream(pds.open())
.getLines().toArray
This code assumes that the data is textual. If it is not, you can adapt it to read any kind of binary data. Here is an example to create a sequence of bytes, that you could process the way you want.
val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
.map case (file, pds) =>
val dis = pds.open()
val bytes = Array.ofDim[Byte](1024)
val all = scala.collection.mutable.ArrayBuffer[Byte]()
while( dis.read(bytes) != -1)
all ++= bytes
all.toSeq
See the javadoc of DataInputStream for more possibilities. For instance, it possesses readLong, readDouble (and so on) methods.
Actually, the PortableDataStream is Serializable. That's what it is meant for. Yet, open() returns a simple DataInputStream (HdfsDataInputStream in your case because your file is on HDFS) which is not Serializable, hence the error you get.
In fact, when you open the PortableDataStream, you just need to read the data right away. In scala, you can use scala.io.Source.fromInputStream:
val data : RDD[Array[String]] = sc
.binaryFiles("path/.../")
.map case (fileName, pds) =>
scala.io.Source.fromInputStream(pds.open())
.getLines().toArray
This code assumes that the data is textual. If it is not, you can adapt it to read any kind of binary data. Here is an example to create a sequence of bytes, that you could process the way you want.
val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
.map case (file, pds) =>
val dis = pds.open()
val bytes = Array.ofDim[Byte](1024)
val all = scala.collection.mutable.ArrayBuffer[Byte]()
while( dis.read(bytes) != -1)
all ++= bytes
all.toSeq
See the javadoc of DataInputStream for more possibilities. For instance, it possesses readLong, readDouble (and so on) methods.
edited Mar 27 at 16:47
answered Mar 27 at 14:30
OliOli
3,3532 gold badges5 silver badges27 bronze badges
3,3532 gold badges5 silver badges27 bronze badges
Thank you for your response. I have tried your solution, there is a '}' missing and also there should I edited.toSeq()to.toSeqotherwise it gives meerror: not enough arguments for method apply: (idx: Int)String in trait SeqLike. Although the solution givesERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
– Iriel
Mar 27 at 15:26
I had not tested all the code, sorry. I fixed my answer.
– Oli
Mar 27 at 15:33
It actually does not work withtoSeq, probably because it simply encapsulates the iterator based on the inputStream.toArrayon the contrary actually extracts the data to put it in an array. I modified it in the answer. Thanks for your feedback ;)
– Oli
Mar 27 at 15:46
Thanks @Oli for your response. The problem still existjava.nio.charset.MalformedInputException: Input length = 1
– Iriel
Mar 27 at 15:50
OK, that's different. What kind of data are you reading? This was just an example for strings. If you read binary data, it will not work and you have to adapt the code...
– Oli
Mar 27 at 15:58
|
show 6 more comments
Thank you for your response. I have tried your solution, there is a '}' missing and also there should I edited.toSeq()to.toSeqotherwise it gives meerror: not enough arguments for method apply: (idx: Int)String in trait SeqLike. Although the solution givesERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
– Iriel
Mar 27 at 15:26
I had not tested all the code, sorry. I fixed my answer.
– Oli
Mar 27 at 15:33
It actually does not work withtoSeq, probably because it simply encapsulates the iterator based on the inputStream.toArrayon the contrary actually extracts the data to put it in an array. I modified it in the answer. Thanks for your feedback ;)
– Oli
Mar 27 at 15:46
Thanks @Oli for your response. The problem still existjava.nio.charset.MalformedInputException: Input length = 1
– Iriel
Mar 27 at 15:50
OK, that's different. What kind of data are you reading? This was just an example for strings. If you read binary data, it will not work and you have to adapt the code...
– Oli
Mar 27 at 15:58
Thank you for your response. I have tried your solution, there is a '}' missing and also there should I edited
.toSeq() to .toSeq otherwise it gives me error: not enough arguments for method apply: (idx: Int)String in trait SeqLike. Although the solution gives ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job– Iriel
Mar 27 at 15:26
Thank you for your response. I have tried your solution, there is a '}' missing and also there should I edited
.toSeq() to .toSeq otherwise it gives me error: not enough arguments for method apply: (idx: Int)String in trait SeqLike. Although the solution gives ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job– Iriel
Mar 27 at 15:26
I had not tested all the code, sorry. I fixed my answer.
– Oli
Mar 27 at 15:33
I had not tested all the code, sorry. I fixed my answer.
– Oli
Mar 27 at 15:33
It actually does not work with
toSeq, probably because it simply encapsulates the iterator based on the inputStream. toArray on the contrary actually extracts the data to put it in an array. I modified it in the answer. Thanks for your feedback ;)– Oli
Mar 27 at 15:46
It actually does not work with
toSeq, probably because it simply encapsulates the iterator based on the inputStream. toArray on the contrary actually extracts the data to put it in an array. I modified it in the answer. Thanks for your feedback ;)– Oli
Mar 27 at 15:46
Thanks @Oli for your response. The problem still exist
java.nio.charset.MalformedInputException: Input length = 1– Iriel
Mar 27 at 15:50
Thanks @Oli for your response. The problem still exist
java.nio.charset.MalformedInputException: Input length = 1– Iriel
Mar 27 at 15:50
OK, that's different. What kind of data are you reading? This was just an example for strings. If you read binary data, it will not work and you have to adapt the code...
– Oli
Mar 27 at 15:58
OK, that's different. What kind of data are you reading? This was just an example for strings. If you read binary data, it will not work and you have to adapt the code...
– Oli
Mar 27 at 15:58
|
show 6 more comments
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55378014%2fscala-how-to-get-the-content-of-portabledatastream-instance-from-an-rdd%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown