Concurrency in Spring's StreamListener and Kafka
Another too fast, too furious post. I have spent a few hours trying to make my event processor multi-threaded, and it’s so damn easy that I don’t want anyone to spend more than a few minutes in this. I couldn’t find it anywhere on Internet, so I share it here.
We are using the Spring Cloud Stream layer to configure our Kafka consumers.
For example, a configuration for a processor named ‘reservations-input’ connected to a Kafka topic ‘reservations-topic’ would be similar to this:
spring.cloud.stream:
bindings:
reservations-input:
content-type: application/json
destination: reservations-topic
group: consumer-service-group
And your class to start processing those events:
@EnableBinding({MessagingConfiguration.ReservationTopic.class})
public class MessagingConfiguration {
public interface ReservationTopic {
String INPUT = "reservations-channel";
@Input(INPUT)
SubscribableChannel input();
}
}
@Service
public class ReservationProcessor {
@StreamListener(MessagingConfiguration.ReservationTopic.INPUT)
public void handle(@Nonnull Message<ReservationEvent> reservationMessage) {
// your stuff
}
Easy peasy. Only problem here is concurrency.
If you have used Kafka before, you would know that the number of partitions in your topic limits the concurrency. Each partition have 1 single consumer.
I don’t know whether (and where) I read that, but I assumed that my application would generate as many threads/consumers as partitions my topic has. But I was wrong. By default, Spring’s only generates 1-threaded processor.
Solutions? Get more instances of your application or configure ConcurrentKafkaListenerContainerFactory to be able to throw more threads (see https://docs.spring.io/spring-kafka/docs/2.3.x/reference/html/#container-factory).
Option 1: create your own instance of ConcurrentKafkaListenerContainerFactory.
The only hint I found in the documentation or stackoverflow was to instance a bean of type ConcurrentKafkaListenerContainerFactory.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
@Nonnull ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(5);
return factory;
}
I am not very prone to creating my own beans to configure stuff that seems too obvious. It is easy to overwrite some Spring default values that I am already using, it is more code to maintain…
There has to be a way through configuration.
Option 2: use configuration
Getting back to configuration, what we write under spring.cloud.stream.bindings.channel-name.consumer ends in the configuration of Kafka. Therefore, I tried to configure the property concurrency. That is:
spring.cloud.stream:
bindings:
reservations-input:
content-type: application/json
consumer.concurrency: 3
destination: reservations-topic
group: consumer-service-group
Starting our application, we see that we have 3 binders.
December 17th 2019, 14:22:57.274 2019-12-17 13:22:57.274 INFO [consumer-service,,,] 1 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [reservations-topic-1]
December 17th 2019, 14:22:57.259 2019-12-17 13:22:57.259 INFO [consumer-service,,,] 1 --- [container-2-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [reservations-topic-2]
December 17th 2019, 14:22:57.256 2019-12-17 13:22:57.256 INFO [consumer-service,,,] 1 --- [container-3-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1 : partitions assigned: [reservations-topic-3]
Profit!