Distribution of topic partitions by concurrentMessageListenerContainerSpring Kafka Listening to all topics and adjusting partition offsetsGet topic metadata from KafkaTemplateDo you need a KafkaTemplate for each topic?Kafka: bizarre assignment of partitions in a topicKafkaConsumer to read topic from beginningWill I be able to read from specific partitions from a Kafka topicSpring Kafka configure number of partitions for topiccontrolling kafka listeners consumptionPartitioned Kafka Topic for DLQ for spring-cloud-streamspring-kafka-test polling records from topics
Word for an event that will likely never happen again
Is it possible to grow new organs through exposure to radioactivity?
Are there any other rule mechanics that could grant Thieves' Cant?
Modeling the uncertainty of the input parameters
Are there really no countries that protect Freedom of Speech as the United States does?
Why aren’t there water shutoff valves for each room?
In which case does the Security misconfiguration vulnerability apply to?
The cat ate your input again!
How is являться different from есть and быть
A torrent of foreign terms
What can Amex do if I cancel their card after using the sign up bonus miles?
crippling fear of hellfire &, damnation, please help?
How can I see if the data in a SQL Server table is page-compressed?
The cat exchanges places with a drawing of the cat
Is it okay to write non-offensive humor into meeting minutes?
Can the IPA represent all languages' tones?
How far did Gandalf and the Balrog drop from the bridge in Moria?
Is this n-speak?
Running code generated in realtime in JavaScript with eval()
If you know the location of an invisible creature, can you attack it?
Why is tert-butoxide often used in elimination reactions when it is not necessary?
Heating Margarine in Pan = loss of calories?
How big are the Choedan Kal?
Weird resistor with dots around it
Distribution of topic partitions by concurrentMessageListenerContainer
Spring Kafka Listening to all topics and adjusting partition offsetsGet topic metadata from KafkaTemplateDo you need a KafkaTemplate for each topic?Kafka: bizarre assignment of partitions in a topicKafkaConsumer to read topic from beginningWill I be able to read from specific partitions from a Kafka topicSpring Kafka configure number of partitions for topiccontrolling kafka listeners consumptionPartitioned Kafka Topic for DLQ for spring-cloud-streamspring-kafka-test polling records from topics
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
i'm setting up a ConcurrentMessageListenerContainer
<bean class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" id="messageListenerContainer">
<constructor-arg index="0" ref="consumerFactory"/>
<constructor-arg index="1" ref="containerProperties"/>
<property name="concurrency" value="2"/>
</bean>
ConsumerFactory use this config:
<util:map id="consumerConfig" map-class="java.util.HashMap">
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).BOOTSTRAP_SERVERS_CONFIG"
value="$rp.kafka.bootstrap.servers"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).KEY_DESERIALIZER_CLASS_CONFIG"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).VALUE_DESERIALIZER_CLASS_CONFIG"
value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
<entry key="#T(org.springframework.kafka.support.serializer.JsonDeserializer).TRUSTED_PACKAGES"
value="*"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).PARTITION_ASSIGNMENT_STRATEGY_CONFIG"
value="org.apache.kafka.clients.consumer.RoundRobinAssignor"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).ENABLE_AUTO_COMMIT_CONFIG"
value="false"/>
</util:map>
and ContainerProperties are
<bean class="org.springframework.kafka.listener.ContainerProperties" id="containerProperties">
<constructor-arg>
<list>
<value>sendSMS</value>
</list>
</constructor-arg>
<property name="groupId" value="main"/>
<property name="messageListener" ref="messageListener"/>
<property name="ackMode" value="RECORD"/>
</bean>
My topic "sendSMS" has 5 partitions on 3-noded cluster with rep factor of 3, so i expect that each KafkaMessageListenerContainer created by Concurrent one (total 2 in that case) will take it's portion of partitions to handle. Hovewer, after an application is started i see in my debugger window that each listener is handling all 5! partitions
https://gyazo.com/183626ff60061b471858f8cc52573353
and message from 4-th partition (its where i have a message that hangs the processing and not being commited after restarts, but its not related to this issue) on the same offset is being delivered 2 times in different threads with different consumers! Why it happens so? Is it a bug or expected behavior?
spring-kafka
add a comment |
i'm setting up a ConcurrentMessageListenerContainer
<bean class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" id="messageListenerContainer">
<constructor-arg index="0" ref="consumerFactory"/>
<constructor-arg index="1" ref="containerProperties"/>
<property name="concurrency" value="2"/>
</bean>
ConsumerFactory use this config:
<util:map id="consumerConfig" map-class="java.util.HashMap">
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).BOOTSTRAP_SERVERS_CONFIG"
value="$rp.kafka.bootstrap.servers"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).KEY_DESERIALIZER_CLASS_CONFIG"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).VALUE_DESERIALIZER_CLASS_CONFIG"
value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
<entry key="#T(org.springframework.kafka.support.serializer.JsonDeserializer).TRUSTED_PACKAGES"
value="*"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).PARTITION_ASSIGNMENT_STRATEGY_CONFIG"
value="org.apache.kafka.clients.consumer.RoundRobinAssignor"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).ENABLE_AUTO_COMMIT_CONFIG"
value="false"/>
</util:map>
and ContainerProperties are
<bean class="org.springframework.kafka.listener.ContainerProperties" id="containerProperties">
<constructor-arg>
<list>
<value>sendSMS</value>
</list>
</constructor-arg>
<property name="groupId" value="main"/>
<property name="messageListener" ref="messageListener"/>
<property name="ackMode" value="RECORD"/>
</bean>
My topic "sendSMS" has 5 partitions on 3-noded cluster with rep factor of 3, so i expect that each KafkaMessageListenerContainer created by Concurrent one (total 2 in that case) will take it's portion of partitions to handle. Hovewer, after an application is started i see in my debugger window that each listener is handling all 5! partitions
https://gyazo.com/183626ff60061b471858f8cc52573353
and message from 4-th partition (its where i have a message that hangs the processing and not being commited after restarts, but its not related to this issue) on the same offset is being delivered 2 times in different threads with different consumers! Why it happens so? Is it a bug or expected behavior?
spring-kafka
add a comment |
i'm setting up a ConcurrentMessageListenerContainer
<bean class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" id="messageListenerContainer">
<constructor-arg index="0" ref="consumerFactory"/>
<constructor-arg index="1" ref="containerProperties"/>
<property name="concurrency" value="2"/>
</bean>
ConsumerFactory use this config:
<util:map id="consumerConfig" map-class="java.util.HashMap">
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).BOOTSTRAP_SERVERS_CONFIG"
value="$rp.kafka.bootstrap.servers"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).KEY_DESERIALIZER_CLASS_CONFIG"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).VALUE_DESERIALIZER_CLASS_CONFIG"
value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
<entry key="#T(org.springframework.kafka.support.serializer.JsonDeserializer).TRUSTED_PACKAGES"
value="*"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).PARTITION_ASSIGNMENT_STRATEGY_CONFIG"
value="org.apache.kafka.clients.consumer.RoundRobinAssignor"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).ENABLE_AUTO_COMMIT_CONFIG"
value="false"/>
</util:map>
and ContainerProperties are
<bean class="org.springframework.kafka.listener.ContainerProperties" id="containerProperties">
<constructor-arg>
<list>
<value>sendSMS</value>
</list>
</constructor-arg>
<property name="groupId" value="main"/>
<property name="messageListener" ref="messageListener"/>
<property name="ackMode" value="RECORD"/>
</bean>
My topic "sendSMS" has 5 partitions on 3-noded cluster with rep factor of 3, so i expect that each KafkaMessageListenerContainer created by Concurrent one (total 2 in that case) will take it's portion of partitions to handle. Hovewer, after an application is started i see in my debugger window that each listener is handling all 5! partitions
https://gyazo.com/183626ff60061b471858f8cc52573353
and message from 4-th partition (its where i have a message that hangs the processing and not being commited after restarts, but its not related to this issue) on the same offset is being delivered 2 times in different threads with different consumers! Why it happens so? Is it a bug or expected behavior?
spring-kafka
i'm setting up a ConcurrentMessageListenerContainer
<bean class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" id="messageListenerContainer">
<constructor-arg index="0" ref="consumerFactory"/>
<constructor-arg index="1" ref="containerProperties"/>
<property name="concurrency" value="2"/>
</bean>
ConsumerFactory use this config:
<util:map id="consumerConfig" map-class="java.util.HashMap">
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).BOOTSTRAP_SERVERS_CONFIG"
value="$rp.kafka.bootstrap.servers"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).KEY_DESERIALIZER_CLASS_CONFIG"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).VALUE_DESERIALIZER_CLASS_CONFIG"
value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
<entry key="#T(org.springframework.kafka.support.serializer.JsonDeserializer).TRUSTED_PACKAGES"
value="*"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).PARTITION_ASSIGNMENT_STRATEGY_CONFIG"
value="org.apache.kafka.clients.consumer.RoundRobinAssignor"/>
<entry key="#T(org.apache.kafka.clients.consumer.ConsumerConfig).ENABLE_AUTO_COMMIT_CONFIG"
value="false"/>
</util:map>
and ContainerProperties are
<bean class="org.springframework.kafka.listener.ContainerProperties" id="containerProperties">
<constructor-arg>
<list>
<value>sendSMS</value>
</list>
</constructor-arg>
<property name="groupId" value="main"/>
<property name="messageListener" ref="messageListener"/>
<property name="ackMode" value="RECORD"/>
</bean>
My topic "sendSMS" has 5 partitions on 3-noded cluster with rep factor of 3, so i expect that each KafkaMessageListenerContainer created by Concurrent one (total 2 in that case) will take it's portion of partitions to handle. Hovewer, after an application is started i see in my debugger window that each listener is handling all 5! partitions
https://gyazo.com/183626ff60061b471858f8cc52573353
and message from 4-th partition (its where i have a message that hangs the processing and not being commited after restarts, but its not related to this issue) on the same offset is being delivered 2 times in different threads with different consumers! Why it happens so? Is it a bug or expected behavior?
spring-kafka
spring-kafka
asked Mar 27 at 10:13
Filipp PetrovFilipp Petrov
1
1
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
You are not showing enough information. The concurrent container aggregates the assigned partitions for the child KafkaListenerContainer
s (one for each concurrency).
@Override
public Collection<TopicPartition> getAssignedPartitions()
return this.containers.stream()
.map(KafkaMessageListenerContainer::getAssignedPartitions)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
You need to show logs for the re-delivery; turn on DEBUG logging for more information.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55374697%2fdistribution-of-topic-partitions-by-concurrentmessagelistenercontainer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
You are not showing enough information. The concurrent container aggregates the assigned partitions for the child KafkaListenerContainer
s (one for each concurrency).
@Override
public Collection<TopicPartition> getAssignedPartitions()
return this.containers.stream()
.map(KafkaMessageListenerContainer::getAssignedPartitions)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
You need to show logs for the re-delivery; turn on DEBUG logging for more information.
add a comment |
You are not showing enough information. The concurrent container aggregates the assigned partitions for the child KafkaListenerContainer
s (one for each concurrency).
@Override
public Collection<TopicPartition> getAssignedPartitions()
return this.containers.stream()
.map(KafkaMessageListenerContainer::getAssignedPartitions)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
You need to show logs for the re-delivery; turn on DEBUG logging for more information.
add a comment |
You are not showing enough information. The concurrent container aggregates the assigned partitions for the child KafkaListenerContainer
s (one for each concurrency).
@Override
public Collection<TopicPartition> getAssignedPartitions()
return this.containers.stream()
.map(KafkaMessageListenerContainer::getAssignedPartitions)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
You need to show logs for the re-delivery; turn on DEBUG logging for more information.
You are not showing enough information. The concurrent container aggregates the assigned partitions for the child KafkaListenerContainer
s (one for each concurrency).
@Override
public Collection<TopicPartition> getAssignedPartitions()
return this.containers.stream()
.map(KafkaMessageListenerContainer::getAssignedPartitions)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
You need to show logs for the re-delivery; turn on DEBUG logging for more information.
answered Mar 27 at 12:49
Gary RussellGary Russell
92.8k8 gold badges58 silver badges85 bronze badges
92.8k8 gold badges58 silver badges85 bronze badges
add a comment |
add a comment |
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Got a question that you can’t ask on public Stack Overflow? Learn more about sharing private information with Stack Overflow for Teams.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55374697%2fdistribution-of-topic-partitions-by-concurrentmessagelistenercontainer%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown