kafka


官网

Kafka is a distributed, partitioned, replicated commit logservice。它提供了类似于 JMS 的特性,但是在实现上完全不同,此外它并不是 JMS 规范的实现。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer,消息接受者成为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个实例()成为 broker。无论是 kafka 集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性集群保存一些 meta 信息。

Topics/logs

一个 Topic 可以认为是一类消息,每个 topic 将被分成多个 partition(区),每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称为 offset(偏移量),offset 为一个long 型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka 并没有提供其他额外的索引机制来存储 offset,因为在 kafka 中几乎不允许对消息进行“随机读写”。

kafka和JMS(Java Message Service)实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支.

对于 consumer 而言,它需要保存消费消息的 offset,对于 offset 的保存和使用,有 consumer 来控制;当consumer正常消费消息时, offset 将会”线性”的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将 offset 重置为任意值..(offset将会保存在zookeeper中,参见下文)

kafka 集群几乎不需要维护任何 consumer 和 producer 状态信息,这些信息有 zookeeper 保存;因此 producer 和 consumer 的实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.

partitions 的目的有多个.最根本原因是 kafka 基于文件存储.通过分区,可以将日志内容分散到多个上,来避免文件尺寸达到单机磁盘的上限,每个 partiton 都会被当前 server(kafka实例)保存;可以将一个 topic 切分多任意多个 partitions,来消息保存/消费的效率.此外越多的 partitions 意味着可以容纳更多的 consumer,有效提升并发消费的能力.

Distribution

一个 Topic 的多个 partitions,被分布在 kafka 集群中的多个 server 上;每个 server (kafka 实例)负责 partitions 中消息的读写操作;此外 kafka 还可以配置 partitions 需要备份的个数(replicas),每个 partition 将会被备份到多台机器上,以提高可用性.

基于 replicated 方案,那么就意味着需要对多个备份进行调度;每个 partition 都有一个为 “leader” ; leader 负责所有的读写操作,如果 leader 失效,那么将会有其他 follower 来接管(成为新的leader);follower 只是单调的和 leader 跟进,同步消息即可.由此可见作为 leader 的 server 承载了全部的请求压力,因此从集群的整体考虑,有多少个 partitions 就意味着有多少个 “leader” , kafka 会将 “leader” 均衡的分散在每个实例上,来确保整体的性能稳定.

Producers
Producer 将消息发布到指定的 Topic 中,同时 Producer 也能决定将此消息归属于哪个 partition ;比如基于 “round-robin” 方式或者通过其他的一些算法等.

Consumers
本质上 kafka 只支持 Topic.每个 consumer 属于一个 consumer group;反过来说,每个 group 中可以有多个 consumer.发送到 Topic 的消息,只会被订阅此 Topic 的每个 group 中的一个 consumer 消费.

如果所有的 consumer 都具有相同的 group,这种情况和 queue 模式很像;消息将会在 consumers 之间负载均衡.

如果所有的 consumer 都具有不同的 group,那这就是”发布-订阅”;消息将会广播给所有的消费者.

在 kafka 中,一个 partition 中的消息只会被 group 中的一个 consumer 消费;每个 group 中 consumer 消息消费互相独立;我们可以认为一个 group 是一个”订阅”者,一个 Topic 中的每个 partions,只会被一个”订阅者”中的一个 consumer 消费,不过一个 consumer 可以消费多个 partitions 中的消息. kafka 只能保证一个 partition 中的消息被某个 consumer 消费时,消息是顺序的.事实上,从 Topic 角度来说,消息仍不是有序的.

kafka 的原理决定,对于一个 topic,同一个 group 中不能有多于 partitions 个数的 consumer 同时消费,否则将意味着某些 consumer 将无法得到消息.

Guarantees

1) 发送到 partitions 中的消息将会按照它接收的顺序追加到日志中
2) 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.
3) 如果 Topic 的 "replicationfactor" 为 N,那么允许 N-1 个 kafka 实例失效.

