Spark运作基本原理和验证

本文介绍Spark的运作原理,并通过在一个实际的Spark运行环境中对数据集进行操作,从而验证这些原理。

Job: 每个action对应一个job,常见的action有例如count(), max(), foreach()等等。

Stage: 每个job由一个或多个stage组成,产生shuffle的操作会形成新的stage。

Task: 对应partition。

  • 关于RDD的几个概念:

RDD: 弹性分布式数据集(Resilient Distributed Datasets),是Spark里做数据分析的主要数据类型。

Transformation: 作用于RDD的变换操作,这些操作的输入和输出都是RDD类型,例如map(), filter(), union(), groupByKey()。transformation是lazy的,直到遇到action才会被实际执行。

Action: 作用域RDD的动作,这些操作的输入是RDD类型,输出不是RDD类型,例如reduce(), count(), first()。

Spark Components

//TODO

Partition数量:

默认情况下spark为每个block创建一个partition,例如在 blocksize=64MB的hdfs环境里,一个500MB的文件将对应8个partition。

Executor数量:

1、dynamicAllocation开启的情况(spark2.0 CDH版默认是开启的):

Spark会动态调整executor数量,以下参数可进一步配置动态调整的行为:

spark.dynamicAllocation.minExecutors,最小executor数量

spark.dynamicAllocation.maxExecutors,最大executor数量

2、dynamicAllocation关闭的情况(关闭方法如下):

spark-submit --conf "spark.dynamicAllocation.enabled=false"

通常在spark-submit时使用–num-executors(对应SparkConf里的spark.executor.instances参数)来指定Application使用的executors数量,而–executor-memory和–executor-cores分别用来指定每个executor所使用的内存和虚拟CPU核数。

静态分配的情况下,spark会为application预先分配指定数量的executor,而每个executor所在的host也就固定了,且这些executor会集中在一台host上,这种情况对是不利于locality的。除非指定的executor数量比较多,例如大于等于集群总共的cores,这时大部分host上都会有executor。(有待进一步验证)

SparkConf里的spark.deploy.spreadOut参数,控制了Spark在worker上分配executor的策略,缺省为true,这时会尽量在每个worker上平均分配executor,以便达到比较好的locality。“the Standalone cluster manager works by spreading out each application across the maximum number of executors by default” (注,似乎只对spark自带cluster manager有效果)

“In spark 2.0 and onward, you can ensure an executor on every node by setting the number of executor cores and the spark.executor.cores argument. Basically if the ratio of the number of cores to the number of cores per executor is greater than the number of nodes in your cluster, you should get at least 1 executor on each node.”

Spark选择哪个executor执行一个task,见https://spark.apache.org/docs/1.2.0/tuning.html#data-locality

但实验发现,在core总数为500左右的集群上,指定executor数量为1000,实际分配的executor却为26个。(原因有待进一步验证)

参考链接: