文章

Spark Tuning & Configuration

Spark调优主要是调整spark的配置和调整spark代码的执行逻辑,不过主要还是调配置,所以这里将配置和调优放在一起。当然由于spark过于庞大(hadoop、kafka也都这样)一次性梳理配置是很让人懵逼的一件事,况且一开始显然也不能理解spark的所有配置。根据平时的需求,将需要用到的学到的配置总结在这里,长期更新。

  1. partition
  2. memory
  3. scheduling
    1. data locality

partition

spark.sql.shuffle.partitions:spark SQL shuffle后的分区数,仅对spark SQL有效(DataFrame)。默认200。

spark.default.parallelism:仅处理RDD时生效,RDD转换(shuffle、parallelize)之后的分区数。默认在不同情况下取值条件不同,对于shuffle(reduceByKey、join)操作,是shuffle前的父RDD的最大partition值。

所以RDD shuffle之后,partition只多不少呗。如果导致partition过多、task过多,建议设个更小的值。官方建议2-3倍的cpu cores。

  • https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism
  • https://www.cnblogs.com/wrencai/p/4231966.html
  • https://www.cnblogs.com/itboys/p/10960614.html
  • https://stackoverflow.com/questions/45704156/what-is-the-difference-between-spark-sql-shuffle-partitions-and-spark-default-pa

另外,由于一个partition一个task,如果cores大于partitions,会有很多cores闲置下来。

  • https://stackoverflow.com/a/52414244/7676237

memory

Java对象在内存中占得空间大概是raw data field内容的2-5倍,因为:

  • object header:16 bytes左右,包含指向对象的class的指针;
  • String在jvm里使用UTF-16编码,所以一个char就是2 byte;
  • 指针,比如对象里对其他对象的引用,一个指针8byte。像HashMap、LinkedList等有很多指针;
  • primitive类型经常被存储为boxed类型,比如Integer;

spark的memory用于两处:执行代码、存储(cache)。二者共享一块内存区域。

spark的内存首先分为两部分:Execution + Storage,默认占60%(spark.memory.fraction=0.6),剩下的用于user data structure、internal metadata in spark等等。

Storage默认占用该区域的一半(spark.memory.storageFraction=0.5),且如果Execution不需要那么多内存,Storage最多能全占了。当Execution需要内存的时候,可以evict Storage,但Storage不能低于0.5。所以Storage站0.5-1.0,Execution最多占0.5。

spark.memory.fraction

spark.memory.storageFraction

如果cache rdd需要的内存过大,但又不想放在磁盘上,建议使用MEMORY_ONLY_SER,使用Kryo序列化,体积会比Java序列化小很多,速度也快很多。

  • http://spark.apache.org/docs/latest/tuning.html

scheduling

data locality

当数据和处理数据的代码不在同一个node上,要么将data发到code所在的node,要么将code发到data所在的node。由于spark从hdfs读的都是大数据,所以将代码发到数据所在的节点更好一些。

按照数据的本地化级别,分为五类:

  • PROCESS_LOCAL:数据和task在同一个进程(同一个jvm)中。比如数据已经cache在了该node中;
  • NODE_LOCAL:数据就在启动task的node上。可以是在该node的磁盘上,也可以是cache在该node的另一个process中
  • NO_PREF:数据从哪里访问都一样,不需要位置优先。比如数据库的数据
  • RACK_LOCAL:数据和启动task的node在同一机架上;
  • ANY:不在同一机架上;

spark默认用满足最高级别的data locality的node去启动task,但是如果有需要处理的数据的executor都在忙,此时没法直接在该executor上启动task。spark会等一下这个executor,如果还不空闲,再使用次级data locality在其他executor上启动task。

spark.locality.wait:启动一个数据本地化的任务时,在退而求其次,在一个不那么本地的节点上启动任务之前,所等待的时间。默认是3s。

spark启动任务时会先按照最高级别数据本地化分配node,以求获得最高的数据读取速度,如果等待spark.locality.wait还没有等到可用的node,降为下一级别请求node发起任务,分别用spark.locality.wait.processspark.locality.wait.nodespark.locality.wait.rack作为超时时间,他们默认都是spark.locality.wait

存在的问题:如果一个executor执行了远多于其他executor的任务,该executor的task的Locality Level都是PROCESS_LOCAL级别,且其他executor闲置、该executor的结束时间影响了整体的时间,说明spark.locality.wait设置的不太合理(是不是手动调大了),过于强调数据本地化,导致task分布不均。这时候可以调小spark.locality.wait(甚至为0)。

  • https://stackoverflow.com/a/27019275/7676237
本文由作者按照 CC BY 4.0 进行授权