使用场景

1、Messaging

对于一些常规的消息系统,kafka 是个不错的选择;partitons/replication 和容错,可以使 kafka 具有良好的扩展性和性能优势.

不过到目前为止,我们应该很清楚认识到,kafka 并没有提供 JMS 中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;

kafka 只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)

2、Websit activity tracking

kafka 可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等

3、Log Aggregation

kafka 的特性决定它非常适合作为"日志收集中心";application 可以将操作日志"批量""异步"的发送到 kafka 集群中,而不是保存在本地或者 DB 中;

kafka 可以批量提交消息/压缩消息等,这对 producer 端而言,几乎感觉不到性能的开支.此时 consumer 端可以使 hadoop 等其他系统化的存储和分析系统.

设计原理

kafka 的初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力.

1、持久性

kafka 使用文件存储消息,这就直接决定 kafka 在性能上严重依赖文件系统的本身特性.且无论任何 OS 下,对文件系统本身的优化几乎没有可能.文件缓存/直接内存映射等是常用的手段.

因为 kafka 是对日志文件进行 append 操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker 会将消息暂时 buffer 起来,当消息的个数(或尺寸)达到一定阀值时,再 flush 到磁盘,这样减少了磁盘 IO 调用的次数.

2、性能

需要考虑的影响性能点很多,除磁盘 IO 之外,我们还需要考虑网络 IO,这直接关系到 kafka 的吞吐量问题.kafka 并没有提供太多高超的技巧;

对于 producer 端,可以将消息 buffer 起来,当消息的条数达到一定阀值时,批量发送给 broker;

对于 consumer 端也是一样,批量 fetch 多条消息.不过消息量的大小可以通过配置文件来指定.

对于 kafka broker 端,似乎有个 sendfile 系统调用可以潜在的提升网络 IO 的性能:将文件的数据映射到系统内存中,socket 直接读取相应的内存区域即可,而无需进程再次copy 和交换.

其实对于 producer/consumer/broker 三者而言,CPU 的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的 CPU 资源,不过对于 kafka 而言,网络 IO 更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka 支持 gzip/snappy 等多种压缩方式.

3、生产者

负载均衡: producer 将会和 Topic 下所有 partition leader 保持 socket 连接;消息由 producer 直接通过 socket 发送到 broker,中间不会经过任何”路由层”.事实上,消息被路由到哪个 partition 上,有 producer 决定.比如可以采用 “random” “key-hash” “轮询”等,如果一个 topic 中有多个partitions,那么在 producer 端实现”消息均衡分发”是必要的.

其中 partition leader 的位置(host:port)注册在 zookeeper 中,producer 作为 zookeeper client,已经注册了 watch 用来监听 partition leader 的变更事件.

异步发送:将多条消息暂且在客户端 buffer 起来,并将他们批量的发送到 broker,小数据 IO 太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当 producer 失效时,那些尚未发送的消息将会丢失。

4、消费者

consumer 端向 broker 发送 “fetch” 请求,并告知其获取消息的 offset;此后 consumer 将会获得一定条数的消息;consumer 端也可以重置 offset 来重新消费消息.

在 JMS 实现中,Topic 模型基于 push 方式,即 broker 将消息推送给 consumer 端.不过在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立连接之后,主动去pull(或者说fetch) 消息;这中模式有些优点,首先 consumer 端可以根据自己的消费能力适时的去 fetch 消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.

其他 JMS 实现,消息消费的位置是有 prodiver 保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求 JMS broker 需要太多额外的工作.在kafka 中,partition 中的消息只有一个 consumer 在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见 kafka broker 端是相当轻量级的.当消息被 consumer 接收之后,consumer 可以在本地保存最后消息的 offset,并间歇性的向 zookeeper 注册 offset.由此可见,consumer 也很轻量级.

5、消息传送机制

