5846 字
29 分钟
Kafka学习笔记

Kafka#

我们总说Kafka是一个消息队列,但在它是消息队列之前,首先是一个分布式流式处理平台

什么是分布式流式处理平台呢?可以用三个关键功能来概述

①、消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因

②、容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。

③、流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

Kafka 主要有两大应用场景:

  1. 消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流。

那既然kafka是作为消息队列所为人熟知的,那我们就先介绍一下,什么是消息队列

消息队列#

什么是消息队列? 我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。

MQ

参与消息传递的双方称为 生产者 和 消费者 ,生产者负责发送消息,消费者负责处理消息。

经典的 Pub/Sub 模型

在操作系统中,有一种很重要的**IPC(进程间通信)方式就是消息队列,但这和我们使用的消息队列是不一样的,前者更多指内置于系统内的队列,仅供内部进程的数据调度,后者指的是中间件**,是各个服务以及系统内部各个组件和模块之间通信的工具亦或桥梁

中间件:中间件(英语:Middleware),又译中间件、中介层,是一类提供系统软件和应用软件之间连接、便于软件各部件之间的沟通的软件,应用软件可以借助中间件在不同的技术架构之间共享信息与资源。中间件位于客户机服务器的操作系统之上,管理着计算资源和网络通信。

阿里淘系技术部的回答

消息队列的作用#

一、异步处理#

asyn

将用户请求中包含的耗时操作,通过消息队列实现异步处理,将对应的消息发送到消息队列之后就立即返回结果,减少响应时间,提高用户体验。随后,系统再对消息进行消费。

因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票

二、削峰填谷流控处理#

先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。

Example:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:

削峰消息队列

我在这里用Reactor的流控模式来解释一下:

①常规的业务模式,即同步的写法,是流控中的正压模式(Push),它的优点就是写法简单,逻辑符合人类正常思维,它是生产者主导的一种数据流模式,所以整个业务流程就是生产者一直推送数据,消费者一直处理数据,key role 是生产者的生产速率,归根结底其实就是上游数据量

// 正压示例
Observable.interval(1, TimeUnit.MILLISECONDS)
.subscribe(data -> {
// 消费者可能处理不过来
process(data);
});

②响应式的业务模式,即全异步的写法,是流控模式中的背压模式(Backpressure),它的优点是可以最大限度地利用服务器亦或者说硬件的性能,特点是由消费者主导,根据自身处理数据的能力来请求数据,可以防止消费者自身过载

// 背压示例
Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
// 消费者请求100个数据
s.request(100);
}
@Override
public void onNext(Long data) {
process(data);
}
});

③Kafka的模式在我看来其实位列前面两者之间,首先前半段,也就是pub阶段,是和正压一样,通过生产者,将数据压入MQ中,是由生产者主导的生产阶段,然而后半段,也就是sub阶段中,业务处理的key role就变成了消费者,整个业务的推进全靠消费者去监听MQ中的消息推送,然后交由具体服务去处理。这样有个特别好的优点,那就是各个服务可以各司其职,极大程度降低了各个服务之间的耦合度,这个我们马上就说。除此之外,即使老生常谈的性能优化了,我其实不愿意赘述这方面的优点,因为kafka的高性能高可能近乎可以说是人尽皆知了,所以这里我将阐述下 :

为什么采用了异步的kafka会这么优秀?

  1. 内存压力释放 > - 数据不再堆积在JVM内存
  • 利用Kafka的页缓存机制(Page Cache)
  • 避免OOM风险
  1. 磁盘顺序写 (原理后面说) > - 消息顺序写入磁盘
  • 避免随机IO的性能瓶颈
  • 提升数据处理效率
  1. 零拷贝技术 (Zero Copy) > - 减少数据在内核空间(管态)和用户空间(目态)的复制
  • 显著提升吞吐量
  • 降低CPU使用率

三、降低系统耦合性#

使用消息队列还可以降低系统耦合性。如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。

生产者(客户端)发送消息到消息队列中去,消费者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。

消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现业务的可扩展性设计

Example In AgriNavi

Pub:

@Mapping(types = 位置信息汇报, desc = "位置信息汇报")
public void T0200(T0200 message, Session session) {
asyncQueueService.sendLocationInfo(message);
}

Sub:

