How to direct stream(kafka) a JSON file in spark and convert it into RDD?How to convert rdd object to dataframe in sparkDifference between DataFrame, Dataset, and RDD in Sparkhow to perform a bulkIncrement on hbase with an RDD from kafka streamingSpark master copies the additional libraries to worker automatically?Kafka + Spark scalabilitySpark Streaming: How to load a Pipeline on a Stream?Cloudera spark, RDD is emptyRun my first spark python program errorHow to convert spark streaming nested json coming on kafka to flat dataframe?pyspark generate EOF error

Checkmate in 1 on a Tangled Board

Calculus, Water Poured into a Cone: Why is Derivative Non-linear?

Reusable spacecraft: why still have fairings detach, instead of open/close?

Can dual citizens open crypto exchange accounts where U.S. citizens are prohibited?

How can I deal with extreme temperatures in a hotel room?

How to securely dispose of a smartphone?

Why would anyone even use a Portkey?

The Football Squad

How do I ensure my employees don't abuse my flexible work hours policy?

Why did the Apple //e make a hideous noise if you inserted the disk upside down?

Is it okay to fade a human face just to create some space to place important content over it?

A* pathfinding algorithm too slow

When casting a spell with a long casting time, what happens if you don't spend your action on a turn to continue casting?

Ways to get SMD resistors from a strip

Cooking a nice pan seared steak for picky eaters

Subset of knight's move in chess.

Prime in substituted benzene

Can purchasing tickets for high holidays services count as maaser?

A quine of sorts

If a USA citizen marries a foreign citizen who has a kid from a previous marriage

Closest Proximity of Oceans to Freshwater Springs

Word ending in "-ine" for rat-like

How can I know if a PDF file was created via LaTeX or XeLaTeX?

How does mmorpg store data?



How to direct stream(kafka) a JSON file in spark and convert it into RDD?


How to convert rdd object to dataframe in sparkDifference between DataFrame, Dataset, and RDD in Sparkhow to perform a bulkIncrement on hbase with an RDD from kafka streamingSpark master copies the additional libraries to worker automatically?Kafka + Spark scalabilitySpark Streaming: How to load a Pipeline on a Stream?Cloudera spark, RDD is emptyRun my first spark python program errorHow to convert spark streaming nested json coming on kafka to flat dataframe?pyspark generate EOF error






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








0















Wrote a code that direct streams(kafka) word count when file is given(in producer)



code :



from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)

brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":

main()


Need to convert the input json file to spark Dataframe using Dstream.