对于 JMS 实现,消息传输担保非常直接:有且只有一次(exactly once).在 kafka 中稍有不同:

1
2
3
4
5
6
7
8
9
10
1) at most once: 最多一次,这个和 JMS 中"非持久化"消息类似.发送一次,无论成败,将不会重发.
2) at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功.
3) exactly once: 消息只会发送一次.

at most once: 消费者 fetch 消息,然后保存 offset,然后处理消息;当 client 保存 offset 之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后"未处理"的消息将不能被 fetch 到,这就是"at most once".

at least once: 消费者 fetch 消息,然后处理消息,然后保存 offset.如果消息处理成功之后,但是在保存 offset 阶段 zookeeper 异常导致保存操作未能执行成功,这就导致接下来再次 fetch 时可能获得上次已经处理过的消息,这就是 "at least once",原因 offset 没有及时的提交给 zookeeper,zookeeper 恢复正常还是之前 offset 状态.

exactly once: kafka 中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka 中是没有必要的.
通常情况下 "at-least-once" 是我们首选.(相比 at most once 而言,重复接收数据总比丢失数据要好).

6、复制备份

kafka 将每个 partition 数据复制到多个 server上,任何一个 partition 有一个 leader 和多个 follower(可以没有);备份的个数可以通过 broker 配置文件来设定. leader 处理所有的 read-write 请求, follower 需要和 leader 保持同步. Follower 和 consumer 一样,消费消息并保存在本地日志中; leader 负责跟踪所有的 follower 状态,如果 follower “落后”太多或者失效, leader 将会把它从 replicas 同步列表中删除.当所有的 follower 都将一条消息保存成功,此消息才被认为是 “committed”,那么此时 consumer 才能消费它.即使只有一个 replicas 实例存活,仍然可以保证消息的正常发送和接收,只要 zookeeper 集群存活即可.(不同于其他分布式存储,比如 hbase 需要”多数派”存活才行)

当 leader 失效时,需在 followers 中选取出新的 leader,可能此时 follower 落后于 leader,因此需要选择一个 “up-to-date” 的 follower.选择 follower 时需要兼顾一个问题,就是新 leader 上所已经承载的 partition leader 的个数,如果一个 server 上有过多的 partition leader,意味着此 server 将承受着更多的 IO 压力.在选举新 leader, 需要考虑到”负载均衡”.

7.日志

如果一个 topic 的名称为 “my_topic”,它有 2 个 partitions, 那么日志将会保存在 my_topic_0 和 my_topic_1 两个目录中;日志文件中保存了一序列 “log entries” (日志条目), 每个 log entry 格式为 “4个字节的数字N表示消息的长度” + “N个字节的消息内容”; 每个日志都有一个 offset 来唯一的标记一条消息, offset 的值为 8 个字节的数字,表示此消息在此 partition 中所处的起始位置. 每个 partition 在物理存储层面,有多个 log file 组成(称为segment). segmentfile 的命名为 “最小offset”.kafka. 例如 “00000000000.kafka”; 其中 “最小offset” 表示此 segment 中起始消息的 offset.

其中每个 partiton 中所持有的 segments 列表信息会存储在 zookeeper 中.

当 segment 文件尺寸达到一定阀值时(可以通过配置文件设定,默认1G),将会创建一个新的文件; 当 buffer 中消息的条数达到阀值时将会触发日志信息 flush 到日志文件中, 同时如果 “距离最近一次 flush 的时间差” 达到阀值时, 也会触发 flush 到日志文件. 如果 broker 失效, 极有可能会丢失那些尚未 flush 到文件的消息. 因为意外实现, 仍然会导致 log 文件格式的破坏(文件尾部), 那么就要求当 server 启动是需要检测最后一个 segment 的文件结构是否合法并进行必要的修复.

