Spark RDD的分区与依赖关系

Spark RDD的分区与依赖关系

RDD分区

RDD,Resiliennt Distributed Datasets,弹性式分布式数据集,是由若干个分区构成的,那么这每一个分区中的数据又是如何产生的呢?这就是RDD分区策略所要解决的问题,下面我们就一道来学习RDD分区相关。

RDD数据分区

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。

分区的决定,就是在宽依赖的过程中才有,窄依赖因为是一对一,分区确定的,所以不需要指定分区操作。

1)Partitioner:在Spark中涉及RDD的分区策略的抽象类为Partitioner,其继承体系如图-27所示,有两个核心的子类实现,一个HashPartitioner,一个RangePartitioner。

图-27 spark Partitioner继承体系

Spark中数据分区的主要工具类(数据分区类),主要用于Spark底层RDD的数据重分布的情况中,主要方法两个,如图-28所示:

图-28 Partitioner抽象类

2)HashPartitioner:Spark中非常重要的一个分区器,也是默认分区器,默认用于90%以上的RDD相关API上。

功能:依据RDD中key值的hashCode的值将数据取模后得到该key值对应的下一个RDD的分区id值,支持key值为null的情况,当key为null的时候,返回0;该分区器基本上适合所有RDD数据类型的数据进行分区操作;但是需要注意的是,由于JAVA中数组的hashCode是基于数组对象本身的,不是基于数组内容的,所以如果RDD的key是数组类型,那么可能导致数据内容一致的数据key没法分配到同一个RDD分区中,这个时候最好自定义数据分区器,采用数组内容进行分区或者将数组的内容转换为集合。HashPartitioner代码如下:

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//加载数据
val rdd = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
//通过Hash分区
val result: RDD[(Int, Int)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
//获取分区方式
println(result.partitioner)
//获取分区数
println(result.getNumPartitions)
}

RDD自定义分区

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合我们的需求,这时候我们就可以自定义分区策略。

要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。

1)numPartitions: Int:返回创建出来的分区数。

2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

3)equals():Java判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。

案例一:模拟实现HashPartitioner。

class CustomerPartitoner(numPartiton:Int) extends Partitioner{

  // 返回分区的总数

  override def numPartitions: Int = numPartiton

  // 根据传入的Key返回分区的索引

  override def getPartition(key: Any): Int = {

key.hashCode()%numparts

  }

}

object CustomerPartitoner {

  def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf()

.setAppName("CustomerPartitoner").setMaster("local[*]")

val sc = new SparkContext(sparkConf)

//zipWithIndex该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

val rdd = sc.parallelize(0 to 10,1).zipWithIndex()

val func = (index:Int,iter:Iterator[(Int,Long)]) =>{

  iter.map(x => "[partID:"+index + ", value:"+x+"]")

}

val r = rdd.mapPartitionsWithIndex(func).collect()

for (i <- r){

  println(i)

}

val rdd2 = rdd.partitionBy(new CustomerPartitoner(5))

val r1 = rdd2.mapPartitionsWithIndex(func).collect()

println("----------------------------------------")

for (i <- r1){

  println(i)

}

println("----------------------------------------")

sc.stop()

  }

}

总结:

1)分区主要面对KV结构数据,Spark内部提供了两个比较重要的分区器,Hash分区器和Range分区器。

2)hash分区主要通过key的hashcode来对分区数求余,hash分区可能会导致数据倾斜问题,Range分区是通过水塘抽样的算法来将数据均匀的分配到各个分区中。

3)自定义分区主要通过继承partitioner抽象类来实现,必须要实现两个方法:numPartitions 和 getPartition(key: Any)。

RDD依赖关系

RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

依赖关系

图-29是源码中的一张图,可以发现一个问题Dependency(依赖)的意思可以发现ShuffleDependency是其子类(即宽依赖),NarrowDependency是其子类(即窄依赖)。

图-29 Dependency体系

1)宽窄依赖:所谓窄依赖,指的是子RDD一个分区中的数据,来自于上游RDD中一个分区。所谓宽依赖,指的是子RDD一个分区中的数据,来自于上游RDD所有的分区。

宽窄依赖关系示例如图-30所示:

图-30 宽窄依赖示例图

2)血统Lineage:RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。关于linage说明示意图如图-31所示:

图-31 lineage示例图

3)DAG有向无环图:如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。通俗的来说就是有方向,没有回流的图可以称为有向无环图,示意图如图-32所示。

图-32 有像无环图

4)RDD任务的切分:对于RDD的任务切分,可以很形象的如图-33所示。

图-33 RDD任务的切分

并行度:程序同一时间执行作业的线程个数。

原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,如图-34所示。

图-34 RDD stage的切分

对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的重要依据。Stage阶段计算过程如图所示-35所示。

图-35 RDD stage阶段计算过程

任务生成和提交的四个阶段

Spark任务生产和提交的四个步骤可以归纳如下:

1)构建DAG:用户提交的job将首先被转换成一系列RDD并通过RDD之间的依赖关系构建DAG,然后将DAG提交到调度系统。

DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算。

DAG是有边界的:开始(通过sparkcontext创建的RDD),结束(触发action,调用runjob就是一个完整的DAG形成了,一旦触发action,就形成了一个完整的DAG)。

一个RDD描述了数据计算过程中的一个环节,而一个DAG包含多个RDD,描述了数据计算过程中的所有环节。

一个spark application可以包含多个DAG,取决于具体有多少个action。

2)DAGScheduler:将DAG切分stage(切分依据是shuffle),将stage中生成的task以taskset的形式发送给TaskScheduler

为什么要切分stage?

一个是复杂业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)。

如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,下一个阶段的计算依赖上一个阶段的数据。

在同一个stage中,会有多个算子,可以合并到一起,我们很难称其为pipeline(流水线,严格按照流程、顺序执行)。

3)TaskScheduler:调度task(根据资源情况将task调度到Executors).

4)Executors:接收task,然后将task交给线程池执行。

具体可以简化为如图-38所示。

图-38 spark任务生成和提交图

排序

TopN

topN就是上述sortBy/sortByKey之后执行action操作take(N),或者直接takeOrderd(N),建议使用后者,效率高于前者。具体操作省略。

二次排序

所谓二次排序,指的是排序字段不唯一,有多个,共同排序,仍然使用上面的数据,对学生的身高和年龄一次排序。

object SecondSortOps {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName(s"${SecondSortOps.getClass.getSimpleName}")

.setMaster("local[2]")

val sc = new SparkContext(conf)

//sortByKey 数据类型为k-v,且是按照key进行排序

val personRDD:RDD[Person] = sc.parallelize(List(

Person(1, "吴轩宇", 19, 168),

Person(2, "彭国宏", 18, 175),

Person(3, "随国强", 18, 176),

Person(4, "闫  磊", 20, 180),

Person(5, "王静轶", 18, 168)

))

personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))

sc.stop()

}

}

