How to dynamically create multiple consumers in Spring KafkaHow can I create an executable JAR with dependencies using Maven?How do I create a file and write to it in Java?Spring Integration Kafka vs Spring KafkaHow to create separate Kafka listener for each topic dynamically in springboot?Spring Kafka listenerExecutorKafka Listener Method is not invoked. Consumer not consuming.How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?Unable to consume Kafka messages within Spring BootMultiple KafkaConsumer on multiple Kafka Cluster in Spring BootSpring kafka consumer don't commit to kafka server after leader changed

Select list elements based on other list

Range hood vents into crawl space

Why doesn't increasing the temperature of something like wood or paper set them on fire?

Bash prompt takes only the first word of a hostname before the dot

shebang or not shebang

How do I minimise waste on a flight?

Is throwing dice a stochastic or a deterministic process?

Magical Modulo Squares

Does restarting the SQL Services (on the machine) clear the server cache (for things like query plans and statistics)?

Did any early RISC OS precursor run on the BBC Micro?

I want to write a blog post building upon someone else's paper, how can I properly cite/credit them?

Why is there a cap on 401k contributions?

Employee is self-centered and affects the team negatively

call() a function within its own context

In a series of books, what happens after the coming of age?

While drilling into kitchen wall, hit a wire - any advice?

If quadruped mammals evolve to become bipedal will their breast or nipple change position?

Antivirus for Ubuntu 18.04

Explaining intravenous drug abuse to a small child

A problem with Hebrew and English underlined text

Searching for a sentence that I only know part of it using Google's operators

Scaling rounded rectangles in Illustrator

My parents are Afghan

How can I draw a rectangle around venn Diagrams?



How to dynamically create multiple consumers in Spring Kafka


How can I create an executable JAR with dependencies using Maven?How do I create a file and write to it in Java?Spring Integration Kafka vs Spring KafkaHow to create separate Kafka listener for each topic dynamically in springboot?Spring Kafka listenerExecutorKafka Listener Method is not invoked. Consumer not consuming.How to stop micro service with Spring Kafka Listener, when connection to Apache Kafka Server is lost?Unable to consume Kafka messages within Spring BootMultiple KafkaConsumer on multiple Kafka Cluster in Spring BootSpring kafka consumer don't commit to kafka server after leader changed






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








1















I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8










share|improve this question






















  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48

















1















I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8










share|improve this question






















  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48













1












1








1








I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8










share|improve this question














I have two Kafka clusters, the IPs for which I am fetching dynamically from database. I am using @KafkaListener for creating listeners. Now I want to create multiple Kafka listeners at runtime depending on the bootstrap server attribute(comma-separated values), each one listening to a cluster. Can you please suggest me how do I achieve this?



Spring-boot: 2.1.3.RELEASE
Kafka-2.0.1
Java-8







java kafka-consumer-api spring-kafka






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Mar 23 at 6:00









IqbalIqbal

1116




1116












  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48

















  • you ca directly use kafka api to do what you want.

    – TongChen
    Mar 23 at 6:43











  • Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

    – Iqbal
    Mar 24 at 6:48
















you ca directly use kafka api to do what you want.

– TongChen
Mar 23 at 6:43





you ca directly use kafka api to do what you want.

– TongChen
Mar 23 at 6:43













Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

– Iqbal
Mar 24 at 6:48





Yes I have already tried that out, however, I want to try it out in Spring-kafka way.

– Iqbal
Mar 24 at 6:48












1 Answer
1






active

oldest

votes


















1














Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer

























  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55311070%2fhow-to-dynamically-create-multiple-consumers-in-spring-kafka%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









1














Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer

























  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34















1














Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer

























  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34













1












1








1







Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);









share|improve this answer















Your requirements are not clear but, assuming you want the same listener configuration to listen to multiple clusters, here is one solution. i.e. make the listener bean a prototype and mutate the container factory for each instance...



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private final Map<String, MyListener> listeners = new HashMap<>();

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
this.listeners.put(groupId, context.getBean("listener", MyListener.class));
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
;


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);






@ConfigurationProperties(prefix = "kafka")
public class ClusterProperties

private String[] clusters;

public String[] getClusters()
return this.clusters;


public void setClusters(String[] clusters)
this.clusters = clusters;





kafka.clusters=localhost:9092,localhost:9093

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false


Result



