1 定义
一种分布式流式处理平台。
主要功能:
- 发布订阅记录流,类似消息队列或企业消息系统。
- 容错且持久地保存记录流。
- 即时处理记录流。
主要应用:
- 构建系统或应用间可靠的即时流式数据管道。
- 构建处理流式数据的即时应用。
2 快速入门
注意:以下文档适用于2.5.0与2.0.x
(1) 下载并解压
(2) 启动服务器
Kafka依赖ZooKeeper。可以使用打包的快捷方式启动单节点实例。
1 | bin/zookeeper-server-start.sh config/zookeeper.properties |
启动Kafka
1 | bin/kafka-server-start.sh config/server.properties |
(3) 创建主题
创建一个单副本的名为“test”的主题
1 | bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test |
查看主题列表
1 | bin/kafka-topics.sh --list --bootstrap-server localhost:9092 |
此外,可以配置Broker在发布不存在的主题时自动创建。
(4) 启动生产者并发送消息
可在命令行中输入文件或默认行分隔的消息。
启动消费者并发送消息
1 | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test |
(5) 启动消费者
命令行消费者将输出消息到标准输出中。
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
注意:不带参数的启动命令将展示帮助信息
(6) 构建多中介集群
1) 为每个中介复制一份配置
1 | cp config/server.properties config/server-1.properties |
2) 编辑配置如下:
1 | config/server-1.properties: |
id是集群中各节点唯一且永久的名称。仅当在同一节点运行多中介时,修改接口和日志目录。
3) 启动ZooKeepper和Kafka
1 | > bin/kafka-server-start.sh config/server-1.properties & |
4) 创建一个3副本因子的主题
1 | bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic |
查看各中介状态:
1 | bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic |
- 首行给出所有分区概览。一行表示一个分区。
- leader是负责特定分区所有读写的节点。每个节点会随机负责一个分区。
- replicas是具有当前分区的节点列表,不管是否是leader或存活
- isr是当前能被leader捕获的活跃节点列表
5) 发布消息
1 | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic |
6) 消费消息
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic |
7) leader容错
杀死进程
1 | ps aux | grep server-1.properties |
查看细节
1 | bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic |
leader被替换为2,并且isr中移除1。消息能够正常消费。
(7) 使用Kafka Connect导入导出数据
Kafka Connect是一个扩展工具,使用自定义逻辑的connectors与外部系统交互。
以下展示从外部文件导入和导出到外部文件。
1) 生成数据
1 | > echo -e "foo\nbar" > test.txt |
2) 启动connectors
以单机模式启动两个connector。
提供3个配置文件作为参数。
文件一配置Kafka Connect进程。包含中介、序列化格式等通用配置。
文件二、三分别配置一个connector。包含唯一名称、初始化类和其他配置。
1 | > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties |
使用默认配置启动。一旦Kafka Connect启动,其中一个connector从test.txt读入消息并生产到主题connect-test,另一个从connect-test主题消费并写出到文件test.sink.txt。
数据存储在主题中,可以通过主题查看数据:
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning |
注意:此时向输入文件追加数据,将持续读入输出到主题和输出文件中。
(8) 使用Kafka Stream处理数据
Kafka Stream是用于构建关键任务实时应用和微服务的客户端库。
结合客户端编写和部署标准Java/Scala应用的简单性,以及服务器端Kafka技术优势,使应用更加可扩展、可变、容错和分布式。
详见入门示例
注意:Kafka2.0.x对应的最高ZooKeeper版本是3.4.9.