Spark性能调优经验总结


常规性能调优

资源配置方面

方式 原理
–num-executors 通过提高task并行度提高效率
–executor-cores 通过增加每个executor的核数,提高平行度(虚拟核)
–executor-memory 通过增加每个executor内存量,减少落盘次数

RDD优化

  1. RDD的复用。
    1. 优化计算框架,避免重复计算
  2. filter
    1. 尽可能提前filter,减少数据量
    2. filter配合coalesce使用
  3. 持久化
    1. 使用序列化的方式持久化,减少内存占用避免频繁GC
      1. persist(StorageLevel.MEMORY_ONLY_SER)序列化可减少内存占用
  4. 并行度调节
  5. 广播大变量
  6. coalesce
  7. mapPartitions/
  8. foreachPartition
  9. readuceByKey的预聚合

调节本地化等待时长

调节map端缓冲区大小

调节reduce端map缓冲区大小

调节reduce端拉取数据重试次数

调节reduce端拉取数据的等待间隔时间

调节 SortShuffle 排序操作阈值

JVM调优

调节 Executor 堆外内存

Executor 堆外内存的配置需要在 spark-submit 脚本里配置:

--conf spark.yarn.executor.memoryOverhead=2048

以上参数配置完成后,会避免掉某些 JVM OOM 的异常问题,同时,可以提升整体 Spark作业的性能。

此内存不够用的时候常见报错有一下几种:

  1. shuffle output file cannot find
  2. executor lost
  3. task lost
  4. out of memory

调节链接等待时长

--conf spark.core.connection.ack.wait.timeout=350

如果 task 在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这会导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark 的 Executor 进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。


数据倾斜问题的优化

  1. 聚合数据,避免数据倾斜(原始数据阶段,在hive表里就处理好)
  2. 过滤倾斜数据(NULL key),最后在union上
  3. 设置reduce端并行度(本质并没有解决倾斜问题,只是缓解了)
    1. 对于大部分shuffle算子,都可以设置并行度例如readuceByKey(500),通过提高reduce端的task数量来减少每个task被分配的计算量。
    2. sparkSQL通过spark.sql.shuffle.partitions设定。
  4. 针对reduceByKey和groupByKey的处理
    1. 将key添加随机数
    2. 然后shuffle
    3. 完成后再取消随机数
    4. 再shuffle聚合。
  5. 小表join大表
    1. 小表作为广播变量与大表join
  6. 大表join大表
    1. 单Key造成数据倾斜
      1. sample采样,得到导致倾斜的key
      2. 将这个key的数据与其他数据分别提取出来作为单独的RDD
      3. 同时与第三方RDD进行join
      4. 将最终结果进行合并union
    2. 多个Key照成的数据倾斜
      1. sample采样,得到导致倾斜的keys
      2. 将这些Keys提取出来作为单独的RDD(keysRDD,otherRDD)
      3. 将需要join的RDD里的keys也提取出来(joinKyesRDD,joinOtherRDD)
      4. 将KyesRDD里的key使用map添加1~N个随机数
      5. 将joinKyesRDD使用flatMap函数进行扩容轮询添加1~N的数值代码如下扩容代码所示
      6. kk相互join,oo相互join最终union
    3. 大量key照成的数据倾斜
      1. 随机
      2. 扩容
      3. join

扩容代码:

val value = sc.makeRDD(List(12 -> 4,12 -> 5,12 -> 3))
value.flatMap(x =>{
  var keyList = new mutable.ArrayBuffer[(String,Int)]()
  for(i <- 1 to 10) {
    keyList.append((x._1.toString + "_"+i,x._2))
  }
  keyList
}).foreach(println)

文章作者: tzkTangXS
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 tzkTangXS !
  目录