Kafka uncommitted message not getting consumed againapache kafka consumer resume behaviourApache Kafka and Strom Clojure implementationspark processed messages offsetHow to handle message processing failures on consumer end with Confluent Kafka?Apache Kafka System Error HandlingSpark-Kafka-Streaming: Offset Management - Can't get manual commit to work (Java)Kafka consumer group loses not commited messagesKafka Exception while Committing Offset Using commitAsyncSpark Kafka Streaming Consumer- Delay in Committing Offset to Kafkakafka async commit request failingGet last committed message in a partition kafka

tar using short form option versus old style

Why doesn't 'd /= d' throw a division by zero exception?

Transposing from C to Cm?

Can RMSE and MAE have the same value?

What to say to a student who has failed?

Change my first, I'm entertaining

Which cells to pick to get a pure sample of DNA without precise equipment?

Is “I am getting married with my sister” ambiguous?

Localization at a multiplicative set is a localization at a prime ideal if local

Handling Disruptive Student on the Autistic Spectrum

Is there any way white can win?

Is "The life is beautiful" incorrect or just very non-idiomatic?

How do I make my image comply with the requirements of this photography competition?

How can I unambiguously ask for a new user's "Display Name"?

How many US airports have 4 or more parallel runways?

Are the players on the same team as the DM?

Managed Package: How to get rid of dependency from Territory Management feature

Showing that the limit of non-eigenvector goes to infinity

Do they have Supervillain(s)?

Non-visual Computers - thoughts?

Round towards zero

Numbers Decrease while Letters Increase

Why are non-collision-resistant hash functions considered insecure for signing self-generated information

How do the Etherealness and Banishment spells interact?



Kafka uncommitted message not getting consumed again


apache kafka consumer resume behaviourApache Kafka and Strom Clojure implementationspark processed messages offsetHow to handle message processing failures on consumer end with Confluent Kafka?Apache Kafka System Error HandlingSpark-Kafka-Streaming: Offset Management - Can't get manual commit to work (Java)Kafka consumer group loses not commited messagesKafka Exception while Committing Offset Using commitAsyncSpark Kafka Streaming Consumer- Delay in Committing Offset to Kafkakafka async commit request failingGet last committed message in a partition kafka






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;








0















I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.



val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
stream.foreachRDD rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>
val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
println(msg)
msgOffset

)
val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)



Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?










