消息队列面试篇
文章目录
零拷贝
rocketMq的架构是类似于kafka的,实现了许多kafka没有的功能,但是依旧没有在市场上打败kafka,kafka的性能这么高的原因得益于零拷贝技术。
在传统io方式中是需要经过cpu的,用户调用read()方法,数据通过dma传输到内存缓冲区,然后通过cpu拷贝到用户态的用户缓冲区,得到了用户能见的数据。再通过write()将数据拷贝到操作系统内核态的socket写缓冲区,最后通过网卡发送数据。这个过程一共经历了四次用户态和内核态的切换:
- 系统调用read,用户态->内核态
- dma后,cpu将数据从内核态拷贝回用户态
- 系统调用write,用户态->内核态
- write方法返回,重新回到用户态
以及四次数据的拷贝。
mmap
mmap是一种将内存映射到用户态空间的系统调用。简单来说就是用户态和内核态使用同一片空间,不用进行上下文切换。因此整个流程就会变为:
- mmap系统调用,用户态->内核态
- dma将磁盘数据写到内核态的缓冲区,返回给用户态,同时用户态也能看到这片缓冲区,可以进行操作。需要进行上下文切换但不用拷贝
- 操作完了之后系统调用write写到网卡发送,用户态->内核态
- 拷贝两次,从内核态缓冲区拷贝到socket缓冲区,再用dma方式写回磁盘或者网卡,最后回到用户态
综上所述,总共经历了3次io和四次上下文切换。减少的这一次叫做零拷贝。
sendfile
sendfile不会给用户态返回具体的数据,直接从内核态走socket传输走了。
- sendfile系统调用,用户态->内核态
- dma拷贝到内核态的缓冲区,这里不会返回给用户态具体的信息,也就是说用户态操作不了,直接拷贝给socket
- 再从socket拷贝到网卡,返回用户态
综上所述,一共两次上下文切换和三次io,对比 mmap+write,主要是没有再用户态进行拷贝。
说了这么多,到底跟kafka和rocketmq有什么关系呢?kafka是基于sendfile的,而rocketmq是基于mmap的,都使用了零拷贝技术,但是由于rocketmq新增了一些高级功能,需要直到具体的数据信息而不是直接发送,就用了能看见数据的mmap方式。kafka为了追求极致性能就没有这些花里胡哨的功能,使用了效率更高的sendfile
消息队列的作用
- 异步:不用等待同步操作,把消息扔给mq然后就直接返回,mq连接的另一端会异步进行操作
- 解耦:可以将业务进行拆分,也是上面异步的思想
- 削峰:到来的高并发请求先进入mq,类似漏桶限流算法,避免打倒服务器和数据库
Kafka的架构
kafka是基于发布者-订阅者模型的,在这个模型中有发布者,订阅者,broker和服务发现中心。具体到kafka就是producer,consumer,broker,topic,partition,配置中心是zookeeper:
- broker:就是一个物理上的kafka服务实体
- topic:消息是按照主题划分的,例如业务消息和日志消息就可以用两个topic划分
- partition:一个topic内也不只有一个队列,这个分区就是队列的意思。需要注意的是kafka在逻辑上是一个整体,但是物理上一个topic内所有分区不一定在同一个broker上。

