目錄
四,常用PairRDD的轉換操作
五,緩存操作
四,常用PairRDD的轉換操作
PairRDD指的是數據為長度為2的tuple類似(k,v)結構的數據類型的RDD,其每個數據的第一個元素被當做key,第二個元素被當做value.
reduceByKey
#reduceByKey對相同的key對應的values應用二元歸并操作
rdd = sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])
rdd.reduceByKey(lambda x,y:x+y).collect()[('hello', 4), ('world', 7)]
groupByKey
#groupByKey將相同的key對應的values收集成一個Iterator
rdd = sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])
rdd.groupByKey().collect()[('hello', <pyspark.resultiterable.ResultIterable at 0x119c6ae48>),('world', <pyspark.resultiterable.ResultIterable at 0x119c6a860>)]
sortByKey
#sortByKey按照key排序,可以指定是否降序
rdd = sc.parallelize([("hello",1),("world",2),("China",3),("Beijing",5)])
rdd.sortByKey(False).collect()[('world', 2), ('hello', 1), ('China', 3), ('Beijing', 5)]
join
#join相當于根據key進行內連接
age = sc.parallelize([("LiLei",18),("HanMeiMei",16),("Jim",20)])
gender = sc.parallelize([("LiLei","male"),("HanMeiMei","female"),("Lucy","female")])
age.join(gender).collect()[('LiLei', (18, 'male')), ('HanMeiMei', (16, 'female'))]
leftOuterJoin和rightOuterJoin
#leftOuterJoin相當于關系表的左連接age = sc.parallelize([("LiLei",18),("HanMeiMei",16)])
gender = sc.parallelize([("LiLei","male"),("HanMeiMei","female"),("Lucy","female")])
age.leftOuterJoin(gender).collect()[('LiLei', (18, 'male')), ('HanMeiMei', (16, 'female'))]#rightOuterJoin相當于關系表的右連接
age = sc.parallelize([("LiLei",18),("HanMeiMei",16),("Jim",20)])
gender = sc.parallelize([("LiLei","male"),("HanMeiMei","female")])
age.rightOuterJoin(gender).collect()[('LiLei', (18, 'male')), ('HanMeiMei', (16, 'female'))]
cogroup
#cogroup相當于對兩個輸入分別goupByKey然后再對結果進行groupByKeyx = sc.parallelize([("a",1),("b",2),("a",3)])
y = sc.parallelize([("a",2),("b",3),("b",5)])result = x.cogroup(y).collect()
print(result)
print(list(result[0][1][0]))[('a', (<pyspark.resultiterable.ResultIterable object at 0x119c6acc0>, <pyspark.resultiterable.ResultIterable object at 0x119c6aba8>)), ('b', (<pyspark.resultiterable.ResultIterable object at 0x119c6a978>, <pyspark.resultiterable.ResultIterable object at 0x119c6a940>))]
[1, 3]
subtractByKey
#subtractByKey去除x中那些key也在y中的元素x = sc.parallelize([("a",1),("b",2),("c",3)])
y = sc.parallelize([("a",2),("b",(1,2))])x.subtractByKey(y).collect()[('c', 3)]
foldByKey
#foldByKey的操作和reduceByKey類似,但是要提供一個初始值
x = sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
x.foldByKey(1,lambda x,y:x*y).collect()[('a', 3), ('b', 10)]
五,緩存操作
如果一個rdd被多個任務用作中間量,那么對其進行cache緩存到內存中對加快計算會非常有幫助。
聲明對一個rdd進行cache后,該rdd不會被立即緩存,而是等到它第一次被計算出來時才進行緩存。
可以使用persist明確指定存儲級別,常用的存儲級別是MEMORY_ONLY和EMORY_AND_DISK。
如果一個RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執行的。
緩存數據不會切斷血緣依賴關系,這是因為緩存數據某些分區所在的節點有可能會有故障,例如內存溢出或者節點損壞。
這時候可以根據血緣關系重新計算這個分區的數據。
#cache緩存到內存中,使用存儲級別 MEMORY_ONLY。
#MEMORY_ONLY意味著如果內存存儲不下,放棄存儲其余部分,需要時重新計算。
a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_aprint(mean_a)#persist緩存到內存或磁盤中,默認使用存儲級別MEMORY_AND_DISK
#MEMORY_AND_DISK意味著如果內存存儲不下,其余部分存儲到磁盤中。
#persist可以指定其它存儲級別,cache相當于persist(MEMORY_ONLY)
from pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_aa.unpersist() #立即釋放緩存
print(mean_a)