获取消息时,需要指定 offset 和最大 chunk 尺寸, offset 用来表示消息的起始位置, chunk size 用来表示最大获取消息的总长度(间接的表示消息的条数). 根据 offset, 可以找到此消息所在 segment 文件, 然后根据 segment 的最小 offset 取差值, 得到它在 file 中的相对位置, 直接读取输出即可.

日志文件的删除策略非常简单: 启动一个后台线程定期扫描 log file 列表, 把保存时间超过阀值的文件直接删除(根据文件的创建时间). 为了避免删除文件时仍然有 read 操作(consumer消费), 采取 copy-on-write 方式.

8、分配

kafka 使用 zookeeper 来存储一些 meta 信息, 并使用了 zookeeper watch 机制来发现 meta 信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

1) Broker node registry: 当一个 kafkabroker 启动后, 首先会向 zookeeper 注册自己的节点信息(临时 znode), 同时当 broker 和 zookeeper 断开连接时, 此 znode 也会被删除.

格式: /broker/ids/[0…N] –>host:port; 其中 [0..N] 表示 broker id, 每个 broker 的配置文件中都需要指定一个数字类型的 id(全局不可重复), znode 的值为此 broker 的 host:port 信息.

2) Broker Topic Registry: 当一个 broker 启动时,会向 zookeeper 注册自己持有的 topic 和 partitions 信息, 仍然是一个临时 znode.

格式: /broker/topics/[topic]/[0…N] 其中 [0..N] 表示 partition 索引号.

3) Consumer and Consumer group: 每个 consumer 被创建时, 会向 zookeeper 注册自己的信息; 此作用主要是为了”负载均衡”.

一个 group 中的多个consumer可以交错的消费一个 topic 的所有partitions; 简而言之, 保证此 topic 的所有 partitions 都能被此 group 所消费, 且消费时为了性能考虑, 让 partition 相对均衡的分散到每个 consumer 上.

4) Consumer id Registry: 每个 consumer 都有一个唯一的 ID(host:uuid, 可以通过配置文件指定, 也可以由系统生成), 此 id 用来标记消费者信息.

格式:/consumers/[group_id]/ids/[consumer_id] 仍然是一个临时的 znode, 此节点的值为 {“topic_name”:#streams…}, 即表示此 consumer 目前所消费的 topic + partitions 列表.

5) Consumer offset Tracking: 用来跟踪每个 consumer 目前所消费的 partition 中最大的 offset.

格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]–>offset_value 此 znode 为持久节点, 可以看出 offset 跟 group_id 有关, 以表明当 group 中一个消费者失效, 其他 consumer 可以继续消费.
6) Partition Owner registry: 用来标记 partition 被哪个 consumer 消费.临时 znode

格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]–>consumer_node_id当consumer启动时,所触发的操作:

A) 首先进行 "Consumer id Registry";
B) 然后在 "Consumer id Registry" 节点下注册一个 watch 用来监听当前 group 中其他 consumer 的 "leave" 和 "join"; 只要此 znode path 下节点列表变更, 都会触发此 group 下 consumer 的负载均衡. (比如一个 consumer 失效,那么其他 consumer 接管 partitions).
C) 在 "Broker id registry" 节点下, 注册一个 watch 用来监听 broker 的存活情况; 如果 broker 列表变更, 将会触发所有的 groups 下的 consumer 重新 balance.

1
2
3
4
5
1) Producer 端使用 zookeeper 用来"发现" broker 列表,以及和 Topic 下每个 partition leader 建立 socket 连接并发送消息.

2) Broker 端使用 zookeeper 用来注册 broker 信息, 已经监测 partitionleader 存活性.

3) Consumer 端使用 zookeeper 用来注册 consumer 信息,其中包括 consumer 消费的 partition 列表等, 同时也用来发现 broker 列表, 并和 partition leader 建立 socket 连接, 并获取消息.

主要配置

1、Broker 配置

2、Consumer 主要配置

3、Producer 主要配置

