Kafka

1 定义

一种分布式流式处理平台。

主要功能:

  • 发布订阅记录流,类似消息队列或企业消息系统。
  • 容错且持久地保存记录流。
  • 即时处理记录流。

主要应用:

  • 构建系统或应用间可靠的即时流式数据管道。
  • 构建处理流式数据的即时应用。

2 快速入门

注意:以下文档适用于2.5.0与2.0.x

(1) 下载并解压

(2) 启动服务器

Kafka依赖ZooKeeper。可以使用打包的快捷方式启动单节点实例。

1
2
3
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

启动Kafka

1
2
3
4
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

(3) 创建主题

创建一个单副本的名为“test”的主题

1
2
3
4
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

# 2.0.x
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看主题列表

1
2
3
4
5
6
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

# 2.0.x
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

此外,可以配置Broker在发布不存在的主题时自动创建。

(4) 启动生产者并发送消息

可在命令行中输入文件或默认行分隔的消息。

启动消费者并发送消息

1
2
3
4
5
6
7
8
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
This is a message
This is another message

# 2.0.x
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

(5) 启动消费者

命令行消费者将输出消息到标准输出中。

1
2
3
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

注意:不带参数的启动命令将展示帮助信息

(6) 构建多中介集群

1) 为每个中介复制一份配置

1
2
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

2) 编辑配置如下:

1
2
3
4
5
6
7
8
9
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2

id是集群中各节点唯一且永久的名称。仅当在同一节点运行多中介时,修改接口和日志目录。

3) 启动ZooKeepper和Kafka

1
2
3
4
5
6
7
8
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

# 停止服务
bin/kafka-server-stop.sh config/server-1.properties &
...

4) 创建一个3副本因子的主题

1
2
3
4
5
6
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic

# 2.0.x
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
# 删除主题
bin/kafka-topics.sh --delete --zookeeper slave1:2181 --topic my-replicated-topic

查看各中介状态:

1
2
3
4
5
6
7
8
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

# 2.0.x
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
  • 首行给出所有分区概览。一行表示一个分区。
  • leader是负责特定分区所有读写的节点。每个节点会随机负责一个分区。
  • replicas是具有当前分区的节点列表,不管是否是leader或存活
  • isr是当前能被leader捕获的活跃节点列表

5) 发布消息

1
2
3
4
5
6
7
8
9
10
11
12
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

# 2.0.x
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

6) 消费消息

1
2
3
4
5
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

7) leader容错

杀死进程

1
2
3
> ps aux | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

查看细节

1
2
3
4
5
6
7
8
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

# 2.0.x
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0

leader被替换为2,并且isr中移除1。消息能够正常消费。

(7) 使用Kafka Connect导入导出数据

Kafka Connect是一个扩展工具,使用自定义逻辑的connectors与外部系统交互。

以下展示从外部文件导入和导出到外部文件。

1) 生成数据

1
> echo -e "foo\nbar" > test.txt

2) 启动connectors

以单机模式启动两个connector。

提供3个配置文件作为参数。

  • 文件一配置Kafka Connect进程。包含中介、序列化格式等通用配置。

  • 文件二、三分别配置一个connector。包含唯一名称、初始化类和其他配置。

1
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

使用默认配置启动。一旦Kafka Connect启动,其中一个connector从test.txt读入消息并生产到主题connect-test,另一个从connect-test主题消费并写出到文件test.sink.txt。

数据存储在主题中,可以通过主题查看数据:

1
2
3
4
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

注意:此时向输入文件追加数据,将持续读入输出到主题和输出文件中。

(8) 使用Kafka Stream处理数据

Kafka Stream是用于构建关键任务实时应用和微服务的客户端库。

结合客户端编写和部署标准Java/Scala应用的简单性,以及服务器端Kafka技术优势,使应用更加可扩展、可变、容错和分布式。

详见入门示例

注意:Kafka2.0.x对应的最高ZooKeeper版本是3.4.9.

参考资料

Introduction

Quickstart

Kafka 2.0 Documentation

Spark Streaming + Kafka Integration Guide