最新要闻
- 一加Ace 2V 12+256G起步行业罕见:友商还在搞8+128卡价位的版本
- 实时:Redmi Note 12 Pro极速版12+256G到手1999元:开机就是MIUI 14
- 造车新势力2月交付量出炉:理想、蔚来、哪吒破万 零跑压力大
- 【全球速看料】厦门征求意见!过马路玩手机或将罚款50元 你支持吗?
- 世界快看:东风概念飞行汽车外观曝光!“鸥翼门”相当炫酷
- 【当前热闻】2018巴彦淖尔国际马拉松
- 环球焦点!胡明轩:平时杜导叫我和徐杰一起训练 要求我们承担起更多责任
- 环球聚焦:委员建议隔周三休成热搜第一 网友吵翻 专家:很难行得通
- 今日热闻!Model 2明天发?这款15万的特斯拉便宜车:马斯克已经说了17年
- 风语筑(603466):上海风语筑文化科技股份有限公司关于股东权益变动比例超过1%的提示性公告
- 天天观察:希望工程发文感谢《原神》玩家 5天9万多人捐赠240万元
- 世界聚焦:“刺客”又来了!网友称买到1600元一斤话梅:每颗至少20元
- “窄边教科书”上新!戴尔XPS15 9530发布:13代酷睿+RTX 40配8TB SSD
- 环球微动态丨孟菲斯动物园发大熊猫丫丫新动态 网友:尽快回国!
- 曝苹果屏下Face ID技术有缺憾:2026年才会趋于完美
- 信息:可取代eSIM:更完美的iSIM卡来了
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
环球新动态:Spark系列 - (5) Spark Shuffle
目前已经更新完《Java并发编程》,《JVM性能优化》,《Spring核心知识》《Docker教程》和《Spark基础知识》,都是多年面试总结。欢迎关注【后端精进之路】,轻松阅读全部文章。
Java并发编程:
- Java并发编程系列-(1) 并发编程基础
- Java并发编程系列-(2) 线程的并发工具类
- Java并发编程系列-(3) 原子操作与CAS
- Java并发编程系列-(4) 显式锁与AQS
- Java并发编程系列-(5) Java并发容器
- Java并发编程系列-(6) Java线程池
- Java并发编程系列-(7) Java线程安全
- Java并发编程系列-(8) JMM和底层实现原理
- Java并发编程系列-(9) JDK 8/9/10中的并发
Docker教程:
(资料图)
- Docker系列-(1) 原理与基本操作
- Docker系列-(2) 镜像制作与发布
- Docker系列-(3) Docker-compose使用与负载均衡
- Docker 安装 Jenkins 并实现项目自动化部署
- 如何0失误快速搭建K8s环境?你想了解的这都有
JVM性能优化:
- JVM性能优化系列-(1) Java内存区域
- JVM性能优化系列-(2) 垃圾收集器与内存分配策略
- JVM性能优化系列-(3) 虚拟机执行子系统
- JVM性能优化系列-(4) 编写高效Java程序
- JVM性能优化系列-(5) 早期编译优化
- JVM性能优化系列-(6) 晚期编译优化
- JVM性能优化系列-(7) 深入了解性能优化
Spring MVC系列:
- Spring MVC系列-(1) Spring概述
- Spring MVC系列-(2) Bean的装配
- Spring MVC系列-(3) Bean的装配
- Spring MVC系列-(4) Bean的生命周期
- Spring MVC系列-(5) AOP
- Spring MVC系列-(6) 声明式事务
- Spring MVC系列-(7) IOC初始化流程
Spark系列:
- Spark系列 - (1) Spark基础
- Spark系列 - (2) Spark核心概念
- Spark系列 - (3) Spark SQL
- Spark系列 - (4) Spark任务调度
- Spark系列 - (5) Spark Shuffle详细解读
- Spark系列 - (6) Spark内存管理
5. Spark Shuffle
5.1 Shuffle概念
有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为Shuffle。
下图是一个简单的Spark Job的运行图,根据宽依赖将任务划分为不同的Stage,
在划分stage时,最后一个stage称为 FinalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。
ShuffleMapStage的结束伴随着shuffle文件的写磁盘。
ResultStage基本上对应代码中的action算子,即将一个函数应用在RDD的各个partition的数据集上,意味着一个job 的运行结束。
触发Shuffle的操作大概分为如下几类:
5.2 核心思想
Shuffle的核心思想可以用上图来表示,前一个Stage的 ShuffleMapTask 进行 Shuffle Write, 把数据存储在 BlockManager上面,并且把数据位置元信息上报到Driver 的MapOutTrack组件中,下一个Stage根据数据位置元信息,进行Shuffle Read,拉取上个Stage的输出数据。
Shuffle中的任务个数
1. Map端task个数的确定
Shuffle过程中的task个数由RDD分区数决定,而RDD的分区个数与参数spark.default.parallelism
有关.
在Yarn Cluster模式下,如果没有手动设置,则:
spark.default.parallelism = max(所有executor使用的core总数,2)
。
参与决定分区数的参数defaultMinPartitions
也是由该参数确定的,defaultMinPartitions=min(spark.default.parallelism, 2)
由于spark对于一个partition中的最大文件大小有限制(spark.files.maxPartitionBytes = 128 M (默认)
),为128M,因此自定义分区时,不能选的过小。
常见的几种情况如下:
2. reduce端的task个数的确定
Reduce端进行数据的聚合,一部分聚合算子可以手动指定并行度,如果没有指定,则以map端的最后一个RDD分区作为其分区数,分区数也就决定了reduce端的task个数。
5.3 HashShuffle
1. 未优化的HashShuffleManager
相对于传统的 MapReduce,Spark 假定大多数情况下 Shuffle 的数据不需要排序,例如 Word Count,强制排序反而会降低性能。因此不在 Shuffle Read 时做 Merge Sort,如果需要合并的操作的话,则会使用聚合(agggregator),即用了一个 HashMap (实际上是一个 AppendOnlyMap)来将数据进行合并。
在 Map Task 过程按照 Hash 的方式重组 Partition 的数据,不进行排序。每个 Map Task 为每个 Reduce Task 生成一个文件,通常会产生大量的文件(即对应为 M*R 个中间文件,其中 M 表示 Map Task 个数,R 表示 Reduce Task 个数),伴随大量的随机磁盘 I/O 操作与大量的内存开销。
总结下这里的两个严重问题:
- 生成大量文件,占用文件描述符,同时引入 DiskObjectWriter 带来的 Writer Handler 的缓存也非常消耗内存;
- 如果在 Reduce Task 时需要合并操作的话,会把数据放在一个 HashMap 中进行合并,如果数据量较大,很容易引发 OOM。
2. 优化后的HashShuffleManager
针对上面的第一个问题,Spark做了改进,引入了File Consolidation
机制。
一个Executor上所有的Map Task生成的分区文件只有一份,即将所有的Map Task相同的分区文件合并,这样每个 Executor上最多只生成N个分区文件。
这样就减少了文件数,但是假如下游 Stage 的分区数 N 很大,还是会在每个Executor上生成 N 个文件,同样,如果一个 Executor 上有 K 个 Core,还是会开 K*N 个 Writer Handler,所以这里仍然容易导致OOM。
5.4 SortShuffle
SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。
1. 普通运行机制
下图说明了普通的SortShuffleManager的原理。在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。
一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。
2. bypass运行机制
下图说明了bypass SortShuffleManager的原理。bypass运行机制的触发条件如下:
- shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
- 不是聚合类的shuffle算子(比如reduceByKey)。
此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。
而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
扩展:Tungsten-Sort Based Shuffle / Unsafe Shuffle
从 Spark 1.5.0 开始,Spark 开始了钨丝计划(Tungsten),目的是优化内存和CPU的使用,进一步提升spark的性能。由于使用了堆外内存,而它基于 JDK Sun Unsafe API,故 Tungsten-Sort Based Shuffle 也被称为 Unsafe Shuffle。
它的做法是将数据记录用二进制的方式存储,直接在序列化的二进制数据上 Sort 而不是在 Java 对象上,这样一方面可以减少内存的使用和 GC 的开销,另一方面避免 Shuffle 过程中频繁的序列化以及反序列化。在排序过程中,它提供 cache-efficient sorter,使用一个 8 bytes 的指针,把排序转化成了一个指针数组的排序,极大的优化了排序性能。
但是使用 Tungsten-Sort Based Shuffle 有几个限制,Shuffle 阶段不能有 aggregate 操作,分区数不能超过一定大小(2^24-1,这是可编码的最大 Parition Id),所以像 reduceByKey 这类有 aggregate 操作的算子是不能使用 Tungsten-Sort Based Shuffle,它会退化采用 Sort Shuffle。
从 Spark-1.6.0 开始,把 Sort Shuffle 和 Tungsten-Sort Based Shuffle 全部统一到 Sort Shuffle 中,如果检测到满足 Tungsten-Sort Based Shuffle 条件会自动采用 Tungsten-Sort Based Shuffle,否则采用 Sort Shuffle。从Spark-2.0.0开始,Spark 把 Hash Shuffle 移除,可以说目前 Spark-2.0 中只有一种 Shuffle,即为 Sort Shuffle。
5.5 Shuffle Read
1. 何时开始fetch上一个stage的数据
当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。
理论上讲一个 ShuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 如果其 parent stages 没有执行完,自己是不能被提交执行的),还是选择全部 ShuffleMapTasks 执行完再去 fetch。
2. 边 fetch 边处理还是一次性 fetch 完再处理?
边fetch边处理。
本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。
Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。
那么如何实现边 shuffle 边处理,而且流入的 records 是无序的?
使用可以 aggregate 的数据结构,比如 HashMap。每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value),并将 func 的结果重新 put(key) 到 HashMap 中去。
3. fetch 来的数据存放到哪里?
刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。也可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果spark.shuffle.spill = false就只用内存。
内存使用的是AppendOnlyMap,类似Java的HashMap,内存+磁盘使用的是ExternalAppendOnlyMap,如果内存空间不足时,ExternalAppendOnlyMap可以将records进行sort后spill到磁盘上,等到需要它们的时候再进行归并。
4. 怎么获得要 fetch 的数据的存放位置?
一个 ShuffleMapStage形成后,会将该 stage 最后一个 final RDD 注册到 MapOutputTrackerMaster,reducer 在 shuffle 的时候去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置。
每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size)
参考:
- https://zhuanlan.zhihu.com/p/67061627
- http://lionheartwang.github.io/blog/2018/03/11/spark-shuffle-implementation/
- https://toutiao.io/posts/eicdjo/preview
搜索『后端精进之路』并关注,立刻获取最新文章和面试资料。
-
环球新动态:Spark系列 - (5) Spark Shuffle
目前已经更新完《Java并发编程》,《JVM性能优化》,《Spring核心知识》《Docker教程》和《Spark基础知识...
来源: -
热消息:Fireasy3 揭秘 -- 万物伊始(依赖注入与服务发现)
最近在忙于Fireasy的重构,3 x抛弃了 NetFramework& 160;时代的一些思想和模式,紧密拥抱 NetCore,...
来源: 环球新动态:Spark系列 - (5) Spark Shuffle
热消息:Fireasy3 揭秘 -- 万物伊始(依赖注入与服务发现)
全球信息:英语四级阅读技巧
一加Ace 2V 12+256G起步行业罕见:友商还在搞8+128卡价位的版本
实时:Redmi Note 12 Pro极速版12+256G到手1999元:开机就是MIUI 14
造车新势力2月交付量出炉:理想、蔚来、哪吒破万 零跑压力大
【全球速看料】厦门征求意见!过马路玩手机或将罚款50元 你支持吗?
世界快看:东风概念飞行汽车外观曝光!“鸥翼门”相当炫酷
【当前热闻】2018巴彦淖尔国际马拉松
环球焦点!胡明轩:平时杜导叫我和徐杰一起训练 要求我们承担起更多责任
世界快讯:makefile
基于alpine基础镜像构建jdk镜像以及tomcat镜像及业务构建
Linux极简入门系列(六):其它补充
CSS全局关键字
环球聚焦:委员建议隔周三休成热搜第一 网友吵翻 专家:很难行得通
今日热闻!Model 2明天发?这款15万的特斯拉便宜车:马斯克已经说了17年
环球新消息丨LOJ 3276 JOISC 2020 Day2 遗迹 题解 (计数DP)
环球快资讯:MySQL学习笔记-多表查询(上)
当前视讯!量化交易基础 - 011 - 样本外检验
风语筑(603466):上海风语筑文化科技股份有限公司关于股东权益变动比例超过1%的提示性公告
天天观察:希望工程发文感谢《原神》玩家 5天9万多人捐赠240万元
世界聚焦:“刺客”又来了!网友称买到1600元一斤话梅:每颗至少20元
“窄边教科书”上新!戴尔XPS15 9530发布:13代酷睿+RTX 40配8TB SSD
环球微动态丨孟菲斯动物园发大熊猫丫丫新动态 网友:尽快回国!
曝苹果屏下Face ID技术有缺憾:2026年才会趋于完美
C++ STL学习笔记-C++ STL基础
焦点讯息:4-Ribbon负载均衡
信息:可取代eSIM:更完美的iSIM卡来了
二月浏览器大战结果出炉:微软Edge用户数不升反降
头条:《王者荣耀》干将莫邪画中仙皮肤公布:中国古风莫邪绝美
环球微头条丨k8s之list-watch机制、节点调度以及亲和性
全球速讯:记录--虚拟滚动探索与封装
天天百事通!(数据库系统概论|王珊)第七章数据库设计-第四节:逻辑结构设计
焦点热议:Cesium 几何体贴模型 sampleHeight(二十二)
环球滚动:苏富比春拍上海预展即将开展,近150件藏品由谁保驾护航?
全球即时看!蔚来2022年财报公布:全年营收492亿元 同比大涨36%
天天资讯:建议元宵节放假1天:提升人民幸福指数
环球通讯!超19万辆!比亚迪2月新能源销量公布:暴打新势力全家
全球微头条丨2023五一档电影增至五部!哪部对你吸引力更大?
热点!Cesium Transform(二十)
世界速讯:第124篇: 期约Promise
怎么登录新浪微博网页版_如何登陆新浪微博
环球快报:刹车变硬踩不动遭车主集体投诉 铃木召回超7.8万辆汽车
【独家】好利来创始人之子回应开劳斯莱斯摆摊:没想博眼球
天天快播:AI小姐姐比真人还好看? N卡又抓到风口:8GB显存稳定绘图 首选RTX30/40系
春丽今天55岁了!网友:Coser我永远只服成龙大哥
速讯:URLDNS链分析
认识数据标签
每日速递:Python识别图形验证码实战项目
全球播报:记一次CPU占用持续上升问题排查(Nacos动态路由引起)
iOS应用发布ITMS-90704错误解决
荣耀“青海湖技术”揭晓:荣耀Magic5系列全球首发硅碳负极技术
国内专属!新款国产特斯拉Model Y升级悬架:终于不颠了
天天热门:功耗开放470W!影驰名人堂RTX 4080真是生猛
每日热讯!又一游戏成功“入奥”:育碧《舞力全开》入选2023年奥林匹克电子竞技项目
天天头条:女子幼儿园收童子尿煮鸡蛋 吃着香是浙江当地非遗:网友直呼酸爽
今日热闻!中国通才教育:已针对首次公开发售相关指控开展独立调查,将继续停牌
全球百事通!为什么95%的Java程序员人,都是用不好Synchronized?
每日时讯!Python教程:类的派生
你有“ChatGPT综合征”吗:想搞钱,或是失业焦虑?
Python教程:类的继承,什么是继承
加点广告怎么了 爱奇艺新专利可在弹幕中显示广告
环球动态:狂飙8000MHz!朗科Z RGB DDR5-8000 16GB电镀银内存图赏
每日短讯:1:1复刻仿生人手 现实版《西部世界》公司众筹开启
全球头条:5G是高铁 6G就是飞机!工信部:全面推进6G技术研发
焦点热文:公司丢货要求全体员工均摊1万赔款:新员工拒赔反被怀疑偷东西
天天观察:如何在Ubuntu上安装Nextcloud(适用于树莓派上的Ubuntu)
每日播报!Pod 进阶
每日快看:Zabbix“专家坐诊”第183期问答汇总
Spring中Bean的加载方式~
什么是Markdown
当前报道:纬德信息(688171)3月1日主力资金净买入105.72万元
旅俄大熊猫画风突变体重狂飙40公斤:摸爬滚打样样精通
百事通!特斯拉Model 2被曝成本大降37% 比丰田卡罗拉还低
世界短讯!打赢了!科比坠机照片泄露案其遗孀获赔2885万美元
天天热头条丨惊险一幕:女子用火车站自动扶梯运行李 把下面男子砸骨折
【全球速看料】玩游戏需自备爆米花:《最终幻想16》主线过场动画超11小时
当前快看:1000亿数据、30W级qps如何架构?来一个天花板案例
3-Eureka注册中心
天天精选!【验证码逆向专栏】某验三代、四代一键通过模式逆向分析
当前信息:索泰RTX 4090月白深度测试:真孤独求败!A卡没得玩了
网友晒视频广州一特斯拉在停车场连撞多车 司机下车就跑:又踩错了吗
当前看点!设计时速100公里!上海苏州互通地铁今起试跑:苏州坐地铁直达
环球资讯:韦达定理
全球播报:轻松玩转Makefile | 基础用法
医院拍CT有位患者叫熊猫 结果竟是真熊猫:网友祝福“国宝”尽快好起来
丰田拆完一辆特斯拉Model Y后被震撼了 高管惊叹:我们远远落后
全球新动态:火爆全网的AI小姐姐模型重新上线 作者:画什么图后果自负
【全球热闻】大厂年薪30万95后女生转行卖快餐:直言脱离公司KPI太快乐了
当前观察:《暗黑破坏神4》玩家打怪时 不会出现天量伤害数值
世界今热点:通用电梯:目前产能在满足履行轨道交通项目合同需求的同时,不会影响公司履行其他客户订单或新接订单的生产需求
环球报道:电脑病毒的介绍与防护_电脑病毒与防护介绍
天天日报丨浅析大促备战过程中出现的fullGc,我们能做什么?
ChunJun 1.16 Release版本即将发布,bug 捉虫活动邀您参与!
一款超级给力的弱网测试神器—Qnet(附视频)
焦点要闻:Vue,小程序开发技术详解
环球即时看!关于React-Router6 (React 路由)
每日简讯:取代马斯克:新CEO接班人浮出水面
健身网红大容量运动杯:富光1.6L顿顿桶29元发车
每日热门:马力超百匹!春风NK800双缸街车发布:46890元起