博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka详解
阅读量:3958 次
发布时间:2019-05-24

本文共 6271 字,大约阅读时间需要 20 分钟。

消息队列概念

消息队列:本质是缓存数据的,之所以独立成为一类软件,因为消息队列提供了消息的管理能力,比如上次读数据的偏移量,信息读取后是否删除,以及消息备份等

消息队列优点

缓冲&高并发:可控制数据经系统的速度,解决生产消息和消费消息的处理速度不一致的情况。使关键组件可顶住浪涌流量,不会因为高突发的请求而崩溃。

解耦&高扩展:允许独立的扩展或修改消费者和生产者两端各自处理过程,只要确保它们遵守同样的接口约束。提高了系统的扩展性

冗余&可恢复:一个消息从队列中删除前,系统需指出该消息已被处理完毕,确保数据安全的保存直至使用完毕。一部分组件失效时,不会影响到整个系统

Kafka概念 

Kafka是一个分布式消息队列,对消息保存时根据Topic进行归类,kafka集群由多个broker组成,依赖于zookeeper集群保存一些meta信息,来保证系统可用性

内部实现原理

两种模式

点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

发布/订阅模式(一对多,数据生产后,推送给所有订阅者)

Kafka架构

kafka架构组件

1)Producer :消息生产者,向kafka broker发消息的客户端

2)Consumer :消息消费者,向kafka broker拉取消息的客户端

3)Topic :可理解为一个队列(同一个业务的数据应放在一个topic下)

4) Consumer Group:kafka实现消息的广播和单播的方式。广播:每个consumer有独立的CG即可,但每个partion只会把消息发给该CG中的一个consumer。单播:所有的consumer在同一个CG

5)Broker :一台kafka服务器即一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic

6)Partition:为了扩展,一个大topic可分布到多个broker上,一个topic分为多个partition。partition中每条消息都会被分配一个有序的offset,kafka只保证按一个partition中的顺序将消息发给consumer,不保证多个partition间的顺序

7)Offset:偏移量。

 Kafka集群部署

 tar包下载kafka_2.11-2.3.1.tgz 

安装过程

1)解压安装包

tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2)修改解压后的文件名称

mv kafka_2.11-0.11.0.0/ kafka

3)在/opt/module/kafka目录下创建logs文件夹

mkdir logs

4)修改配置文件

cd config/

vim server.properties

输入以下内容:

#broker的全局唯一编号,不能重复

broker.id=1

#kafka运行日志存放的路径

log.dirs=/opt/module/kafka/logs

#topic在当前broker上的分区个数

num.partitions=1

#配置连接Zookeeper集群地址

zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop7:2181

6)分发安装包

7)分别在hadoop102和hadoop103上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=2、broker.id=3

注:broker.id不得重复

8)启动集群

依次在hadoop101、hadoop102、hadoop103节点上启动kafka

[root@hadoop101 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[root@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[root@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

9)关闭集群

[root@hadoop101 kafka]$ bin/kafka-server-stop.sh stop

[root@hadoop102 kafka]$ bin/kafka-server-stop.sh stop

[root@hadoop103 kafka]$ bin/kafka-server-stop.sh stop

Kafka命令行操作

1)查看当前服务器中的所有topic

[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list

2)创建topic

[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \

--create --replication-factor 3 --partitions 1 --topic first

选项说明:

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

3)删除topic

[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \

--delete --topic first

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

4)发送消息

创建second主题

bin/kafka-topics.sh --zookeeper hadoop101:2181 \

--create --replication-factor 3 --partitions 3 --topic second

 

[root@hadoop101 kafka]$ bin/kafka-console-producer.sh \

--broker-list hadoop101:9092 --topic second

>hello world

>bigdata  bigdata

5)消费消息

[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh \

--bootstrap-server hadoop100:9092 --from-beginning --topic second

--from-beginning会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

6)查看某个Topic的详情

[root@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 \

--describe --topic second

Kafka工作流程分析

Create topic(创建topic的时候指定该topic分区数、副本数);此处对比create table

Producer向kafka的topic发送消息

Kafka生产过程分析

写入方式

producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写效率要高,保障kafka吞吐率)。

分区(Partition)

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:

每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值

1)分区的原因

(1)方便扩展

(2)提高并发

2)分区的原则

(1)指定了patition,则直接使用;

<1,  “我是中国人”>

(2)未指定patition但指定key,通过对key的值进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

副本(Replication)

副本是针对分区的副本

同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。

写入流程

 producer写入消息流程如下

1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader

2)producer将消息发送给该leader

3)leader将消息写入本地log

4)followers从leader pull消息,写入本地log后向leader发送ACK

5)leader收到所有ISR中的replication的ACK后向producer发送ACK

注:leader会维护一个与其一定程度保持同步的Replica列表,该列表称为ISR(in-sync Replica)。所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数replica.lag.time.max.ms(默认10秒)进行配置。与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas)。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步。

Broker 保存消息

存储方式

物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件),如下:

[root@hadoop101 logs]$ ll

drwxrwxr-x. 2 root root 4096 8月   6 14:37 first-0

drwxrwxr-x. 2 root root 4096 8月   6 14:35 first-1

drwxrwxr-x. 2 root root 8月   6 14:37 first-2

[root@hadoop101 logs]$ cd first-0

[root@hadoop101 first-0]$ ll

-rw-rw-r--. 1 root root 10485760 8月   6 14:33 00000000000000000000.index

-rw-rw-r--. 1 root root 219 8月   6 15:07 00000000000000000000.log

-rw-rw-r--. 1 root root 10485756 8月   6 14:33 00000000000000000000.timeindex

-rw-rw-r--. 1 root root 8 8月   6 14:37 leader-epoch-checkpoint

存储策略(消息删除策略)

无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:

1)基于时间:log.retention.hours=168

2)基于大小:log.retention.bytes=1073741824

需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

Kafka消费过程分析

kafka提供了两套consumer API:高级Consumer API和低级Consumer API。

高级API

1)高级API优点

高级API 写起来简单

不需要自行去管理offset,系统通过zookeeper自行管理

不需要管理分区,副本等情况,.系统自动管理

消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)

可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

2)高级API缺点

不能自行控制offset(对于某些特殊需求来说)

不能细化控制如分区、副本、zk等

低级API

1)低级 API 优点

能够开发者自己控制offset,想从哪里读取就从哪里读取。

自行控制连接分区,对分区自定义进行负载均衡

对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)

2)低级API缺点

太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。

消费者组

消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。

在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。

消费方式

Consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。

转载地址:http://uvazi.baihongyu.com/

你可能感兴趣的文章
用python的matplotlib包绘制热度图
查看>>
matplot pip安装
查看>>
序列S的所有可能情况
查看>>
在Linux上用pip安装scipy
查看>>
随机salt二次加密及hash加密漫谈
查看>>
linux 技巧:使用 screen 管理你的远程会话
查看>>
同时装了Python3和Python2,怎么用pip?
查看>>
linux tar 解压缩zip文件报错的解决
查看>>
vim,ctag和Taglist
查看>>
Ubuntu的apt命令详解
查看>>
Ubuntu Server 设置sshd
查看>>
sort,uniq命令的使用。
查看>>
linux下md5加密(使用openssl库C实现)
查看>>
openssl、MD5的linux安装方法
查看>>
DevC++ 工程没有调试信息的解决办法
查看>>
http消息长度的确定
查看>>
手机和电脑如何连接蓝牙
查看>>
HTTP协议参数
查看>>
wireshark检索命令
查看>>
五人分鱼问题(附答案)
查看>>