Spark入门

在Spark 2.0之后,主要的编程对象是Dataset。具有类似RDD的强类型特性,提供更加丰富的操作和更好的性能。详见SQL programming guide

1 安全

默认关闭安全设置,详见Spark Security

2 Spark Shell 交互分析

(1) 基础

启动spark-shell

1
./bin/spark-shell

Dataset API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1 构建Dataset:可从Hadoop输入格式或其他Dataset转换。如读取源码目录下的README文件。
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

// 2 操作Dataset
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

// 3 转换Dataset
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

// 连接操作动作和转换
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

(2) 数据集操作

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 统计单行最大单词数
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

// map()和reduce()是Scala的闭包,可以使用语言特性或者Scala/Java库(需要先导入)
scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

// 统计单词频次
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

(3) 缓存

可以在集群范围内缓存海量数据,如热点数据集或者迭代算法中的数据等.

1
2
3
4
5
6
7
8
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

3 自包含应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

// 由于Scala.App的子类可能不能正常使用,建议使用主函数
object SimpleApp {
def main(args: Array[String]) {
// 需要替换YOUR_SPARK_HOME为适当的目录
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}

sbt配置文件如下:

1
2
3
4
5
6
7
name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"

项目结构、打包和提交命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

4 学习指引

API : [RDD programming guide ]RDD programming guide SQL programming guide

集群部署:deployment overview

参考资料

Quick Start