Spring Boot Manual Acknowledgement of kafka messages is not workingKafka streams exactly once deliveryHow to configure port for a Spring Boot applicationnot able see messages published from Kafka producer(Spring Integration)How Do I know whether my record has been Manually committed using Spring KafkaKafka manual offset update using spring bootSpring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java Object(Cleanly?) Consuming Messages in Spring Apache KafkaDead letter queue (DLQ) for Kafka with spring-kafkaSpring Kafka @KafkaListener - Retry sending failed messages and manually commit the offsetSpring Kafka filter not filtering consumer recordWill spring-cloud-stream Kafka consumer not consume messages that were sent when the service was down?
How does mmorpg store data?
I just started; should I accept a farewell lunch for a coworker I don't know?
How far can gerrymandering go?
Why isn't UDP with reliability (implemented at Application layer) a substitute of TCP?
Is it common for a managing editor of a University Press to solicit book proposals from PhD students at a conference?
Customs and immigration on a USA-UK-Sweden flight itinerary
Is it possible to alias a column based on the result of a select+where?
Tricolour nonogram
How to draw a diagram like this with tikz?
Can I submit a paper to two or more journals at the same time?
Why didn't Caesar move against Sextus Pompey immediately after Munda?
How useful would a hydroelectric power plant be in the post-apocalypse world?
Dynamic Sql Query - how to add an int to the code?
Why would Dementors torture a Death Eater if they are loyal to Voldemort?
Automorphisms and epimorphisms of finite groups
Meaning of the word "good" in context
Would skyscrapers tip over if people fell sideways?
Does "boire un jus" tend to mean "coffee" or "juice of fruit"?
Can I take Amul cottage cheese from India to Netherlands?
Is it advisable to inform the CEO about his brother accessing his office?
Why did the Apple //e make a hideous noise if you inserted the disk upside down?
Where to connect the fuse and why?
How did they film the Invisible Man being invisible in 1933?
Copy group of files (Filename*) to backup (Filename*.bak)
Spring Boot Manual Acknowledgement of kafka messages is not working
Kafka streams exactly once deliveryHow to configure port for a Spring Boot applicationnot able see messages published from Kafka producer(Spring Integration)How Do I know whether my record has been Manually committed using Spring KafkaKafka manual offset update using spring bootSpring-Kafka Consumer KafkaListener cannot convert GenericMessage to Java Object(Cleanly?) Consuming Messages in Spring Apache KafkaDead letter queue (DLQ) for Kafka with spring-kafkaSpring Kafka @KafkaListener - Retry sending failed messages and manually commit the offsetSpring Kafka filter not filtering consumer recordWill spring-cloud-stream Kafka consumer not consume messages that were sent when the service was down?
I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer
My consumer looks like below
@KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());
log.info("Check Duplicate By Comparing With DB records");
if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;
if (shouldAcknowledge)
acknowledgment.acknowledge();
```
So if you see the above snippet we did not sent acknowledgment.
spring-boot apache-kafka kafka-consumer-api
add a comment |
I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer
My consumer looks like below
@KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());
log.info("Check Duplicate By Comparing With DB records");
if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;
if (shouldAcknowledge)
acknowledgment.acknowledge();
```
So if you see the above snippet we did not sent acknowledgment.
spring-boot apache-kafka kafka-consumer-api
Take a look here: stackoverflow.com/questions/50934411/…
– Vassilis
Mar 25 at 17:01
add a comment |
I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer
My consumer looks like below
@KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());
log.info("Check Duplicate By Comparing With DB records");
if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;
if (shouldAcknowledge)
acknowledgment.acknowledge();
```
So if you see the above snippet we did not sent acknowledgment.
spring-boot apache-kafka kafka-consumer-api
I have a spring boot kafka consumer which consume data from a topic and store it in a Database and acknowledge it once stored.
It is working fine but the problem is happening if the application failed to get the DB connection after consuming the record ,in this case we are not sending the acknowledgement but still the message never consumed until or unless we change the group id and restart the consumer
My consumer looks like below
@KafkaListener(id = "$group.id", topics = "$kafka.edi.topic")
public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
boolean shouldAcknowledge = false;
try {
String tNo = getTrackingNumber((String) record.key());
log.info("Check Duplicate By Comparing With DB records");
if (!ediRecordService.isDuplicate(tNo)) ---this checks the record in my DB
shouldAcknowledge = insertEDIRecord(record, tNo); --this return true
else
log.warn("Duplicate record found.");
shouldAcknowledge = true;
if (shouldAcknowledge)
acknowledgment.acknowledge();
```
So if you see the above snippet we did not sent acknowledgment.
spring-boot apache-kafka kafka-consumer-api
spring-boot apache-kafka kafka-consumer-api
asked Mar 25 at 15:42
BiplabBiplab
284 bronze badges
284 bronze badges
Take a look here: stackoverflow.com/questions/50934411/…
– Vassilis
Mar 25 at 17:01
add a comment |
Take a look here: stackoverflow.com/questions/50934411/…
– Vassilis
Mar 25 at 17:01
Take a look here: stackoverflow.com/questions/50934411/…
– Vassilis
Mar 25 at 17:01
Take a look here: stackoverflow.com/questions/50934411/…
– Vassilis
Mar 25 at 17:01
add a comment |
1 Answer
1
active
oldest
votes
That is not how kafka offset works here
The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
From the above statement For example, from the first poll consumer get the message at offset 300
and if it failed to persist into database because of some issue and it will not submit the offset.
So in the next poll it will get the next record where offset is 301
and if it persist data into database successfully then it will commit the offset 301
(which means all records in that partitions are processed till that offset, in above example it is 301)
Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic
and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55341509%2fspring-boot-manual-acknowledgement-of-kafka-messages-is-not-working%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
That is not how kafka offset works here
The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
From the above statement For example, from the first poll consumer get the message at offset 300
and if it failed to persist into database because of some issue and it will not submit the offset.
So in the next poll it will get the next record where offset is 301
and if it persist data into database successfully then it will commit the offset 301
(which means all records in that partitions are processed till that offset, in above example it is 301)
Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic
and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.
add a comment |
That is not how kafka offset works here
The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
From the above statement For example, from the first poll consumer get the message at offset 300
and if it failed to persist into database because of some issue and it will not submit the offset.
So in the next poll it will get the next record where offset is 301
and if it persist data into database successfully then it will commit the offset 301
(which means all records in that partitions are processed till that offset, in above example it is 301)
Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic
and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.
add a comment |
That is not how kafka offset works here
The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
From the above statement For example, from the first poll consumer get the message at offset 300
and if it failed to persist into database because of some issue and it will not submit the offset.
So in the next poll it will get the next record where offset is 301
and if it persist data into database successfully then it will commit the offset 301
(which means all records in that partitions are processed till that offset, in above example it is 301)
Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic
and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.
That is not how kafka offset works here
The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.
From the above statement For example, from the first poll consumer get the message at offset 300
and if it failed to persist into database because of some issue and it will not submit the offset.
So in the next poll it will get the next record where offset is 301
and if it persist data into database successfully then it will commit the offset 301
(which means all records in that partitions are processed till that offset, in above example it is 301)
Solution for this : use retry mechanism until it successfully stores data into database with some limited retries or just save failed data into error topic
and reprocess it later, or save the offset of failed records somewhere so later you can reprocess them.
edited Mar 25 at 21:31
answered Mar 25 at 16:18
DeadpoolDeadpool
9,9402 gold badges12 silver badges37 bronze badges
9,9402 gold badges12 silver badges37 bronze badges
add a comment |
add a comment |
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55341509%2fspring-boot-manual-acknowledgement-of-kafka-messages-is-not-working%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Take a look here: stackoverflow.com/questions/50934411/…
– Vassilis
Mar 25 at 17:01