以上是关于 kafka 一些基础说明,在其中我们知道如果要 kafka 正常运行,必须配置zookeeper,否则无论是 kafka 集群还是的生存者和消费者都无法正常的工作的,以下是对zookeeper 进行一些简单的介绍:

zookeeper集群

zookeeper 是一个为分布式应用提供一致性服务的软件,它是开源的 Hadoop 项目的一个子项目,并根据 google 发表的一篇论文来实现的。zookeeper 为分布式系统提供了高效且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等。zookeeper 接口简单,我们不必过多地纠结在分布式系统难于处理的同步和一致性问题上,你可以使用 zookeeper 提供的现成 (off-the-shelf) 服务来实现来实现分布式系统额配置管理,组管理,Leader 选举等功能。

zookeeper 集群的安装

准备三台服务器

1
server1:192.168.0.1, server2:192.168.0.2, server3:192.168.0.3.

1)下载zookeeper

http://zookeeper.apache.org/releases.html 去下载最新版本Zookeeper-3.4.5 的安装包 zookeeper-3.4.5.tar.gz. 将文件保存 server1 的~目录下

2)安装zookeeper

先在服务器分别执行a-c步骤

a)解压

1
tar -zxvf zookeeper-3.4.5.tar.gz

解压完成后在目录~下会发现多出一个目录 zookeeper-3.4.5, 重新命令为 zookeeper

b)配置

conf/zoo_sample.cfg 拷贝一份命名为 zoo.cfg,也放在 conf 目录下。然后按照如下值修改其中的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/wwb/zookeeper /data
dataLogDir=/home/wwb/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#http://zookeeper.apache.org/doc/ ... html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=192.168.0.1:3888:4888
server.2=192.168.0.2:3888:4888
server.3=192.168.0.3:3888:4888
  • tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

  • dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。

  • clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

  • initLimit:这个配置项是用来配置 Zookeeper 接受(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒

  • syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒

server.A = B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号

注意:dataDir, dataLogDir 中的 wwb 是当前登录用户名,data,logs 目录开始是不存在,需要使用 mkdir 命令创建相应的目录。并且在该目录下创建文件 myid, serve1, server2, server3 该文件内容分别为 1, 2, 3。
针对服务器 server2### 针对服务器 server2, server3 可以将 server1 复制到相应的目录,不过需要注意dataDir, dataLogDir 目录, 并且文件 myid 内容分别为 2, 3。

3)依次启动server1,server2, server3 的 zookeeper.

1
2
3
4
/home/wwb/zookeeper/bin/zkServer.sh start,出现类似以下内容
JMX enabled by default
Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

4)测试 zookeeper 是否正常工作

在 server1 上执行以下命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1、/home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181   

//出现类似以下内容

JLine support is enabled

2013-11-27 19:59:40,560 - INFO [main-SendThread(localhost.localdomain:2181):ClientCnxn$SendThread@736]- Session establishmentcomplete on localhost.localdomain/127.0.0.1:2181, sessionid = 0x1429cdb49220000, negotiatedtimeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2181(CONNECTED) 0] [root@localhostzookeeper2]
// 即代表集群构建成功了,如果出现错误那应该是第三部时没有启动好集群

//先利用
2、ps aux | grep zookeeper

// 查看是否有相应的进程的,没有话,说明集群启动出现问题,可以在每个服务器上使用

3、./home/wwb/zookeeper/bin/zkServer.sh stop

//再依次使用

4、./home/wwb/zookeeper/binzkServer.sh start

//这时在执行 4 一般是没有问题,如果还是有问题,那么先 stop 再到 bin 的上级目录执行

5、./bin/zkServer.shstart

试试。

注意:zookeeper 集群时,zookeeper 要求半数以上的机器可用,zookeeper 才能提供服务。

kafka集群

(利用上面server1, server2, server3, 下面以 server1 为实例)

