介绍

Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。项目是用Scala进行编写。

目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLib、SparkR等子项目,Spark是基于内存计算的大数据并行计算框架。除了扩展了广泛使用的 MapReduce 计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。Spark 适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理。通过在一个统一的框架下支持这些不同的计算,Spark 使我们可以简单而低耗地把各种处理流程整合在一起。而这样的组合,在实际的数据分析 过程中是很有意义的。不仅如此,Spark 的这种特性还大大减轻了原先需要对各种平台分 别管理的负担。

当今玩大数据,如果不知道或者不懂Spark的话,那么很容易会失去交流的机会。尤其是研发的小伙伴们。

命令

提交任务

1
2
3
4
5
6
7
8
9
# Run on a YARN cluster
spark-submit \
--deploy-mode cluster \
--class com.soyuan.deltaLake.html.WinDws \
--master yarn \
--executor-memory 2G \
--num-executors 2 \
delta-lake-1.0-SNAPSHOT-jar-with-dependencies.jar \
1000 \

参数介绍

参数 说明
class 作业的主类。
master Local[*],spark://cdh02:7077,yarn
deploy-mode client模式表示作业的AM会放在Master节点上运行。如果设置此参数,需要指定Master为yarn。
cluster模式表示AM会随机的在Worker节点中的任意一台上启动运行。如果设置此参数,需要指定Master为yarn。
driver-memory Driver使用的内存,不可超过单机的总内存。
num-executors 创建Executor的个数。
executor-memory 各个Executor使用的最大内存,不可以超过单机的最大可使用内存。
executor-cores 各个Executor使用的并发线程数目,即每个Executor最大可并发执行的Task数目。

资源计算

参考例子:

总资源:8核16 GiB(Worker)x 10 + 8核16 GiB(Master)

注意 由于作业提交的时候资源只计算CPU和内存,所以这里磁盘的大小并未计算到总资源中。

Yarn可分配总资源:12核12.8 GiB(Worker)x 10

注意 默认情况下,Yarn可分配核 = 机器核 x 1.5,Yarn可分配内存 = 机器内存 x 0.8。

1
2
3
4
5
6
7
8
9
--class org.apache.spark.examples.SparkPi 
--master yarn 
--deploy-mode client 
--driver-memory 4g 
--num-executors 2 
--executor-memory 2g 
--executor-cores 2 
/opt/apps/spark-1.6.0-bin-hadoop2.6/lib/spark-examples*.jar 
10

在不同模式、不同的设置下运行时,作业使用的资源情况如下表所示:

  • yarn-client模式的资源计算
节点 资源类型 资源量
master core 1
mem driver-memroy = 4
worker core num-executors * executor-cores = 4
mem num-executors * executor-memory = 4
  • 作业主程序(Driver程序)会在Master节点上执行。按照作业配置将分配4 GB(由—driver-memroy指定)的内存给它(当然实际上可能没有用到)。
  • 会在Worker节点上起2个(由—num-executors指定)Executor,每一个Executor最大能分配2 GB(由—executor-memory指定)的内存,并最大支持2个(由—executor-cores指定)Task的并发执行。
  • yarn-cluster模式的资源计算
节点 资源类型 资源量(结果使用上面的例子计算得到)
master - 一个很小的Client程序,负责同步job信息,占用很小。
worker core num-executors * executor-cores+spark.driver.cores = 5
mem num-executors * executor-memory + driver-memroy = 8

说明 spark.driver.cores默认是1。

资源使用的优化

  • yarn-client模式

    如果您的大作业,使用yarn-client模式,想要多用一些这个集群的资源,请参见如下配置。

    1
    2
    3
    4
    5
    
    --master yarn-client 
    --driver-memory 5g 
    –-num-executors 20 
    --executor-memory 4g 
    --executor-cores 4
    

    注意

    • Spark在分配内存时,会在用户设定的内存值上溢出375 MB或7%(取大值)。
    • Yarn分配Container内存时,遵循向上取整的原则,这里也就是需要满足1 GB的整数倍。

    按照上述的资源计算公式:

    • Master的资源量为:
      • core:1
      • mem:6 GiB(5 GiB + 375 MB向上取整为6 GiB)
    • Worker的资源量为:
      • core:20*4 = 80
      • mem:20*5 GiB (4 GiB + 375 MB向上取整为5 GiB)= 100 GiB

    可以看到总的资源没有超过集群的总资源,那么遵循这个原则,您还可以有很多种配置,例如:

    • 1
      
      --master yarn-client --driver-memory 5g --num-executors 40 --executor-memory 1g --executor-cores 2
      
    • 1
      
      --master yarn-client --driver-memory 5g --num-executors 15 --executor-memory 4g --executor-cores 4
      
    • 1
      
      --master yarn-client --driver-memory 5g --num-executors 10 --executor-memory 9g --executor-cores 6
      

    原则上,按照上述的公式计算出来的需要资源不超过集群的最大资源量就可以,但在实际场景中,因为系统、HDFS以及E-MapReduce的服务会需要使用Core和Mem资源,如果把Core和Mem都占用完了,反而会导致性能的下降,甚至无法运行。

    executor-cores数通常也都会被设置成和集群的可使用核一致,因为如果设置的太多,CPU会频繁切换,性能并不会提高。

  • yarn-cluster模式

    当使用yarn-cluster模式后,Driver程序会被放到Worker节点上。会占用到Worker资源池里的资源,这时若想要多用一些这个集群的资源,请参见如下配置。

    1
    
    --master yarn-cluster --driver-memory 5g --num-executors 15 --executor-memory 4g --executor-cores 4
    

配置建议

  • 如果将内存设置的很大,要注意GC所产生的消耗。通常推荐每一个Executor的内存<=64 GB。
  • 如果是进行HDFS读写的作业,建议每个Executor中使用<=5个并发来读写。
  • 如果是进行OSS读写的作业,建议是将Executor分布在不同的ECS上,这样可以将每一个ECS的带宽都用上。例如,有10台ECS,那么就可以配置num-executors=10,并设置合理的内存和并发。
  • 如果作业中使用了非线程安全的代码,则在设置executor-cores的时候需要注意多并发是否会造成作业的不正常。如果会造成作业不正常,推荐设置executor-cores=1。

SparkSQL

  • 创建表使用ORC文件格式需要注意使用

    1
    2
    3
    4
    
    #hive和sparksql都可以访问文件
    create table data_zhaobiao.suzhou_ods
    stored as orc
    as select * from temp_suzhou
    

FAQ

Unrecognized Hadoop major version number: 3.0.0-cdh6.3.2

1
2
3
4
5
6
7
8
9
#注释pom的一些引用
#Hadoop是一个开源产品,基于Hadoop的商业版本有几个CDH,HDP,MapR。我开发的时候是基于开源的Hadoop。而实际测试的时候使用的是CDH版
#本,导致Hadoop和Hive相关maven依赖不一致,所以报错。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

ORC WriterVersion gets ArrayIndexOutOfBoundsException on newer ORC files(Hive2.1.1)

1
2
3
4
5
6
7
#官方解决方案说明:https://spark.apache.org/docs/2.4.0/sql-data-sources-orc.html
#如果设置`hive` 用的是Hive 1.2.1orc的库文件
#例如:
create table data_zhaobiao.suzhou_ods
stored as orc
#spark2.4读取Hive2.1.1的时候会报错,这个时候需要配置说是用1.2.1orc的库文件可以解决,用于读取老版本的

参考

Spark-Submit参数设置说明: https://www.alibabacloud.com/help/zh/doc-detail/28124.htm

总结

持续更新中