How to dynamically create multiple consumers in Spring KafkaHow can I create an executable JAR with dependencies using Maven?How do I create a file and write to it in Java?Spring Integration Kafka vs Spring KafkaHow to create separate Kafka listener for each topic dynamically in springboot?Spring Kafka listenerExecutorKafka Listener Method is not invoked. Consumer not consuming.How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?Unable to consume Kafka messages within Spring BootMultiple KafkaConsumer on multiple Kafka Cluster in Spring BootSpring kafka consumer don't commit to kafka server after leader changed

Select list elements based on other list

Range hood vents into crawl space

Why doesn't increasing the temperature of something like wood or paper set them on fire?

Bash prompt takes only the first word of a hostname before the dot

shebang or not shebang

How do I minimise waste on a flight?

Is throwing dice a stochastic or a deterministic process?

Magical Modulo Squares

Does restarting the SQL Services (on the machine) clear the server cache (for things like query plans and statistics)?

Did any early RISC OS precursor run on the BBC Micro?

I want to write a blog post building upon someone else's paper, how can I properly cite/credit them?

Why is there a cap on 401k contributions?

Employee is self-centered and affects the team negatively

call() a function within its own context

In a series of books, what happens after the coming of age?

While drilling into kitchen wall, hit a wire - any advice?

If quadruped mammals evolve to become bipedal will their breast or nipple change position?

Antivirus for Ubuntu 18.04

Explaining intravenous drug abuse to a small child

A problem with Hebrew and English underlined text

Searching for a sentence that I only know part of it using Google's operators

Scaling rounded rectangles in Illustrator

My parents are Afghan

How can I draw a rectangle around venn Diagrams?



How to dynamically create multiple consumers in Spring Kafka


How can I create an executable JAR with dependencies using Maven?How do I create a file and write to it in Java?Spring Integration Kafka vs Spring KafkaHow to create separate Kafka listener for each topic dynamically in springboot?Spring Kafka listenerExecutorKafka Listener Method is not invoked. Consumer not consuming.How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?Unable to consume Kafka messages within Spring BootMultiple KafkaConsumer on multiple Kafka Cluster in Spring BootSpring kafka consumer don't commit to kafka server after leader changed






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








1















I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8










share|improve this question






















  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48

















1















I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8










share|improve this question






















  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48













1












1








1








I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8










share|improve this question














I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8







java kafka-consumer-api spring-kafka






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Mar 23 at 6:00









IqbalIqbal

1116




1116












  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48

















  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48
















you ca directly use kafka api to do what you want.

– TongChen
Mar 23 at 6:43





you ca directly use kafka api to do what you want.

– TongChen
Mar 23 at 6:43













Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

– Iqbal
Mar 24 at 6:48





Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

– Iqbal
Mar 24 at 6:48












1 Answer
1






active

oldest

votes


















1














Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer

























  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55311070%2fhow-to-dynamically-create-multiple-consumers-in-spring-kafka%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer

























  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34















1














Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer

























  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34













1












1








1







Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer















Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);










share|improve this answer














share|improve this answer



share|improve this answer








edited Mar 25 at 13:34

























answered Mar 23 at 15:49









Gary RussellGary Russell

87k85380




87k85380












  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34

















  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34
















Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

– Iqbal
Mar 24 at 6:47





Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

– Iqbal
Mar 24 at 6:47













I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

– Iqbal
Mar 24 at 6:51






I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

– Iqbal
Mar 24 at 6:51














Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

– Gary Russell
Mar 24 at 13:27






Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

– Gary Russell
Mar 24 at 13:27














Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

– Iqbal
Mar 25 at 8:01





Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

– Iqbal
Mar 25 at 8:01













See the edit to my answer.

– Gary Russell
Mar 25 at 13:34





See the edit to my answer.

– Gary Russell
Mar 25 at 13:34



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55311070%2fhow-to-dynamically-create-multiple-consumers-in-spring-kafka%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

Swift 4 - func physicsWorld not invoked on collision? The Next CEO of Stack OverflowHow to call Objective-C code from Swift#ifdef replacement in the Swift language@selector() in Swift?#pragma mark in Swift?Swift for loop: for index, element in array?dispatch_after - GCD in Swift?Swift Beta performance: sorting arraysSplit a String into an array in Swift?The use of Swift 3 @objc inference in Swift 4 mode is deprecated?How to optimize UITableViewCell, because my UITableView lags

Access current req object everywhere in Node.js ExpressWhy are global variables considered bad practice? (node.js)Using req & res across functionsHow do I get the path to the current script with Node.js?What is Node.js' Connect, Express and “middleware”?Node.js w/ express error handling in callbackHow to access the GET parameters after “?” in Express?Modify Node.js req object parametersAccess “app” variable inside of ExpressJS/ConnectJS middleware?Node.js Express app - request objectAngular Http Module considered middleware?Session variables in ExpressJSAdd properties to the req object in expressjs with Typescript