group0
group1
...
2019-03-23 11:43:25.993 INFO 74869 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]
2019-03-23 11:43:25.994 INFO 74869 --- [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: partitions assigned: [so55311070-0]


EDIT



Add code to retry starting failed containers.



It turns out we don't need a local map of listeners, the registry has a map of all containers, including the ones that failed to start.



@SpringBootApplication
@EnableConfigurationProperties(ClusterProperties.class)
public class So55311070Application

public static void main(String[] args)
SpringApplication.run(So55311070Application.class, args);


private boolean atLeastOneFailure;

private ScheduledFuture<?> restartTask;

@Bean
public ApplicationRunner runner(ClusterProperties props, ConsumerFactory<Object, Object> cf,
ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, KafkaListenerEndpointRegistry registry, TaskScheduler scheduler)

return args ->
AtomicInteger instance = new AtomicInteger();
Arrays.stream(props.getClusters()).forEach(cluster ->
Map<String, Object> consumerProps = new HashMap<>(cf.getConfigurationProperties());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster);
String groupId = "group" + instance.getAndIncrement();
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
attemptStart(containerFactory, context, consumerProps, groupId);
);
registry.getListenerContainers().forEach(c -> System.out.println(c.getGroupId())); // 2.2.5 snapshot only
if (this.atLeastOneFailure)
Runnable rescheduleTask = () ->
registry.getListenerContainers().forEach(c ->
this.atLeastOneFailure = false;
if (!c.isRunning())
System.out.println("Attempting restart of " + c.getGroupId());
try
c.start();

catch (Exception e)
System.out.println("Failed to start " + e.getMessage());
this.atLeastOneFailure = true;


);
if (!this.atLeastOneFailure)
this.restartTask.cancel(false);

;
this.restartTask = scheduler.scheduleAtFixedRate(rescheduleTask,
Instant.now().plusSeconds(60),
Duration.ofSeconds(60));

;


private void attemptStart(ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory,
ApplicationContext context, Map<String, Object> consumerProps, String groupId)

containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
try
context.getBean("listener", MyListener.class);

catch (BeanCreationException e)
this.atLeastOneFailure = true;



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public MyListener listener()
return new MyListener();


@Bean
public TaskScheduler scheduler()
return new ThreadPoolTaskScheduler();




class MyListener

@KafkaListener(topics = "so55311070")
public void listen(String in)
System.out.println(in);










share|improve this answer














share|improve this answer



share|improve this answer








edited Mar 25 at 13:34

























answered Mar 23 at 15:49









Gary RussellGary Russell

87k85380




87k85380












  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34

















  • Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

    – Iqbal
    Mar 24 at 6:47











  • I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

    – Iqbal
    Mar 24 at 6:51












  • Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

    – Gary Russell
    Mar 24 at 13:27












  • Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

    – Iqbal
    Mar 25 at 8:01











  • See the edit to my answer.

    – Gary Russell
    Mar 25 at 13:34
















Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

– Iqbal
Mar 24 at 6:47





Thanks a lot Gary for the guidance. I am now able to spawn multiple consumers depending on the number of clusters. However, while starting the application if one of the mentioned clusters is unavailable, the consumer keeps retrying for the connection and then eventually times out bringing down the entire application. While I am looking for an approach even if one of the mentioned cluster is unavailable my other consumer must keep running(which is successfully connected to the available cluster).

– Iqbal
Mar 24 at 6:47













I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

– Iqbal
Mar 24 at 6:51






I am getting the exception-java.lang.IllegalStateException: Failed to execute ApplicationRunner Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'listener' defined. Perhaps because since one of my cluster is unavailable listener bean is not getting initialized properly and hence is failing.

– Iqbal
Mar 24 at 6:51














Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

– Gary Russell
Mar 24 at 13:27






Simply put try... catch(...) ... around the call to getBean(). You can speed up the failure by setting the default.api.timeout.ms property; e.g. spring.kafka.consumer.properties.default.api.timeout.ms=5000.

– Gary Russell
Mar 24 at 13:27














Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

– Iqbal
Mar 25 at 8:01





Thanks for the suggestion, it worked. Is there a way I can have the consumer polling for the connection, so that once the cluster becomes available the consumer must get connected and start consuming messages.

– Iqbal
Mar 25 at 8:01













See the edit to my answer.

– Gary Russell
Mar 25 at 13:34





See the edit to my answer.

– Gary Russell
Mar 25 at 13:34



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55311070%2fhow-to-dynamically-create-multiple-consumers-in-spring-kafka%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Kamusi Yaliyomo Aina za kamusi | Muundo wa kamusi | Faida za kamusi | Dhima ya picha katika kamusi | Marejeo | Tazama pia | Viungo vya nje | UrambazajiKuhusu kamusiGo-SwahiliWiki-KamusiKamusi ya Kiswahili na Kiingerezakuihariri na kuongeza habari

SQL error code 1064 with creating Laravel foreign keysForeign key constraints: When to use ON UPDATE and ON DELETEDropping column with foreign key Laravel error: General error: 1025 Error on renameLaravel SQL Can't create tableLaravel Migration foreign key errorLaravel php artisan migrate:refresh giving a syntax errorSQLSTATE[42S01]: Base table or view already exists or Base table or view already exists: 1050 Tableerror in migrating laravel file to xampp serverSyntax error or access violation: 1064:syntax to use near 'unsigned not null, modelName varchar(191) not null, title varchar(191) not nLaravel cannot create new table field in mysqlLaravel 5.7:Last migration creates table but is not registered in the migration table

은진 송씨 목차 역사 본관 분파 인물 조선 왕실과의 인척 관계 집성촌 항렬자 인구 같이 보기 각주 둘러보기 메뉴은진 송씨세종실록 149권, 지리지 충청도 공주목 은진현