Spark + Flume

注意:spark 2.3.0将flume支持标记为弃用

Spark Streaming有推和拉两种方式从Flume获取数据

1 推

Flume可在agents间推送数据。Spark Streaming建立一个Avro agent用于接收数据。

(1) 要求

  • Flume+Spark Streaming运行中,并且是一个工作节点。
  • Flume可以推送数据到该主机端口。

(2) Flume配置

配置一个Avro Sink,详见Flume’s documentation

1
2
3
4
5
agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>

(3) 应用配置

1) 链接

在项目定义中引入依赖项,详见Linking section

1
2
3
groupId = org.apache.spark
artifactId = spark-streaming-flume_2.11
version = 2.3.0

2) 编程

引入FlumeUtils并创建输入DStream,详见API docs

1
2
3
import org.apache.spark.streaming.flume._

val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])

注意:主机名应该与集群资源管理器使用的主机名一致,以便资源分配能匹配名称并在正确的机器上接收。

3) 部署

对于Scala/Java应用,将spark-streaming-flume_2.11及其依赖打包到JAR中。由于安装目录已有,确保spark-core_2.11spark-streaming_2.11标记为provided。详见Deploying section

对于Python应用,直接在提交命令中将spark-streaming-flume_2.11及其依赖通过--packages添加。详见Application Submission Guide

1
./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.3.0 ...

此外,可以下载spark-streaming-flume-assembly,并通过--jars添加。

2 拉

相比Flume直接推送数据到Spark Streaming,拉取方式通过以下特性运行自定义sink。

  • Flume推送数据,然后数据被缓存。
  • Spark Streaming使用可靠的Flume接收器和事务从sink中拉取数据。仅当数据被接收并备份后事务才成功。

拉取方式提供了一种更为可靠和[容错保障][fault-tolerance guarantees ],但同时要求Flume运行一个自定义的sink。

(1) 要求

  • 选择一台运行自定义sink的机器,其他Flume管道将数据发送至该agent。
  • 集群能够访问这台机器。

(2) Flume配置

1) sink JARS

添加一下JAR到Flume路径,详见Flume’s documentation

1
2
3
4
5
6
7
8
9
10
11
groupId = org.apache.spark
artifactId = spark-streaming-flume-sink_2.11
version = 2.3.0

groupId = org.scala-lang
artifactId = scala-library
version = 2.11.8

groupId = org.apache.commons
artifactId = commons-lang3
version = 3.5

2) 配置文件

1
2
3
4
5
agent.sinks = spark
agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent.sinks.spark.hostname = <hostname of the local machine>
agent.sinks.spark.port = <port to listen on for connection from Spark>
agent.sinks.spark.channel = memoryChannel

同时确保上游管道发送数据到该agent。

(3) 应用配置

1) 链接

在SBT/Maven定义中链接应用到spark-streaming-flume_2.11

2) 编程

引入FlumeUtils并创建输入DStream

1
2
3
import org.apache.spark.streaming.flume._

val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

注意:

  • 创建的Stream与推送方式不同
  • 每个DStream可从多个sink接收数据

3) 部署

与推送方式相同

理解:应用配置基本与推送方式相同

参考资料