目錄
?編輯
一,創建RDD
二,常用Action操作
三,常用Transformation操作
一,創建RDD
創建RDD主要有兩種方式,一個是textFile加載本地或者集群文件系統中的數據,
第二個是用parallelize方法將Driver中的數據結構并行化成RDD。
#從本地文件系統中加載數據
file = "./data/hello.txt"
rdd = sc.textFile(file,3)
rdd.collect()['hello world','hello spark','spark love jupyter','spark love pandas','spark love sql']#從集群文件系統中加載數據
#file = "hdfs://localhost:9000/user/hadoop/data.txt"
#也可以省去hdfs://localhost:9000
#rdd = sc.textFile(file,3)#parallelize將Driver中的數據結構生成RDD,第二個參數指定分區數
rdd = sc.parallelize(range(1,11),2)
rdd.collect()[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
二,常用Action操作
Action操作將觸發基于RDD依賴關系的計算。
collect
rdd = sc.parallelize(range(10),5)
#collect操作將數據匯集到Driver,數據過大時有超內存風險
all_data = rdd.collect()
all_data[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
take
#take操作將前若干個數據匯集到Driver,相比collect安全
rdd = sc.parallelize(range(10),5)
part_data = rdd.take(4)
part_data[0, 1, 2, 3]
takeSample
#takeSample可以隨機取若干個到Driver,第一個參數設置是否放回抽樣
rdd = sc.parallelize(range(10),5)
sample_data = rdd.takeSample(False,10,0)
sample_data[7, 8, 1, 5, 3, 4, 2, 0, 9, 6]
first
#first取第一個數據
rdd = sc.parallelize(range(10),5)
first_data = rdd.first()
print(first_data)0
count
#count查看RDD元素數量
rdd = sc.parallelize(range(10),5)
data_count = rdd.count()
print(data_count)10
reduce
#reduce利用二元函數對數據進行規約
rdd = sc.parallelize(range(10),5)
rdd.reduce(lambda x,y:x+y)45
foreach
#foreach對每一個元素執行某種操作,不生成新的RDD
#累加器用法詳見共享變量
rdd = sc.parallelize(range(10),5)
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)45
countByKey
#countByKey對Pair RDD按key統計數量
pairRdd = sc.parallelize([(1,1),(1,4),(3,9),(2,16)])
pairRdd.countByKey()defaultdict(int, {1: 2, 3: 1, 2: 1})
saveAsTextFile
#saveAsTextFile保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)#重新讀入會被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()['2', '3', '4', '1', '0']
三,常用Transformation操作
Transformation轉換操作具有懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關系,只有當Action操作觸發到該依賴的時候,它才被計算。
map
#map操作對每個元素進行一個映射轉換
rdd = sc.parallelize(range(10),3)
rdd.collect()[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]rdd.map(lambda x:x**2).collect()[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
filter
#filter應用過濾條件過濾掉一些數據
rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()[6, 7, 8, 9]
flatMap
#flatMap操作執行將每個元素生成一個Array后壓平
rdd = sc.parallelize(["hello world","hello China"])
rdd.map(lambda x:x.split(" ")).collect()[['hello', 'world'], ['hello', 'China']]rdd.flatMap(lambda x:x.split(" ")).collect()['hello', 'world', 'hello', 'China']
sample
#sample對原rdd在每個分區按照比例進行抽樣,第一個參數設置是否可以重復抽樣
rdd = sc.parallelize(range(10),1)
rdd.sample(False,0.5,0).collect()[1, 4, 9]
distinct
#distinct去重
rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()[4, 1, 5, 2, 3]
subtract
#subtract找到屬于前一個rdd而不屬于后一個rdd的元素
a = sc.parallelize(range(10))
b = sc.parallelize(range(5,15))
a.subtract(b).collect()[0, 1, 2, 3, 4]
union
#union合并數據
a = sc.parallelize(range(5))
b = sc.parallelize(range(3,8))
a.union(b).collect()[0, 1, 2, 3, 4, 3, 4, 5, 6, 7]
intersection
#intersection求交集
a = sc.parallelize(range(1,6))
b = sc.parallelize(range(3,9))
a.intersection(b).collect()[3, 4, 5]
cartesian
#cartesian笛卡爾積
boys = sc.parallelize(["LiLei","Tom"])
girls = sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()[('LiLei', 'HanMeiMei'),('LiLei', 'Lily'),('Tom', 'HanMeiMei'),('Tom', 'Lily')]
sortBy
#按照某種方式進行排序
#指定按照第3個元素大小進行排序
rdd = sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda x:x[2]).collect()[(4, 1, 1), (3, 2, 2), (1, 2, 3)]
zip
#按照拉鏈方式連接兩個RDD,效果類似python的zip函數
#需要兩個RDD具有相同的分區,每個分區元素數量相同rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age = sc.parallelize([19,18,20])rdd_zip = rdd_name.zip(rdd_age)
print(rdd_zip.collect())[('LiLei', 19), ('Hanmeimei', 18), ('Lily', 20)]
zipWithIndex
#將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
rdd_name = sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())[('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]