DataFrame to Dataset conversion (scala)Difference between DataFrame, Dataset, and RDD in SparkSpark Scala Dataframe ConversionHow to write streaming dataset to Kafka?How to Convert DataSet<Row> to DataSet of JSON messages to write to Kafka?How to display a streaming DataFrame (as show fails with AnalysisException)?Unable to find encoder for type stored in a Dataset. in spark structured streamingHow to access file paths in records from Kafka and create Datasets from?Structured Streaming and Splitting nested data into multiple datasetsorg.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrameHow to collect a streaming dataset (to a Scala value)?

Multi tool use
Why is so much ransomware breakable?
Germany rejected my entry to Schengen countries
Vehemently against code formatting
Bash - Execute two commands and get exit status 1 if first fails
Why are stats in Angband written as 18/** instead of 19, 20...?
Who is frowning in the sentence "Daisy looked at Tom frowning"?
Very serious stuff - Salesforce bug enabled "Modify All"
On a piano, are the effects of holding notes and the sustain pedal the same for a single chord?
Could a chemically propelled craft travel directly between Earth and Mars spaceports?
Is there any official Lore on Keraptis the Wizard, apart from what is in White Plume Mountain?
Why should one apply for UK visa before other visas, on a multi-destination European holiday?
Why would Thor need to strike a building with lightning to attack enemies?
Bash Read: Reading comma separated list, last element is missed
How do you cope with rejection?
Working hours and productivity expectations for game artists and programmers
How come Arya Stark wasn't hurt by this in Game of Thrones Season 8 Episode 5?
Does science define life as "beginning at conception"?
Is my company merging branches wrong?
Addressing an email
Would it be possible to set up a franchise in the ancient world?
In how many ways can we partition a set into smaller subsets so the sum of the numbers in each subset is equal?
Can 2 light bulbs of 120V in series be used on 230V AC?
Isn't Kirchhoff's junction law a violation of conservation of charge?
Can the word crowd refer to just 10 people?
DataFrame to Dataset conversion (scala)
Difference between DataFrame, Dataset, and RDD in SparkSpark Scala Dataframe ConversionHow to write streaming dataset to Kafka?How to Convert DataSet<Row> to DataSet of JSON messages to write to Kafka?How to display a streaming DataFrame (as show fails with AnalysisException)?Unable to find encoder for type stored in a Dataset. in spark structured streamingHow to access file paths in records from Kafka and create Datasets from?Structured Streaming and Splitting nested data into multiple datasetsorg.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrameHow to collect a streaming dataset (to a Scala value)?
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)
This code:
import ss.implicits._
import org.apache.spark.sql.functions._
val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
inputStream.printSchema
val records = inputStream
.selectExpr(s"deserialize(value) AS record")
records.printSchema
val rec2 = records.as(enc)
rec2.printSchema
produces this output:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)
When I get to the sink
val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()
debugOut.awaitTermination()
catalyst complains:
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0))
,explode("record")
, etc but bump into ClassCastExceptions
.
apache-spark apache-kafka spark-structured-streaming
add a comment |
I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)
This code:
import ss.implicits._
import org.apache.spark.sql.functions._
val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
inputStream.printSchema
val records = inputStream
.selectExpr(s"deserialize(value) AS record")
records.printSchema
val rec2 = records.as(enc)
rec2.printSchema
produces this output:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)
When I get to the sink
val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()
debugOut.awaitTermination()
catalyst complains:
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0))
,explode("record")
, etc but bump into ClassCastExceptions
.
apache-spark apache-kafka spark-structured-streaming
Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something likeselectExpr(s"deserialize(value).*")
?
– Jack Leow
Mar 23 at 20:15
Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...
– jasonnerothin
Mar 23 at 21:30
add a comment |
I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)
This code:
import ss.implicits._
import org.apache.spark.sql.functions._
val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
inputStream.printSchema
val records = inputStream
.selectExpr(s"deserialize(value) AS record")
records.printSchema
val rec2 = records.as(enc)
rec2.printSchema
produces this output:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)
When I get to the sink
val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()
debugOut.awaitTermination()
catalyst complains:
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0))
,explode("record")
, etc but bump into ClassCastExceptions
.
apache-spark apache-kafka spark-structured-streaming
I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)
This code:
import ss.implicits._
import org.apache.spark.sql.functions._
val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
inputStream.printSchema
val records = inputStream
.selectExpr(s"deserialize(value) AS record")
records.printSchema
val rec2 = records.as(enc)
rec2.printSchema
produces this output:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)
When I get to the sink
val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()
debugOut.awaitTermination()
catalyst complains:
Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0))
,explode("record")
, etc but bump into ClassCastExceptions
.
apache-spark apache-kafka spark-structured-streaming
apache-spark apache-kafka spark-structured-streaming
edited Mar 23 at 19:17
jasonnerothin
asked Mar 23 at 18:38
jasonnerothinjasonnerothin
1,4061014
1,4061014
Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something likeselectExpr(s"deserialize(value).*")
?
– Jack Leow
Mar 23 at 20:15
Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...
– jasonnerothin
Mar 23 at 21:30
add a comment |
Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something likeselectExpr(s"deserialize(value).*")
?
– Jack Leow
Mar 23 at 20:15
Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...
– jasonnerothin
Mar 23 at 21:30
Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like
selectExpr(s"deserialize(value).*")
?– Jack Leow
Mar 23 at 20:15
Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like
selectExpr(s"deserialize(value).*")
?– Jack Leow
Mar 23 at 20:15
Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...
– jasonnerothin
Mar 23 at 21:30
Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...
– jasonnerothin
Mar 23 at 21:30
add a comment |
1 Answer
1
active
oldest
votes
The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map
function
import ss.implicits._
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val records = inputStream.map(row =>
DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)
records
will directly be a Dataset[TextRecord]
.
Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55317146%2fdataframe-to-dataset-conversion-scala%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map
function
import ss.implicits._
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val records = inputStream.map(row =>
DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)
records
will directly be a Dataset[TextRecord]
.
Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.
add a comment |
The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map
function
import ss.implicits._
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val records = inputStream.map(row =>
DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)
records
will directly be a Dataset[TextRecord]
.
Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.
add a comment |
The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map
function
import ss.implicits._
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val records = inputStream.map(row =>
DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)
records
will directly be a Dataset[TextRecord]
.
Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.
The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map
function
import ss.implicits._
val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
val records = inputStream.map(row =>
DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)
records
will directly be a Dataset[TextRecord]
.
Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.
edited Mar 23 at 21:52
answered Mar 23 at 21:43


rlutarluta
3,19011017
3,19011017
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55317146%2fdataframe-to-dataset-conversion-scala%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
v E0 9PNCdnkl0HNTznY DdfOZ51WYmE,uNGliCuerVhOnY3pvX5tZU NkeiJtx3pH j68GkkJNNlm,qXL
Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like
selectExpr(s"deserialize(value).*")
?– Jack Leow
Mar 23 at 20:15
Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...
– jasonnerothin
Mar 23 at 21:30