/**
* JT808 消息监听器
* 负责接收并处理不同类型的 JT808 消息,包括批量处理功能。
* 目前支持 T0200、T0801 和 T8103 消息类型。
* @author simon
* @since 2024/9/23
*/
@Component
public class JT808Listener {
private static final Logger log = LoggerFactory.getLogger(JT808Listener.class);
@Resource
private IEffectiveTrajectoryService effectiveTrajectoryService;
@Resource
private IOriginalTrajectoryService originalTrajectoryService;
/**
* 批量接收 T0200 消息并处理
* @param messages 消息列表
*/
@KafkaListener(
topicPartitions = @TopicPartition(topic = "T0200", partitions = {"0"}),
containerFactory = "kafkaListenerContainerFactory"
)
public void savePoint(List<T0200> messages) {
originalTrajectoryService.saveOriginalPoints(messages);
effectiveTrajectoryService.insertBatch(messages);
}
}

这就是一个很明显的pub/sub模式来进行系统业务解耦的例子,在GPS中,我们通过协议解析,将16进制数据反序列化成Java实体类,再将它批量压入MQ中,这样,客户无需等待一系列后续业务的执行(批次加入InfluxDB和ElasticSearch),就可以进行后续的操作。并且,这样,也无需再Endpoint这个业务中引入持久化的业务了,Endpoint只是一个端点控制器,它只需要从协议模块中获取到实体类,再下发给真正Web控制器,仅此而已,这样做极大地降低了两个业务(解析业务、持久化业务)之间的耦合度。

四、实现分布式事务#

分布式事务的解决方案之一就是 MQ 事务。

RocketMQ、 Kafka、Pulsar、QMQ 都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。

本文档主要针对Kafka进行讲解,故拿kafka进行举例说明

如果想要是在kafka中实现分布式事务,需要引入一个特殊的组件 TC

tc

与Seata不同的是,用kafka之类的消息队列做分布式事务,消息队列本身是对数据源不敏感的,所以就不会有RM(Resource Manager)的概念,TM

Kafka的事务特性

①原子写

这是事务中最重要的一点,成功一起成功,失败一起失败,事务是此处kafka中消息状态流转的最小操作单位。 实现原理如图

transcation

②拒绝僵尸实例 (Zombie Fencing)

在分布式系统中,一个instance的宕机或失联,集群往往会自动启动一个新的实例来代替它的工作。此时若原实例恢复了,那么集群中就产生了两个具有相同职责的实例,此时前一个instance就被称为“僵尸实例(Zombie Instance)”。在Kafka中,两个相同的producer同时处理消息并生产出重复的消息(read-process-write模式),这样就严重违反了Exactly Once Processing的语义。这就是僵尸实例问题。 Kafka事务特性通过 transaction-id 属性来解决僵尸实例问题。所有具有相同 transaction-id 的Producer都会被分配相同的pid,同时每一个Producer还会被分配一个递增的epoch。Kafka收到事务提交请求时,如果检查当前事务提交者的epoch不是最新的,那么就会拒绝该Producer的请求。从而达成拒绝僵尸实例的目标。

③读已提交事务消息

为了保证事务特性,Consumer如果设置了 isolation.level=read commited ,那么它只会读取已经提交了的消息。在 Producer成功提交事务后,Kafka会将所有该事务中的消息的 Transaction Marker 从 uncommited 标记为 commited状态,从而所有的Consumer都能够消费

④2PC

提交阶段(两阶段提交)#

第一阶段(Prepare)#

Producer调用commitTransaction发起提交

TC将事务状态改为PREPARE_COMMIT

TC确保所有消息都已写入对应分区

各分区确认数据已持久化

第二阶段(Commit)#

TC收到所有确认后,将状态改为COMMIT

TC向所有涉及的分区发送COMMIT标记(控制消息)

分区接收到标记后使消息对消费者可见

TC最终将状态更新为COMPLETE

五、顺序保证#

在很多应用场景中,处理数据的顺序至关重要。消息队列保证数据按照特定的顺序被处理,适用于那些对数据顺序有严格要求的场景。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持顺序消息。

而kafka的顺序保证,依靠的是Kafka本身采用的数据结构,大家都知道,kafka中真正存放消息的其实是topic,然后topic中有很多partition,每个partition其实就是一个追加日志 (Append-only Log),每新增一条消息,实际上是在日志的segment段中追加记录,所以kafka天生支持消息的顺序保证

六、数据流处理#

针对分布式系统产生的海量数据流,如业务日志、监控数据、用户行为等,消息队列可以实时或批量收集这些数据,并将其导入到大数据处理引擎中,实现高效的数据流管理和处理。

使用消息队列会带来的问题#

