常规性能调优
资源配置方面
方式 | 原理 |
---|---|
–num-executors | 通过提高task并行度提高效率 |
–executor-cores | 通过增加每个executor的核数,提高平行度(虚拟核) |
–executor-memory | 通过增加每个executor内存量,减少落盘次数 |
RDD优化
- RDD的复用。
- 优化计算框架,避免重复计算
- filter
- 尽可能提前filter,减少数据量
- filter配合coalesce使用
- 持久化
- 使用序列化的方式持久化,减少内存占用避免频繁GC
- persist(StorageLevel.MEMORY_ONLY_SER)序列化可减少内存占用
- 使用序列化的方式持久化,减少内存占用避免频繁GC
- 并行度调节
- 广播大变量
- coalesce
- mapPartitions/
- foreachPartition
- readuceByKey的预聚合
调节本地化等待时长
调节map端缓冲区大小
调节reduce端map缓冲区大小
调节reduce端拉取数据重试次数
调节reduce端拉取数据的等待间隔时间
调节 SortShuffle 排序操作阈值
JVM调优
调节 Executor 堆外内存
Executor 堆外内存的配置需要在 spark-submit 脚本里配置:
--conf spark.yarn.executor.memoryOverhead=2048
以上参数配置完成后,会避免掉某些 JVM OOM 的异常问题,同时,可以提升整体 Spark作业的性能。
此内存不够用的时候常见报错有一下几种:
- shuffle output file cannot find
- executor lost
- task lost
- out of memory
调节链接等待时长
--conf spark.core.connection.ack.wait.timeout=350
如果 task 在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这会导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark 的 Executor 进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。
数据倾斜问题的优化
- 聚合数据,避免数据倾斜(原始数据阶段,在hive表里就处理好)
- 过滤倾斜数据(NULL key),最后在union上
- 设置reduce端并行度(本质并没有解决倾斜问题,只是缓解了)
- 对于大部分shuffle算子,都可以设置并行度例如readuceByKey(500),通过提高reduce端的task数量来减少每个task被分配的计算量。
- sparkSQL通过spark.sql.shuffle.partitions设定。
- 针对reduceByKey和groupByKey的处理
- 将key添加随机数
- 然后shuffle
- 完成后再取消随机数
- 再shuffle聚合。
- 小表join大表
- 小表作为广播变量与大表join
- 大表join大表
- 单Key造成数据倾斜
- sample采样,得到导致倾斜的key
- 将这个key的数据与其他数据分别提取出来作为单独的RDD
- 同时与第三方RDD进行join
- 将最终结果进行合并union
- 多个Key照成的数据倾斜
- sample采样,得到导致倾斜的keys
- 将这些Keys提取出来作为单独的RDD(keysRDD,otherRDD)
- 将需要join的RDD里的keys也提取出来(joinKyesRDD,joinOtherRDD)
- 将KyesRDD里的key使用map添加1~N个随机数
- 将joinKyesRDD使用flatMap函数进行扩容轮询添加1~N的数值代码如下扩容代码所示
- kk相互join,oo相互join最终union
- 大量key照成的数据倾斜
- 随机
- 扩容
- join
- 单Key造成数据倾斜
扩容代码:
val value = sc.makeRDD(List(12 -> 4,12 -> 5,12 -> 3))
value.flatMap(x =>{
var keyList = new mutable.ArrayBuffer[(String,Int)]()
for(i <- 1 to 10) {
keyList.append((x._1.toString + "_"+i,x._2))
}
keyList
}).foreach(println)