DataFrame to Dataset conversion (scala)Difference between DataFrame, Dataset, and RDD in SparkSpark Scala Dataframe ConversionHow to write streaming dataset to Kafka?How to Convert DataSet<Row> to DataSet of JSON messages to write to Kafka?How to display a streaming DataFrame (as show fails with AnalysisException)?Unable to find encoder for type stored in a Dataset. in spark structured streamingHow to access file paths in records from Kafka and create Datasets from?Structured Streaming and Splitting nested data into multiple datasetsorg.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrameHow to collect a streaming dataset (to a Scala value)?

Why is so much ransomware breakable?

Germany rejected my entry to Schengen countries

Vehemently against code formatting

Bash - Execute two commands and get exit status 1 if first fails

Why are stats in Angband written as 18/** instead of 19, 20...?

Who is frowning in the sentence "Daisy looked at Tom frowning"?

Very serious stuff - Salesforce bug enabled "Modify All"

On a piano, are the effects of holding notes and the sustain pedal the same for a single chord?

Could a chemically propelled craft travel directly between Earth and Mars spaceports?

Is there any official Lore on Keraptis the Wizard, apart from what is in White Plume Mountain?

Why should one apply for UK visa before other visas, on a multi-destination European holiday?

Why would Thor need to strike a building with lightning to attack enemies?

Bash Read: Reading comma separated list, last element is missed

How do you cope with rejection?

Working hours and productivity expectations for game artists and programmers

How come Arya Stark wasn't hurt by this in Game of Thrones Season 8 Episode 5?

Does science define life as "beginning at conception"?

Is my company merging branches wrong?

Addressing an email

Would it be possible to set up a franchise in the ancient world?

In how many ways can we partition a set into smaller subsets so the sum of the numbers in each subset is equal?

Can 2 light bulbs of 120V in series be used on 230V AC?

Isn't Kirchhoff's junction law a violation of conservation of charge?

Can the word crowd refer to just 10 people?



DataFrame to Dataset conversion (scala)


Difference between DataFrame, Dataset, and RDD in SparkSpark Scala Dataframe ConversionHow to write streaming dataset to Kafka?How to Convert DataSet<Row> to DataSet of JSON messages to write to Kafka?How to display a streaming DataFrame (as show fails with AnalysisException)?Unable to find encoder for type stored in a Dataset. in spark structured streamingHow to access file paths in records from Kafka and create Datasets from?Structured Streaming and Splitting nested data into multiple datasetsorg.apache.spark.sql.AnalysisException: 'write' can not be called on streaming Dataset/DataFrameHow to collect a streaming dataset (to a Scala value)?






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








1















I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)



This code:




import ss.implicits._
import org.apache.spark.sql.functions._

val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)

val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

inputStream.printSchema

val records = inputStream
.selectExpr(s"deserialize(value) AS record")

records.printSchema

val rec2 = records.as(enc)

rec2.printSchema



produces this output:





root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)



When I get to the sink





val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()

debugOut.awaitTermination()


catalyst complains:





Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)



I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0)),explode("record"), etc but bump into ClassCastExceptions.










share|improve this question
























  • Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like selectExpr(s"deserialize(value).*")?

    – Jack Leow
    Mar 23 at 20:15











  • Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...

    – jasonnerothin
    Mar 23 at 21:30


















1















I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)



This code:




import ss.implicits._
import org.apache.spark.sql.functions._

val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)

val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

inputStream.printSchema

val records = inputStream
.selectExpr(s"deserialize(value) AS record")

records.printSchema

val rec2 = records.as(enc)

rec2.printSchema



produces this output:





root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)



When I get to the sink





val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()

debugOut.awaitTermination()


catalyst complains:





Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)



I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0)),explode("record"), etc but bump into ClassCastExceptions.










share|improve this question
























  • Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like selectExpr(s"deserialize(value).*")?

    – Jack Leow
    Mar 23 at 20:15











  • Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...

    – jasonnerothin
    Mar 23 at 21:30














1












1








1


1






I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)



This code:




import ss.implicits._
import org.apache.spark.sql.functions._

val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)

val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

inputStream.printSchema

val records = inputStream
.selectExpr(s"deserialize(value) AS record")

records.printSchema

val rec2 = records.as(enc)

rec2.printSchema



produces this output:





root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)



When I get to the sink





val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()

debugOut.awaitTermination()


catalyst complains:





Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)



I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0)),explode("record"), etc but bump into ClassCastExceptions.










share|improve this question
















I'm trying to unpack Kafka message values into case class instances. (I put the messages in on the other side.)



This code:




import ss.implicits._
import org.apache.spark.sql.functions._

val enc: Encoder[TextRecord] = Encoders.product[TextRecord]
ss.udf.register("deserialize", (bytes: Array[Byte]) =>
DefSer.deserialize(bytes).asInstanceOf[TextRecord]
)

val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

inputStream.printSchema

val records = inputStream
.selectExpr(s"deserialize(value) AS record")

records.printSchema

val rec2 = records.as(enc)

rec2.printSchema



produces this output:





root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

root
|-- record: struct (nullable = true)
| |-- eventTime: timestamp (nullable = true)
| |-- lineLength: integer (nullable = false)
| |-- windDirection: float (nullable = false)
| |-- windSpeed: float (nullable = false)
| |-- gustSpeed: float (nullable = false)
| |-- waveHeight: float (nullable = false)
| |-- dominantWavePeriod: float (nullable = false)
| |-- averageWavePeriod: float (nullable = false)
| |-- mWaveDirection: float (nullable = false)
| |-- seaLevelPressure: float (nullable = false)
| |-- airTemp: float (nullable = false)
| |-- waterSurfaceTemp: float (nullable = false)
| |-- dewPointTemp: float (nullable = false)
| |-- visibility: float (nullable = false)
| |-- pressureTendency: float (nullable = false)
| |-- tide: float (nullable = false)



When I get to the sink





val debugOut = rec2.writeStream
.format("console")
.option("truncate", "false")
.start()

debugOut.awaitTermination()


catalyst complains:





Caused by: org.apache.spark.sql.AnalysisException: cannot resolve '`eventTime`' given input columns: [record];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)



I've tried a number of things to "pull the TextRecord up", by calling rec2.map(r=>r.getAs[TextRecord](0)),explode("record"), etc but bump into ClassCastExceptions.







apache-spark apache-kafka spark-structured-streaming






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 23 at 19:17







jasonnerothin

















asked Mar 23 at 18:38









jasonnerothinjasonnerothin

1,4061014




1,4061014












  • Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like selectExpr(s"deserialize(value).*")?

    – Jack Leow
    Mar 23 at 20:15











  • Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...

    – jasonnerothin
    Mar 23 at 21:30


















  • Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like selectExpr(s"deserialize(value).*")?

    – Jack Leow
    Mar 23 at 20:15











  • Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...

    – jasonnerothin
    Mar 23 at 21:30

















Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like selectExpr(s"deserialize(value).*")?

– Jack Leow
Mar 23 at 20:15





Hi @jasonnerothin, we missed you in Crested Butte this year. Are you able to do something like selectExpr(s"deserialize(value).*")?

– Jack Leow
Mar 23 at 20:15













Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...

– jasonnerothin
Mar 23 at 21:30






Hi @JackLeow - sadly no - next year, for sure! Caused by: org.apache.spark.sql.catalyst.parser.ParseException: mismatched input '*' expecting {'SELECT', 'FROM'...

– jasonnerothin
Mar 23 at 21:30













1 Answer
1






active

oldest

votes


















1














The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map function



import ss.implicits._

val inputStream = ss.readStream
.format("kafka")
.option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()

val records = inputStream.map(row =>
DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
)


records will directly be a Dataset[TextRecord].



Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.






share|improve this answer

























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55317146%2fdataframe-to-dataset-conversion-scala%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map function



    import ss.implicits._

    val inputStream = ss.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .load()

    val records = inputStream.map(row =>
    DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
    )


    records will directly be a Dataset[TextRecord].



    Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.






    share|improve this answer





























      1














      The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map function



      import ss.implicits._

      val inputStream = ss.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

      val records = inputStream.map(row =>
      DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
      )


      records will directly be a Dataset[TextRecord].



      Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.






      share|improve this answer



























        1












        1








        1







        The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map function



        import ss.implicits._

        val inputStream = ss.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .load()

        val records = inputStream.map(row =>
        DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
        )


        records will directly be a Dataset[TextRecord].



        Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.






        share|improve this answer















        The easiest way to do this is to directly map the inputStream Row instances to a TextRecord, assuming it's a case class, using the map function



        import ss.implicits._

        val inputStream = ss.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", conf.getString("bootstrap.servers"))
        .option("subscribe", topic)
        .option("startingOffsets", "earliest")
        .load()

        val records = inputStream.map(row =>
        DefSer.deserialize(row.getAs[Array[Byte]]("value")).asInstanceOf[TextRecord]
        )


        records will directly be a Dataset[TextRecord].



        Also as long as you import the SparkSession implicits, you don't need to provide the encoder class for you case class, Scala will do it implicitly for you.







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Mar 23 at 21:52

























        answered Mar 23 at 21:43









        rlutarluta

        3,19011017




        3,19011017





























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55317146%2fdataframe-to-dataset-conversion-scala%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

            Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

            Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript