1、kafka是什么

    kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。

特性:

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

 

2、应用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源

 

3、基本架构及相关术语

image-1691674304057

Producer:Producer即生产者,消息的产生者,是消息的入口。

image-1691674313459

Kafka Cluster

  • Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
  • Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
  • Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
  • Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  • Message:每一条发送的消息主体。

 

Consumer:消费者,即消息的消费方,是消息的出口。

  • Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量。

     

Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

zookeeper相关原理学习 https://blog.miuyun.work/archives/10724943

 

4、工作流程

4.1 发送数据

image-1691674364335

    由图中流程可知,生产者总是找leader进行写入,而follower会主动的找leader进行数据同步。因为follower总是按序同步添加数据,所以同一个分区内的数据总是有序的。

 

4.1.1 发送时partition选择

    在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:

  1. partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
  2. 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
  3. 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

 

4.1.2 发送消息的应答机制

    保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?

   在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为01all

  • 0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
  • 1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
  • all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?

  kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

 

4.2 保存数据

   Kafka初始会单独开辟一块磁盘空间,顺序写入数据
 

4.2.1 Partition 结构

   Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

image-1691674376637

   每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

 

4.2.2 Message结构

   log文件就实际是存储message的地方,producer往kafka写入的也是一条一条的message。在log中的message(消息)主要包含消息体、消息大小、offset、压缩类型等!重点是下面三个:

  1. offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置
  2. 消息大小:消息大小占用4byte,用于描述消息的大小。
  3. 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

 

4.2.3 存储策略

  无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?

  1. 基于时间,默认配置是168小时(7天)
  2. 基于大小,默认配置是1073741824

  需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能

 

4.3 消费数据

  消息存储在log文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是找leader去拉取。

 

4.3.1 消费策略

image-1691674388213

   多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据。所以:

  • 消费者数量少于partition时,部分消费者会消费多个partition,效率会更低
  • 消费者数量多于partition时,多出的消费者不消费任何数据,效率也会变低

  因此,消费者组的consumer的数量与partition的数量一致效率最高。

 

4.3.2 查找消息流程

查找消息的时候是利用segment+offset配合查找。假如现在需要查找一个offset为368801的message

image-1691674401172

  1. 先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。

  2. 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。

    由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。

  3. 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。

    这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据。至此,消费者就能拿到需要处理的数据进行处理了。

 

4.3.3 重平衡

    consumer group内某个消费者挂掉后,其他消费者自动重新分配订阅主题分区的过程,是 Kafka 消费者端实现高可用的重要手段。

 

5、核心特性

5.1 压缩

    Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩之后,在Consumer端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU(压缩和解压会耗掉部分CPU资源)。
Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,如果后两位为0,则表示消息未被压缩。
 

5.2 消息可靠性

5.2.1 Producer端

当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉,Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。

 

5.2.2 Consumer端

broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过这个offset值重新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理。

 

5.3 备份机制

    备份机制是Kafka0.8版本的新特性,一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。

image-1691674419178

    Producer只往leader里写消息,Consumers只从leader中读消息。备份节点不做读写。

 

5.4 读写性能优化

5.4.1 顺序读写磁盘

    基于磁盘的随机读写确实很慢,但基于磁盘的顺序读写性能却很高,一般而言要高出磁盘的随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。

    Kafka高度依赖文件系统来存储和缓存消息,将partition的数据持久化在磁盘的(消息日志),不过Kafka只允许追加写入(顺序访问),避免缓慢的随机 I/O 操作。

    Kafka也不是partition一有数据就立马将数据写到磁盘上,它会先缓存一部分,等到足够多数据量或等待一定的时间再批量写入(flush)。

 

5.4.2 page cache

    为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。

    另外操作系统本身对page cache做了大量优化,通过操作系统的Page Cache,Kafka的读写操作基本上是基于系统内存的,读写性能也得到了极大的提升。

 

5.5 I/O操作优化

5.5.1 零拷贝

    零拷贝是指Kafka利用 linux 操作系统的 “zero-copy” 机制在消费端做的优化。零拷贝就是使用了一个名为sendfile()的系统调用方法,将数据从page cache直接发送到Socket缓冲区,避免了系统上下文的切换,消除了从内核空间到用户空间的来回复制。

    为了解决字节拷贝的问题,kafka设计了一种“标准字节消息”,Producer、Broker、Consumer共享这一种消息格式。Kakfa的message log在broker端就是一些目录文件,这些日志文件都是MessageSet按照这种“标准字节消息”格式写入到磁盘的。

    通过zero-copy,整个kafka集群几乎都以缓存的方式提供服务,而且即使下游的consumer很多,也不会对整个集群服务造成压力。

 

5.5.2 消息集合

​     为了减少大量小I/O操作的问题,Producer一次网络请求可以发送一个「消息集合」,而不是每一次只发一条消息。在server端是以消息块的形式追加消息到log中的,consumer在查询的时候也是一次查询大量的线性数据块。

 

5.6 分区分段

    Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。

    通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
 
 
参考链接:
https://www.cnblogs.com/sujing/p/10960832.html
https://blog.csdn.net/suifeng3051/article/details/48053965
https://mp.weixin.qq.com/s?__biz=MzU1NDA0MDQ3MA==&mid=2247483958&idx=1&sn=dffaad318b50f875eea615bc3bdcc80c&chksm=fbe8efcfcc9f66d9ff096fbae1c2a3671f60ca4dc3e7412ebb511252e7193a46dcd4eb11aadc&scene=21#wechat_redirect
 
如有不对,烦请指出,感谢~