How to direct stream(kafka) a JSON file in spark and convert it into RDD?How to convert rdd object to dataframe in sparkDifference between DataFrame, Dataset, and RDD in Sparkhow to perform a bulkIncrement on hbase with an RDD from kafka streamingSpark master copies the additional libraries to worker automatically?Kafka + Spark scalabilitySpark Streaming: How to load a Pipeline on a Stream?Cloudera spark, RDD is emptyRun my first spark python program errorHow to convert spark streaming nested json coming on kafka to flat dataframe?pyspark generate EOF error
Checkmate in 1 on a Tangled Board
Calculus, Water Poured into a Cone: Why is Derivative Non-linear?
Reusable spacecraft: why still have fairings detach, instead of open/close?
Can dual citizens open crypto exchange accounts where U.S. citizens are prohibited?
How can I deal with extreme temperatures in a hotel room?
How to securely dispose of a smartphone?
Why would anyone even use a Portkey?
The Football Squad
How do I ensure my employees don't abuse my flexible work hours policy?
Why did the Apple //e make a hideous noise if you inserted the disk upside down?
Is it okay to fade a human face just to create some space to place important content over it?
A* pathfinding algorithm too slow
When casting a spell with a long casting time, what happens if you don't spend your action on a turn to continue casting?
Ways to get SMD resistors from a strip
Cooking a nice pan seared steak for picky eaters
Subset of knight's move in chess.
Prime in substituted benzene
Can purchasing tickets for high holidays services count as maaser?
A quine of sorts
If a USA citizen marries a foreign citizen who has a kid from a previous marriage
Closest Proximity of Oceans to Freshwater Springs
Word ending in "-ine" for rat-like
How can I know if a PDF file was created via LaTeX or XeLaTeX?
How does mmorpg store data?
How to direct stream(kafka) a JSON file in spark and convert it into RDD?
How to convert rdd object to dataframe in sparkDifference between DataFrame, Dataset, and RDD in Sparkhow to perform a bulkIncrement on hbase with an RDD from kafka streamingSpark master copies the additional libraries to worker automatically?Kafka + Spark scalabilitySpark Streaming: How to load a Pipeline on a Stream?Cloudera spark, RDD is emptyRun my first spark python program errorHow to convert spark streaming nested json coming on kafka to flat dataframe?pyspark generate EOF error
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
Wrote a code that direct streams(kafka) word count when file is given(in producer)
code :
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Need to convert the input json file to spark Dataframe using Dstream.
apache-spark pyspark apache-kafka apache-spark-sql
add a comment |
Wrote a code that direct streams(kafka) word count when file is given(in producer)
code :
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Need to convert the input json file to spark Dataframe using Dstream.
apache-spark pyspark apache-kafka apache-spark-sql
add a comment |
Wrote a code that direct streams(kafka) word count when file is given(in producer)
code :
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Need to convert the input json file to spark Dataframe using Dstream.
apache-spark pyspark apache-kafka apache-spark-sql
Wrote a code that direct streams(kafka) word count when file is given(in producer)
code :
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], "metadata.broker.list": brokers)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Need to convert the input json file to spark Dataframe using Dstream.
apache-spark pyspark apache-kafka apache-spark-sql
apache-spark pyspark apache-kafka apache-spark-sql
edited Mar 26 at 5:26
Matthias J. Sax
34.4k4 gold badges57 silver badges86 bronze badges
34.4k4 gold badges57 silver badges86 bronze badges
asked Mar 25 at 14:47
艾瑪艾瑪艾瑪艾瑪艾瑪艾瑪
2499 bronze badges
2499 bronze badges
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
This should work:
Once you have your variable containing the TransformedDStream kvs
, you can just create a map of DStreams and pass the data to a handler function like this:
data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
You should define the handler function that should create the dataframe using your JSON data:
def readMyRddsFromKafkaStream( readRdd ):
# Put RDD into a Dataframe
df = spark.read.json( readRdd )
df.registerTempTable( "temporary_table" )
df = spark.sql( """
SELECT
*
FROM
temporary_table
""" )
df.show()
Hope it helps :)
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55340455%2fhow-to-direct-streamkafka-a-json-file-in-spark-and-convert-it-into-rdd%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
This should work:
Once you have your variable containing the TransformedDStream kvs
, you can just create a map of DStreams and pass the data to a handler function like this:
data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
You should define the handler function that should create the dataframe using your JSON data:
def readMyRddsFromKafkaStream( readRdd ):
# Put RDD into a Dataframe
df = spark.read.json( readRdd )
df.registerTempTable( "temporary_table" )
df = spark.sql( """
SELECT
*
FROM
temporary_table
""" )
df.show()
Hope it helps :)
add a comment |
This should work:
Once you have your variable containing the TransformedDStream kvs
, you can just create a map of DStreams and pass the data to a handler function like this:
data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
You should define the handler function that should create the dataframe using your JSON data:
def readMyRddsFromKafkaStream( readRdd ):
# Put RDD into a Dataframe
df = spark.read.json( readRdd )
df.registerTempTable( "temporary_table" )
df = spark.sql( """
SELECT
*
FROM
temporary_table
""" )
df.show()
Hope it helps :)
add a comment |
This should work:
Once you have your variable containing the TransformedDStream kvs
, you can just create a map of DStreams and pass the data to a handler function like this:
data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
You should define the handler function that should create the dataframe using your JSON data:
def readMyRddsFromKafkaStream( readRdd ):
# Put RDD into a Dataframe
df = spark.read.json( readRdd )
df.registerTempTable( "temporary_table" )
df = spark.sql( """
SELECT
*
FROM
temporary_table
""" )
df.show()
Hope it helps :)
This should work:
Once you have your variable containing the TransformedDStream kvs
, you can just create a map of DStreams and pass the data to a handler function like this:
data = kvs.map( lambda tuple: tuple[1] )
data.foreachRDD( lambda yourRdd: readMyRddsFromKafkaStream( yourRdd ) )
You should define the handler function that should create the dataframe using your JSON data:
def readMyRddsFromKafkaStream( readRdd ):
# Put RDD into a Dataframe
df = spark.read.json( readRdd )
df.registerTempTable( "temporary_table" )
df = spark.sql( """
SELECT
*
FROM
temporary_table
""" )
df.show()
Hope it helps :)
answered Mar 26 at 6:17
Isaac AmezcuaIsaac Amezcua
336 bronze badges
336 bronze badges
add a comment |
add a comment |
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55340455%2fhow-to-direct-streamkafka-a-json-file-in-spark-and-convert-it-into-rdd%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown