kafka消费者
点击勘误issues (opens new window),哪吒感谢大家的阅读
# kafka消费者
那么消费者是如何提交偏移量的呢?消费者往一个叫作 _consumer_offset 的特殊主题发送 消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有 什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完 成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续 之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方 继续处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的 消息就会被重复处理
提交的偏移量小于客户端处理的最后一个消息的偏移量
提交的偏移量大于客户端处理的最后一个消息的偏移量
# 自动提交
最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那 么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔 由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交 也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那 么就会提交从上一次轮询返回的偏移量。
假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地 提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。
# 重试异步提交
我们可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏 移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调 的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那 么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送 出去了,应该停止重试。
# 同步和异步组合提交
在消费者关闭前一般会组合使用 commitAsync() 和 commitSync()。
# 再均衡监听器
在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代 码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。
(1) public void onPartitionsRevoked(Collection partitions) 方法会在 再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接 管分区的消费者就知道该从哪里开始读取了。
(2) public void onPartitionsAssigned(Collection partitions) 方法会在 重新分配分区之后和消费者开始读取消息之前被调用。
# 如何退出
不需要担心消费者会在一个无限循环里轮询消息,我们会告诉 消费者如何优雅地退出循环。
消费者群组,分区被自动分配给群组里的消费者,在群组里新增 或移除消费者时自动触发再均衡。
比如,你可能只需要一个消费者从一个主题的所有分区或者某 个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分 区分配给消费者,然后开始读取消息并提交偏移量。
了一个消费者是如何为自己分配分区并从分区里读取消息的
# 分区读取
向集群请求主题可用的分区。如果只打算读取特定分区
知道需要哪些分区之后,调用 assign() 方法
除了不会发生再均衡,也不需要手动查找分区,其他的看起来一切正常。不过要记住,如 果主题增加了新的分区,消费者并不会收到通知。所以,要么周期性地调用 consumer. partitionsFor() 方法来检查是否有新分区加入,要么在添加新分区后重启应用程序。
# 必需的
Kafka 如何进行复制; • Kafka 如何处理来自生产者和消费者的请求; • Kafka 的存储细节,比如文件格式和索引。
Kafka 使用 Zookeeper 来维护集群成员的信息。每个 broker 都有一个唯一标识符,这个 标识符可以在配置文件里指定,也可以自动生成。在 broker 启动的时候,它通过创建 临时节点把自己的 ID 注册到 Zookeeper。Kafka 组件订阅 Zookeeper 的 /brokers/ids 路径 (broker 在 Zookeeper 上的注册路径),当有 broker 加入集群或退出集群时,这些组件就 可以获得通知。
如果你要启动另一个具有相同 ID 的 broker,会得到一个错误——新 broker 会试着进行注 册,但不会成功,因为 Zookeeper 里已经有一个具有相同 ID 的 broker。
在 broker 停机、出现网络分区或长时间垃圾回收停顿时,broker 会从 Zookeeper 上断开连 接,此时 broker 在启动时创建的临时节点会自动从 Zookeeper 上移除。监听 broker 列表的 Kafka 组件会被告知该 broker 已移除。
在关闭 broker 时,它对应的节点也会消失,不过它的 ID 会继续存在于其他数据结构中。 例如,主题的副本列表里就可能包含这些 ID。在完全关闭一个 broker 之 后,如果使用相同的 ID 启动另一个全新的 broker,它会立即加入集群,并拥有与旧 broker 相同的分区和主题。
# 控制器
控制器其实就是一个 broker,只不过它除了具有一般 broker 的功能之外,还负责分区 首领的选举。集群里第一个启动的 broker 通过在 Zookeeper 里创建一个临时节点 /controller 让自己成为控制器。其他 broker 在启动时也 会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制 器节点已存在,也就是说集群里已经有一个控制器了。其他 broker 在控制器节点上创建 Zookeeper watch 对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群 里一次只有一个控制器存在。
如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 上的临时节点就会消失。集群 里的其他 broker 通过 watch 对象得到控制器节点消失的通知,它们会尝试让自己成为新的 控制器。第一个在 Zookeeper 里成功创建控制器节点的 broker 就会成为新的控制器,其他 节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建 watch 对象。每个 新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧 epoch 的消息,就会忽略它们。
当控制器发现一个 broker 已经离开集群(通过观察相关的 Zookeeper 路径),它就知道,那 些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个 broker 上)。控制器遍 历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本), 然后向所有包含新首领或现有跟随者的 broker 发送请求。该请求消息包含了谁是新首领以 及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者 开始从新首领那里复制消息。
当控制器发现一个 broker 加入集群时,它会使用 broker ID 来检查新加入的 broker 是否包 含现有分区的副本。如果有,控制器就把变更通知发送给新加入的 broker 和其他 broker, 新 broker 上的副本开始从首领那里复制消息。
简而言之,Kafka 使用 Zookeeper 的临时节 点来选举控制器,并在节点加入集群或退出集 群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用 epoch 来避免“脑裂”。“脑裂”是指两个节点同时认为自己是当前的控制器。
# 复制
“一个分布式的、 可分区的、可复制的提交日志服务”。
复制之所以这么关键,是因为它可以在个别节点失 效时仍能保证 Kafka 的可用性和持久性。
Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副 本被保存在 broker 上,每个 broker 可以保存成百上千个属于不同主题和分区的副本。
副本有以下两种类型。
首领副本 每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过 这个副本。
跟随者副本 首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任 务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个 跟随者会被提升为新首领。
首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的 状态一致,在有新消息到达时尝试从首领那里复制消息,不过有各种原因会导致同步失 败。例如,网络拥塞导致复制变慢,broker 发生崩溃导致复制滞后,直到重启 broker 后复 制才会继续。
为了与首领保持同步,跟随者向首领发送获取数据的请求,这种请求与消费者为了读取消 息而发送的请求是一样的。首领将响应消息发给跟随者。请求消息里包含了跟随者想要获 取消息的偏移量,而且这些偏移量总是有序的。
通过查看每个跟随者请求的最新偏移量,首领就会 知道每个跟随者复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消 息,但在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。
如果一个副本无法 与首领保持一致,在首领发生失效时,它就不可能成为新首领——毕竟它没有包含全部的 消息。
相反,持续请求得到的最新消息副本被称为同步的副本。在首领发生失效时,只有同步副 本才有可能被选为新首领。
跟随者的正常不活跃时间或在成为不同步副本之前的时间是通过 replica.lag.time.max.ms 参数来配置的。这个时间间隔直接影响着首领选举期间的客户端行为和数据保留机制。
除了当前首领之外,每个分区都有一个首选首领——创建主题时选定的首领就是分区的首 选首领。之所以把它叫作首选首领,是因为在创建分区时,需要在 broker 之间均衡首领
broker 间的负载最终会得到均衡。默认情况下,Kafka 的 auto.leader.rebalance. enable 被设为 true,它会检查首选首领是不是当前首领,如果不是,并且该副本是同步 的,那么就会触发首领选举,让首选首领成为当前首领。
# 处理请求
broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。Kafka 提 供了一个二进制协议(基于 TCP),指定了请求消息的格式以及 broker 如何对请求作出 响应——包括成功处理请求或在处理请求过程中遇到错误。客户端发起连接并发送请求, broker 处理请求并作出响应。broker 按照请求到达的顺序来处理它们——这种顺序保证让 Kafka 具有了消息队列的特性,同时保证保存的消息也是有序的。
所有的请求消息都包含一个标准消息头:
• Request type(也就是 API key)
• Request version(broker 可以处理不同版本的客户端请求,并根据客户端版本作出不同 的响应)
• Correlation ID——一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消 息和错误日志里(用于诊断问题)
• Client ID——用于标识发送请求的客户端
broker 会在它所监听的每一个端口上运行一个 Acceptor 线程,这个线程会创建一个连接, 并把它交给 Processor 线程去处理。Processor 线程(也被叫作“网络线程”)的数量是可 配置的。网络线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获 取响应消息,把它们发送给客户端。
请求消息被放到请求队列后,IO 线程会负责处理它们。
生产请求 生产者发送的请求,它包含客户端要写入 broker 的消息。
获取请求 在消费者和跟随者副本需要从 broker 读取消息时发送的请求。
生产请求和获取请求都必须发送给分区的首领副本。如果 broker 收到一个针对特定分区的 请求,而该分区的首领在另一个 broker 上,那么发送请求的客户端会收到一个“非分区 首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的 broker 上,也会出现同样的错误。Kafka 客户端要自己负责把生产请求和获取请求发送到正确的 broker 上。
那么客户端怎么知道该往哪里发送请求呢?
元数据 请求。这种请求包含了客户端感兴趣的主题列表。
每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任 意一个 broker,因为所有 broker 都缓存了这些信息。
一般情况下,客户端会把这些信息缓存起来,并直接往目标 broker 上发送生产请求和 获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通 过 metadata.max.age.ms 参数来配置),从而知道元数据是否发生了变更
# 生产请求
acks 这个配置参数——该参数指定了需 要多少个 broker 确认才可以认为一个消息写入是成功的。不同的配置对“写入成功”的界 定是不一样的,如果 acks=1,那么只要首领收到消息就认为写入成功;如果 acks=all,那 么需要所有同步副本收到消息才算写入成功;如果 acks=0,那么生产者在把消息发出去之 后,完全不需要等待 broker 的响应。
包含首领副本的 broker 在收到生产请求时,会对请求做一些验证。
# 发送数据的用户是否有主题写入权限?
# 请求里包含的 acks 值是否有效(只允许出现 0、1 或 all)?
# 如果 acks=all,是否有足够多的同步副本保证消息已经被安全写入?
如果同步副本的数量不足,broker 可以拒绝处理新消息。
之后,消息被写入本地磁盘。在 Linux 系统上,消息会被写到文件系统缓存里,并不保证 它们何时会被刷新到磁盘上。Kafka 不会一直等待数据被写到磁盘上——它依赖复制功能 来保证消息的持久性。
在消息被写入分区的首领之后,broker 开始检查 acks 配置参数——如果 acks 被设为 0 或 1, 那么 broker 立即返回响应;如果 acks 被设为 all,那么请求会被保存在一个叫作炼狱的缓冲 区里,直到首领发现所有跟随者副本都复制了消息,响应才会被返回给客户端。
主题分区里具有特定偏移量的消息
主题 Test 分区 0 偏移量从 53 开始 的消息
客户端还可以指定 broker 最 多可以从一个分区里返回多少数据。
请求需要先到达指定的分区首领上,然后客户端通过查询元数据来确保 请求的路由是正确的。首领在收到请求时,它会先检查请求是否有效——比如,指定的偏 移量在分区上是否存在?如果客户端请求的是已经被删除的数据,或者请求的偏移量不存 在,那么 broker 将返回一个错误。
# Linux 文件系统缓存
如果请求的偏移量存在,broker 将按照客户端指定的数量上限从分区里读取消息,再把消 息返回给客户端。Kafka 使用零复制技术向客户端发送消息——也就是说,Kafka 直接把消 息从文件(或者更确切地说是 Linux 文件系统缓存)里发送到网络通道,而不需要经过任 何中间缓冲区。这是 Kafka 与其他大部分数据库系统不一样的地方,其他数据库在将数据 发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制,也不需要管 理内存缓冲区,从而获得更好的性能。
客户端除了可以设置 broker 返回数据的上限,也可以设置下限。例如,如果把下限设置为 10KB,就好像是在告诉 broker:“等到有 10KB 数据的时候再把它们发送给我。”在主题消 息流量不是很大的情况下,这样可以减少 CPU 和网络开销。客户端发送一个请求,broker 等到有足够的数据时才把它们返回给客户端,然后客户端再发出请求,而不是让客户端每 隔几毫秒就发送一次请求,每次只能得到很少的数据甚至没有数据。对 比这两种情况,它们最终读取的数据总量是一样的,但前者的来回传送次数更少,因此开 销也更小。
当然,我们不会让客户端一直等待 broker 累积数据。在等待了一段时间之后,就可以把 可用的数据拿回处理,而不是一直等待下去。所以,客户端可以定义一个超时时间,告 诉 broker:“如果你无法在 X 毫秒内累积满足要求的数据量,那么就把当前这些数据返回 给我。”
有意思的是,并不是所有保存在分区首领上的数据都可以被客户端读取。大部分客户端只 能读取已经被写入所有同步副本的消息(跟随者副本也不行,尽管它们也是消费者——否 则复制功能就无法工作)。分区首领知道每个消息会被复制到哪个副本上,在消息还没有 被写入所有同步副本之前,是不会发送给消费者的——尝试获取这些消息的请求会得到空 的响应而不是错误。
因为还没有被足够多副本复制的消息被认为是“不安全”的——如果首领发生崩溃,另一 个副本成为新首领,那么这些消息就丢失了。如果我们允许消费者读取这些消息,可能就 会破坏一致性。试想,一个消费者读取并处理了这样的一个消息,而另一个消费者发现这 个消息其实并不存在。所以,我们会等到所有同步副本复制了这些消息,才允许消费者读 取它们
如果 broker 间的消息复制因为某些原因变慢,那 么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)
延迟时间可 以通过参数 replica.lag.time.max.ms 来配置,它指定了副本在复制消息时可被允许的最大 延迟时间。
消费者只能看到已经复制到 ISR 的消息
# 元数据请求、生产请求和获取 请求。
# 优化点说明
- 代码注释:
- 添加了详细的类和方法注释,符合阿里等大厂的代码规范。
- 每个关键配置项都添加了中文说明,便于后续维护和团队协作。
- 构造函数注入:
- 使用构造函数注入代替字段注入,符合Spring推荐的最佳实践,便于单元测试和依赖管理。
- 配置集中管理:
- 将Kafka配置参数集中管理,避免硬编码,提升代码的可维护性。
- 方法职责单一:
- 每个方法的职责单一,便于理解和扩展。例如:
consumerConfigs
方法负责构建Kafka配置。
getConsumerFactory
方法负责创建消费者工厂。createKafkaListenerContainerFactory
方法负责创建监听容器工厂。
- 批量消费与单条消费分离:
- 通过
batchListener
参数区分批量消费和单条消费,提升代码的灵活性。
- 通过
- 常量提取:
- 将硬编码的配置值(如
MAX_POLL_RECORDS_CONFIG
)提取为常量或配置文件,便于统一管理。
- 将硬编码的配置值(如
# 符合大厂规范
- 命名规范:
- 类名、方法名、变量名均采用驼峰命名法,符合Java命名规范。
- 配置项名称与配置文件中的Key保持一致,便于查找和维护。
- 注释规范:
- 类注释包含作者、日期和功能描述。
- 方法注释包含功能描述、参数说明和返回值说明。
- 代码结构清晰:
- 代码逻辑分层清晰,配置、工厂创建、监听器分离,便于扩展和维护。
- 异常处理(可根据实际需求补充):
- 在实际生产环境中,可以增加异常处理逻辑,例如Kafka连接失败时的重试机制。
把偏移量保存在特定的 Kafka 主题 上。为了达到这个目的,我们不得不往协议里增加几种请求类型:OffsetCommitRequest、 OffsetFetchRequest 和 ListOffsetsRequest。
在应用程序调用 commitOffset() 方法 时,客户端不再把偏移量写入 Zookeeper,而是往 Kafka 发送 OffsetCommitRequest 请求。
# 物理存储
Kafka 的基本存储单元是分区。分区无法在多个 broker 间进行再细分,也无法在同一个 broker 的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制(一 个挂载点由单个磁盘或多个磁盘组成,如果配置了 JBOD,就是单个磁盘,如果配置了 RAID,就是多个磁盘。)。
在配置 Kafka 的时候,管理员指定了一个用于存储分区的目录清单——也就是 log.dirs 参 数的值(不要把它与存放错误日志的目录混淆了,日志目录是配置在 log4j.properties 文件 里的)。该参数一般会包含每个挂载点的目录。
# 分区分配
在 broker 间平均地分布分区副本。对于我们的例子来说,就是要保证每个 broker 可以 分到 5 个副本。
• 确保每个分区的每个副本分布在不同的 broker 上。假设分区 0 的首领副本在 broker 2 上, 那么可以把跟随者副本放在 broker 3 和 broker 4 上,但不能放在 broker 2 上,也不能两 个都放在 broker 3 上。
如果为 broker 指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的 broker 上。这样做是为了保证一个机架的不可用不会导致整体的分区不可用。
# 文件管理
保留数据是 Kafka 的一个基本特性,Kafka 不会一直保留数据,也不会等到所有消费者都 读取了消息之后才删除消息。相反,Kafka 管理员为每个主题配置了数据保留期限,规定 数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。
因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若 干个片段。默认情况下,每个片段包含 1GB 或一周的数据,以较小的那个为准。在 broker 往分区写入数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。
当前正在写入数据的片段叫作活跃片段。活动片段永远不会被删除,所以如果你要保留数 据 1 天,但片段里包含了 5 天的数据,那么这些数据会被保留 5 天,因为在片段被关闭之 前这些数据无法被删除。如果你要保留数据一周,而且每天使用一个新片段,那么你就会 看到,每天在使用一个新片段的同时会删除一个最老的片段——所以大部分时间该分区会 有 7 个片段存在。
# 文件格式
除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法 (Snappy、GZip 或 LZ4)和时间戳(在 0.10.0 版本里引入的)。
也就是说,如果在生产者端使用了压缩功能(极力推荐),那么发送的批次越大,就意味 着在网络传输和磁盘存储方面会获得越好的压缩性能,同时意味着如果修改了消费者使用 的消息格式(例如,在消息里增加了时间戳),那么网络传输和磁盘存储的格式也要随之 修改,而且 broker 要知道如何处理包含了两种消息格式的文件 Kafka 附带了一个叫 DumpLogSegment 的工具,可以用它查看片段的内容。它可以显示每 个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。
bin/kafka-run-class.sh kafka.tools.DumpLogSegments
如果使用了 --deep-iteration 参数,可以显示被压缩到包装消息里的消息
# 索引
消费者可以从 Kafka 的任意可用偏移量位置开始读取消息。假设消费者要读取从偏移量 100 开始的 1MB 消息,那么 broker 必须立即定位到偏移量 100(可能是在分区的任意一个片段 里),然后开始从这个位置读取消息。为了帮助 broker 更快地定位到指定的偏移量,Kafka 为每个分区维护了一个索引。索引把偏移量映射到片段文件和偏移量在文件里的位置。
索引也被分成片段,所以在删除消息时,也可以删除相应的索引。Kafka 不维护索引的 校验和。如果索引出现损坏,Kafka 会通过重新读取消息并录制偏移量和位置来重新生 成索引。如果有必要,管理员可以删除索引,这样做是绝对安全的,Kafka 会自动重新 生成这些索引。
# 清理
一般情况下,Kafka 会根据设置的时间保留数据,把超过时效的旧数据删除掉。不过,试 想一下这样的场景,如果你使用 Kafka 保存客户的收货地址,那么保存客户的最新地址比 保存客户上周甚至去年的地址要有意义得多,这样你就不用担心会用错旧地址,而且短时 间内客户也不会修改新地址。另外一个场景,一个应用程序使用 Kafka 保存它的状态,每 次状态发生变化,它就把状态写入 Kafka。在应用程序从崩溃中恢复时,它从 Kafka 读取 消息来恢复最近的状态。在这种情况下,应用程序只关心它在崩溃前的那个状态,而不关 心运行过程中的那些状态。
Kafka 通过改变主题的保留策略来满足这些使用场景。早于保留时间的旧事件会被删除, 为每个键保留最新的值,从而达到清理的效果。很显然,只有当应用程序生成的事件里 包含了键值对时,为这些主题设置 compact 策略才有意义。如果主题包含 null 键,清理 就会失败。
# 清理的工作原理
干净的部分 这些消息之前被清理过,每个键只有一个对应的值,这个值是上一次清理时保留下来的。
污浊的部分 这些消息是在上一次清理之后写入的。
# 可靠性保证
那么 Kafka 可以在哪些方面作出保证呢?
• Kafka 可以保证分区消息的顺序。如果使用同一个生产者往同一个分区写入消息,而且 消息 B 在消息 A 之后写入,那么 Kafka 可以保证消息 B 的偏移量比消息 A 的偏移量大, 而且消费者会先读取消息 A 再读取消息 B。
• 只有当消息被写入分区的所有同步副本时(但不一定要写入磁盘),它才被认为是“已 提交”的。生产者可以选择接收不同类型的确认,比如在消息被完全提交时的确认,或 者在消息被写入首领副本时的确认,或者在消息被发送到网络时的确认。
• 只要还有一个副本是活跃的,那么已经提交的消息就不会丢失。
• 消费者只能读取已经提交的消息。
Kafka 的主题被分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,Kafka 可以 保证分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。每个分区可 以有多个副本,其中一个副本是首领。所有的事件都直接发送给首领副本,或者直接从首 领副本读取事件。其他副本只需要与首领保持同步,并及时复制最新的事件。当首领副本 不可用时,其中一个同步副本将成为新首领。
与 Zookeeper 之间有一个活跃的会话,也就是说,它在过去的 6s(可配置)内向 Zookeeper 发送过心跳。
在过去的 10s 内(可配置)从首领那里获取过消息。
在过去的 10s 内从首领那里获取过最新的消息。光从首领那里获取消息是不够的,它还 必须是几乎零延迟的。
# 构建数据管道时需要考虑的问题
Kafka 支持多种类型的压缩,在增长吞吐量时,Kafka 用户和管理员可以通过压缩来调整网 络和存储资源的使用。
在某些情况下,不同的集群之间相互依赖,管理员需要不停地在集群间复制数据。大部分 数据库都支持复制(replication),也就是持续地在数据库服务器之间复制数据。不过,因 为前面已经使用过“复制”这个词来描述在同一个集群的节点间移动数据,所以我们把集 群间的数据复制叫作镜像(mirroring)。Kafka 内置的跨集群复制工具叫作 MirrorMaker。
# 跨集群镜像的使用场景
大多数情况下,我们要避免向远程的数据中心生成数据,但如果这么做了,那么就要忍受 高延迟,并且需要通过增加重试次数(LinkedIn 曾经为跨集群镜像设置了 32 000 多次重试 次数)和增大缓冲区来解决潜在的网络分区问题(生产者和服务器之间临时断开连接)。
如果有了跨集群复制的需求,同时又禁用了从 broker 到 broker 之间的通信以及从生产者到 broker 之间的通信,那么我们必须允许从 broker 到消费者之间的通信。事实上,这是最安 全的跨集群通信方式。在发生网络分区时,消费者无法从 Kafka 读取数据,数据会驻留在 Kafka 里,直到通信恢复正常。因此,网络分区不会造成任何数据丢失。不过,因为带宽 有限,如果一个数据中心的多个应用程序需要从另一个数据中心的 Kafka 服务器上读取数 据,我们倾向于为每一个数据中心安装一个 Kafka 集群,并在这些集群间复制数据,而不 是让不同的应用程序通过广域网访问数据。
每个数据中心至少需要一个集群。
• 每两个数据中心之间的数据复制要做到每个事件仅复制一次(除非出现错误需要重试)。
• 如果有可能,尽量从远程数据中心读取数据,而不是向远程数据中心写入数据。
一个中心 Kafka 集群对应多个本地 Kafka 集群
假设有一家银行,它在不同的城市有多家分行。每个城市的 Kafka 集群上保存了用户的信 息和账号历史数据。我们把各个城市的数据复制到一个中心集群上,这样银行就可以利用 这些数据进行业务分析。在用户访问银行网站或去他们所属的分行办理业务时,他们的请 求被路由到本地集群上,同时从本地集群读取数据。假设一个用户去另一个城市的分行办 理业务,因为他的信息不在这个城市,所以这个分行需要与远程的集群发生交互(不建议 这么做),否则根本没有办法访问到这个用户的信息(很尴尬)。因此,这种架构模式在数 据访问方面有所局限,因为区域数据中心之间的数据是完全独立的。
在采用这种架构时,每个区域数据中心的数据都需要被镜像到中央数据中心上。镜像进程 会读取每一个区域数据中心的数据,并将它们重新生成到中心集群上。如果多个数据中心 出现了重名的主题,那么这些主题的数据可以被写到中心集群的单个主题上,也可以被写 到多个主题上。
# 双活架构
当有两个或多个数据中心需要共享数据并且每个数据中心都可以生产和读取数据时,可以 使用双活(Active-Active)架构
# 主备架构
这种架构的好处是易于实现,而且可以被用于任何一种场景。你可以安装第二个集群,然 后使用镜像进程将第一个集群的数据完整镜像到第二个集群上,不需要担心数据的访问和 冲突问题,也不需要担心它会带来像其他架构那样的复杂性。
这种架构的不足在于,它浪费了一个集群。Kafka 集群间的失效备援比我们想象的要难得 多。从目前的情况来看,要实现不丢失数据或无重复数据的 Kafka 集群失效备援是不可能 的。我们只能尽量减少这些问题的发生,但无法完全避免。
让一个集群什么事也不做,只是等待灾难的发生,这明显就是对资源的浪费。因为灾难是 (或者说应该是)很少见的,所以在大部分时间里,灾备集群什么事也不做。有些组织尝 试减小灾备集群的规模,让它远小于生产环境的集群规模。这种做法具有一定的风险,因 为你无法保证这种小规模的集群能够在紧急情况下发挥应有的作用。有些组织则倾向于让 灾备集群在平常也能发挥作用,他们把一些只读的工作负载定向到灾备集群上,也就是 说,实际上运行的是 Hub 和 Spoke 架构的一个简化版本,因为架构里只有一个 Spoke。
首先,不管选择哪一种失效备援方案,SRE(网站可靠性工程)团队都必须随时待命。今 天能够正常运行的计划,在系统升级之后可能就无法正常工作,又或者已有的工具无法满 足新场景的需求。每季度进行一次失效备援是最低限度的要求,一个高效的 SRE 团队会更 频繁地进行失效备援。Chaos Monkey 是 Netflix 提供的一个著名的服务,它随机地制造灾 难,有可能让任何一天都成为失效备援日。 现在,让我们来看看失效备援都包括哪些内容。
# 失效备援都包括哪些内容
# 1. 冗余硬件
- 备份服务器:为主服务器提供冗余,一旦主服务器宕机,备用服务器可以立刻接管任务。
- 负载均衡器:多个服务器组成一个集群,通过负载均衡器分配请求。当某一台服务器失效时,负载均衡器会自动将流量切换到健康的服务器上。
# 2. 数据冗余
- 数据复制:将数据从主数据库同步到备用数据库,确保在主数据库失效时,备用数据库能够无缝接管并提供服务。
- 主备数据库:常见的配置包括主数据库和从数据库,通过主从复制保证数据的一致性和可用性。主数据库出现故障时,从数据库能够自动接管。
# 3. 网络冗余
- 多条网络路径:为避免因网络故障导致的服务不可用,通常会部署多个网络路径或多个数据中心。当一条网络路径失效时,系统会切换到备用路径。
- DNS 负载均衡:通过 DNS 将流量分发到不同的数据中心,确保某个数据中心不可用时,流量可以自动切换到其他数据中心。
# 4. 应用层的备援
- 微服务架构:在微服务架构下,每个服务通常都有多个实例,使用容器或虚拟机来运行,服务发现和自动扩展机制可以保证当某个实例失效时,其他实例能够接管任务。
- 无状态设计:应用程序采用无状态设计,确保系统的每个节点可以独立地处理请求,不依赖于某个特定节点的状态。
# 5. 自动切换机制
- 自动故障检测与切换:通过监控系统定期检查各个服务的健康状态。当检测到服务故障时,自动将请求切换到备用系统或节点。
- 监控与告警:通过监控系统实时检测硬件、数据库、应用等各个组件的健康状况,一旦发现故障,立刻触发备援切换,并向运维团队发送告警通知。
# 6. 容灾方案(Disaster Recovery)
- 灾备中心:在不同地域或数据中心部署灾备系统,当一个区域或数据中心发生灾难时,另一个区域可以接管服务。
- 异地备份:数据、应用和配置等信息定期备份到异地,确保在发生不可预见灾难时能够快速恢复。
- 灾备演练:定期进行灾备演练,确保在发生故障时可以快速切换和恢复服务。
# 7. 回滚和恢复机制
- 事务回滚:在服务切换过程中,如果发生了数据一致性问题或失败,系统应该能够回滚到故障发生前的状态,确保数据的一致性。
- 备份恢复:当主系统故障并无法自动恢复时,手动恢复备份数据到主系统,以保证系统恢复正常。
# 8. 监控和日志管理
- 日志收集与分析:收集系统各个部分的日志信息,实时监控系统状态,发现潜在问题,快速响应。
- 日志集中管理:通过集中管理系统日志,可以在发生故障时,快速定位问题并处理。
# 9. 用户体验和接口稳定性
- API 冗余:API 服务可以通过冗余部署,确保当某个 API 服务节点失效时,其他节点能够继续提供服务。
- 缓存备援:如 Redis、Memcached 等缓存系统也需要部署主备机制,以避免缓存服务的单点故障影响用户体验。
# 创建主题
在集群里创建一个主题需要用到 3 个参数。这些参数是必须提供的,尽管有些已经有了 broker 级别的默认值。
主题名字 想要创建的主题的名字。
复制系数 主题的副本数量。
分区 主题的分区数量。
示例:列出集群里所有主题的详细信息。 # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe
# 修改复制系数
分区重分配工具提供了一些特性,用于改变分区的复制系数,这些特性并没有在文档里 说明。如果在创建分区时指定了错误的复制系数(比如在创建主题时没有足够多可用的 broker),那么就有必要修改它们。这可以通过创建一个 JSON 对象来完成,该对象使用分 区重新分配的执行步骤中使用的格式,显式指定分区所需的副本数量。集群将完成重分配 过程,并使用新的复制系数。
# 1. 集群级别的问题
集群问题一般分为以下两类。
• 不均衡的负载。
• 资源过度消耗。