This is the place where I share my ideas.You can also visit me on @CSDN or @oschina.
Give Chapter and Verse and Author for Transshipment, copy or other using. Thanks!
spark streaming 运行原理图解分析
使员工fastutil优化数据格式
layout: post title: spark调优-重构RDD 以及RDD的持久化 date: 2015-03-08 categories: Spark tags: spark调优 comments: true description: spark调优 —
介绍了Spark RDD 调优方面的知识
尽量去复用RDD,差不多的RDD,可以抽取为一个共同的RDD,供后面的RDD计算时,反复使用。
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
case "OFF_HEAP" => OFF_HEAP
sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());
/**
cache就是一个特殊的默认在内存中的缓存。
Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
如图所示。如果rdd没有缓存。 在计算RDD3的时候,会从hdfs读取一份,到RDD1到RDD2 到RDD3 需要15分钟。 再需要计算RDD4的时候,会重新从HDFS中读取,计算, 又需要耗时15分钟。 那么总共就需要30分钟。
如果把RDD1 缓存在内存或磁盘中。 那么 要计算的时候,直接从内存或磁盘中读取RDD1 即可,不需要再次读取HDFS,以及重新计算RDD1. 这样 总时间 就只需要20分钟。 大大提升了效率。
对于多次计算和公共的RDD,一定要进行持久化。 持久化,也就是说,将RDD的数据缓存到内存中、磁盘中,BlockManager。 以后无论对这个RDD做多少次计算,那么都直接取这个RDD的持久化的数据,比如从内存中,或者磁盘中,直接提取一份数据。
如果正常将数据持久化在内存中,那么可能会导致内存占用过大,这样的话,也许会导致OOM内存溢出。
当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑,使用序列化的方式,在纯内存中存储。 将RDD的每个partion的数据,序列化成一个大的字节数组,就一个对象; 序列化后,大大减少内存的空间占用。
序列化的方式,唯一的缺点,就是,获取数据的时候,需要反序列化。
如果序列化纯内存的方式,还是导致OOM,内存溢出。 就只能考虑磁盘的方式,内存+磁盘,普通方式(持久化) 内存+磁盘 ,序列化。
持久化双副本,持久化后的一个副本,因为机器宕机了,副本丢了,就还是得重新计算一次;
持久化的每个数据单元,存储一份副本,放在其他节点上,从而进行容错。一个副本丢了,可以使用另外一个。
这种方式,仅仅针对内存资源极度充足。!
1.PROCESS_LOCAL:进程本地化,
代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好.
2.NODE_LOCAL:节点本地化
代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输
3.NO_PREF
对于task来说,数据从哪里获取都一样,没有好坏之分
4.RACK_LOCAL:机架本地化
数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输
5.ANY
数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差
spark.locality.wait,默认是3s
降级!!!!!!! 次一点的还不行,再等待3秒。 再降级!!!等待。最后不行,就用最坏的!!!!
怎么调?
看运行日志! 看WebUI
我们什么时候要调节这个参数?怎么调节!
观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。 日志里面会显示,starting task。。。,PROCESS LOCAL、NODE LOCAL
观察大部分task的数据本地化级别 如果大多都是PROCESS_LOCAL,那就不用调节了
如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长。
调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志 看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短。
别本末倒置,本地化级别倒是提升了, 但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了。
spark.locality.wait,默认是3s;6s,10s
默认情况下,下面3个的等待时长,都是跟上面那个是一样的,都是3s
spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack
new SparkConf() .set(“spark.locality.wait”, “10”)
/usr/local/spark/bin/spark-submit \
--class com.hulb.sparkstudy.WordCount \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \ (80*6 = 480G 内存)
--executor-cores 3 \ (80*3 = 240 个core)
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar \
${1} ( task id)
↑↑↑↑↑↑实际生产环境提交作业的一个例子↑↑↑↑↑↑
有时候,如果你的spark作业处理的数据量特别特别大,几亿数据量;然后spark作业一运行,
时不时的报错, shuffle file cannot find, executor、 task lost, out of memory(内存溢出);
可能是说executor的堆外内存不太够用,导致executor在运行的过程中,可能会内存溢出;
然后可能导致后续的stage的task在运行的时候,可能要从一些executor中去拉取shuffle map output文件,但是executor可能已经挂掉了,关联的block manager也没有了;
所以可能会报 shuffle output file not found; resubmitting task; executor lost;spark作业彻底崩溃。
上述情况下,就可以去考虑调节一下executor的堆外内存。 也许就可以避免报错; 此外,有时,堆外内存调节的比较大的时候,对于性能来说,也会带来一定的提升。
如果此时,stage0的executor挂了,block manager也没有了;此时,stage1的executor的task,虽然通过Driver的MapOutputTrakcer获取到了自己数据的地址;但是实际上去找对方的block manager获取数据的时候,是获取不到的 此时,就会在spark-submit运行作业(jar),client(standalone client、yarn client),在本机就会打印出log shuffle output file not found。。。 DAGScheduler,resubmitting task,一直会挂掉。反复挂掉几次,反复报错几次 整个spark作业就崩溃了
–conf spark.yarn.executor.memoryOverhead=2048
spark-submit脚本里面,去用–conf的方式,去添加配置;一定要注意!!! 切记,不是在你的spark作业代码中,用new SparkConf().set()这种方式去设置,不要这样去设置,是没有用的!
一定要在spark-submit脚本中去设置。
spark.yarn.executor.memoryOverhead(看名字,顾名思义,针对的是基于yarn的提交模式)
默认情况下,这个堆外内存上限大概是300多M; 后来我们通常项目中,真正处理大数据的时候,这里都会出现问题,导致spark作业反复崩溃,无法运行;
此时就会去调节这个参数,到至少1G(1024M),甚至说2G、4G
通常这个参数调节上去以后,就会避免掉某些JVM OOM的异常问题,
同时呢,会让整体spark作业的性能,得到较大的提升。
正好碰到对方在GC ,导致没能响应,无法建立网络连接!!(默认60s)
此时呢,就会没有响应,无法建立网络连接;会卡住;ok,spark默认的网络连接的超时时长,是60s;如果卡住60s都无法建立连接的话,那么就宣告失败了。 碰到一种情况,偶尔,偶尔,偶尔!!!没有规律!!!
报错:
某某file。一串file id。uuid(dsfsfd-2342vs–sdf–sdfsd)。not found。file lost。
这种情况下,很有可能是有那份数据的executor在jvm gc。所以拉取数据的时候,建立不了连接。 然后超过默认60s以后,直接宣告失败。
报错几次,几次都拉取不到数据的话,可能会导致spark作业的崩溃。 也可能会导致DAGScheduler,反复提交几次stage。 TaskScheduler,反复提交几次task。大大延长我们的spark作业的运行时间。
可以考虑调节连接的超时时长。
–conf spark.core.connection.ack.wait.timeout=300
spark-submit脚本,切记,不是在new SparkConf().set()这种方式来设置的。 spark.core.connection.ack.wait.timeout(spark core,connection,连接,ack,wait timeout,建立不上连接的时候,超时等待时长)
调节这个值比较大以后,通常来说,可以避免部分的偶尔出现的某某文件拉取失败,某某文件lost掉了。。。