Spark笔记(2):Pair RDD与数据分区


这一章节我们来看一下在Spark常用来进行聚合操作的Pair RDD,其实类似于字典,由key-value对构成,同样的,Pair RDD也有很多的操作接口,比如reduceByKey()、join(),下面会逐一对介绍,很多语言的语法都是相同的,这里的聚合操作就类似于SQL中的group by或者python中的groupby,所以领会起来也不难。

然后再看一下数据分区相关的内容,数据分区对分布式集群上跑数据来说及其重要,一个小的优化就会极大的降低时间成本和内存开销,这一块也是写Spark Job过程中需要重点关注的。

一、Pair RDD创建

1
2
3
4
5
6
在python中使用第一个单词作为键创建出一个pair RDD
%pyspark
lines = sc.parallelize(['I have a dream','hello world'])
lines.map(lambda x: (x.split(' ')[0],x)).take(10)

[('I', 'I have a dream'), ('hello', 'hello world')]

二、Pair RDD转化操作

  • 聚合操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# reduceByKey()  以key聚合对value进行操作
rdd = sc.parallelize([(1,2),(3,4),(3,6)])
rdd.reduceByKey(lambda x,y:x+y)
[(1,2),(3,10)]


# mapValues() 对pair的values进行操作
rdd = sc.parallelize([(panda,0),(pink,3),(pirate,3),(panda,1),(pink,4)])
rdd.mapvalues(lambda x:(x,1)).reduceByKey(lambda x,y:())
[(panda,(1,2)),(pink,(7,2)),(pirate,(3,1))]



# 单词计数
rdd = sc.parallelize(['stay hungry','stay foolish'])
words = rdd.flatMap(lambda x:x.split(" ")) # 分隔单词,铺展开来
result = words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y) # mapreduce 实现计数
[("stay",2),("hungry",1),("foolish",1)]


# combineByKey() 对每个键求对应的均值
nums = sc.parallelize([(panda,0),(pink,3),(pirate,3),(panda,1),(pink,4)])
sumcount = nums.combineByKey(
(lambda x:(x,1))
(lambda x,y:(x[0]+y,x[1]+1))
(lambda x,y:(x[0]+x[1],y[0],y[1])))
.map(lambda x,y:(x,y[0]/y[1]))

[("panda",0),("pink",3.5),("pirate",3)]
  • 数据分组
1
2
3
4
5
# groupByKey() 以key聚合进行分组 
rdd = sc.parallelize([(1,2),(3,4),(3,6)])
rdd.groupByKey()

[(1,[2]),(3,[4,6])]
  • 连接

内连接 => join()
左连接 => leftOuterJoin()
右连接 => rightOuterJoin()

  • 排序
1
2
# 在python中以字符串顺序对整数进行升序排序
rdd.sortedByKey(ascending=True, numPartitions=None, keyfunc = lambda x:str(x))

三、Pair RDD行动操作

上一节说到的行动操作都适用于Pair RDD,此外Pair RDD还有以下行动操作:

1
2
3
4
5
6
7
8
# 对每个键对应的元素分别计数
rdd = sc.parallelize([(1,1),(3,4),(3,6)])
rdd.countByKey()
[(1,1),(3,2)]

# 返回给定健所对应的所有值
rdd.lookup(3)
[4,6]

四、数据分区

在执行以上的聚合或分组操作时,可以给定Spark的分区数,每一个RDD都有固定给定数目的分区数,Spark会根据集群大小推断有意义的默认值,当然我们也可以对并行度进行调优来获取更好的性能表现。

1
2
3
4
# 在python中自定义reduceByKey()的并行度
data = sc.parallelize([("a",3),("b",4),("a",1)])
data.reduceByKey(lambda x,y:x+y) ## 默认并行度
data.reduceByKey(lambda x,y:x+y,10) ## 自定义并行度为10

coalesce和repartition

此外Spark还提供了repartition()函数,以用于在分组和聚合操作之外改变分区,repartition()函数会先把数据通过网络进行混洗,创建新的分区集合。但这样网络开销会很大,coalesce()函数正是对此做了优化。我们可以通过rdd.getNumPartitions查看RDD分区数。