1)下载 kafka0.8(http://kafka.apache.org/downloads.html), 保存到服务器 /home/wwb 目录下 kafka-0.8.0-beta1-src.tgz (kafka_2.8.0-0.8.0-beta1.tgz)

2)解压

1
tar -zxvf kafka-0.8.0-beta1-src.tgz

产生文件夹 kafka-0.8.0-beta1-src 更改为 kafka01

3)配置
修改 kafka01/config/.properties, 其中 broker.id, log.dirs, zookeeper.connect 必须根据实际情况进行修改,其他项根据需要自行斟酌。
大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
broker.id=1  
port=9091
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
num.replica.fetchers=2
log.cleanup.interval.mins=10
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false

4)初始化因为 kafka 用 scala 语言编写,因此运行 kafka 需要首先准备 scala 相关环境。

1
2
3
4
> cd kafka01  
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency

在第二个命令时可能需要一定时间,由于要下载更新一些依赖包。所以请大家 耐心点。

5) 启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kafka01
JMX_PORT=9997 bin/kafka--start.sh config/server.properties &

//a) kafka02 操作步骤与 kafka01 雷同,不同的地方如下
修改 kafka02/config/server.properties
broker.id=2
port=9092
##其他配置和kafka-0保持一致
##启动kafka02
JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties &

// kafka03 操作步骤与 kafka01 雷同,不同的地方如下
修改 kafka03/config/server.properties
broker.id=3
port=9093
##其他配置和kafka-0保持一致
##启动 kafka03
JMX_PORT=9999 bin/kafka--start.shconfig/serverconfig/server.properties &

6)创建 topic(包含一个分区,三个副本)

1
bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic

7)查看 topic 情况

1
2
>bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181
topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0

8)创建发送者

1
2
3
4
bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic
my test message1
my test message2
ctrl c

9)创建消费者

1
2
3
4
5
bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic
...
my test message1
my test message2
ctrl c

10) 杀掉 server1 上的 broker

pkill -9 -f config/.properties

11)查看 topic

bin/kafka-list-top.sh --zookeeper192.168.0.1:2181
topic: my-replicated-topic partition: 0 leader: 1 replicas: 1,2,0

发现 topic 还正常的存在

12)创建消费者,看是否能查询到消息

1
2
3
4
5
6
bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
ctrl c
##说明一切都是正常的。
补充说明:
1、public Map<String, List<KafkaStream<byte[], byte[]>>>     
createMessageStreams(Map<String, Integer> topicCountMap),其中该方法的参数Map 的 key 为 topic 名称,value 为 topic 对应的分区数,
譬如说如果在 kafka 中不存在相应的 topic 时,则会创建一个 topic,分区数为 value,如果存在的话,该处的 value 则不起什么作用

2、关于生产者向指定的分区发送数据,通过设置 partitioner.class 的属性来指定向那个分区发送数据,
如果自己指定必须编写相应的程序,默认是 kafka.producer.DefaultPartitioner,分区程序是基于散列的键。

3、在多个消费者读取同一个 topic 的数据,为了保证每个消费者读取数据的唯一性,
必须将这些消费者 group_id 定义为同一个值,这样就构建了一个类似队列的数据结构,如果定义不同,则类似一种广播结构的。

4、在 consumerapi 中,参数到数字部分,类似Map<String,Integer>, numStream, 指的都是在 topic 不存在的时,会创建一个topic,并且分区个数为Integer, numStream, 
注意如果数字大于 broker 的配置中 num.partitions 属性,会以 num.partitions 为依据创建分区个数的。

5、producerapi,调用 send 时,如果不存在 topic,也会创建 topic,在该方法中没有提供分区个数的参数,在这里分区个数是由服务端 broker 的配置中 num.partitions 属性决定的。

参考

http://kafka.apache.org/
https://www.jianshu.com/p/4bf007885116
https://www.cnblogs.com/likehua/p/3999538.html

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2020 KNOWLEDGE IS POWER All Rights Reserved.

访客数 : | 访问量 :