Spark算子
# Spark算子
[toc]
# 一、转换算子
# coalesce函数
返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖,例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)。为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。
注意:第二个参数shuffle=true,将会产生多于之前的分区数目,例如你有一个个数较少的分区,假如是100,调用coalesce(1000, shuffle = true)将会使用一个 HashPartitioner产生1000个分区分布在集群节点上。这个(对于提高并行度)是非常有用的。如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的partition数变多的
# repartition函数
返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle
Repartition函数内部调用了coalesce函数 shuffle 为True
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
2
3
4
5
# flatMap——flatMap变换
算子函数格式:
flatMap[U](f:FlatMapFunction[T,U]):JavaRDD[U]
在前面我们已经了解到map变换是对原RDD中的每个元素进行一对一变换生成
新RDD,而flatMap不同的地方在于,它是对原RDD中的每个元素用指定函数f进行一
对多(这也是lat前缀的由来)的变换,然后将转换后的结果汇聚生成新RDD.
示例:
flatMap示例代码
scala>valrdd=sc,parallelize(0 to 3,1)//生成由0-3序列构成的RDD
rdd:org.apache,spark.rdd.RDD[Int]=ParallelCollectionRDD[17] at parallelize at:21
scala>val flatMappedRDD=rdd.flatMap(x=>0tox)//使用flatMap将每个原始变换为一个序列
flatMappedRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[18] at flatMap at:23
scala>flatMappedRDD.collect//显示新的RDD
res0:Array[Int]=Array(0,0,1,0,1,2,0,1,2,3)
2
3
4
5
6
7
8
9
10
11
# sample——抽样
算子函数格式:
sample(withReplacement:Boolean,fraction:Double,seed:Long):JavaRDD[T]
对原始RDD中的元素进行随机抽样,抽样后产生的元素集合构成新的RDD.
参数fraction 指定新集合中元素的数量占原始集合的比例.抽样时的随机数种子由seed指定.
参数withReplacement为false时,抽样方式为不放回抽样.
参数withReplacement为true时,抽样方式为放回抽样.
示例:
sample示例代码
1:scala>valrdd=sc.parallelize(0to9,1)//生成由0-9的序列构成的RDD
rdd:org.apache.spark.rdd.RDD [Int1=ParallelCollectionRDD[5]at parallelize at:21
2:scala>rdd.sample(false,0.5).collect//不放回抽样一半比例的元素生成新的RDD
res4:Array[Int]=Array(0,1,2,3,4,7)
3:rdd.sample(false,0.5).collect//再次不放回抽样一半比例的元素生成新的RDD
res7:Array [Int]=Array(0,1,3,6,8)
4:scala>rdd.sample(false,0.8).collect//不放回抽样80%比例的元素生成新的RDD
res8:Array[Int]=Array(0,1,2,5,6,8,9)
5:scala>rdd.sample(true,0.5).co1lect//放回抽样一半比例的元素生成新的RDD
res9:Array[Int]=Array(0,2,3,4,4,6,7,9)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# zip——联结
算子函数格式:
zip[U](other:JavaRDDLike[U,_]):JavapairRDD(T,U]
输入参数为另一个RDD,zip变换生成由原始RDD的值为Key、输入参数RDD的值为Value依次配对构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD.
示例:
zip示例代码
1:scala>val rdd1=sc.parallelize(0 to 4,1)//构建原始RDD
rdd_1:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[19]at parallelize at :21
2:scala>val rdd2=sc.parallelize(5 to 9,1)//构建输入参数RDD
rdd_2:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD [20]at parallelize at :21
3:scala>rdd_1.zip(rdd_2).collect//对两个RDD进行联结
res5:Array[(Int,Int)]=Array((0,5),(1,6),(2,7),(3,8),(4,9})
2
3
4
5
6
7
8
9
10
11
# mapValues——对Value值进行变换
算子函数格式:
mapValues[u](f:Function[v,U]):JavapairRDD[K,U]
将Key/Value型RDD中的每个元素的Value值,使用输入参数函数f进行变换,生成新的RDD.
示例:
1:scala>val pairs=sc.parallelize(List("apple","banana","berry","cherry","cumquat","haw"),1).keyBy(_.1ength)//构建原始RDD
pairs:org.apache.spark.rdd.RDDI(Int,String)]=MappedRDD[16]at keyBy at:12
2:scala>pairs.mapvalues(v=>v+""+V{0)).collect//生成将单词加单词首字母的RDD
res0:Array[(Int,string)]=Array{(5,apple a),(6,banana b),(5,berry b),(6,cherry c),7,cumquat c),(3,haw h))
2
3
4
5
6
7
# 二、行动Action算子
# 数据运算类行动算子
# reduce——Reduce操作
算子函数格式:
reduce(f:Function2[T,T,T]):T
对RDD中的每个元素依次使用指定的函数f进行运算,并输出最终的计算结果.
需要注意的是,Spark中的reduce操作与Hadoop中的reduce操作并不一样.在Hadoop中,reduce操作是将指定的函数作用在Key值相同的全部元素上.而Spark的reduce操作则是对所有元素依次进行相同的函数计算.
示例:
1:scala>val nums=sc.parallelize(0 to 9,5)//构建由数字0-9构成的RDD
nums:org,apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[18]at parallelize at:12
2:scala>nums.reduce(_+_)//计算RDD中所有数字的和
2
3
4
5
# collect——收集元素
算子函数格式:
co1lect():List[T]
collect的作用是以数组格式返回RDD内的所有元素.
示例:
1:scala>val data=sc.parallelize(List(1,2,3,4,5,6,7,8,9,0},2)//构建原始RDD
data;org.apache.spark.rdd.RDD[Int]=ParallelCo1lectionRDD[8]at parallelize at:12
2:scala>data.collect//显示原始RDD中的元素
res0:Array[Int]=Array(l,2,3,4,5,6,7,8,9,0)
2
3
4
5
6
7
# countByKey——按Key值统计Key/Value型RDD中的元素个数
算子函数格式:
countByKey():Map[K,Long]
计算Key/Value型RDD中每个Key值对应的元素个数,并以Map数据类型返回
统计结果.
示例:
1:scala>val pairRDD=sc.parallelize(List(("fruit","Apple"),("fruit","Banana"),{"fruit","Cherry "),{"vegetable","bean"),("vegetable","cucumber"),("vegetable","pepper")),2} //构建原始 RDD
pairRDD:org.apache.spark.rdd.RDD [(String,String)]=Paralle1 Collection RDD[3 Jat parallelize at :12
2:sca1a>pairRDD.countByKey //统计原始RDD中每个物品类型下的物品数量
res0:scala.collection.Map[String,Long]=Map(fruit->3,vegetable->3)
2
3
4
5
6
7
# countByValue——统计RDD中元素值出现的次数
算子函数格式:
countByValue():Map[T,Long]
计算RDD中每个元素的值出现的次数,并以Map数据类型返回统计结果.
countByValue示例代码
1:scala>val num=sc.parallelize(List(1,1,1,2,2,3),2)//构建原始RDD
num:org.apache.spark.rdd.RDD [Int]=ParallelcollectionRDD[4]at
parallelize at:12
2:scala>num.countByValue//统计原始RDD中每个数字出现的次数
res0:scala.collection.Map[Int,Long]=Map(2->2,1->3,3->1)
2
3
4
5
6
7
8
9
# foreach——逐个处理RDD元素
算子函数格式:
foreach(f:VoidFunction[(K,V)]):Unit
对RDD中的每个元素,使用参数f指定的函数进行处理.
示例:
1:scala>val words=sc.parallelize(List("A","B","C","D"),2)//构建原始 RDD
words;org.apache.spark.rdd,RDD[String]=ParallelCollectionRDD[9] at parallelize at :21
2:scala>words.foreach(x=>print1n(x+"is a letter."))/打印输出每个单词构造的一句话
Cis a letter.
Ais a letter.
Dis a letter.
Bis a letter.
2
3
4
5
6
7
8
9
10
11
12
13
# lookup——查找元素
算子函数格式:
lookup(key:K):List[V]
在Key/Value型的RDD中,查找与参数key相同Key值的元素,并得到这些元素
的Value值构成的序列.
示例:
1:scala>val pairs=sc.parallelize(List("apple","banana","berry","cherry","cumcquat","haw"),1).keyBy (_.1ength)//构建原始RDD
pairs:org.apache.spark.rdd.RDDt(Int,String)]=MapPartitionsRDD[13] at keyBy at:21
2:scala>pairs.collect
res18:Array [(Int,String)]=Array((5,apple),(6,banana),(5,berry),(6,cherry),(7,cumcuat),(3,haw))
3:scala>pairs.lookup(5)//查找长度为5的单词
res19:Seq[string]=WrappedArray (apple,berry)
2
3
4
5
6
7
8
9
10
11
# take——获取前n个元素
# takeSample——提取n个元素
# takeOrdered——获取排序后的前n个元素
# 存储型行动算子
# saveAsObjectFile——存储为二进制文件
算子函数格式:
saveAsobjectPile(path:string):Unit
将RDD转换为序列号对象后,以Hadoop SequenceFile文件格式保存,保存路径由
参数path指定.
示例:
1:scala>val data=sc.parallelize(0to9,1)//构建0-9组成的RDD
data:org.apache.spark.rdd.RDD[Int]=Paralle1CollectionRDD[40]at parallelize at :12
2:scala>data.saveAsobjectFile("obj")//将RDD以SequenceFile文件格式保存,文件名为obj
2
3
4
5
# saveAsTextFile——存储为文本文件
# saveAsNewAPIHadoopFile——存储为Hadoop文件
# 三、缓存算子
为了提高计算效率,Spark采用了两个重要机制:
①基于分布式内存数据集进行运算,也就是我们已经熟知的RDD;
②变换算子的惰性执行(Lazy Evaluation),即RDD的变换操作并不是在运行到该行代码时立即执行,而仅记录下转换操作的操作对象.只有当运行到一个行动算子代码时,变换操作的计算逻辑才真正执行.
这两个机制帮助Spark提高了运算效率,但正如'硬币都有两面'一样,在带来提升性能的好处的同时,这两个机制也留下了隐患.
例如:
①如果在计算过程中,需要反复使用某个RDD,而该RDD需要经过多次变换才能得到,则每次使用该RDD时都需要重复这些变换操作,这种运算效率是很低的;
②在计算过程中数据存放在内存中,如果出现参与计算的某个节点出现问题,则存放在该节点内存中的RDD数据会发生损坏.如果损坏的也是需要经过多次变换才能得到的RDD,此时虽然可以通过再次执行计算恢复该RDD,但仍然要付出很大的代价.因此,Spark提供了一类缓存算子,以帮助用户解决此类问题.
# cache——缓存RDD
算子函数格式:
cache():JavaRDD[T]
cache将RDD的数据持久化存储在内存中,其实现方法是使用后面我们会介绍的persist算子.当需要反复使用某RDD时,使用cache缓存后,可以直接从内存中读出,不再需要执行该RDD的变换过程.需要注意的是,这种缓存方式虽然可以提高再次使用某个RDD的效率,但由于cache后的数据仅仅存储在内存中,因此不能解决RDD出错时需要再次恢复运算的问题.而且cache保存的数据在Driver关闭后会被清除,因此不能被在其他Driver中启动的Spark程序使用.
示例:
1:scala>val num=sc.parallelize(0to9,1)//构建RDD
num:org,apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[7]at parallelize at:21
2:scala>val result=num.map(x=>x*x)//对原始RDD进行map变换
result:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD [8]at map at:23
3:scala>result.cache//对新RDD进行缓存
res19:result.type=MapPartitionsRDD[8 Jat map at :23
4:scala>result.count//统计新RDD中的元素个数
res30:Long=10
5:scala>result.collect().mkstring(',")//再次使用新RDD,生成用逗号分隔的序列
res31:String=0,1,4,9,16,25,36,49,64,81
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# checkpoint——建立RDD的检查点
算子函数格式:
checkpoint():Unit
对于需要很长时间才能计算出或者需要依赖很多其他RDD变化才能得到的RDD,如果在计算过程中出错,要从头恢复需要付出很大的代价.此时,可以利用checkpoint建立中间过程的检查点,Spark会将执行checkpoint操作的RDD持久化,以二进制文件的形式存放在指定的目录下.与cache不同的是,checkpoint保存的数据在Driver关闭后仍然以文件的形式存在,因此可以被其他Driver中的Spark程序使用.
示例:
1:scala>val rdd=sc.makeRDD(1to9,2)//构建原始RDD
rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]at makeRDD at :21
2:scala>val flatMapRDD=rdd.flatMap(x=>Seq(x,x))//对原始RDD做
flatMap变换
flatMapRDD:org.apache.spark.rdd.RDD [Int]=MappartitionsRDD[1]at flatMap at:23
3:scala>sc.setCheckpointDir("my_checkpoint")//指定checkpoint存放的目录
4:scala>flatMapRDD.checkpoint()//建立 checkpoint
5:scala>flatMapRDD.dependencies.head.rdd//显示变换后RDD的依赖
res2:org.apache.spark.rdd.RDD(_]=ParallelcollectionRDD[0]at makeRDD at:21
6:scala>flatMapRDD.collect()//显示变换后的RDD
res3:Array[Int]=Array(1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9)
7:scala>flatMapRDD.dependencies.head.rdd//再次显示变换后RDD的依赖
res4:org.apache.spark.rdd.RDD [_1=CheckpointRDD[2]at collect at:26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# persist——持久化RDD
算子函数格式:
persist(newLeve1:storageLeve1):JavaRDD[T]
调用persist可对RDD进行持久化操作,利用参数newlevel可以指定不同的持久化方式,常用的持久化方式包括:
MEMORY_ONLY:仅在内存中持久化,且将RDD作为非序列化的Java对象存储在JVM中.这种方式比较轻量,是默认的持久化方式.
MEMORY_ONLY_SER:仅在内存中持久化,且将RDD作为序列化的Java对象存储(每个分区一个byte数组).这种方式比MEMORY_ONLY方式要更加节省空间,但会耗费更多的CPU资源进行序列化操作.
MEMORY_ONLY_2:仅在内存中持久化,且将数据复制到集群的两个节点中.
MEMORY_AND_DISK:同时在内存和磁盘中持久化,且将RDD作为非序列化的Java对象存储.
MEMORY_AND_DISK_SER:同时在内存和磁盘中持久化,且将RDD作为序列化的Java对象存储.
MEMORY_AND_DISK_2:同时在内存和磁盘中持久化,且将数据复制到集群的两个节点中.
persist示例代码
1:scala>val num=sc.parallelize(0to9,1)//构建RDD
num:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]at parallelize at :12
2:scala>num.getStorageLeve1//显示RDD当前的持久化状态
res8:org.apache.spark.storage.Storagelevel=StorageLevel{false,false,false,false,1)
3:scala>num.persist()//使用persist进行默认的MEMORY_ONLY持久化
res9:num.type=ParallelCollectionRDD [5] at parallelize at:21
4:scala>num.getStorageLeve1//显示RDD新的持久化状态
res10:org,apache,spark,storage.StorageLevel=StorageLevel(false,true,false,true,1)
2
3
4
5
6
7
8
9
10
11
12
13
14
15