share|improve this question






























    0















    Wrote a code that direct streams(kafka) word count when file is given(in producer)



    code :



    from pyspark import SparkConf, SparkContext

    from operator import add
    import sys
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    ## Constants
    APP_NAME = "PythonStreamingDirectKafkaWordCount"
    ##OTHER FUNCTIONS/CLASSES

    def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()
    if __name__ == "__main__":

    main()


    Need to convert the input json file to spark Dataframe using Dstream.










    share|improve this question


























      0












      0








      0








      Wrote a code that direct streams(kafka) word count when file is given(in producer)



      code :



      from pyspark import SparkConf, SparkContext

      from operator import add
      import sys
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.kafka import KafkaUtils
      ## Constants
      APP_NAME = "PythonStreamingDirectKafkaWordCount"
      ##OTHER FUNCTIONS/CLASSES

      def main():
      sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
      ssc = StreamingContext(sc, 2)

      brokers, topic = sys.argv[1:]
      kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
      lines = kvs.map(lambda x: x[1])
      counts = lines.flatMap(lambda line: line.split(" "))
      .map(lambda word: (word, 1))
      .reduceByKey(lambda a, b: a+b)
      counts.pprint()

      ssc.start()
      ssc.awaitTermination()
      if __name__ == "__main__":

      main()


      Need to convert the input json file to spark Dataframe using Dstream.










      share|improve this question
















      Wrote a code that direct streams(kafka) word count when file is given(in producer)



      code :



      from pyspark import SparkConf, SparkContext

      from operator import add
      import sys
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.kafka import KafkaUtils
      ## Constants
      APP_NAME = "PythonStreamingDirectKafkaWordCount"
      ##OTHER FUNCTIONS/CLASSES

      def main():
      sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
      ssc = StreamingContext(sc, 2)

      brokers, topic = sys.argv[1:]
      kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
      lines = kvs.map(lambda x: x[1])
      counts = lines.flatMap(lambda line: line.split(" "))
      .map(lambda word: (word, 1))
      .reduceByKey(lambda a, b: a+b)
      counts.pprint()

      ssc.start()
      ssc.awaitTermination()
      if __name__ == "__main__":

      main()


      Need to convert the input json file to spark Dataframe using Dstream.







      apache-spark pyspark apache-kafka apache-spark-sql






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Mar 26 at 5:26









      Matthias J. Sax

      34.4k4 gold badges57 silver badges86 bronze badges




      34.4k4 gold badges57 silver badges86 bronze badges










      asked Mar 25 at 14:47









      艾瑪艾瑪艾瑪艾瑪艾瑪艾瑪

      2499 bronze badges




      2499 bronze badges






















          1 Answer
          1






          active

          oldest

          votes


















          1














          This should work:



          Once you have your variable containing the TransformedDStream kvs, you can just create a map of DStreams and pass the data to a handler function like this:



          data = kvs.map( lambda tuple: tuple[1] )
          data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )


          You should define the handler function that should create the dataframe using your JSON data:



          def readMyRddsFromKafkaStream( readRdd ):
          # Put RDD into a Dataframe
          df = spark.read.json( readRdd )
          df.registerTempTable( "temporary_table" )
          df = spark.sql( """
          SELECT
          *
          FROM
          temporary_table
          """ )
          df.show()


          Hope it helps :)






          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55340455%2fhow-to-direct-streamkafka-a-json-file-in-spark-and-convert-it-into-rdd%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            This should work:



            Once you have your variable containing the TransformedDStream kvs, you can just create a map of DStreams and pass the data to a handler function like this:



            data = kvs.map( lambda tuple: tuple[1] )
            data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )


            You should define the handler function that should create the dataframe using your JSON data:



            def readMyRddsFromKafkaStream( readRdd ):
            # Put RDD into a Dataframe
            df = spark.read.json( readRdd )
            df.registerTempTable( "temporary_table" )
            df = spark.sql( """
            SELECT
            *
            FROM
            temporary_table
            """ )
            df.show()


            Hope it helps :)






            share|improve this answer



























              1














              This should work:



              Once you have your variable containing the TransformedDStream kvs, you can just create a map of DStreams and pass the data to a handler function like this:



              data = kvs.map( lambda tuple: tuple[1] )
              data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )


              You should define the handler function that should create the dataframe using your JSON data:



              def readMyRddsFromKafkaStream( readRdd ):
              # Put RDD into a Dataframe
              df = spark.read.json( readRdd )
              df.registerTempTable( "temporary_table" )
              df = spark.sql( """
              SELECT
              *
              FROM
              temporary_table
              """ )
              df.show()


              Hope it helps :)






              share|improve this answer

























                1












                1








                1







                This should work:



                Once you have your variable containing the TransformedDStream kvs, you can just create a map of DStreams and pass the data to a handler function like this:



                data = kvs.map( lambda tuple: tuple[1] )
                data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )


                You should define the handler function that should create the dataframe using your JSON data:



                def readMyRddsFromKafkaStream( readRdd ):
                # Put RDD into a Dataframe
                df = spark.read.json( readRdd )
                df.registerTempTable( "temporary_table" )
                df = spark.sql( """
                SELECT
                *
                FROM
                temporary_table
                """ )
                df.show()


                Hope it helps :)






                share|improve this answer













                This should work:



                Once you have your variable containing the TransformedDStream kvs, you can just create a map of DStreams and pass the data to a handler function like this:



                data = kvs.map( lambda tuple: tuple[1] )
                data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )


                You should define the handler function that should create the dataframe using your JSON data:



                def readMyRddsFromKafkaStream( readRdd ):
                # Put RDD into a Dataframe
                df = spark.read.json( readRdd )
                df.registerTempTable( "temporary_table" )
                df = spark.sql( """
                SELECT
                *
                FROM
                temporary_table
                """ )
                df.show()


                Hope it helps :)







                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Mar 26 at 6:17









                Isaac AmezcuaIsaac Amezcua

                336 bronze badges




                336 bronze badges


















                    Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.







                    Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.



















                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55340455%2fhow-to-direct-streamkafka-a-json-file-in-spark-and-convert-it-into-rdd%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

                    Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

                    Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript