百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分析 > 正文

Spark源码阅读:DataFrame.collect 作业提交流程思维导图

liebian365 2025-04-07 15:42 4 浏览 0 评论

本文分为两个部分:

  1. 作业提交流程思维导图
  2. 关键函数列表

作业提交流程思维导图

collect后Job的提交流程

点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。

关键函数列表

Dataset.collect

def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

Dataset.withAction

Dataset.collectFromPlan

触发物理计划的执行,其中 plan 的类型是 SparkPlan

private def collectFromPlan(plan: SparkPlan): Array[T] = {
  val fromRow = resolvedEnc.createDeserializer()
  plan.executeCollect().map(fromRow)
}

Spark 有很多action 函数,比如:

  • collect
  • count
  • show

最终都是通过 collectFromPlan 去创建 Job

SparkPlan.executeCollect


executeCollect

这个函数分为三部:

  • getByteArrayRdd函数 将UnsafeRow RDD 转化为 byte array RDD,加速序列化
  • 然后调用了 RDD.collect
  • 解析 collect 结果,并返回

RDD.collect

Resilient Distributed Dataset (RDD), 是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。

class RDD是一个基类,它有很多子类:

  • ShuffledRDD:存储shuffle结果数据,parent RDD 是 Java key-value 对
  • ShuffledRowRDD:存储shuffle结果数据,parent RDD 是 InternalRow,SparkSQL使用
  • MapPartitionsRDD:算子会被应用到 parent RDD 的所有分区
  • UnionRDD:存储 union 的结果数据
  • 其他 RDD 子类

collect 方法的主要职能是提交 Spark 作业,该功能代理给了 SparkContext 去支持:

SparkContext.runJob

runJob 方法有很多重载,我们只关心最复杂的一个:

从功能上来说,它实现了

  • 准备 callSite,以便出问题知道是哪一行代码出错了
  • 通过 DAGScheduler.runJob提交作业
  • progressBar: 命令行里 stage的进度条显示
  • doCheckpoint 将 RDD的中间和最后结果缓存下来

从代码上来说,方法声明如下:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit 

它有两个泛型类型参数:

  • T: ClassTag 输入RDD的类型
  • U: ClassTag 输出数据的类型

参数列表:

  • rdd: RDD[T] 指输入RDD类型,比如 RDD[(Long, Array[Byte])]
  • func: (TaskContext, Iterator[T]) => U。func会被作用到 rdd的每个分区,返回U
  • partitions: Seq[Int]。分区下标列表
  • resultHandler: (Int, U)。这是一个回调函数。处理func执行完返回的数据,第一个参数是分区index,第二个是func的返回值

返回值: Unit 表示没有任何返回值

DAGScheduler.runJob

对于 DAGScheduler 而言,Stage是最小的调度单元。它会

  • 给Job生成以Stage为调度单位的DAG图
  • 追踪RDD和Stage的输出状态,比如哪些已经被物化,并基于这些信息提供一个最优的调度方案
  • 提交Stage,以TaskSet的形式提交给 TasksetManager

DAGScheduler 对Job的调度是围绕
DAGSchedulerEventProcessLoop 展开的。这是一个经典的EventLoop使用场景。runJob 方法的执行流程如下:

  1. 提交任务本质上是向 EventLoop 发送一个 JobSubmitted 事件
  2. 通过一个JobWaiter对象等待结果

在 EventLoop 的另一端,onReceive 接收到 JobSubmitted事件,交给成员函数 handleJobSubmitted 处理该事件。

JobWaiter 内部有一个 Promise 对象,它会不停接收到 taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。

DAGSchedulerEventProcessLoop.onReceive

onReceive 负责接收各类事件,并分发给特定的 handler 函数处理,具体可以看思维导图或spark代码。

这里我们只看 handleJobSubmitted,它做了五件事情:

  • 创建Stage:递归式地创建,先创建parent stage
  • 注册Stage
  • 创建Job
  • 注册Job
  • 提交Stage

由于 stage 是一个有向无环图,所以创建和执行都遵循 topological order。

DAGScheduler.createResultStage

在 SparkPlan 对象调用 execute 时,会递归地生成 RDD,从而构成了 RDD Lineage Graph,它是一个有向无环图。那么在 RDD Lineage 上如何切分 stage 呢?

RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建 RDD Lineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分 Stage。根节点的RDD必须分配到 ResultStage里,而之前所有的Stage,不管有多少级依赖,都是 ShuffleMapStage。

