- 2023-11-08 11:32:12
- 3249 热度
- 0 评论
最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用户对消费组的认识不够。其实,在之前的博文以及《Spring Cloud微服务实战》一书中都有提到关于消费组的概念以及作用。
那么什么是消费组呢?为什么要用消费组?它解决什么问题呢?摘录一段之前博文的内容,来解答这些疑问:
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
详细也可查看原文:消息驱动的微服务(消费组)。
下面,通过一个例子来看看如何使用消费组:
问题重现
构建消息消费端
第一步:创建绑定接口,绑定example-topic
输入通道(默认情况下,会绑定到RabbitMQ的同名Exchange或Kafaka的同名Topic)。
interface ExampleBinder { |
第二步:对上述输入通道创建监听与处理逻辑。
@EnableBinding(ExampleBinder.class) |
第三步;创建应用主类和配置文件
@SpringBootApplication |
spring.application.name=stream-consumer-group |
这里设置server.port=0
,以方便在本地启动多实例来重现问题。
完成上述操作之后,启动两个该应用的实例,以备后续调用。
构建消息生产端
比较简单,需要注意的是,使用@Output
创建一个同名的输出绑定,这样发出的消息才能被上述启动的实例接收到。具体实现如下:
@RunWith(SpringRunner.class) |
启动上述测试用例之后,可以发现之前启动的两个实例都收到的消息,并在日志中打印了:Received: Produce a message from : http://blog.didispace.com
。消息重复消费的问题成功重现!
使用消费组解决问题
如何解决上述消息重复消费的问题呢?我们只需要在配置文件中增加如下配置即可:
spring.cloud.stream.bindings.example-topic.group=aaa |
当我们指定了某个绑定所指向的消费组之后,往当前主题发送的消息在每个订阅消费组中,只会有一个订阅者接收和消费,从而实现了对消息的负载均衡。只所以之前会出现重复消费的问题,是由于默认情况下,任何订阅都会产生一个匿名消费组,所以每个订阅实例都会有自己的消费组,从而当有消息发送的时候,就形成了广播的模式。
另外,需要注意上述配置中example-topic
是在代码中@Output
和@Input
中传入的名字。
代码示例
本文示例读者可以通过查看下面仓库的中的stream-consumer-group
项目:
如果您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!
以下专题教程也许您会有兴趣
- Spring(403)
- Boot(208)
- Spring Boot(187)
- Java(82)
- Cloud(82)
- Spring Cloud(82)
- Security(60)
- Spring Security(54)
- Boot2(51)
- Spring Boot2(51)
- Redis(31)
- SQL(29)
- Mysql(25)
- IDE(24)
- Dalston(24)
- JDBC(22)
- IDEA(22)
- mongoDB(22)
- MVC(22)
- Web(21)
- CLI(20)
- Alibaba(19)
- SpringMVC(19)
- SpringBoot(17)
- Docker(17)
- Eclipse(16)
- Vue(16)
- Git(16)
- JPA(15)
- Apache(15)
- ORA(15)
- Oracle(14)
- jdk(14)
- Tomcat(14)
- Linux(14)
- HTTP(14)
- Mybatis(14)
- XML(13)
- JdbcTemplate(13)
- OAuth(13)
- Nacos(13)
- Pro(13)
- JSON(12)
- OAuth2(12)
- Data(12)
- stream(11)
- int(11)
- Myeclipse(11)
- not(10)
- Bug(10)
- maven(9)
- Map(9)
- Hystrix(9)
- ast(9)
- APP(8)
- Bit(8)
- API(8)
- session(8)
- Window(8)
- Swagger(8)
- JavaMail(7)
- Cache(7)
- File(7)
- IntelliJ(7)
- mail(7)
- windows(7)
- too(7)
- HTML(7)
- Github(7)
- Excel(6)
- Log4J(6)
- pushlet(6)
- apt(6)
- read(6)
- Freemarker(6)
- WebFlux(6)
- JSP(6)
- Bean(6)
- error(6)
- Server(6)
- nginx(6)
- ueditor(6)
- jar(6)
- ehcache(6)
- UDP(6)
- RabbitMQ(6)
- star(6)
- and(6)
- Struts(5)
- string(5)
- script(5)
- Syntaxhighlighter(5)
- Tool(5)
- Controller(5)
- swagger2(5)
- ldquo(5)
- input(5)
- Servlet(5)
- Config(5)
- discuz(5)