Kafka uncommitted message not getting consumed againapache kafka consumer resume behaviourApache Kafka and Strom Clojure implementationspark processed messages offsetHow to handle message processing failures on consumer end with Confluent Kafka?Apache Kafka System Error HandlingSpark-Kafka-Streaming: Offset Management - Can't get manual commit to work (Java)Kafka consumer group loses not commited messagesKafka Exception while Committing Offset Using commitAsyncSpark Kafka Streaming Consumer- Delay in Committing Offset to Kafkakafka async commit request failingGet last committed message in a partition kafka
tar using short form option versus old style
Why doesn't 'd /= d' throw a division by zero exception?
Transposing from C to Cm?
Can RMSE and MAE have the same value?
What to say to a student who has failed?
Change my first, I'm entertaining
Which cells to pick to get a pure sample of DNA without precise equipment?
Is “I am getting married with my sister” ambiguous?
Localization at a multiplicative set is a localization at a prime ideal if local
Handling Disruptive Student on the Autistic Spectrum
Is there any way white can win?
Is "The life is beautiful" incorrect or just very non-idiomatic?
How do I make my image comply with the requirements of this photography competition?
How can I unambiguously ask for a new user's "Display Name"?
How many US airports have 4 or more parallel runways?
Are the players on the same team as the DM?
Managed Package: How to get rid of dependency from Territory Management feature
Showing that the limit of non-eigenvector goes to infinity
Do they have Supervillain(s)?
Non-visual Computers - thoughts?
Round towards zero
Numbers Decrease while Letters Increase
Why are non-collision-resistant hash functions considered insecure for signing self-generated information
How do the Etherealness and Banishment spells interact?
Kafka uncommitted message not getting consumed again
apache kafka consumer resume behaviourApache Kafka and Strom Clojure implementationspark processed messages offsetHow to handle message processing failures on consumer end with Confluent Kafka?Apache Kafka System Error HandlingSpark-Kafka-Streaming: Offset Management - Can't get manual commit to work (Java)Kafka consumer group loses not commited messagesKafka Exception while Committing Offset Using commitAsyncSpark Kafka Streaming Consumer- Delay in Committing Offset to Kafkakafka async commit request failingGet last committed message in a partition kafka
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
stream.foreachRDD rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>
val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
println(msg)
msgOffset
)
val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?
apache-kafka spark-streaming kafka-consumer-api
add a comment |
I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
stream.foreachRDD rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>
val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
println(msg)
msgOffset
)
val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?
apache-kafka spark-streaming kafka-consumer-api
add a comment |
I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
stream.foreachRDD rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>
val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
println(msg)
msgOffset
)
val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?
apache-kafka spark-streaming kafka-consumer-api
I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
stream.foreachRDD rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>
val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
println(msg)
msgOffset
)
val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?
apache-kafka spark-streaming kafka-consumer-api
apache-kafka spark-streaming kafka-consumer-api
asked Mar 27 at 17:46
Mahaboob BashaMahaboob Basha
31 gold badge1 silver badge5 bronze badges
31 gold badge1 silver badge5 bronze badges
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You have the message, why don't to put a retry logic in case of failure.
Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.
You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.
I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time
– Mahaboob Basha
Mar 28 at 11:57
Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down
– Pharaoh
Mar 28 at 12:20
I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me
– Mahaboob Basha
Mar 28 at 12:27
You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages
– Pharaoh
Mar 28 at 12:34
If this answers you question, please mark it as the Answer, Thanks.
– Pharaoh
Mar 29 at 7:28
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55383558%2fkafka-uncommitted-message-not-getting-consumed-again%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You have the message, why don't to put a retry logic in case of failure.
Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.
You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.
I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time
– Mahaboob Basha
Mar 28 at 11:57
Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down
– Pharaoh
Mar 28 at 12:20
I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me
– Mahaboob Basha
Mar 28 at 12:27
You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages
– Pharaoh
Mar 28 at 12:34
If this answers you question, please mark it as the Answer, Thanks.
– Pharaoh
Mar 29 at 7:28
add a comment |
You have the message, why don't to put a retry logic in case of failure.
Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.
You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.
I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time
– Mahaboob Basha
Mar 28 at 11:57
Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down
– Pharaoh
Mar 28 at 12:20
I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me
– Mahaboob Basha
Mar 28 at 12:27
You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages
– Pharaoh
Mar 28 at 12:34
If this answers you question, please mark it as the Answer, Thanks.
– Pharaoh
Mar 29 at 7:28
add a comment |
You have the message, why don't to put a retry logic in case of failure.
Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.
You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.
You have the message, why don't to put a retry logic in case of failure.
Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.
You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.
answered Mar 27 at 20:36
PharaohPharaoh
4085 silver badges18 bronze badges
4085 silver badges18 bronze badges
I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time
– Mahaboob Basha
Mar 28 at 11:57
Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down
– Pharaoh
Mar 28 at 12:20
I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me
– Mahaboob Basha
Mar 28 at 12:27
You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages
– Pharaoh
Mar 28 at 12:34
If this answers you question, please mark it as the Answer, Thanks.
– Pharaoh
Mar 29 at 7:28
add a comment |
I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time
– Mahaboob Basha
Mar 28 at 11:57
Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down
– Pharaoh
Mar 28 at 12:20
I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me
– Mahaboob Basha
Mar 28 at 12:27
You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages
– Pharaoh
Mar 28 at 12:34
If this answers you question, please mark it as the Answer, Thanks.
– Pharaoh
Mar 29 at 7:28
I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time
– Mahaboob Basha
Mar 28 at 11:57
I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time
– Mahaboob Basha
Mar 28 at 11:57
Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down
– Pharaoh
Mar 28 at 12:20
Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down
– Pharaoh
Mar 28 at 12:20
I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me
– Mahaboob Basha
Mar 28 at 12:27
I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me
– Mahaboob Basha
Mar 28 at 12:27
You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages
– Pharaoh
Mar 28 at 12:34
You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages
– Pharaoh
Mar 28 at 12:34
If this answers you question, please mark it as the Answer, Thanks.
– Pharaoh
Mar 29 at 7:28
If this answers you question, please mark it as the Answer, Thanks.
– Pharaoh
Mar 29 at 7:28
add a comment |
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55383558%2fkafka-uncommitted-message-not-getting-consumed-again%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown