Spark作业调度

1 应用间调度

(1) 静态资源分配

分配给应用固定量资源,全程占用,直到应用完成并释放。

用于standalone、YARN和粗粒度Mesos。

  • standalone

    默认按照提交顺序执行(FIFO)

    核心数:spark.cores.max或spark.deploy.defaultCores

    内存:spark.executor.memory

  • Mesos

    静态分配需要启用spark.mesos.coarse

    核心数:spark.cores.max

    内存:spark.executor.memory

  • YARN

    详见YARN Spark Properties

    执行器数量:–num-executor或spark.executor.instances

    每个执行器核心数:–executor-cores或spark.executor.cores

    每个执行器内存:–executor-memory或spark.executor.memory

注意:

(1) Mesos提供了一种共享CPU的模式。

使用mesos:// URL并关闭spark.mesos.coarse。

适用于shell会话的大量轻量应用。

缺点是延迟难以预测,因为应用获取核心需要时间。

(2) 当前没有模式提供应用间共享内存的实现

(2) 动态资源分配

动态资源分配按照负载调整应用资源。

默认关闭,可用于所有粗粒度模式,如standalone、YARN和粗粒度Mesos。

1) 配置

1‘ 启用动态分配

spark.dynamicAllocation.enabled

2’ 启用external shuffle service

用于移除执行器时,不删除其写出的交换文件

  • 每个工作节点上启用external shuffle service

    • standalone

      仅需启用spark.shuffle.service.enabled

    • 粗粒度Mesos

      开启spark.shuffle.service.enabled

      运行脚本$SPARK_HOME/sbin/start-mesos-shuffle-service.sh

    • YARN

      image-20200901015413997

    更多配置详见configurations page

  • 开启spark.shuffle.service.enabled

2) 分配策略

难以预测执行器的未来行为,需要使用策略探索。

1’ 请求策略

Spark循环判断执行器请求。

经过spark.dynamicAllocation.schedulerBacklogTimeout后有等待执行的任务,则触发分配。

再经过spark.dynamicAllocation.sustainedSchedulerBacklogTimeout仍然有等待的任务,则再次触发。

每次触发以指数式增长,1, 2, 4, 8…

指数式增长的含义:

  • 开始时应请求尽量少的资源,如同TCP的慢启动
  • 应用能随着时间提升资源使用量
2‘ 移除策略

空闲时间经过spark.dynamicAllocation.executorIdleTimeout后移除。

3) 执行器退出

为了后续能使用退出的执行器状态数据,执行器退出前需要保存状态数据。

保存状态数据对于数据交换尤为重要。shuffle阶段,执行器首先将自身map解雇输出到本地磁盘,再向其他执行器提供数据服务。当执行器因为异常,如运行时间过长移除,获取移除的执行器状态需要重算。

external shuffle service以独立于应用和执行器的长期进程形式在每个节点上运行。可以提供移除的执行器状态数据。

除了写出交换文件外,执行器还会缓存数据。当执行器移除后,其缓存数据也不可用了。可以设置spark.dynamicAllocation.cachedExecutorIdleTimeout使缓存数据的执行器不被移除。后续版本可能像external shuffle service一样,将缓存数据保存到堆外空间中。

2 应用内调度

可以在同一应用中,通过不同的线程提交多个并行作业。调度器是线程安全的。

Spark默认按照FIFO调度作业。排在前的作业优先使用集群资源,如有剩余才分配给后续作业。因此当前前面的作业需要大量资源时,后续作业将明显延后。

版本>=0.8,Spark提供公平调度机制。所有作业获得几乎相等的资源,不用等待长时间作业完成,适合多用户场景。

1
2
3
4
# 启用公平调度机制
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

(1) 公平调度池

公平调度池用于聚合具有相同优先级的作业。可用于分配不同的优先级和多个用户的多个作业提交。类似Hadoop Fair Scheduler

默认使用默认调度池。可以通过设置本地属性spark.scheduler.pool添加调度池,在该线程中提交的作业分配到该调度池中。

1
2
3
4
5
6
// 添加调度池
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

// 移除调度池
sc.setLocalProperty("spark.scheduler.pool", null)

(2) 默认行为

默认调度池间使用公平策略,调度池内使用FIFO。

(3) 配置

调度池有三个属性进行配置:

  • 调度模式:FIFO(默认)或FAIR
  • 权重:默认1
  • 最小占比:默认0

模板详见conf/fairscheduler.xml.template文件,可以通过类路径中的fairscheduler.xml或 SparkConf中配置spark.scheduler.allocation.file

1
conf.set("spark.scheduler.allocation.file", "/path/to/file")
1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>

参考资料

Job Scheduling