MQ(Kafka)学习笔记
[toc]
MQ 概述
在项目中,要完成一个异步操作,就可以使用MQ。消息的生产者把消息放到MQ中,消费者(下游服务)从MQ中取消息,并根据消息内容,执行对应的操作。
如果每个消费者都与一条队列强绑定,那么生产者在生产的时候就要每次都往多个队列发消息,这样不仅会造成更多的存储占用,还不利于解耦,如果消费者不消费了,对应的队列就得去掉。
主题模式(发布-订阅模式)
为了解决这个问题,可以使用主题模式:
- 每个消费者去订阅需要消费的主题(topic,内部为队列)
- 生产者往topic发消息
- topic可以把队列中的消息发送给每个订阅的消费者
消费者不再直接依赖队列,而是去依赖(订阅)topic。一个topic中,可以有一个或多个队列。通过增加topic中的队列数,从而对topic进行水平扩容。
Broker
一个MQ中可以有多个topic,broker(可以理解为MQ自身)用于管理topic。
生产者把消息发给broker;broker根据topic,将消息放到对应的队列中;消费者根据topic获取消息。
接下来的问题是,消息存在哪里?如果存在内存中,那么重启服务器,消息就会丢失,消费者就消费不到这部分消息了。因此broker要将消息保存到磁盘里做持久化。
如果只有一个broker,就要面临单点故障问题的风险,所以最起码也得做个主从同步。写消息只能写broker的主节点(主broker),从节点(从broker)定期从主节点中同步数据。(Mysql的读写分离、Redis的主从架构也是如此)
集群
接下来的问题是,broker的主从节点存的数据一样,每个broker能存的消息数量也有上限,主节点并发写入的能力也有瓶颈。解决方式是,部署多个主从结构,构成一个broker集群,然后把每个topic的队列和数据分散存到不同broker节点上。这样的话,每个主节点都只存topic的部分数据,从而减轻其压力。(Mysql的分库分表、Redis的分片集群也是如此)
注册中心
既然现在有那么多broker,生产者怎么知道该给哪个broker发消息呢?消费者又怎么知道该从哪个broker读消息呢?这就需要一个注册中心来专门管理集群。
注册中心可以做到:
- 保存broker的状态信息,broker定期向注册中心发心跳包(是不是死掉了)
- 管理消息的路由,给生产者和消费者topic路由表信息(该往哪里写,该从哪里读)
- broker的负载均衡与故障转移
- 注册中心自身的高可用(注册中心也得是个集群)
整体流程
- broker集群定时向注册中心发心跳包
- 注册中心掌管所有broker的节点信息和消息的路由信息
- 生产者要给某个topic发消息时,通过访问注册中心,得到topic的路由表,从而得知消息该发给哪个broker的哪个队列
- broker主节点收到消息后,同步给从节点,然后将消息持久化到磁盘
- 消费者通过访问注册中心订阅对应的topic,得到topic的路由表,从而得知消息该从哪个broker的哪个队列拉取消息
Kafka 概述
结构:Kafka工作的整体结构包括生产者集群、消费者集群、broker集群和Zookeeper集群(或Kraft集群),这个和
MQ 概述部分提到的是一致的。Broker:topic中的队列在Kafka中是partition,一个topic的partition可以分布在多台机器上的多个broker上,这可以解决单机性能瓶颈问题。
Offset:partition中每个消息都有一个唯一标识即offset,这是个单调递增的整数。消费者通过记录消费到的offset,就可以知道消费到哪个地方了。
有序性:Kafka只能保证单个partition(队列)中的消息是有序的,并不能保证不同partition之间消息的有序性。
高可用:为了避免单点故障问题,每个partition上的数据,都会同步到其他broker节点上,每个partition在其他broker上就有了多个副本。在所有相同的partition中,会有一个leader和多个followers。读写实际上都是和leader交互,而followers负责做备份(这里和
MQ 概述中不太一样,kafka做的是partition的主从,MQ 概述中讲的是broker的主从)。之所以不读follwer,是为了便于管理消费者的offset,不然如果每次读的follwer都不一样,就没法控制offset了。消费者组:多个消费者可以组成消费者组,并行地消费topic中的消息。消费者组中,最多有partition num个消费者可以消费topic,多余的消费者空闲。(topic中的一个partition只能被一个消费者组的一个消费者消费)
整体流程
- broker集群定时向ZK/Kraft发心跳包
- ZK/Kraft掌管所有broker、topic和partition的信息
- broker中选取出一个controller,用于监听ZK/Kraft中的topic的变化。一旦topic变化,controller就会从ZK/Kraft中拉取最新数据,并广播给其他broker
- 生产者可以直接通过broker获取路由信息,然后根据配置的分区策略,把消息直接发送给目标topic对应的partition的leader对应的broker
- broker收到消息后,写入partition末尾,并分配offset,然后leader同步数据给followers
- 消费者可以直接通过访问broker,获取订阅的topic的路由信息,就知道去哪个broker中拉取消息了,然后根据自己记录的offset,就知道该从什么位置读消息了
顺序性
不管是Kafka、RabbitMQ还是RocketMQ,单个队列/partition(分区)内的消息都是有序的,跨队列或者跨分区都不能保证消息有序。
想要保证有序有两个点:
- 自定义消息路由算法,把消息都路由到同一个队列里面
- 单个消费者串行地去消费消息
有序实际上分为全局有序和局部有序。全局有序就只能有一个队列/分区,并且一个消费者组只能有一个消费者去串行处理,这性能很差、问题也很大。一般只需要局部有序,比如同一个订单的下单、支付、发货需要有序,可以根据订单的id,将其路由到同一个队列。所以局部有序的核心就是把相同id(相同业务)的消息路由到同一个队列,然后每个队列都有一个消费者。
如果消费者用线程池并行消费partition,又会造成乱序问题。解决方法是,消费者在内存中维护几个阻塞队列,把同一个id的消息放到同一个阻塞队列里面,一个队列对应一个线程来进行并行的消费处理。
但这样还有问题,topic的分区的扩缩容会导致相同id在扩缩容后被存到其他队列,或者分区的消费者发生变化,分区的新的消费者可能会重新拉取消息,从而导致乱序或幂等性问题
如果消费失败,一般会再放到队列里重试,那么顺序消息怎么重试呢?只能做本地重试,但是本地重试遇到重新分配消费者的情况,可能会丢掉重试的消息。
MQ的本质就是一个高性能但不可靠的中间件,因此不应该完全依赖MQ去保证顺序性和可靠性。要做好兜底措施。
- 可以用死信队列去保存拒绝、处理失败、超时等的消息,便于后续复核、修复、重放等。配合错误日志和链路追踪快速定位问题
- 做好定时的数据对账和补偿,实现最终一致性
可靠性
保证可靠性就是保证消息不会丢失。消息可能在这些情况丢失:
- 从生产者发送到MQ
- MQ自己弄丢了消息,比如宕机、重启等
- MQ把消息发给消费者或者消费者拿到消息后还没来得及消费就发生了异常等
解决方式:
- 从生产者到MQ可以用ACK机制,MQ收到消息后给生产者回复ACK
- 针对MQ自己弄丢了消息,应该开启MQ的持久化消息,这样可以把消息都持久化到磁盘,就算MQ宕机、重启,也能从磁盘里恢复;做好MQ的集群,保证高可用,避免宕机
- 启用消费者的ACK机制,消费者消费完之后,再给MQ发一个ACK
消费者消费失败了,最好是放到重试队列中。因为本地重试遇到消费者Rebalance(重新分配消费者)可能会丢消息。而且放到重试队列里,不仅不会丢,还因为重试队列是一个单独的队列,因此方便做好监控、观测等。
既然都重试了,还需要保证消息的幂等性。可以给消息增加唯一索引,保证消费的幂等性,本质就是避免重试导致重复消费问题。
幂等性
保证幂等性就是避免消息的重复消费。消息在这些情况下会有重复的消息:
- 生产者发给MQ后,因为网络问题,没能及时给生产者发ACK,生产者为了保证消息不丢,就再发一次;或者业务层的一些重试操作,也有可能重发消息。
- 消费者因为宕机、重启或网络延迟等问题,没有及时给MQ发ACK,MQ为了保证消息不丢,就再给消费者发一次。
要解决消息的重复消费问题,要从消费者入手:
最通用的解决消息重复消费问题的方式是给消息分配一个唯一ID,然后消费的时候把ID存到Redis的setnx(第一次保存为成功,后续均失败)。
在业务中,有些消息自带唯一ID,比如订单ID。可以先查数据库看有没有做过相应处理,或者就直接用数据库唯一索引来避免重复消息。
针对更新的操作,可以用乐观锁的思想做。比如消息体中自带一个版本号,然后消费者更新数据的时候,对比版本号是否一致。这种方案只能针对更新操作。
这些操作不仅可以保证消息的幂等性,很多接口的幂等性也是这么做的。