- 生产者组:生成消息放入消息队列,是多对多的关系,一个生产者可以生成很多个topic的消息,也可以生成一个topic内多个partition的消息
- 消费者组:消费的进度用offset表示,消费者组并不会真的把消息队列的信息删除,而是用一个偏移量来表示消费到了哪里。每一个消费者组的偏移量可以是不同的,一般来说一个partition最好只有一个消费者,如果多了的话并发控制也会成为性能影响因素
- zookeeper:对于如此庞大规模的kafka集群,实体之间是怎么互相发现对方的?这里使用了zookeeper作为服务发现框架。kafka高度依赖zookeeper
kafka为了保证高可用,还有类似主从架构的副本机制,使得某个broker挂了之后,部署在其他地方的partition能够继续工作。总而言之,一个topic多个partition的机制能够使消息分散在多个broker上,实现负载均衡。partition的副本也会保存在不同的机器上。从逻辑上看是整体的,物理上看都不会集中在一块区域。
Kafka的存储机制
前情提要:一个topic可能横跨多个broker,每一个broker的文件系统只会保存自己实际物理存储的日志。只有通过zookeeper才能看到一个topic的全部分区信息。
在一个broker下,文件信息是这么存储的:
1 | <kafka-log-dir>/ |
- 日志段(segment):一个分区下有多个segment,segment中保存了实际的数据,这些数据的key是offset,每一个segment以当前最小的offset命名。查询的时候通过二分查找找到对应的offset
- 索引(index):同时会生成一个与segment同名的索引。保存当前segment下offset的索引信息(类似二级索引),加快日志内的查找速度
- 时间索引文件:存储offset对应的时间戳,便于根据时间查找信息。
生产者
当收到生产者数据的时候,用户态内存接收到之后,会给操作系统内核态的page cache,os再择机刷盘。这个写过程是顺序写,也就是说在对应的分区的最后一个log后面写刚刚的消息。
消费者
消费者消费数据的时候,根据offset先在文件名中二分查找到segment,再用对应的索引来加速搜索。
日志滚动
如果一个segment达到了默认的1GB或者超过了7天没有新增,那就会有一个新的segment
日志删除
可以配置kafka的消息删除时间,这个时候上面的日志索引文件就有用了,通过判断时间戳来删除。在删除过程中会使用copyonwrite以避免有消费者这个时候消费消息。
Kafka如何保证消息不丢失
消息丢失不是Kafka一个管道能说了算的,可能在生产者传输的过程中丢失,也有可能消费者offset更新完了之后就掉了导致没有真正消费信息。
- 生产者的信息丢失:类似tcp的各种握手,可以使用一个回调函数返回kafka是否接收到了信息,这里最好不要用同步操作get,用异步ListenableFuture<SendResult<String, Object>>开一个线程监听;如果失败了就重传,可以设置重传次数。
- 消费者的信息丢失:offset默认是消费者接受了这个消息就更新,但是可能会发生拿到消息就掉电导致消息没有实际被消耗。可以采用offset手动更新,在消费者逻辑的最后手动提交offset,这样就能保证消费完了才更新offset
- kafka的信息丢失:架构中都有这么多副本在兜底了,但是副本之间不一定时时刻刻都保持同步。设置几个参数就能保证kafka的高可用
- acks=all,表示所有副本都更新完写操作才返回,默认值是1,也就是主节点更新了就返回,但是这样依旧可能造成丢数据问题。因此为了绝对安全可以设置acks=all,但是会极大的影响性能。
- 设置副本数量>=3,虽然造成了数据冗余,但是可以保证高可用
- 设置更新的副本数量>1,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1,是acks=all的一种折中
- 设置 unclean.leader.election.enable = false,当发生主从数据不一致的时候,只有完全同步的数据副本才能被选举为leader
kafka是ap架构还cp架构?从上面的各种默认参数来看,比如acks=1,默认的时候还是倾向于可用性的,也就是会牺牲一部分一致性。但是经过上面的种种配置之后,就可以使kafka集群是强一致性的,在某些场合下可以灵活改变配置。
生产者向Kafka发送消息的执行流程
- 配置生产者,一般消息有三个值,topic,key和value(保证顺序的话就要指定partition)。还可以配置acks的参数,reties的次数,配置完这些之后产生一个生产者对象。
- 序列化消息,将key和value序列化,例如StringSerializer
- 消息需要选择进入哪个分区,如果key没有配置的话就轮询(RR),如果有的话就hash然后取模
- 如果配置了批处理,先把消息放在缓冲区,等达到了batchsize再传送
- 请求压缩:kafka集群主要的瓶颈在网络io而不是cpu,在进入的时候压缩,在出去的时候解压,能够减少网络延迟
- 当达到了batchsize,根据选择的分区发往broker。根据acks配置的参数返回消息
- 超时重试
- 如果有回调函数就会执行
kafka可以在分区内保证顺序,但是消费的整体顺序是无法保证的,如果需要严格的顺序消费,以下有两种方法解决:
- 指定key是相同的,在序列化的时候无法保证不同的key散列取余以后是在同一个分区,如果key是相同的就可以保证在一个分区
- key可以不同,指定partition相同就可以了,从根源上解决问题
消费者消费失败会怎样
消费者会有一个重试次数,如果由于网络导致消费失败,信息不会阻塞,会进入一个死信队列。可以在这个队列里面进一步分析原因,也可以继续消费这个队列的信息。
重试次数默认是10次,并且间隔是0s,也就是立即重试10次,没有消费成功就进入死信队列,不会阻塞下一条信息。
Kafka如何保证不重复消费信息
如上面所述,即便是手动提交offset还是可能出现重复消费,这个时候只能通过消费者的幂等进行校验了
三种消息投递保证
- At most once:至多一次,对应上面的消费者自动提交offset,这个时候掉电了这条消息就没有了
- At least once:对应手动提交offset,可能会重复多次。需要消费者自己做幂等
- Exactly once:消息精确传输一次。这不仅仅依赖于kafka,同时也要消费者的配合。
kafka的生产者在发送的时候会用全局唯一id+offset来进行幂等发送,避免因为发送端超时重试而导致重复消费。同时开启事务,确保消息是原子性的。
消费者只会消费已经提交过事务的消息(类似MVCC,未提交事务的信息是有可能回滚的,消费他们有风险)。同时类似At Least once,消费者也需要配合做幂等。
Exactly Once 的实现机制
生产者的角色:
- 幂等性:生产者通过启用幂等性功能(
enable.idempotence=true
)来确保在发送过程中,每条消息只有一个唯一的 ID。即使发生网络错误或重试,生产者也不会重复发送相同的消息。 - 事务支持:生产者使用事务来发送消息。在一个事务中,生产者可以发送多条消息,这些消息在调用
commitTransaction()
之前对消费者是不可见的。如果在发送过程中出现错误,生产者可以调用abortTransaction()
,此时所有消息都将被丢弃,确保没有未提交的消息被消费。
- 幂等性:生产者通过启用幂等性功能(
消费者的角色:
- **配置为
read_committed
**:消费者配置为isolation.level=read_committed
,确保只读取那些已成功提交的事务消息。这意味着消费者只会处理生产者通过commitTransaction()
提交的消息。 - 手动提交偏移量:消费者在处理每条消息后手动提交偏移量(
commitSync()
),以标记这些消息已被成功处理。只有在确认消息处理成功后,才提交偏移量,以避免处理失败导致的重复消费。 - 实现幂等性处理:消费者在业务逻辑中需要实现幂等性,以确保即使同一条消息被多次消费,最终的业务效果也应该一致。例如,使用数据库的唯一约束来避免重复插入。
- **配置为
总结
- 唯一性保证:通过生产者的唯一 ID 和事务管理,确保在 Kafka 中发送的消息是唯一的,并且不会因重试而重复。
- 消费的一致性:消费者通过手动提交偏移量和实现幂等性来确保消息的消费是唯一的,并且只处理那些已经提交的消息,避免了未提交消息可能引起的回滚和不一致。
- 消息可见性:未提交的事务消息对消费者是不可见的,确保了只有经过确认的消息才能被消费。
这种机制结合起来,使得在 Kafka 中能够实现 Exactly Once 的消息处理语义,从而在关键任务和需要高一致性的数据流动场景中得以应用。
乱入一下rocketmq
RocketMQ
阿里的rocketmq参考了kafka的架构,在架构上做减法,在功能上做加法。主要区别有:
- 存储模式:kafka的数据存储是按照segment,一个partition有很多segment,做的是追加写,但是一个broker下面有很多的topic,访问量大起来就不是追加写会退化为随机写。rocketmq发现了这一点,他将所有的queue都放在一个文件上commitlog,而queue只放offset和对应索引。这样就能一直是追加写了
- 同步模式:按照上面的逻辑,kafka那种replica就不行了,因为kafka是按照partition进行同步和冗余的,partition可以分布在不同的broker上的一个原因就是segment是分开来的。rocketmq把这一点优化了,所以rocketmq是类似redis和mysql那样,以broker为单位进行主从同步
- 简化协调节点:kafka高度依靠zookeeper,太重了。rocketmq直接用nacos就可以了,作为微服务的一个模块
- 零拷贝:kafka是sendfile而rocketmq是mmap+write
- 功能增强:延迟队列,死信队列,按照tag过滤消息,事务
乱入一下elasticsearch,估计问的也不多,简单了解一下
elasticsearch
微观到宏观,最小单元segment->shard->node->集群。
1 | 服务器实体 (Node 1) 服务器实体 (Node 2) |
segment
- 倒排索引:用分词器将每个单词作为键,值为对应的句子id就可以方便关键词查找了
- 前缀索引:单靠上面的倒排索引查询起来还是很费劲。可以使用前缀树trie完成。放在内存里方便查询
- 句子实际的存储位置:倒排索引只是放了句子的id,真正的句子存储在这里
- 一种方便排序和聚合的数据结构:某些时候用户可能想要按照时间排序,但是这些字段放在3中是很不好的,需要回表再去排序。不如直接用空间换时间,将时间信息提取出来单独做一个表
以上的四个部分共同构成了一个segment,这是搜索引擎的最小单元。但是由于前缀索引不好进行扩容,可以约定新增几条句子就开辟一个新的segment,同时也解决了读写问题。问题是文件会变多,可以采用定时合并的方法。这样多个segment就构成了lucene(shard)。如果有搜索请求就会并发在所有segment里面进行搜索。
shard
其实上面的多个segment加起来就是一个lucene,在es中如果只有一个lucene那么读写效率会变得很慢:
- 在垂直上可以参考kafka的topic设置多个index_name,例如分体育新闻和娱乐新闻。一个index对应一个lucene。
- 在水平上可以将lucene再分小一点,每个段少分一点segment,这就变成了标题所说的shard,本质上shard就是lucene的一个实例。这样读写操作有可能会被分开,提高效率
同时为了高可用还未每个shard提供了副本机制,分散在不同的node中
node
一个node就是一个服务器节点,类似kafka的broker,可以放很多不同topic的不同partition,包括副本啥的。这里就会要一个类似zookeeper的服务发现中心,但在es里是去中心化的,通过raft协议选举。
node有很多职责,有负责管理集群的(类似哨兵负责故障转移和选主),有存储数据的。有实现restful接口的,在集群环境下可以分开。每一个node负责不同的职责。
数据如何查找
- 客户端发送rest的http请求给集群中负责接收的node
- 在指定index_name下hash分片找到对应的shard
- 在shard中并行查找segment
- segment根据倒排索引找到对应句子的id返回
- 再去根据id进行回表,找到句子返回