①系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入MQ之后,这些问题就会实打实地困扰你

②系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题

③一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了,这在生产中就是大锅了

JMS & AMQP#

JMS#

JMS是什么?JMS(JAVA Message Service,java 消息服务)是一种规范,是 Java 的消息服务,JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。JMS(JAVA Message Service,Java 消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

JMS 定义了五种不同的消息正文格式以及调用的消息类型,允许你发送并接收以一些不同形式的数据:

  • StreamMessage: 原始值的数据流
  • MapMessage:一套名称-值对
  • TextMessage:一个字符串对象
  • ObjectMessage:一个序列化的 Java 对象
  • BytesMessage:一个字节的数据流

ActiveMQ(已被淘汰) 就是基于 JMS 规范实现的。

JMS采用两种消息模型,P2P和Pub/Sub,老生畅谈了,这里不多赘述

p2p

二者都做到了消息的传递,但是实现效果和方式不同,效果根据上图展示已经很明显了,至于方式,可以细说一下。

P2P使用的消息载体是Queue,也就是典型的队列满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

Pub/Sub采用的是Topic作为消息载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者。而kafka就是大规模采用了这种模式,也演变成了现在所熟知的生产订阅模式。

AMQP#

AMQP是什么?AMQP,即 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

RabbitMQ 就是基于 AMQP 协议实现的

JMS vs AMQP#

对比方向JMSAMQP
定义Java API协议
跨语言
跨平台
支持消息类型提供两种消息模型:①Peer-2-Peer;②Pub/sub提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分;
支持消息类型支持多种消息类型 ,我们在上面提到过byte[](二进制)

总结:

  • AMQP 为消息定义了线路层(wire-level protocol)的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
  • JMS 支持 TextMessageMapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
  • 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。

ZooKeeper#

谈到kafka就不得不说ZooKeeper,因为Kafka归根结底也是个服务,它的很多功能是需要zk做支持的

其实zk是个很伟大的分布式服务注册中心,如果把他单拉出来讲的话,要好久。

我们此处仅做针对kafka的介绍,所以只介绍一些与kafka相关的功能

zookeeperkafka

元数据管理#

ZooKeeper 主要为 Kafka 提供元数据的管理的功能。 从图中我们可以看出,Zookeeper 主要为 Kafka 做了下面这些事情:

Broker 注册:在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到 /brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去

Topic 注册:在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0/brokers/topics/my-topic/Partitions/1

负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡

kafkazkstruct

功能实现#

这里介绍一下,ZooKeeper其实实现了以下内容:

顺序一致性: 从同一客户端发起的事务请求,最终将会严格地按照顺序被应用到 ZooKeeper 中去。

原子性: 所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的,也就是说,要么整个集群中所有的机器都成功应用了某一个事务,要么都没有应用。

单一系统映像: 无论客户端连到哪一个 ZooKeeper 服务器上,其看到的服务端数据模型都是一致的。

可靠性: 一旦一次更改请求被应用,更改的结果就会被持久化,直到被下一次更改覆盖。

实时性: 一旦数据发生变更,其他节点会实时感知到。每个客户端的系统视图都是最新的。

集群部署:3~5 台(最好奇数台)机器就可以组成一个集群,每台机器都在内存保存了 ZooKeeper 的全部数据,机器之间互相通信同步数据,客户端连接任何一台机器都可以。

**高可用:**如果某台机器宕机,会保证数据不丢失。集群中挂掉不超过一半的机器,都能保证集群可用。比如 3 台机器可以挂 1 台,5 台机器可以挂 2 台。


参考了这篇文章,如果想细致地了解Kafka基于ZooKeeper是怎么实现,可以去看一下

Zookeeper 在 Kafka 中的作用

铺垫了这么多,终于可以正式开始讲Kafka了,相信基于前面的介绍和扫盲,大家已经对Kafka是什么,为什么会有kafka有了充分的了解了,接下来,我会较为系统地介绍Kafka这一

Kafka的核心概念#

kafkaCore

  1. Producer(生产者) : 产生消息的一方。
  2. Consumer(消费者) : 消费消息的一方。
  3. Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。

同时,你一定也注意到每个 Broker 中又包含了 Topic 以及 Partition 这两个重要的概念:

  • Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
  • Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。这正如我上面所画的图一样

数量对应关系

Kafka学习笔记
https://fuwari.vercel.app/posts/kafka_learn/
作者
Simon
发布于
2025-02-12
许可协议
CC BY-NC-SA 4.0