本文共 6271 字,大约阅读时间需要 20 分钟。
消息队列概念
消息队列:本质是缓存数据的,之所以独立成为一类软件,因为消息队列提供了消息的管理能力,比如上次读数据的偏移量,信息读取后是否删除,以及消息备份等
消息队列优点
缓冲&高并发:可控制数据经系统的速度,解决生产消息和消费消息的处理速度不一致的情况。使关键组件可顶住浪涌流量,不会因为高突发的请求而崩溃。
解耦&高扩展:允许独立的扩展或修改消费者和生产者两端各自处理过程,只要确保它们遵守同样的接口约束。提高了系统的扩展性
冗余&可恢复:一个消息从队列中删除前,系统需指出该消息已被处理完毕,确保数据安全的保存直至使用完毕。一部分组件失效时,不会影响到整个系统
Kafka概念
Kafka是一个分布式消息队列,对消息保存时根据Topic进行归类,kafka集群由多个broker组成,依赖于zookeeper集群保存一些meta信息,来保证系统可用性
内部实现原理
两种模式
点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
发布/订阅模式(一对多,数据生产后,推送给所有订阅者)
Kafka架构
kafka架构组件
1)Producer :消息生产者,向kafka broker发消息的客户端
2)Consumer :消息消费者,向kafka broker拉取消息的客户端
3)Topic :可理解为一个队列(同一个业务的数据应放在一个topic下)
4) Consumer Group:kafka实现消息的广播和单播的方式。广播:每个consumer有独立的CG即可,但每个partion只会把消息发给该CG中的一个consumer。单播:所有的consumer在同一个CG
5)Broker :一台kafka服务器即一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
6)Partition:为了扩展,一个大topic可分布到多个broker上,一个topic分为多个partition。partition中每条消息都会被分配一个有序的offset,kafka只保证按一个partition中的顺序将消息发给consumer,不保证多个partition间的顺序
7)Offset:偏移量。
Kafka集群部署
tar包下载kafka_2.11-2.3.1.tgz
安装过程
1)解压安装包
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解压后的文件名称
mv kafka_2.11-0.11.0.0/ kafka
3)在/opt/module/kafka目录下创建logs文件夹
mkdir logs
4)修改配置文件
cd config/
vim server.properties
输入以下内容:
#broker的全局唯一编号,不能重复
broker.id=1
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop7:2181
6)分发安装包
7)分别在hadoop102和hadoop103上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=2、broker.id=3
注:broker.id不得重复
8)启动集群
依次在hadoop101、hadoop102、hadoop103节点上启动kafka
[root@hadoop101 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
[root@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
9)关闭集群
[root@hadoop101 kafka]$ bin/kafka-server-stop.sh stop
[root@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[root@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
Kafka命令行操作
1)查看当前服务器中的所有topic
[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list
2)创建topic
[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--create --replication-factor 3 --partitions 1 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
3)删除topic
[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
4)发送消息
创建second主题
bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--create --replication-factor 3 --partitions 3 --topic second
[root@hadoop101 kafka]$ bin/kafka-console-producer.sh \
--broker-list hadoop101:9092 --topic second
>hello world
>bigdata bigdata
5)消费消息
[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop100:9092 --from-beginning --topic second
--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
6)查看某个Topic的详情
[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \
--describe --topic second
Kafka工作流程分析
Create topic(创建topic的时候指定该topic分区数、副本数);此处对比create table
Producer向kafka的topic发送消息
Kafka生产过程分析
写入方式
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写效率要高,保障kafka吞吐率)。
分区(Partition)
消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。
1)分区的原因
(1)方便扩展
(2)提高并发
2)分区的原则
(1)指定了patition,则直接使用;
<1, “我是中国人”>
(2)未指定patition但指定key,通过对key的值进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
副本(Replication)
副本是针对分区的副本
同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。
写入流程
producer写入消息流程如下:
1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后向producer发送ACK
注:leader会维护一个与其一定程度保持同步的Replica列表,该列表称为ISR(in-sync Replica)。所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数replica.lag.time.max.ms(默认10秒)进行配置。与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas)。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步。
Broker 保存消息
存储方式
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件),如下:
[root@hadoop101 logs]$ ll
drwxrwxr-x. 2 root root 4096 8月 6 14:37 first-0
drwxrwxr-x. 2 root root 4096 8月 6 14:35 first-1
drwxrwxr-x. 2 root root 8月 6 14:37 first-2
[root@hadoop101 logs]$ cd first-0
[root@hadoop101 first-0]$ ll
-rw-rw-r--. 1 root root 10485760 8月 6 14:33 00000000000000000000.index
-rw-rw-r--. 1 root root 219 8月 6 15:07 00000000000000000000.log
-rw-rw-r--. 1 root root 10485756 8月 6 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 root root 8 8月 6 14:37 leader-epoch-checkpoint
存储策略(消息删除策略)
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
Kafka消费过程分析
kafka提供了两套consumer API:高级Consumer API和低级Consumer API。
高级API
1)高级API优点
高级API 写起来简单
不需要自行去管理offset,系统通过zookeeper自行管理。
不需要管理分区,副本等情况,.系统自动管理。
消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)
可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
2)高级API缺点
不能自行控制offset(对于某些特殊需求来说)
不能细化控制如分区、副本、zk等
低级API
1)低级 API 优点
能够让开发者自己控制offset,想从哪里读取就从哪里读取。
自行控制连接分区,对分区自定义进行负载均衡
对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)
2)低级API缺点
太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。
消费者组
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
消费方式
Consumer采用pull(拉)模式从broker中读取数据。
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式。
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。
转载地址:http://uvazi.baihongyu.com/