简介
Kafka 是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由 LinkedIn 公司开发,使用
Scala 语言编写,目前是 Apache 的开源项目。
- broker:Kafka 服务器,负责消息存储和转发
- topic:消息类别,Kafka 按照 topic 来分类消息
- partition:topic 的分区,一个 topic 可以包含多个 partition,topic 消息保存在各个
partition 上 - offset:消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表该消息的
唯一序号 - Producer:消息生产者
- Consumer:消息消费者
- Consumer Group:消费者分组,每个 Consumer 必须属于一个 group
- Zookeeper:保存着集群 broker、topic、partition 等 meta 数据;另外,还负责 broker 故
障发现,partition leader 选举,负载均衡等功能
数据存储
partition 的数据文件( offset,MessageSize,data )
partition中的每条Message包含了以下三个属性:offset,MessageSize,data,其中offset表示 Message 在这个 partition 中的偏移量,offset 不是该 Message 在 partition 数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message,可以认为offset是partition 中 Message 的 id;MessageSize 表示消息内容 data 的大小;data 为 Message 的具体内容。
数据文件分段 segment( 顺序读写、分段命令、二分查找 )
partition 物理上由多个 segment 文件组成,每个 segment 大小相等,顺序读写。每个 segment数据文件以该段中最小的 offset 命名,文件扩展名为.log。这样在查找指定 offset 的 Message 的时候,用二分查找就可以定位到该 Message 在哪个 segment 数据文件中。
数据文件索引(分段索引、 稀疏存储 )
Kafka 为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。index 文件中并没有为数据文件中的每条 Message 建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
生产者设计
负载均衡
由于消息 topic 由多个 partition 组成,且 partition 会均衡分布到不同 broker 上,因此,为了有效利用 broker 集群的性能,提高消息的吞吐量,producer 可以通过随机或者 hash 等方式,将消息平均发送到多个 partition 上,以实现负载均衡。
批量发送
是提高消息吞吐量重要的方式,Producer 端可以在内存中合并多条消息后,以一次请求的方式发送了批量的消息给 broker,从而大大减少 broker 存储消息的 IO 操作次数。但也一定程度上影响了消息的实时性,相当于以时延代价,换取更好的吞吐量。
压缩
Producer 端可以通过 GZIP 或 Snappy 格式对消息集合进行压缩。Producer 端进行压缩之后,在Consumer 端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是 CPU(压缩和解压会耗掉部分 CPU 资源)。
消费者设计
Consumer Group
同一 Consumer Group 中的多个 Consumer 实例,不同时消费同一个 partition,等效于队列模式。partition 内消息是有序的,Consumer 通过 pull 方式消费消息。Kafka 不删除已消费的消息对于 partition,顺序读写磁盘数据,以时间复杂度 O(1)方式提供消息持久化能力。
可靠性的保证
下面通过从topic的分区副本、producer发送到broker、leader选举三个方面来阐述kafka的可靠性。
Topic的分区副本
其实在kafka-0.8.0之前的版本是还没有副本这个概念的,在之后版本引入了副本这个架构,每个分区设置几个副本,可以在设置主题的时候可以通过replication-factor参数来设置,也可以在broker级别中设置defalut.replication-factor来指定,一般我们都设置为3;
三个副本中有一个副本是leader,两个副本是follower,leader负责消息的读写,follower负责定期从leader中复制最新的消息,保证follower和leader的消息一致性,当leader宕机后,会从follower中选举出新的leader负责读写消息,通过分区副本的架构,虽然引入了数据冗余,但是保证了kafka的高可靠。
Kafka的分区多副本是Kafka可靠性的核心保证,把消息写入到多个副本可以使Kafka在崩溃时保证消息的持久性及可靠性。
Producer发送消息到broker
topic的每个分区内的事件都是有序的,但是各个分区间的事件不是有序的,producer发送消息到broker时通过acks参数来确认消息是否发送成功,request.required.acks参数有三个值来代表不同的含义;
acks=0:表示只要producer通过网络传输将消息发送给broker,那么就会认为消息已经成功写入Kafka;但是如果网卡故障或者发送的对象不能序列化就会错误;
acks=1:表示发送消息的消息leader已经接收并写入到分区数据文件中,就会返回成功或者错误的响应,如果这时候leader发生选举,生产者会再次发送消息直到新的leader接收并写入分区文件;但是这种方式还是可能发生数据丢失,当follower还没来得及从leader中复制最新的消息,leader就宕机了,那么这时候就会造成数据丢失;
acks=-1:代表leader和follower都已经成功写入消息是才会返回确认的响应,但是这种方式效率最低,因为要等到当前消息已经被leader和follower都写入返回响应才能继续下条消息的发送;
所以根据不用的业务场景,设置不同的acks值,当然producer发送消息有两种方式:同步和异步,异步的方式虽然能增加消息发送的性能,但是会增加数据丢失风险,所以为了保证数据的可靠性,需要将发送方式设置为同步(sync)。
Leader选举
在每个分区的leader都会维护一个ISR列表,ISR里面就是follower在broker的编号,只有跟得上leader的follower副本才能加入到ISR列表,只有这个列表里面的follower才能被选举为leader,所以在leader挂了的时候,并且unclean.leader.election.enable=false(关闭不完全的leader选举)的情况下,会从ISR列表中选取第一个follower作为新的leader,来保证消息的高可靠性。
综上所述,要保证kafka消息的可靠性,至少需要配置一下参数:
- topic级别:replication-factor>=3;
- producer级别:acks=-1;同时发送模式设置producer.type=sync;
- broker级别:关闭不完全的leader选举,即unclean.leader.election.enable=false;
数据一致性
这里说的一致性指的是不管是老的leader还是新的leader,consumer都能读到一样的数据。
假设分区副本为3,副本0位leader,副本1和2位follower,在ISR列表里面副本0已经写入了message4,但是consumer只能读取message2,这是因为所有副本都同步了message2,只有High water mark以上的message才能被consumer读取,而High water mark取决于ISR列表里偏移量最小的分区,对应上图中的副本2;
所以在message还没有被follower同步完成时会被认为是”不安全的”,如果consumer读取了副本0中的message4,这时候leader挂了,选举了副本1为新的leader,别的消费者去消费的时候就没有message4,就会造成不同的consumer消费的数据不一致,破坏了数据的一致性。
在0.08版本引入了High water mark机制后,会导致broker之间的消息复制因为某些原因变慢,消息到达消费者的时间也会延长(需要等消息复制完了才能消费),延迟的时间可以通过参数来设置:replica.lag.time.max.ms(它指定了副本在复制消息时可被允许的最大延迟时间)
0.11.0.0版本的Kafka通过引入leader epoch解决了原先依赖水位表示副本进度可能造成的数据丢失/数据不一致问题。
面试题
Kafka 的设计时什么样的呢?
- Kafka 将消息以 topic 为单位进行归纳
- 将向 Kafka topic 发布消息的程序成为 producers.
- 将预订 topics 并消费消息的程序成为 consumer.
- Kafka 以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个 broker.
- producers 通过网络将消息发送到 Kafka 集群,集群向消费者提供消息
数据传输的事务定义有哪三种?
数据传输的事务定义通常有以下三种级别:
- 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
- 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
- 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的
Kafka 判断一个节点是否还活着有那两个条件?
- 节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连
接 - 如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
Producer 是否直接将数据发送到 broker 的 leader(主节点)?
producer 直接将数据发送到 broker 的 leader(主节点),不需要在多个节点进行分发,为了帮助 producer 做到这点,所有的 Kafka 节点都可以及时的告知:哪些节点是活动的,目标topic 目标分区的 leader 在哪。这样 producer 就可以直接将消息发送到目的地了
Kafa consumer 是否可以消费指定分区消息?
Kafa consumer 消费消息时,向 broker 发出”fetch”请求去消费特定分区的消息,consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,customer 拥有了 offset 的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的
Kafka 消息是采用 Pull 模式,还是 Push 模式?
Kafka 最初考虑的问题是,customer 应该从 brokes 拉取消息还是 brokers 将消息推送到consumer,也就是 pull 还 push。在这方面,Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从 broker 拉取消息一些消息系统比如 Scribe 和 Apache Flume 采用了 push 模式,将消息推送到下游的consumer。这样做有好处也有坏处:由 broker 决定消息推送的速率,对于不同消费速率的consumer 就不太好处理了。消息系统都致力于让 consumer 以最大的速率最快速的消费消息,但不幸的是,push 模式下,当 broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。最终 Kafka 还是选取了传统的 pull 模式。
Pull 模式的另外一个好处是 consumer 可以自主决定是否批量的从 broker 拉取数据。Push模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull 模式下,consumer 就可以根据自己的消费能力去决定这些策略。Pull 有个缺点是,如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到达。为了避免这点,Kafka 有个参数可以让 consumer 阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发
Kafka 存储在硬盘上的消息格式是什么?
消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和 CRC32校验码。
- 消息长度: 4 bytes (value: 1+4+n)
- 版本号: 1 byte
- CRC 校验码: 4 bytes
- 具体的消息: n bytes
Kafka 高效文件存储设计特点
- Kafka 把 topic 中一个 parition 大文件分成多个小文件段,通过多个小文件段,就容易定
期清除或删除已经消费完文件,减少磁盘占用。
- 通过索引信息可以快速定位 message 和确定 response 的最大大小。
- 通过 index 元数据全部映射到 memory,可以避免 segment file 的 IO 磁盘操作。
- 通过索引文件稀疏存储,可以大幅降低 index 文件元数据占用空间大小。
Kafka 与传统消息系统之间有三个关键区别
- Kafka 持久化日志,这些日志可以被重复读取和无限期保留
- Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据
提升容错能力和高可用性
- Kafka 支持实时的流式处理
Kafka 创建 Topic 时如何将分区放置到不同的 Broker 中
- 副本因子不能大于 Broker 的个数;
- 第一个分区(编号为 0)的第一个副本放置位置是随机从 brokerList 选择的;
- 其他分区的第一个副本放置位置相对于第 0 个分区依次往后移。也就是如果我们有 5 个
Broker,5 个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五 个 Broker 上;第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推;
- 剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是
随机产生的
Kafka 新建的分区会在哪个目录下创建
在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘上用于提高读写性能。 当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。
但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?
答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic名+分区 ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。
Partition 的数据如何保存到硬盘
topic 中的多个 partition 以文件夹的形式保存到 broker,每个分区序号从 0 递增,且消息有序Partition 文件下有多个 segment(xxx.index,xxx.log)segment 文件里的大小和配置文件大小一致可以根据要求修改默认为 1g,如果大小大于 1g 时,会滚动一个新的 segment 并且以上一个 segment 最后一条消息的偏移量命名
kafka 的 ack 机制
request.required.acks 有三个值 0 1 -1
- 0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候
就会丢数据
- 1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他
不确保是否复制完成新 leader 也会导致数据丢失
- -1:同样在 1 的基础上 服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出
的 ack,这样数据不会丢失
Kafka 的消费者如何消费数据
消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置等到下次消费时,他会接着上次位置继续消费
消费者负载均衡策略
一个消费者组中的一个分片对应一个消费者成员,他能保证每个消费者成员都能访问,如果组中成员太多会有空闲的成员
数据有序
一个消费者组里它的内部是有序的,消费者组与消费者组之间是无序的
kafaka 生产数据时数据的分组策略
生产者决定数据产生到集群的哪个 partition 中,每一条消息都是以(key,value)格式。Key 是由生产者发送数据传入,所以生产者(key)决定了数据产生到集群的哪个 partition