coalesce()函数中有两个传入参数,coalesce(numPartitions,shuffle),其中numPartitions为指定分区数,shuffle为是否进行shuffle,默认为false,若numPartitions大于原有的分区数,必须指定shuffle=True;但避免进行shuffle可以节省网络开销。

repartition()函数只有一个传入参数,repartition(numPartitions),因为它指定了shuffle为True。

combineByKey()分区数据处理过程

combineByKey是Spark中一个比较核心的高级函数,groupByKey、reduceByKey的底层都是使用combineByKey实现的,我们来看一下combineByKey()是如何处理分区数据的。


这个数据流图中出现了三个函数createCombiner、mergeValue、mergeCombiners,分别看一下概念:

createCombiner: combineByKey()会遍历分区中的所有元素,因此每个元素的键要么新出现,要么之前遇到过。若是一个新元素, combineByKey()会使用一个createCombiner() 函数创建那个键对应的累加器的初始值。

mergeValue: 如果键在之前遇到过,可以使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。

mergeCombiners: 每个分区独立操作,所以对于同一个键可有多个累加器。若多个分区都有同一个键的累加器,就需要用mergeCombiners() 将各个分区的结果合并。

这样就很好理解了,整个combineByKey的过程就是在不同的分区上执行类似的操作,遇到新键,执行createCombiner,遇到已存在的键,执行mergeValue,最终对所有分区执行mergeCombiners。

数据分区优化

Spark程序可以通过控制RDD分区方式来减少通信开销。举个具体的例子,下面这段scala代码计算了查阅自己订阅主题页面的用户数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val sc.new SparkContext(...)

val userData = sc.sequenceFile[UserID,UserInfo]("hdfs://...").persist() //UserID用户ID,UserInfo用户订阅的主题,类似于("Mike",List("sports","math")这样的元素


def processNewLogs(logFileName:String){
val events = sc.sequenceFile[UserID,LinkInfo](logFileName) # 用户访问情况,元素类似于("Mike","sports")
userData.persist()
val joined = userData.join(events)
val results = joined.filter({
case (id, (info, link)) =>
info.contains(link)
}
).count()
println('Number of visits to subscribed topics:'+results)
}

但这里有个问题就是,每次调用processNewLogs时,会有一个join操作,会将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。假如userdata表很大很大,而且几乎是不怎么变化的,那么每次都对userdata表进行哈希值计算和跨节点的数据混洗,就会产生很多的额外开销。这个过程的join操作如下:

如何解决网络开销的问题呢?可以再程序开始时,对userdata表使用partitionBy()转化操作,将这张表转为哈希分区。具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val sc.new SparkContext(...)

val userData = sc.sequenceFile[UserID,UserInfo]("hdfs://...").partitionBy(new HashPartitioner).persist() // UserID用户ID,UserInfo用户订阅的主题,类似于("Mike",List("sports","math")这样的元素


def processNewLogs(logFileName:String){
val events = sc.sequenceFile[UserID,LinkInfo](logFileName) # 用户访问情况,元素类似于("Mike","sports")
userData.persist()
val joined = userData.join(events)
val results = joined.filter({
case (id, (info, link)) =>
info.contains(link)
}
).count()
println('Number of visits to subscribed topics:'+results)
}

构建userData时调用了partitionBy(),在调用join()时,Spark只会对events进行数据混洗操作,将events中特定UserID的记录发送到userData的对应分区所在的那台机器上。这样,通过网络传输的数据就大大减少,程序运行速度也可以显著提升。partitionBy()是一个转化操作,因此它的返回值是一个新的RDD。还有一点要注意,这里必须要持久化才可以在后面用到RDD时不重复分区操作。

scala可以使用RDD的partitioner属性来获取RDD的分区方式,它会返回一个scala.Option对象。

可以从数据分区中获益的操作有cogroup() , groupWith() , join() , leftOuterJoin() , rightOuterJoin() , groupByKey() , reduceByKey() , combineByKey()以及lookup()。

参考资料:《Spark快速大数据分析》

分享到