case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {

//对学生的身高和年龄依次排序

override def compare(that: Person) = {

var ret = this.height.compareTo(that.height)

if(ret == 0) {

ret = this.age.compareTo(that.age)

}

ret

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/589489.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

音视频开发之旅——实现录音器、音频格式转换器和播放器(PCM文件转换为WAV文件、使用LAME编码MP3文件)(Android)

本文主要讲解的是实现录音器、音频转换器和播放器&#xff0c;在实现过程中需要把PCM文件转换为WAV文件&#xff0c;同时需要使用上一篇文章交叉编译出来的LAME库编码MP3文件。本文基于Android平台&#xff0c;示例代码如下所示&#xff1a; AndroidAudioDemo Android系列&am…

Golang | Leetcode Golang题解之第64题最小路径和

题目&#xff1a; 题解&#xff1a; func minPathSum(grid [][]int) int {if len(grid) 0 || len(grid[0]) 0 {return 0}rows, columns : len(grid), len(grid[0])dp : make([][]int, rows)for i : 0; i < len(dp); i {dp[i] make([]int, columns)}dp[0][0] grid[0][0]…

服务器IP选择

可以去https://ip.ping0.cc/查看IP的具体情况 1.IP位置--如果是国内用&#xff0c;国外服务器的话建议选择日本&#xff0c;香港这些比较好&#xff0c;因为它们离这里近&#xff0c;一般延时低&#xff08;在没有绕一圈的情况下&#xff09;。 不过GPT的话屏蔽了香港IP 2. 企…

Mac 安装John the Ripper 破解rar(zip)压缩文件

注&#xff1a;仅以此篇记录我满足好奇心所逝去的十几个小时。&#xff08;自娱自乐&#xff09; 1、首先利用 brewhome 包管理工具 安装john the ripper &#xff1a; brew install john-jumbo 如果没有安装brewhome 利用如下命令安装&#xff1a; /bin/zsh -c "$(c…

LeetCode-网络延迟时间(Dijkstra算法)

每日一题 今天刷到一道有关的图的题&#xff0c;需要求单源最短路径&#xff0c;因此使用Dijkstra算法。 题目要求 有 n 个网络节点&#xff0c;标记为 1 到 n。 给你一个列表 times&#xff0c;表示信号经过 有向 边的传递时间。 times[i] (ui, vi, wi)&#xff0c;其中 …

【跟马少平老师学AI】-【神经网络是怎么实现的】(七-1)词向量

一句话归纳&#xff1a; 1&#xff09;神经网络不仅可以处理图像&#xff0c;还可以处理文本。 2&#xff09;神经网络处理文本&#xff0c;先要解决文本的表示&#xff08;图像的表示用像素RGB&#xff09;。 3&#xff09;独热编码词向量&#xff1a; 词表&#xff1a;{我&am…

OpenVINO安装教程 Docker版

从 Docker 映像安装IntelDistribution OpenVINO™ 工具套件 本指南介绍了如何使用预构建的 Docker 镜像/手动创建镜像来安装 OpenVINO™ Runtime。 Docker Base 映像支持的主机操作系统&#xff1a; Linux操作系统 Windows (WSL2) macOS(仅限 CPU exectuion) 您可以使用预…

【跟马少平老师学AI】-【神经网络是怎么实现的】(八)循环神经网络

一句话归纳&#xff1a; 1&#xff09;词向量与句子向量的循环神经网络&#xff1a; x(i)为词向量。h(i)为含前i个词信息的向量。h(t)为句向量。 2&#xff09;循环神经网络的局部。 每个子网络都是标准的全连接神经网络。 3&#xff09;对句向量增加全连接层和激活函数。 每个…

I2C接口18路LED呼吸灯驱动IS31FL3218互相替代SN3218替换HTR3218

I2C接口18路LED呼吸灯控制电路IC 该型号IC为QFN24接口&#xff0c;属于小众产品&#xff0c;IS31FL3218、SN3218、HTR3218S管脚兼容&#xff0c;需要注意的是HTR3218管脚与其他型号不兼容。 I2C接口可实现多个LED灯的呼吸灯控制&#xff0c;可实现单色控制18个LED灯&#xff0…

【ARM Cache 系列文章 11.2 -- ARM Cache 组相联映射】

请阅读【ARM Cache 系列文章专栏导读】 文章目录 Cache 组相联映射组相联映射原理多路组相连缓存的优势多路组相连缓存的代价关联度&#xff08;Associativity&#xff09; 上篇文章&#xff1a;【ARM Cache 系列文章 11.1 – ARM Cache 全相连 详细介绍】 Cache 组相联映射 A…

笔记1--Llama 3 超级课堂 | Llama3概述与演进历程

1、Llama 3概述 https://github.com/SmartFlowAI/Llama3-Tutorial.git 【Llama 3 五一超级课堂 | Llama3概述与演进历程】 2、Llama 3 改进点 【最新【大模型微调】大模型llama3技术全面解析 大模型应用部署 据说llama3不满足scaling law&#xff1f;】…

Deep learning Part Five RNN--24.4.29

接着上期&#xff0c;CBOW模型无法解决文章内容过长的单词预测的&#xff0c;那该如何解决呢&#xff1f; 除此之外&#xff0c;根据图中5-5的左图所示&#xff0c;在CBOW模型的中间层求单词向量的和&#xff0c;这时就会出现另一个问题的&#xff0c;那就是上下文的单词的顺序…

Redis Zset的底层原理

Redis Zset的底层原理 ZSet也就是SortedSet&#xff0c;其中每一个元素都需要指定一个score值和member值&#xff1a; 可以根据score值排序后member必须唯一可以根据member查询分数 因此&#xff0c;zset底层数据结构必须满足键值存储、键必须唯一、可排序这几个需求。之前学…

ZooKeeper知识点总结及分布式锁实现

最初接触ZooKeeper是之前的一个公司的微服务项目中&#xff0c;涉及到Dubbo和ZooKeeper&#xff0c;ZooKeeper作为微服务的注册和配置中心。好了&#xff0c;开始介绍ZooKeeper了。 目录 1.ZooKeeper的基本概念 2.ZooKeeper的节点&#xff08;ZNode&#xff09; 3. ZooKeep…

【Java笔记】第5章:函数

前言1. 函数的理解2. 函数的基本使用3. 函数的参数4. 函数的返回值5. 函数的执行机制6. 函数的递归调用结语 ↓ 上期回顾: 【Java笔记】第4章&#xff1a;深入学习循环结构 个人主页&#xff1a;C_GUIQU 归属专栏&#xff1a;【Java学习】 ↑ 前言 各位小伙伴大家好&#xff…

[随记]Mac安装Docker及运行开源Penpot

下载Docker Desktop for Mac&#xff1a;https://www.docker.com/products/docker-desktop/ 安装Docker Desktop for Mac&#xff0c;安装完成后&#xff0c;启动Docker&#xff0c;然后在终端输入&#xff1a; docker version 在Mac电脑的Desktop&#xff0c;随便创建一个文…

【真实体验】使用崖山YMP 迁移 Oracle/MySQL 至YashanDB 23.2 验证测试【YashanDB迁移体验官】

一、前言 说一下我和崖山数据库的结缘&#xff0c;大概在去年吧&#xff0c;因为我经常在墨天轮写文章&#xff0c;看到崖山数据库推出了一崖山体验官的活动&#xff0c;我就报名参加了。第一次体验了崖山数据库&#xff0c;也测试了我司数据库到崖山数据库的兼容性&#xff0…

钉钉手机端调试前端H5项目流程

此流程以Vue项目为例 一、操作步骤 在根目录下 vue.config.js 文件中将 devServer.host 设置为 0.0.0.0 // vue.config.js module.exports {devServer: {host: 0.0.0.0,...},...}本地启动项目&#xff0c;获取 Network App running at:- Local: http://localhost:8080/ -…

JAVA 学习·泛型(二)——通配泛型

有关泛型的基本概念&#xff0c;参见我的前一篇博客 JAVA 学习泛型&#xff08;一&#xff09;。 协变性 泛型不具备协变性 在介绍通配泛型之前&#xff0c;先来看一下下面的例子。我们定义了一个泛型栈&#xff1a; import java.util.ArrayList; class GenericStack<E>…

全新TOF感知RGBD相机 | 高帧率+AI,探索3D感知新境界

海康机器人在近期的机器视觉新品发布会上推出的全新TOF感知RGBD相机,无疑是对当前机器视觉技术的一次革新。这款相机不仅融合了高帧率、轻松集成、体积小巧以及供电稳定等诸多优点,更重要的是,它将AI与3D感知技术完美结合,通过高帧率+AI算法,实现了对不同场景的快速捕捉与…
最新文章