Flume开发指南

1 简介

Flume是分布式、可靠和可用的,海量日志数据高效收集、聚合和移动系统,用于从多源系统到中心数据存储。

目前有两个代码发布基线:0.9.x和1.x。0.9.x详见Flume 0.9.x Developer Guide

2 概览

(1) 数据流模型

DevGuide_image00

Agent: Jvm进程。其中的组件允许外部事件流流入,并留出到外部目的地。

Event: 数据流单位。携带有字节数组的负载和字符串属性的头部,从Source流向Channel,再流向Sink。

Source:从外部来源消费特定格式的Event,如AvroSource。保存接收到的事件到一个或多个Channel中。

Channel:被动保存Event,直到有Sink消费。如FileChannel使用本地文件系统作为底部存储。

Sink:从Channel中删除Event, 并推送到外部仓库或另一个Source,如HDFSEventSink。

同一Agent中的Source和Sink异步运行。

(2) 可靠性

只有在Event被下一Channel或外部保存后,Sink才会删除当前Channel中的Event。

Flume使用事务保证Event的可靠传递。Channel提供给Source和Sink对于Event的事务处理。在multi-hop flow中,上一Sink和下一Source同时使用事务保证Event被存储到下一Channel中。

3 源码编译

Google Protocol Buffer是一个语言中立、平台中立的用于序列化结构数据的扩展机制。

image-20200529213739305

4 组件开发

(1) 客户端

客户端是Event的来源,并传递给Agent。

当前Flume支持Avro, log4j, syslog,Http POST (with a JSON body)和本地进程输出(ExecSource)格式的外部Source。

其他格式的输入可以通过自定义客户端实现:

  • 格式转换

    转换数据格式,在通过已有的Source导入

  • 自定义Source

    使用IPC或RPC协议间客户端数据转换为Event。存储在Channel的数据必须是Event。

1) 客户端 SDK

Flume Client SDK允许使用RPC连接Flume并发送数据。

2)RPC客户端接口

Event构造:

  • 直接实现Event接口
  • 使用简单实现类,如SimpleEvent.class
  • 使用EventBuilder.withbody()

Event传递:简单调用Flume Client SDK的append(Event)appendBatch(List<Event>)发送数据,不必关心传递细节。

3) RPC客户端(Avro和Thrift)

版本1.4.0中,Avro是默认RPC协议。

NettyAvroRpcClient和ThriftRpcClient都实现了RpcClient接口。客户端通过传递Agent的IP和port创建对象,并发布Event。

示例:使用Flume Client SDK API更新数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

public class MyApp {
public static void main(String[] args) {
MyRpcClientFacade client = new MyRpcClientFacade();
// Initialize client with the remote Flume agent's host and port
client.init("host.example.org", 41414);

// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}

client.cleanUp();
}
}

class MyRpcClientFacade {
private RpcClient client;
private String hostname;
private int port;

public void init(String hostname, int port) {
// Setup the RPC connection
this.hostname = hostname;
this.port = port;
this.client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}

public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = RpcClientFactory.getDefaultInstance(hostname, port);
// Use the following method to create a thrift client (instead of the above line):
// this.client = RpcClientFactory.getThriftInstance(hostname, port);
}
}

public void cleanUp() {
// Close the RPC connection
client.close();
}

}

Agent配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
a1.channels = c1
a1.sources = r1
a1.sinks = k1

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sources.r1.type = avro
# For using a thrift source set the following instead of the above line.
# a1.source.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414

a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

为了灵活性,客户端实现可以采用以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
client.type = default (for avro) or thrift (for thrift)

hosts = h1 # default client accepts only 1 host
# (additional hosts will be ignored)

hosts.h1 = host1.example.org:41414 # host and port must both be specified
# (neither has a default)

batch-size = 100 # Must be >=1 (default: 100)

connect-timeout = 20000 # Must be >=1000 (default: 20000)

request-timeout = 20000 # Must be >=1000 (default: 20000)

4) 安全RPC客户端(Thrift)

版本1.6.0,Thrift的Source和Sink提供基于keberos的身份认证。

要求flume-ng-auth模块在类路径中,kerberos 在flume-ng-auth中

SecureThriftRpcClient - > ThriftRpcClient -> RpcClient

客户端通过SecureRpcClientFactory.getThriftInstance()获取SecureThriftRpcClient 。用于客户端认证的client-principal和client-keytab以属性方式传入。提供server-principal,用于目的ThriftSource认证。

示例:使用SecureRpcClientFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.api.SecureRpcClientFactory;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClient;
import java.nio.charset.Charset;
import java.util.Properties;

public class MyApp {
public static void main(String[] args) {
MySecureRpcClientFacade client = new MySecureRpcClientFacade();
// Initialize client with the remote Flume agent's host, port
Properties props = new Properties();
props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift");
props.setProperty("hosts", "h1");
props.setProperty("hosts.h1", "client.example.org"+":"+ String.valueOf(41414));

// Initialize client with the kerberos authentication related properties
props.setProperty("kerberos", "true");
props.setProperty("client-principal", "flumeclient/client.example.org@EXAMPLE.ORG");
props.setProperty("client-keytab", "/tmp/flumeclient.keytab");
props.setProperty("server-principal", "flume/server.example.org@EXAMPLE.ORG");
client.init(props);

// Send 10 events to the remote Flume agent. That agent should be
// configured to listen with an AvroSource.
String sampleData = "Hello Flume!";
for (int i = 0; i < 10; i++) {
client.sendDataToFlume(sampleData);
}

client.cleanUp();
}
}

class MySecureRpcClientFacade {
private RpcClient client;
private Properties properties;

public void init(Properties properties) {
// Setup the RPC connection
this.properties = properties;
// Create the ThriftSecureRpcClient instance by using SecureRpcClientFactory
this.client = SecureRpcClientFactory.getThriftInstance(properties);
}

public void sendDataToFlume(String data) {
// Create a Flume Event object that encapsulates the sample data
Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

// Send the event
try {
client.append(event);
} catch (EventDeliveryException e) {
// clean up and recreate the client
client.close();
client = null;
client = SecureRpcClientFactory.getThriftInstance(properties);
}
}

public void cleanUp() {
// Close the RPC connection
client.close();
}
}

远程ThriftSource配置中需要开启kerberos模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
a1.channels = c1
a1.sources = r1
a1.sinks = k1

a1.channels.c1.type = memory

a1.sources.r1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 41414
a1.sources.r1.kerberos = true
a1.sources.r1.agent-principal = flume/server.example.org@EXAMPLE.ORG
a1.sources.r1.agent-keytab = /tmp/flume.keytab


a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger

5) 故障切换客户端

暂不支持Thrift。

包装默认的Avro RPC Client处理故障切换,提供空格分隔的<host>:<port>列表作为切换组。当前选中的主机故障后,将自动切换到下一主机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);

FailoverRpcClient可使用以下属性配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
client.type = default_failover

hosts = h1 h2 h3 # at least one is required, but 2 or
# more makes better sense

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

max-attempts = 3 # Must be >=0 (default: number of hosts
# specified, 3 in this case). A '0'
# value doesn't make much sense because
# it will just cause an append call to
# immmediately fail. A '1' value means
# that the failover client will try only
# once to send the Event, and if it
# fails then there will be no failover
# to a second client, so this value
# causes the failover client to
# degenerate into just a default client.
# It makes sense to set this value to at
# least the number of hosts that you
# specified.

batch-size = 100 # Must be >=1 (default: 100)

connect-timeout = 20000 # Must be >=1000 (default: 20000)

request-timeout = 20000 # Must be >=1000 (default: 20000)

6) 负载均衡RPC客户端

暂不支持Thirift

通过空格分隔的<host>:<port>指定负载均衡组。

负载均衡策略:

  • 随机选取

  • 循环

  • 自定义

    实现LoadBalancingRpcClient$HostSelector接口,需要指定host-selector

启用backoff,将在一定时间内拉黑故障主机。倒计时结束后,依旧无法响应的,倒计时成指数增长。(为了防止阻塞)

默认maxBackoff为30000ms,由所有负责均衡策略的父类OrderSelector缺省实现。设置上限为65536000ms.(约18.2h)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
// // selection
props.put("backoff", "true"); // Disabled by default.

props.put("maxBackoff", "10000"); // Defaults 0, which effectively
// becomes 30000 ms

// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);

LoadBalancingRpcClient可以使用以下属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
client.type = default_loadbalance

hosts = h1 h2 h3 # At least 2 hosts are required

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

backoff = false # Specifies whether the client should
# back-off from (i.e. temporarily
# blacklist) a failed host
# (default: false).

maxBackoff = 0 # Max timeout in millis that a will
# remain inactive due to a previous
# failure with that host (default: 0,
# which effectively becomes 30000)

host-selector = round_robin # The host selection strategy used
# when load-balancing among hosts
# (default: round_robin).
# Other values are include "random"
# or the FQCN of a custom class
# that implements
# LoadBalancingRpcClient$HostSelector

batch-size = 100 # Must be >=1 (default: 100)

connect-timeout = 20000 # Must be >=1000 (default: 20000)

request-timeout = 20000 # Must be >=1000 (default: 20000)

(2) 内嵌Agent

Flume提供一种内嵌于应用的轻量Agent。仅支持部分组件,如Source、Channel(只有File和Memory)、Sink(仅Avro)和Interceptor。

注意:内嵌Agent依赖于hadoop-core.jar

必需的属性粗体标注:

image-20200601174413041

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Map<String, String> properties = new HashMap<String, String>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "200");
properties.put("sinks", "sink1 sink2");
properties.put("sink1.type", "avro");
properties.put("sink2.type", "avro");
properties.put("sink1.hostname", "collector1.apache.org");
properties.put("sink1.port", "5564");
properties.put("sink2.hostname", "collector2.apache.org");
properties.put("sink2.port", "5565");
properties.put("processor.type", "load_balance");
properties.put("source.interceptors", "i1");
properties.put("source.interceptors.i1.type", "static");
properties.put("source.interceptors.i1.key", "key1");
properties.put("source.interceptors.i1.value", "value1");

EmbeddedAgent agent = new EmbeddedAgent("myagent");

agent.configure(properties);
agent.start();

List<Event> events = Lists.newArrayList();

events.add(event);
events.add(event);
events.add(event);
events.add(event);

agent.putAll(events);

...

agent.stop();

(3) 事务接口

DevGuide_image01

事务接口是Flume可靠性的基础。所有的主要组件必须使用事务。

事务在Channel内部实现。与Channel连接的每个Source和Sink必须获取事务对象。Source使用ChannelProcessor管理,Sink显式通过Channel管理。保存Event到Channel或从Channel中获取都是在事务内部完成。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do

Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
ch.put(eventToStage);
// Event takenEvent = ch.take();
// ...
txn.commit();
} catch (Throwable t) {
txn.rollback();

// Log exception, handle individual exceptions as needed

// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}

(4) Sink

用于从Channel中获取Event,并传递给下一个Agent或外部仓库。

一个Sink只与一个Channel相关。

SinkRunnner示例与每一个配置好的Sink相关。当Flume框架调用SinkRunner.start()时,创建一个线程来驱动Sink(使用SinkRunner.PollingRunner()作为线程的Runnable)。该线程管理Sink的生命周期。Sink需要实现LifeCycleAware接口的start()和stop()接口。Sink.start()负责初始化,Sink.process()负责Event提取和推送的主要流程,Sink.stop()负责清理。Sink也需要实现Configurable接口以处理自身的配置设置。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class MySink extends AbstractSink implements Configurable {
private String myProp;

@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");

// Process the myProp value (e.g. validation)

// Store myProp for later retrieval by process() method
this.myProp = myProp;
}

@Override
public void start() {
// Initialize the connection to the external repository (e.g. HDFS) that
// this Sink will forward Events to ..
}

@Override
public void stop () {
// Disconnect from the external respository and do any
// additional cleanup (e.g. releasing resources or nulling-out
// field values) ..
}

@Override
public Status process() throws EventDeliveryException {
Status status = null;

// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do

Event event = ch.take();

// Send the Event to the external repository.
// storeSomeData(e);

txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();

// Log exception, handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
return status;
}
}

(5) Source

Source用于从外部接收数据,并保存到响应的Channel中。Source通过其ChannelProcessor串行处理Channel本地事务提交的Event。对于异常,Channel向外传递,所有的Channel回滚事务,除了已经处理完的。

与自定义Sink类似,当Flume框架调用PollableSourceRunner.start()时,创建一个线程来驱动。该线程管理Source的生命周期。PollableSource需要实现LifeCycleAware接口的start()和stop()接口。Sink.start()负责初始化,PollableSource.process()负责检查数据和存储到Channel的Event中。

注意:存在另一种Source类型——EventDrivenSource。与PollableSource不同的是,其必须具有获取数据和存储数据到Channel的Event中的回调机制。并且不是由新建的自身线程驱动。

以下为PollableSource示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class MySource extends AbstractSource implements Configurable, PollableSource {
private String myProp;

@Override
public void configure(Context context) {
String myProp = context.getString("myProp", "defaultValue");

// Process the myProp value (e.g. validation, convert to another type, ...)

// Store myProp for later retrieval by process() method
this.myProp = myProp;
}

@Override
public void start() {
// Initialize the connection to the external client
}

@Override
public void stop () {
// Disconnect from external client and do any additional cleanup
// (e.g. releasing resources or nulling-out field values) ..
}

@Override
public Status process() throws EventDeliveryException {
Status status = null;

try {
// This try clause includes whatever Channel/Event operations you want to do

// Receive new data
Event e = getSomeData();

// Store the Event into this Source's associated Channel(s)
getChannelProcessor().processEvent(e);

status = Status.READY;
} catch (Throwable t) {
// Log exception, handle individual exceptions as needed

status = Status.BACKOFF;

// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
return status;
}
}

(6) Channel

暂无

参考资料

Flume 1.9.0 Developer Guide