How to dynamically create multiple consumers in Spring KafkaHow can I create an executable JAR with dependencies using Maven?How do I create a file and write to it in Java?Spring Integration Kafka vs Spring KafkaHow to create separate Kafka listener for each topic dynamically in springboot?Spring Kafka listenerExecutorKafka Listener Method is not invoked. Consumer not consuming.How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?Unable to consume Kafka messages within Spring BootMultiple KafkaConsumer on multiple Kafka Cluster in Spring BootSpring kafka consumer don't commit to kafka server after leader changed
Select list elements based on other list
Range hood vents into crawl space
Why doesn't increasing the temperature of something like wood or paper set them on fire?
Bash prompt takes only the first word of a hostname before the dot
shebang or not shebang
How do I minimise waste on a flight?
Is throwing dice a stochastic or a deterministic process?
Magical Modulo Squares
Does restarting the SQL Services (on the machine) clear the server cache (for things like query plans and statistics)?
Did any early RISC OS precursor run on the BBC Micro?
I want to write a blog post building upon someone else's paper, how can I properly cite/credit them?
Why is there a cap on 401k contributions?
Employee is self-centered and affects the team negatively
call() a function within its own context
In a series of books, what happens after the coming of age?
While drilling into kitchen wall, hit a wire - any advice?
If quadruped mammals evolve to become bipedal will their breast or nipple change position?
Antivirus for Ubuntu 18.04
Explaining intravenous drug abuse to a small child
A problem with Hebrew and English underlined text
Searching for a sentence that I only know part of it using Google's operators
Scaling rounded rectangles in Illustrator
My parents are Afghan
How can I draw a rectangle around venn Diagrams?
How to dynamically create multiple consumers in Spring Kafka
How can I create an executable JAR with dependencies using Maven?How do I create a file and write to it in Java?Spring Integration Kafka vs Spring KafkaHow to create separate Kafka listener for each topic dynamically in springboot?Spring Kafka listenerExecutorKafka Listener Method is not invoked. Consumer not consuming.How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?Unable to consume Kafka messages within Spring BootMultiple KafkaConsumer on multiple Kafka Cluster in Spring BootSpring kafka consumer don't commit to kafka server after leader changed
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?
Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8
java kafka-consumer-api spring-kafka
add a comment |
I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?
Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8
java kafka-consumer-api spring-kafka
you ca directly use kafka api to do what you want.
– TongChen
Mar 23 at 6:43
Yes I have already tried that out, however, I want to try it out in Spring-kafka way.
– Iqbal
Mar 24 at 6:48
add a comment |
I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?
Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8
java kafka-consumer-api spring-kafka
I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?
Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8
java kafka-consumer-api spring-kafka
java kafka-consumer-api spring-kafka
asked Mar 23 at 6:00
IqbalIqbal
1116
1116
you ca directly use kafka api to do what you want.
– TongChen
Mar 23 at 6:43
Yes I have already tried that out, however, I want to try it out in Spring-kafka way.
– Iqbal
Mar 24 at 6:48
add a comment |
you ca directly use kafka api to do what you want.
– TongChen
Mar 23 at 6:43
Yes I have already tried that out, however, I want to try it out in Spring-kafka way.
– Iqbal
Mar 24 at 6:48
you ca directly use kafka api to do what you want.
– TongChen
Mar 23 at 6:43
you ca directly use kafka api to do what you want.
– TongChen
Mar 23 at 6:43
Yes I have already tried that out, however, I want to try it out in Spring-kafka way.
– Iqbal
Mar 24 at 6:48
Yes I have already tried that out, however, I want to try it out in Spring-kafka way.
– Iqbal
Mar 24 at 6:48
add a comment |
1 Answer
1
active
oldest
votes
Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private final Map<String, MyListener> listeners = new HashMap<>();
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties
private String[] clusters;
public String[] getClusters()
return this.clusters;
public void setClusters(String[] clusters)
this.clusters = clusters;
kafka.clusters=localhost:9092,localhost:9093
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
Result
group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
EDIT
Add code to retry starting failed containers.
It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private boolean atLeastOneFailure;
private ScheduledFuture<?> restartTask;
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();
catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;
);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);
;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));
;
private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);
catch (BeanCreationException e)
this.atLeastOneFailure = true;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).
– Iqbal
Mar 24 at 6:47
I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.
– Iqbal
Mar 24 at 6:51
Simply puttry... catch(...) ...
around the call togetBean()
. You can speed up the failure by setting thedefault.api.timeout.ms
property; e.g.spring.kafka.consumer.properties.default.api.timeout.ms=5000
.
– Gary Russell
Mar 24 at 13:27
Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.
– Iqbal
Mar 25 at 8:01
See the edit to my answer.
– Gary Russell
Mar 25 at 13:34
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55311070%2fhow-to-dynamically-create-multiple-consumers-in-spring-kafka%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private final Map<String, MyListener> listeners = new HashMap<>();
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties
private String[] clusters;
public String[] getClusters()
return this.clusters;
public void setClusters(String[] clusters)
this.clusters = clusters;
kafka.clusters=localhost:9092,localhost:9093
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
Result
group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
EDIT
Add code to retry starting failed containers.
It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private boolean atLeastOneFailure;
private ScheduledFuture<?> restartTask;
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();
catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;
);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);
;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));
;
private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);
catch (BeanCreationException e)
this.atLeastOneFailure = true;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).
– Iqbal
Mar 24 at 6:47
I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.
– Iqbal
Mar 24 at 6:51
Simply puttry... catch(...) ...
around the call togetBean()
. You can speed up the failure by setting thedefault.api.timeout.ms
property; e.g.spring.kafka.consumer.properties.default.api.timeout.ms=5000
.
– Gary Russell
Mar 24 at 13:27
Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.
– Iqbal
Mar 25 at 8:01
See the edit to my answer.
– Gary Russell
Mar 25 at 13:34
add a comment |
Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private final Map<String, MyListener> listeners = new HashMap<>();
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties
private String[] clusters;
public String[] getClusters()
return this.clusters;
public void setClusters(String[] clusters)
this.clusters = clusters;
kafka.clusters=localhost:9092,localhost:9093
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
Result
group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
EDIT
Add code to retry starting failed containers.
It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private boolean atLeastOneFailure;
private ScheduledFuture<?> restartTask;
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();
catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;
);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);
;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));
;
private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);
catch (BeanCreationException e)
this.atLeastOneFailure = true;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).
– Iqbal
Mar 24 at 6:47
I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.
– Iqbal
Mar 24 at 6:51
Simply puttry... catch(...) ...
around the call togetBean()
. You can speed up the failure by setting thedefault.api.timeout.ms
property; e.g.spring.kafka.consumer.properties.default.api.timeout.ms=5000
.
– Gary Russell
Mar 24 at 13:27
Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.
– Iqbal
Mar 25 at 8:01
See the edit to my answer.
– Gary Russell
Mar 25 at 13:34
add a comment |
Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private final Map<String, MyListener> listeners = new HashMap<>();
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties
private String[] clusters;
public String[] getClusters()
return this.clusters;
public void setClusters(String[] clusters)
this.clusters = clusters;
kafka.clusters=localhost:9092,localhost:9093
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
Result
group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
EDIT
Add code to retry starting failed containers.
It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private boolean atLeastOneFailure;
private ScheduledFuture<?> restartTask;
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();
catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;
);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);
;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));
;
private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);
catch (BeanCreationException e)
this.atLeastOneFailure = true;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private final Map<String, MyListener> listeners = new HashMap<>();
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties
private String[] clusters;
public String[] getClusters()
return this.clusters;
public void setClusters(String[] clusters)
this.clusters = clusters;
kafka.clusters=localhost:9092,localhost:9093
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
Result
group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
EDIT
Add code to retry starting failed containers.
It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.
@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application
public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);
private boolean atLeastOneFailure;
private ScheduledFuture<?> restartTask;
@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)
return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();
catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;
);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);
;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));
;
private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);
catch (BeanCreationException e)
this.atLeastOneFailure = true;
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();
@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();
class MyListener
@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);
edited Mar 25 at 13:34
answered Mar 23 at 15:49
Gary RussellGary Russell
87k85380
87k85380
Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).
– Iqbal
Mar 24 at 6:47
I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.
– Iqbal
Mar 24 at 6:51
Simply puttry... catch(...) ...
around the call togetBean()
. You can speed up the failure by setting thedefault.api.timeout.ms
property; e.g.spring.kafka.consumer.properties.default.api.timeout.ms=5000
.
– Gary Russell
Mar 24 at 13:27
Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.
– Iqbal
Mar 25 at 8:01
See the edit to my answer.
– Gary Russell
Mar 25 at 13:34
add a comment |
Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).
– Iqbal
Mar 24 at 6:47
I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.
– Iqbal
Mar 24 at 6:51
Simply puttry... catch(...) ...
around the call togetBean()
. You can speed up the failure by setting thedefault.api.timeout.ms
property; e.g.spring.kafka.consumer.properties.default.api.timeout.ms=5000
.
– Gary Russell
Mar 24 at 13:27
Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.
– Iqbal
Mar 25 at 8:01
See the edit to my answer.
– Gary Russell
Mar 25 at 13:34
Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).
– Iqbal
Mar 24 at 6:47
Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).
– Iqbal
Mar 24 at 6:47
I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.
– Iqbal
Mar 24 at 6:51
I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.
– Iqbal
Mar 24 at 6:51
Simply put
try... catch(...) ...
around the call to getBean()
. You can speed up the failure by setting the default.api.timeout.ms
property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000
.– Gary Russell
Mar 24 at 13:27
Simply put
try... catch(...) ...
around the call to getBean()
. You can speed up the failure by setting the default.api.timeout.ms
property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000
.– Gary Russell
Mar 24 at 13:27
Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.
– Iqbal
Mar 25 at 8:01
Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.
– Iqbal
Mar 25 at 8:01
See the edit to my answer.
– Gary Russell
Mar 25 at 13:34
See the edit to my answer.
– Gary Russell
Mar 25 at 13:34
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55311070%2fhow-to-dynamically-create-multiple-consumers-in-spring-kafka%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
you ca directly use kafka api to do what you want.
– TongChen
Mar 23 at 6:43
Yes I have already tried that out, however, I want to try it out in Spring-kafka way.
– Iqbal
Mar 24 at 6:48