Kafka 是一个分布式流平台。是一种高吞吐量的分布式发布订阅消息系统。
消息中间件一般支持两种模式的队列:
一是消息队列模式
二是发布订阅 (Pub-Sub) 模式
Kafka 是发布订阅模式的轻量级消息系统、流平台。
这个系统需要满足如下三个特性:
- 能够对流记录(每条消息)进行发布、订阅。
- 能够支持容错地持久化每一条流记录。(注:采用时间复杂度O(1)的磁盘存储结构,即使TB级别以上的数据也可以常数时间访问)
- 能够及实地处理每一条实时的流记录消息。
核心概念
- Broker
即代理服务器,kafka 部署在集群中的多台机器上,其中的每一台服务器即 Broker,它代表 kafka 的一个实例或节点,多个 Broker 构成一个 kafka 集群。
- Topic
即话题,生产或消费流记录(消息),都需要指定特定的 Topic,一般将同一业务数据、同一类型数据写入同一 Topic。
- Partition
即分区,因为一个 Topic 下可能会有大量的数据,一个 Broker 可能存不下,故一个 Topic 可以有多个分区,相当于把一个大流数据集分为多份,分别存在隶属于同一个 Topic 下的不同分区中。
(注:Topic 是逻辑概念,Partition 是物理概念,每个分区对应于一个物理文件夹,存储分区数据及索引文件)
- Producer
即生产者,向 Broker 某一 Topic 主题发送数据的客户端。
- Consumer
即消费者,消费指定 Topic 下的数据。
- Consumer Group
即消费者组,每个 Consumer 隶属于某个特定的 Consumer Group,Producer 将一条消息发送给所有的 Consumer Group,但最终只能被 Consumer Group 下的唯一的一个 Consumer 消费。
(注:分组的目的是为了加快读取速度)
- Replication
即副本,一个 Partition 分区可以有多个副本,存在不同的 Broker 中,提供容错保证。
一个 Topic 可以被多个 Consumer Group 订阅,但是只能被其中的一个 Consumer 消费。这里 Consumer Group A 和 Consumer Group B 都订阅了 Topic 1 和 Topic 2。在Consumer Group 中所有 Consumer 是竞争关系,一个流记录(消息)只能被一个 Consumer 消费。
如何保证百万级写入速度
Kafka 底层的页缓存技术的使用,磁盘顺序写的思路,以及零拷贝技术的运用是做到每秒几十万的吞吐量的保证。
页缓存技术 & 磁盘顺序写(生产端)
由于 Kafka 每次接收到数据都会往磁盘上写,如果把数据基于磁盘来存储,频繁的往磁盘文件里写数据,那么无疑这个写操作的性能会非常差,无法达到百万级别的效率。
为了保证数据写入性能,Kafka 采取 基于操作系统的页缓存来实现文件写入 。操作系统本身有一层缓存,叫做 page cache,是在内存里的缓存,也可以称之为 os cache,意思就是操作系统自己管理的缓存。当写入磁盘文件的时候,可以直接写入 os cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 os cache 里的数据真的刷入磁盘文件中。仅仅这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘。
还有一个保证 kafka 有百万级别写入数据的关键点是 磁盘顺序写 技术,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。
所以 Kafka 在写数据的时候,基于以下两个方面实现写入数据的超高性能:
基于操作系统层面的 page cache 来写数据,所以性能很高,本质就是在写内存。
采用磁盘顺序写的方式,所以即使数据刷入磁盘的时候,性能也是极高的,和写内存是差不多的。
零拷贝(消费端)
Kafka 在消费数据的时候实际上就是要从磁盘文件里读取某条数据然后发送给下游的消费者,这里如果频繁的进行磁盘的读写操作,无疑性能也是级低的。
Kafka 为了解决这个问题,在读数据的时候是引入 零拷贝技术。
也就是直接让操作系统的 cache 中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到 Socket 缓存。
通过零拷贝技术,就不需要把 os cache 里的数据拷贝到应用缓存,再从应用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝。
对 Socket 缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从 os cache 中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
而且大家会注意到,在从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是直接读内存的。
如果kafka集群经过良好的调优,可以使得大量的数据都是直接写入 os cache 中,然后读数据的时候也是从 os cache 中读。
相当于是 Kafka 完全基于内存提供数据的读写操作,所以这个整体性能会极其的高。
如何做到不丢失数据
- 对kafka进行限速,平滑流量
- 启用重试机制,重试间隔时间设置长一些
- Kafka 设置 acks=all,即需要相应的所有处于 ISR 的分区都确认收到该消息后,才算发送成功。
Kafka 的 ISR 机制
ISR 机制是 Kafka 集群保证高可用的核心机制,这个机制简单来说,就是会自动给每个 Partition 维护一个 ISR 列表,这个列表里一定会有 Leader,然后还会包含跟 Leader 保持同步的 Follower。也就是说,只要 Leader 的某个 Follower 一直跟他保持数据同步,那么就会存在于 ISR 列表里。但是如果 Follower 因为自身发生一些问题,导致不能及时的从 Leader 同步数据过去,那么这个 Follower 就会被认为是 “out-of-sync”,被从 ISR 列表里踢出去。
在做主备切换时,直接且只允许从 ISR 集合中选取将要转正的副本即可
核心关键点
- 必须要求至少一个 Follower 在 ISR 列表里
- 每次写入数据的时候,要求 Leader 写入成功以外,至少保证 ISR 里有一个 Follower 也写成功
如何做到不重复消费
生产者不重复生产消息,精确一次,
消费者不重复消费消息,精确一次,业务去重
首先要了解的是 message delivery semantic 也就是消息传递语义。这是一个通用的概念,也就是消息传递过程中消息传递的保证性。
分为三种:
- 最多一次(at most once): 消息可能丢失也可能被处理,但最多只会被处理一次。
可能丢失 不会重复
- 至少一次(at least once): 消息不会丢失,但可能被处理多次。
可能重复 不会丢失
- 精确传递一次(exactly once): 消息被处理且只会被处理一次。
不丢失 不重复 就一次
而kafka其实有两次消息传递,一次生产者发送消息给kafka,一次消费者去kafka消费消息。
两次传递都会影响最终结果,两次都是精确一次,最终结果才是精确一次。两次中有一次会丢失消息,或者有一次会重复,那么最终的结果就是可能丢失或者重复的。
去重问题:消息可以使用唯一id标识
保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
高可用策略
kafka 使用 ISR 副本管理机制来保证其高可用性。首先需要明确 kafka 副本基本管理单位是 Partition 分区,如果我们指定了多个副本策略,则这些副本里只有一个为主副本(Leader),其他为次级副本 (Follower),所有读写均和主副本来响应。
ISR 具体运行机制是:将所有的次级副本(偶数)放到两个集合,其中一个集合被称为 ISR 集合,该集合里的数据始终与主节点数据保持一致,数据写入时,只有 ISR 集合中全部成功写入才算写入成功,在做主备切换时,直接且只允许从 ISR集合中选取将要转正的副本即可。
那么为何不采用 Zab(Zookeeper 采用的一致性协议)、或 Paxos 算法来保证 kafka 副本的数据一致性,而是采用另一套 ISR 副本管理机制来保证数据一致性呢?
原因其实很简单,因为在相同的副本容错条件下, ISR 机制可以维护更少的数据副本。比如 ISR 集合大小为 n+1(1主,n副,n为偶数),那么最多可以允许n个副本故障,而对于其他基于投票的一致性算法来说,则需要 2n + 1 个副本才能达到相同的容错性。
选举规则
Kafka 的 Leader 选举思路很简单,基于上述提到的 ISR 列表,当宕机后会从所有副本中顺序查找,如果查找到的副本在 ISR 列表中,则当选为 Leader。另外还要保证前任 Leader 已经是退位状态了,否则会出现脑裂情况(有两个 Leader )。
常见问题
- 多少个副本才算够用?
副本肯定越多越能保证Kafka的高可用,但越多的副本意味着网络、磁盘资源的消耗更多,性能会有所下降,通常来说副本数为3即可保证高可用,极端情况下将 replication-factor 参数调大即可。
- Follower 和 Leader 之间没有完全同步怎么办?
Follower 和 Leader 之间并不是完全同步,但也不是完全异步,而是采用一种 ISR 机制( In-Sync Replica)。每个 Leader 会动态维护一个 ISR 列表,该列表里存储的是和 Leader 基本同步的 Follower。如果有 Follower 由于网络、GC 等原因而没有向 Leader 发起拉取数据请求,此时 Follower 相对于 Leader 是不同步的,则会被踢出 ISR 列表。所以说,ISR 列表中的 Follower 都是跟得上 Leader 的副本。
- Kafka 的 offset 机制?
由于 Consumer 在消费过程中可能会出现断电宕机等故障,Consumer 恢复后,需要从故障前的位置继续消费。所以 Consumer 需要实时记录自己消费到了哪个 Offset,以便故障恢复后继续消费。
Kafka 0.9 版本之前,Consumer 默认将 Offset 保存在 Zookeeper 中,从 0.9 版本开始,Consumer 默认将 Offset 保存在 Kafka 一个内置的 Topic 中,该 Topic 为 __consumer_offsets。