Spring Boot Manual Acknowledgement of kafka messages is not workingKafka streams exactly once deliveryHow to configure port for a Spring Boot applicationnot able see messages published from Kafka producer(Spring Integration)How Do I know whether my record has been Manually committed using Spring KafkaKafka manual offset update using spring bootSpring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java Object(Cleanly?) Consuming Messages in Spring Apache KafkaDead letter queue (DLQ) for Kafka with spring-kafkaSpring Kafka @KafkaListener - Retry sending failed messages and manually commit the offsetSpring Kafka filter not filtering consumer recordWill spring-cloud-stream Kafka consumer not consume messages that were sent when the service was down?

How does mmorpg store data?

I just started; should I accept a farewell lunch for a coworker I don't know?

How far can gerrymandering go?

Why isn't UDP with reliability (implemented at Application layer) a substitute of TCP?

Is it common for a managing editor of a University Press to solicit book proposals from PhD students at a conference?

Customs and immigration on a USA-UK-Sweden flight itinerary

Is it possible to alias a column based on the result of a select+where?

Tricolour nonogram

How to draw a diagram like this with tikz?

Can I submit a paper to two or more journals at the same time?

Why didn't Caesar move against Sextus Pompey immediately after Munda?

How useful would a hydroelectric power plant be in the post-apocalypse world?

Dynamic Sql Query - how to add an int to the code?

Why would Dementors torture a Death Eater if they are loyal to Voldemort?

Automorphisms and epimorphisms of finite groups

Meaning of the word "good" in context

Would skyscrapers tip over if people fell sideways?

Does "boire un jus" tend to mean "coffee" or "juice of fruit"?

Can I take Amul cottage cheese from India to Netherlands?

Is it advisable to inform the CEO about his brother accessing his office?

Why did the Apple //e make a hideous noise if you inserted the disk upside down?

Where to connect the fuse and why?

How did they film the Invisible Man being invisible in 1933?

Copy group of files (Filename*) to backup (Filename*.bak)



Spring Boot Manual Acknowledgement of kafka messages is not working


Kafka streams exactly once deliveryHow to configure port for a Spring Boot applicationnot able see messages published from Kafka producer(Spring Integration)How Do I know whether my record has been Manually committed using Spring KafkaKafka manual offset update using spring bootSpring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java Object(Cleanly?) Consuming Messages in Spring Apache KafkaDead letter queue (DLQ) for Kafka with spring-kafkaSpring Kafka @KafkaListener - Retry sending failed messages and manually commit the offsetSpring Kafka filter not filtering consumer recordWill spring-cloud-stream Kafka consumer not consume messages that were sent when the service was down?













1















I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer



My consumer looks like below



 @KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());

log.info("Check Duplicate By Comparing With DB records");

if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;

if (shouldAcknowledge)
acknowledgment.acknowledge();
```



So if you see the above snippet we did not sent acknowledgment.









share|improve this question






















  • Take a look here: stackoverflow.com/questions/50934411/…

    – Vassilis
    Mar 25 at 17:01















1















I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer



My consumer looks like below



 @KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());

log.info("Check Duplicate By Comparing With DB records");

if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;

if (shouldAcknowledge)
acknowledgment.acknowledge();
```



So if you see the above snippet we did not sent acknowledgment.









share|improve this question






















  • Take a look here: stackoverflow.com/questions/50934411/…

    – Vassilis
    Mar 25 at 17:01













1












1








1


2






I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer



My consumer looks like below



 @KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());

log.info("Check Duplicate By Comparing With DB records");

if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;

if (shouldAcknowledge)
acknowledgment.acknowledge();
```



So if you see the above snippet we did not sent acknowledgment.









share|improve this question














I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer



My consumer looks like below



 @KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());

log.info("Check Duplicate By Comparing With DB records");

if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;

if (shouldAcknowledge)
acknowledgment.acknowledge();
```



So if you see the above snippet we did not sent acknowledgment.






spring-boot apache-kafka kafka-consumer-api






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Mar 25 at 15:42









BiplabBiplab

284 bronze badges




284 bronze badges












  • Take a look here: stackoverflow.com/questions/50934411/…

    – Vassilis
    Mar 25 at 17:01

















  • Take a look here: stackoverflow.com/questions/50934411/…

    – Vassilis
    Mar 25 at 17:01
















Take a look here: stackoverflow.com/questions/50934411/…

– Vassilis
Mar 25 at 17:01





Take a look here: stackoverflow.com/questions/50934411/…

– Vassilis
Mar 25 at 17:01










1 Answer
1






active

oldest

votes


















1














That is not how kafka offset works here



The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.



From the above statement For example, from the first poll consumer get the message at offset 300 and if it failed to persist into database because of some issue and it will not submit the offset.



So in the next poll it will get the next record where offset is 301 and if it persist data into database successfully then it will commit the offset 301 (which means all records in that partitions are processed till that offset, in above example it is 301)



Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.






share|improve this answer
























    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55341509%2fspring-boot-manual-acknowledgement-of-kafka-messages-is-not-working%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    That is not how kafka offset works here



    The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.



    From the above statement For example, from the first poll consumer get the message at offset 300 and if it failed to persist into database because of some issue and it will not submit the offset.



    So in the next poll it will get the next record where offset is 301 and if it persist data into database successfully then it will commit the offset 301 (which means all records in that partitions are processed till that offset, in above example it is 301)



    Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.






    share|improve this answer





























      1














      That is not how kafka offset works here



      The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.



      From the above statement For example, from the first poll consumer get the message at offset 300 and if it failed to persist into database because of some issue and it will not submit the offset.



      So in the next poll it will get the next record where offset is 301 and if it persist data into database successfully then it will commit the offset 301 (which means all records in that partitions are processed till that offset, in above example it is 301)



      Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.






      share|improve this answer



























        1












        1








        1







        That is not how kafka offset works here



        The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.



        From the above statement For example, from the first poll consumer get the message at offset 300 and if it failed to persist into database because of some issue and it will not submit the offset.



        So in the next poll it will get the next record where offset is 301 and if it persist data into database successfully then it will commit the offset 301 (which means all records in that partitions are processed till that offset, in above example it is 301)



        Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.






        share|improve this answer















        That is not how kafka offset works here



        The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.



        From the above statement For example, from the first poll consumer get the message at offset 300 and if it failed to persist into database because of some issue and it will not submit the offset.



        So in the next poll it will get the next record where offset is 301 and if it persist data into database successfully then it will commit the offset 301 (which means all records in that partitions are processed till that offset, in above example it is 301)



        Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Mar 25 at 21:31

























        answered Mar 25 at 16:18









        DeadpoolDeadpool

        9,9402 gold badges12 silver badges37 bronze badges




        9,9402 gold badges12 silver badges37 bronze badges
















            Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.







            Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.



















            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55341509%2fspring-boot-manual-acknowledgement-of-kafka-messages-is-not-working%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

            Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

            Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript