1 简介
Flume是分布式、可靠和可用的,海量日志数据高效收集、聚合和移动系统,用于从多源系统到中心数据存储。
目前有两个代码发布基线:0.9.x和1.x。0.9.x详见Flume 0.9.x Developer Guide
2 概览
(1) 数据流模型
Agent: Jvm进程。其中的组件允许外部事件流流入,并留出到外部目的地。
Event: 数据流单位。携带有字节数组的负载和字符串属性的头部,从Source流向Channel,再流向Sink。
Source:从外部来源消费特定格式的Event,如AvroSource。保存接收到的事件到一个或多个Channel中。
Channel:被动保存Event,直到有Sink消费。如FileChannel使用本地文件系统作为底部存储。
Sink:从Channel中删除Event, 并推送到外部仓库或另一个Source,如HDFSEventSink。
同一Agent中的Source和Sink异步运行。
(2) 可靠性
只有在Event被下一Channel或外部保存后,Sink才会删除当前Channel中的Event。
Flume使用事务保证Event的可靠传递。Channel提供给Source和Sink对于Event的事务处理。在multi-hop flow中,上一Sink和下一Source同时使用事务保证Event被存储到下一Channel中。
3 源码编译
Google Protocol Buffer是一个语言中立、平台中立的用于序列化结构数据的扩展机制。
4 组件开发
(1) 客户端
客户端是Event的来源,并传递给Agent。
当前Flume支持Avro, log4j, syslog,Http POST (with a JSON body)和本地进程输出(ExecSource)格式的外部Source。
其他格式的输入可以通过自定义客户端实现:
格式转换
转换数据格式,在通过已有的Source导入
自定义Source
使用IPC或RPC协议间客户端数据转换为Event。存储在Channel的数据必须是Event。
1) 客户端 SDK
Flume Client SDK允许使用RPC连接Flume并发送数据。
2)RPC客户端接口
Event构造:
- 直接实现Event接口
- 使用简单实现类,如SimpleEvent.class
- 使用EventBuilder.withbody()
Event传递:简单调用Flume Client SDK的append(Event)
或appendBatch(List<Event>)
发送数据,不必关心传递细节。
3) RPC客户端(Avro和Thrift)
版本1.4.0中,Avro是默认RPC协议。
NettyAvroRpcClient和ThriftRpcClient都实现了RpcClient接口。客户端通过传递Agent的IP和port创建对象,并发布Event。
示例:使用Flume Client SDK API更新数据
1 | import org.apache.flume.Event; |
Agent配置:
1 | a1.channels = c1 |
为了灵活性,客户端实现可以采用以下配置:
1 | client.type = default (for avro) or thrift (for thrift) |
4) 安全RPC客户端(Thrift)
版本1.6.0,Thrift的Source和Sink提供基于keberos的身份认证。
要求flume-ng-auth模块在类路径中,kerberos 在flume-ng-auth中
SecureThriftRpcClient - > ThriftRpcClient -> RpcClient
客户端通过SecureRpcClientFactory.getThriftInstance()获取SecureThriftRpcClient 。用于客户端认证的client-principal和client-keytab以属性方式传入。提供server-principal,用于目的ThriftSource认证。
示例:使用SecureRpcClientFactory
1 | import org.apache.flume.Event; |
远程ThriftSource配置中需要开启kerberos模式:
1 | a1.channels = c1 |
5) 故障切换客户端
暂不支持Thrift。
包装默认的Avro RPC Client处理故障切换,提供空格分隔的<host>:<port>
列表作为切换组。当前选中的主机故障后,将自动切换到下一主机。
1 | // Setup properties for the failover |
FailoverRpcClient可使用以下属性配置:
1 | client.type = default_failover |
6) 负载均衡RPC客户端
暂不支持Thirift
通过空格分隔的<host>:<port>
指定负载均衡组。
负载均衡策略:
随机选取
循环
自定义
实现LoadBalancingRpcClient$HostSelector接口,需要指定host-selector
启用backoff,将在一定时间内拉黑故障主机。倒计时结束后,依旧无法响应的,倒计时成指数增长。(为了防止阻塞)
默认maxBackoff为30000ms,由所有负责均衡策略的父类OrderSelector缺省实现。设置上限为65536000ms.(约18.2h)
1 | // Setup properties for the load balancing |
LoadBalancingRpcClient可以使用以下属性:
1 | client.type = default_loadbalance |
(2) 内嵌Agent
Flume提供一种内嵌于应用的轻量Agent。仅支持部分组件,如Source、Channel(只有File和Memory)、Sink(仅Avro)和Interceptor。
注意:内嵌Agent依赖于hadoop-core.jar
必需的属性粗体标注:
使用示例:
1 | Map<String, String> properties = new HashMap<String, String>(); |
(3) 事务接口
事务接口是Flume可靠性的基础。所有的主要组件必须使用事务。
事务在Channel内部实现。与Channel连接的每个Source和Sink必须获取事务对象。Source使用ChannelProcessor管理,Sink显式通过Channel管理。保存Event到Channel或从Channel中获取都是在事务内部完成。示例如下:
1 | Channel ch = new MemoryChannel(); |
(4) Sink
用于从Channel中获取Event,并传递给下一个Agent或外部仓库。
一个Sink只与一个Channel相关。
SinkRunnner示例与每一个配置好的Sink相关。当Flume框架调用SinkRunner.start()时,创建一个线程来驱动Sink(使用SinkRunner.PollingRunner()作为线程的Runnable)。该线程管理Sink的生命周期。Sink需要实现LifeCycleAware接口的start()和stop()接口。Sink.start()负责初始化,Sink.process()负责Event提取和推送的主要流程,Sink.stop()负责清理。Sink也需要实现Configurable接口以处理自身的配置设置。示例如下:
1 | public class MySink extends AbstractSink implements Configurable { |
(5) Source
Source用于从外部接收数据,并保存到响应的Channel中。Source通过其ChannelProcessor串行处理Channel本地事务提交的Event。对于异常,Channel向外传递,所有的Channel回滚事务,除了已经处理完的。
与自定义Sink类似,当Flume框架调用PollableSourceRunner.start()时,创建一个线程来驱动。该线程管理Source的生命周期。PollableSource需要实现LifeCycleAware接口的start()和stop()接口。Sink.start()负责初始化,PollableSource.process()负责检查数据和存储到Channel的Event中。
注意:存在另一种Source类型——EventDrivenSource。与PollableSource不同的是,其必须具有获取数据和存储数据到Channel的Event中的回调机制。并且不是由新建的自身线程驱动。
以下为PollableSource示例:
1 | public class MySource extends AbstractSource implements Configurable, PollableSource { |
(6) Channel
暂无