share|improve this question






























    0















    I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.



    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
    ConsumerConfig.GROUP_ID_CONFIG -> groupId,
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
    )
    val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topicsSet, kafkaParams)
    )
    stream.foreachRDD rdd =>
    var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //offsetRanges.foreach(println)
    var msgOffsetsRdd = rdd.map(msg =>
    val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
    println(msg)
    msgOffset

    )
    val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
    stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)



    Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?










    share|improve this question


























      0












      0








      0








      I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.



      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
      )
      val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaParams)
      )
      stream.foreachRDD rdd =>
      var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //offsetRanges.foreach(println)
      var msgOffsetsRdd = rdd.map(msg =>
      val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
      println(msg)
      msgOffset

      )
      val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
      stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)



      Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?










      share|improve this question














      I am processing kafka messages and inserting into kudu table using spark streaming with manual offset commit here is my code.



      val topicsSet = topics.split(",").toSet
      val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
      )
      val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaParams)
      )
      stream.foreachRDD rdd =>
      var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //offsetRanges.foreach(println)
      var msgOffsetsRdd = rdd.map(msg =>
      val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
      println(msg)
      msgOffset

      )
      val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
      stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)



      Let us table this example While inserting data into kudu I got the error I need to process those messages again, if I stop the job and start it again I am able to get uncommitted message can't we get all uncommitted messages in the streaming?







      apache-kafka spark-streaming kafka-consumer-api






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Mar 27 at 17:46









      Mahaboob BashaMahaboob Basha

      31 gold badge1 silver badge5 bronze badges




      31 gold badge1 silver badge5 bronze badges

























          1 Answer
          1






          active

          oldest

          votes


















          0















          You have the message, why don't to put a retry logic in case of failure.
          Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.



          You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.






          share|improve this answer

























          • I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time

            – Mahaboob Basha
            Mar 28 at 11:57












          • Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down

            – Pharaoh
            Mar 28 at 12:20












          • I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me

            – Mahaboob Basha
            Mar 28 at 12:27












          • You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages

            – Pharaoh
            Mar 28 at 12:34












          • If this answers you question, please mark it as the Answer, Thanks.

            – Pharaoh
            Mar 29 at 7:28










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55383558%2fkafka-uncommitted-message-not-getting-consumed-again%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0















          You have the message, why don't to put a retry logic in case of failure.
          Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.



          You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.






          share|improve this answer

























          • I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time

            – Mahaboob Basha
            Mar 28 at 11:57












          • Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down

            – Pharaoh
            Mar 28 at 12:20












          • I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me

            – Mahaboob Basha
            Mar 28 at 12:27












          • You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages

            – Pharaoh
            Mar 28 at 12:34












          • If this answers you question, please mark it as the Answer, Thanks.

            – Pharaoh
            Mar 29 at 7:28















          0















          You have the message, why don't to put a retry logic in case of failure.
          Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.



          You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.






          share|improve this answer

























          • I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time

            – Mahaboob Basha
            Mar 28 at 11:57












          • Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down

            – Pharaoh
            Mar 28 at 12:20












          • I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me

            – Mahaboob Basha
            Mar 28 at 12:27












          • You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages

            – Pharaoh
            Mar 28 at 12:34












          • If this answers you question, please mark it as the Answer, Thanks.

            – Pharaoh
            Mar 29 at 7:28













          0














          0










          0









          You have the message, why don't to put a retry logic in case of failure.
          Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.



          You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.






          share|improve this answer













          You have the message, why don't to put a retry logic in case of failure.
          Kafka will give you the same message when you reconnect in case your consumer crashes, Not sure if Kafka will give the same message while the connection is still open.



          You can have some retry logic in your code if the failure is due to unavailability of destination datastore , Or if insert the failed due incorrect message format, you can save those messages into a temporary cache, datastore or another kafka topic to retry later or examine whats wrong with those messages.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Mar 27 at 20:36









          PharaohPharaoh

          4085 silver badges18 bronze badges




          4085 silver badges18 bronze badges















          • I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time

            – Mahaboob Basha
            Mar 28 at 11:57












          • Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down

            – Pharaoh
            Mar 28 at 12:20












          • I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me

            – Mahaboob Basha
            Mar 28 at 12:27












          • You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages

            – Pharaoh
            Mar 28 at 12:34












          • If this answers you question, please mark it as the Answer, Thanks.

            – Pharaoh
            Mar 29 at 7:28

















          • I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time

            – Mahaboob Basha
            Mar 28 at 11:57












          • Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down

            – Pharaoh
            Mar 28 at 12:20












          • I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me

            – Mahaboob Basha
            Mar 28 at 12:27












          • You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages

            – Pharaoh
            Mar 28 at 12:34












          • If this answers you question, please mark it as the Answer, Thanks.

            – Pharaoh
            Mar 29 at 7:28
















          I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time

          – Mahaboob Basha
          Mar 28 at 11:57






          I thought this solution but the problem is in my streaming I am getting ~20 messages per second, let us kudu was down by 2 hours in my driver ~ 144000 messages will be in my driver memory it might get crash my streaming job The second solutions seem to be a workaround if I could get kafka consumer object from spark streaming I can seek after some time

          – Mahaboob Basha
          Mar 28 at 11:57














          Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down

          – Pharaoh
          Mar 28 at 12:20






          Are you consuming messages in a single thread or multiple threads? if it is a single thread, then you can wait for kudu to be up before pulling more messages. If it is multiple thread then you should have an upper limit on the number threads you will create for consuming messages, and block those threads when kudu goes down. To summarize, you need to stop reading messages when kudu is down

          – Pharaoh
          Mar 28 at 12:20














          I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me

          – Mahaboob Basha
          Mar 28 at 12:27






          I like this solution can you have a sample to code to set the upper limit and stop reading messages? this could solve my issue thanks a lot Pharaoh any pointers also help's me

          – Mahaboob Basha
          Mar 28 at 12:27














          You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages

          – Pharaoh
          Mar 28 at 12:34






          You can read about ThreadPools, you can use thread pools to maintain threads that would consume messages

          – Pharaoh
          Mar 28 at 12:34














          If this answers you question, please mark it as the Answer, Thanks.

          – Pharaoh
          Mar 29 at 7:28





          If this answers you question, please mark it as the Answer, Thanks.

          – Pharaoh
          Mar 29 at 7:28








          Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.







          Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55383558%2fkafka-uncommitted-message-not-getting-consumed-again%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          SQL error code 1064 with creating Laravel foreign keysForeign key constraints: When to use ON UPDATE and ON DELETEDropping column with foreign key Laravel error: General error: 1025 Error on renameLaravel SQL Can't create tableLaravel Migration foreign key errorLaravel php artisan migrate:refresh giving a syntax errorSQLSTATE[42S01]: Base table or view already exists or Base table or view already exists: 1050 Tableerror in migrating laravel file to xampp serverSyntax error or access violation: 1064:syntax to use near 'unsigned not null, modelName varchar(191) not null, title varchar(191) not nLaravel cannot create new table field in mysqlLaravel 5.7:Last migration creates table but is not registered in the migration table

          용인 삼성생명 블루밍스 목차 통계 역대 감독 선수단 응원단 경기장 같이 보기 외부 링크 둘러보기 메뉴samsungblueminx.comeh선수 명단용인 삼성생명 블루밍스용인 삼성생명 블루밍스ehsamsungblueminx.comeheheheh

          155 수학 과학 기타 둘러보기 메뉴eh추가해eh문서를 완성해