DAGScheduler.getShuffleDependenciesAndResourceProfiles

方法中,通过一个栈来记录分配到当前stage中的 RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到 shuffleDeps 中。

getShuffleDependenciesAndResourceProfiles

相关推荐

zookeeper的Leader选举源码解析(zookeeper选取机制)

作者:京东物流梁吉超zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致...

接待外国人英文口语(接待外国人英文口语翻译)

接待外国人英文口语询问访客身份:  MayIhaveyourname,please?  请问您贵姓?  Whatcompanyareyoufrom?  您是哪个公司的?  Could...

一文深入理解AP架构Nacos注册原理

Nacos简介Nacos是一款阿里巴巴开源用于管理分布式微服务的中间件,能够帮助开发人员快速实现动态服务发现、服务配置、服务元数据及流量管理等。这篇文章主要剖析一下Nacos作为注册中心时其服务注册与...

Android面试宝典之终极大招(android 面试宝典)

以下内容来自兆隆IT云学院就业部,根据多年成功就业服务经验,以及职业素养课程部分内容,归纳总结:18.请描述一下Intent和IntentFilter。Android中通过Intent...

除了Crontab,Swoole Timer也可以实现定时任务的

一般的定时器是怎么实现的呢?我总结如下:1.使用Crontab工具,写一个shell脚本,在脚本中调用PHP文件,然后定期执行该脚本;2.ignore_user_abort()和set_time_li...

Spark源码阅读:DataFrame.collect 作业提交流程思维导图

本文分为两个部分:作业提交流程思维导图关键函数列表作业提交流程思维导图collect后Job的提交流程点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。关键函数列表Data...

Arduino通过串口透传ESP 13板与java程序交互

ESP13---是一个无线板子,配置通过热点通信Arduino通过串口透传ESP13板与java程序交互这个程序最基本的想法是用java把Arduino抽象出来,忙活了好几天,虽然没有达到最后的...

Arduino与两个或多个Arduino板之间的通信

问题您希望让两个或多个Arduino板一起工作。您可能希望增加I/O能力或执行比单个板上能够实现的更多处理。您可以使用I2C在板间传递数据,以便它们可以共享工作负载。解决方案本示例中的两段代码展示了如...

Android开发者必知的5个开源库(安卓开源库)

过去的时间里,Android开发逐步走向成熟,一个个与Android相关的开发工具也层出不穷。不过,在面对各种新鲜事物时,不要忘了那些我们每天使用的大量开源库。在这里,向大家介绍的就是,在这个任劳任怨...

Android 开发中文引导-应用小部件

应用小部件是可以嵌入其它应用(例如主屏幕)并收到定期更新的微型应用视图。这些视图在用户界面中被叫做小部件,并可以用应用小部件提供者发布。可以容纳其他应用部件的应用组件叫做应用部件的宿主(1)。下面的截...

Android | 如何在设备启动完成后打开应用

读完这篇文章大概需要1分钟最近,在做一个应用(暂且称之为MyApp),里面需要有provisioning(配置)的部分,也就是在应用启动前有个配置的过程,由好几个Activity组成,一个...

2021款欧版标致3008 1.5T柴油版:更为舒适的家用SUV

自文艺复兴时期及以后以来闻名的法国人的外向性为现代汽车文化注入了另一块石头,但如今已被大量技术所装饰。这标致3008出生时是畅销书。这是您在看到新船体时就知道的情况之一,那是在2016年,当年几乎...

媒库文选估计波士顿动力公司机器狗大军的拉力

EstimatethePullingForceofBostonDynamics'Robo-DogArmy估计波士顿动力公司机器狗大军的拉力RhettAllain雷特·阿兰When...

Three.js建模基础(threejs3d)

在Three.js中,一个可见的物体是由几何体和材料构成的。在这个教程中,我们将学习如何从头开始创建新的网格几何体,研究Three.js为处理几何对象和材质所提供的相关支持。1、索引面集/Indexe...

如何用2 KB代码实现3D赛车游戏?2kPlus Jam大赛了解一下

选自frankforce作者:Frank机器之心编译参与:王子嘉、GeekAI控制复杂度一直是软件开发的核心问题之一,一代代的计算机从业者纷纷贡献着自己的智慧,试图降低程序的计算复杂度。然而,将一款...

取消回复欢迎 发表评论: