目錄
- 一、淺語
- 二、三種數據處理方式比較
- 2.1 RDD
- 2.2 DataFrame
- 2.3 Spark SQL
- 三、三種方法的創建方式
- 3.1 創建RDD
- 3.2 創建DataFrame
- 3.2.1 創建sqlContext
- 3.2.2 定義Schema
- 3.2.3 創建DataFrame
- 3.3 創建SparkSQL
- 3.3.1 登錄臨時表
- 3.3.2 使用sparkSQL
- 四、三種方法顯示部分字段
- 4.1 使用RDD選取顯示部分字段
- 4.2 使用DataFrame選取顯示字段
- 4.2.1 select方法輸入[字段名]
- 4.2.2 select方法輸入[dataframe名稱].[字段名]
- 4.2.3 select方法輸入[dataframe別名].[字段名]
- 4.2.4 通過dataframe中括號方式
- 4.3 使用SparkSQL選取顯示字段
- 五、三種方法增加計算字段
- 5.1 RDD增加計算字段
- 5.2 DataFrame增加計算字段
- 5.3 SparkSQL增加計算字段
- 六、三種方法篩選數據
- 6.1 RDD篩選數據
- 6.2 DataFrame篩選數據
- 6.2.1 使用多個filter
- 6.2.2 使用單個filter
- 6.2.3 使用[dataframe 名稱].[字段名]指定條件
- 6.2.4 使用[]指定篩選條件
- 6.3 SparkSQL篩選數據
- 七、三種方法按單個字段給數據排序
- 7.1 RDD:takeOrdered方法
- 7.2 DataFrame
- 7.3 SQparkSQL
- 八、三種方法按多個字段排序
- 8.1 RDD
- 8.2 DataFrame
- 8.3 SparkSQL
- 九、三種方法顯示不重復的數據
- 9.1 RDD
- 9.2 DataFrame
- 9.3 SparkSQL
- 十、三種方法分組統計數據
- 10.1 RDD:map/reduce
- 10.2 DataFrame
- 10.2.1 crosstab:長表變寬表
- 10.3 SparkSQL
- 參考資料
一、淺語
上一篇(《pyspark RDD相關常用函數使用案例》) 對pyspark的一些常用函數做了梳理,這篇主要是針對RDD、DataFrame、SparkSql三種實現同一功能需要的方式做一梳理,通過實際動手,體會不同方式在數據處理過程中的差異性、便利性。
import findspark
findspark.init()
from pyspark.sql import SparkSession# 創建 Spark 會話
spark = SparkSession.builder \.appName("Test PySpark") \.master("local[*]") \.getOrCreate()
sc=spark.sparkContext
sc.master
'local[*]'
二、三種數據處理方式比較
2.1 RDD
- rdd的數據</font color-“green”>只能使用位置來指定每一個字段。
- rdd功能最強,能完成所有spark功能。
2.2 DataFrame
- Spark DataFrame被創建時</font color=“green”>必須定義Schema,定義每個字段名和數據類型。
- 定義了很多類似SQL的方法
- 比起RDD更容易使用
2.3 Spark SQL
- 有DataFrame派生出來,所以</font color=“green”>必須先創建DataFrame,進而轉化使用。
- 最簡單
三、三種方法的創建方式
3.1 創建RDD
# 讀取本地數據文件
mRDD = sc.textFile("test.txt")
print('查看數據的行數:',mRDD.count())
# 按照空格分隔數據字段
sRDD = mRDD.map(lambda x:x.split(' '))
print('查看分隔后的結果:',sRDD.collect())
查看數據的行數: 8
查看分隔后的結果: [['yellow', '1', 'F'], ['blue', '2', 'M'], ['yellow', '3', 'F'], ['black', '4', 'F'], ['red', '5', 'M'], ['red', '5', 'M'], ['blue', '3', 'M'], ['blue', '7', 'M']]
3.2 創建DataFrame
3.2.1 創建sqlContext
sqlContext = SparkSession.builder.getOrCreate()
3.2.2 定義Schema
from pyspark.sql import Row
sRows = sRDD.map(lambda x:Row(color=x[0],num = int(x[1]),sex = x[2]))
# 查看schema
sRows.collect()
[Row(color='yellow', num=1, sex='F'),Row(color='blue', num=2, sex='M'),Row(color='yellow', num=3, sex='F'),Row(color='black', num=4, sex='F'),Row(color='red', num=5, sex='M'),Row(color='red', num=5, sex='M'),Row(color='blue', num=3, sex='M'),Row(color='blue', num=7, sex='M')]
3.2.3 創建DataFrame
# 創建DataFrame?
df = sqlContext.createDataFrame(sRows)
# 使用.printSchema()查看DataFrame的schema
df.printSchema()
root|-- color: string (nullable = true)|-- num: long (nullable = true)|-- sex: string (nullable = true)
# 查看數據
df.show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 3| M|
| blue| 7| M|
+------+---+---+
### 為DataFrame創建別名
dataf = df.alias('dataf')
dataf.show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 3| M|
| blue| 7| M|
+------+---+---+
3.3 創建SparkSQL
3.3.1 登錄臨時表
dataf.registerTempTable('temp_tb')
D:\bigdataenv\spark-3.5.0-bin-hadoop3\python\pyspark\sql\dataframe.py:329: FutureWarning: Deprecated in 2.0, use createOrReplaceTempView instead.warnings.warn("Deprecated in 2.0, use createOrReplaceTempView instead.", FutureWarning)
3.3.2 使用sparkSQL
查看數據使用show()方法,默認顯示前20行數據
sqlContext.sql('select * from temp_tb').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 3| M|
| blue| 7| M|
+------+---+---+
四、三種方法顯示部分字段
4.1 使用RDD選取顯示部分字段
rdd = sRDD.map(lambda x:(x[0],x[2],x[1]))
rdd.take(2)
[('yellow', 'F', '1'), ('blue', 'M', '2')]
4.2 使用DataFrame選取顯示字段
下面四種方法顯示的結果相同。
4.2.1 select方法輸入[字段名]
df.select('color','sex').show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.2.2 select方法輸入[dataframe名稱].[字段名]
df.select(df.color,df.sex).show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.2.3 select方法輸入[dataframe別名].[字段名]
dataf.select(dataf.color,dataf.sex).show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.2.4 通過dataframe中括號方式
df[df['color'],df['sex']].show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
4.3 使用SparkSQL選取顯示字段
sqlContext.sql('select color,sex from temp_tb').show()
+------+---+
| color|sex|
+------+---+
|yellow| F|
| blue| M|
|yellow| F|
| black| F|
| red| M|
| red| M|
| blue| M|
| blue| M|
+------+---+
五、三種方法增加計算字段
5.1 RDD增加計算字段
sRDD.map(lambda x:(x[0],x[1],x[2],10-int(x[1]))).collect()
[('yellow', '1', 'F', 9),('blue', '2', 'M', 8),('yellow', '3', 'F', 7),('black', '4', 'F', 6),('red', '5', 'M', 5),('red', '5', 'M', 5),('blue', '3', 'M', 7),('blue', '7', 'M', 3)]
5.2 DataFrame增加計算字段
df.select('color','num','sex',10-df['num']).show()
+------+---+---+----------+
| color|num|sex|(10 - num)|
+------+---+---+----------+
|yellow| 1| F| 9|
| blue| 2| M| 8|
|yellow| 3| F| 7|
| black| 4| F| 6|
| red| 5| M| 5|
| red| 5| M| 5|
| blue| 3| M| 7|
| blue| 7| M| 3|
+------+---+---+----------+
# 為計算字段取一個別名
df.select('color','num','sex',(10-df['num']).alias('diff_num')).show()
+------+---+---+--------+
| color|num|sex|diff_num|
+------+---+---+--------+
|yellow| 1| F| 9|
| blue| 2| M| 8|
|yellow| 3| F| 7|
| black| 4| F| 6|
| red| 5| M| 5|
| red| 5| M| 5|
| blue| 3| M| 7|
| blue| 7| M| 3|
+------+---+---+--------+
5.3 SparkSQL增加計算字段
sqlContext.sql('select color,num,sex,10-num as diff_num from temp_tb').show()
+------+---+---+--------+
| color|num|sex|diff_num|
+------+---+---+--------+
|yellow| 1| F| 9|
| blue| 2| M| 8|
|yellow| 3| F| 7|
| black| 4| F| 6|
| red| 5| M| 5|
| red| 5| M| 5|
| blue| 3| M| 7|
| blue| 7| M| 3|
+------+---+---+--------+
六、三種方法篩選數據
6.1 RDD篩選數據
sRDD.filter(lambda x:int(x[1])>2).collect()
[['yellow', '3', 'F'],['black', '4', 'F'],['red', '5', 'M'],['red', '5', 'M'],['blue', '3', 'M'],['blue', '7', 'M']]
6.2 DataFrame篩選數據
四種篩選方式,執行結果相同。
6.2.1 使用多個filter
df.filter("color='blue'").filter('num=2').show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.2.2 使用單個filter
df.filter("color='blue' and num=2 ").show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.2.3 使用[dataframe 名稱].[字段名]指定條件
注意:
- 必須使用“&”,不能使用“and”
- 必須使用“==”,不能使用“=”
df.filter((df.color=='blue') & (df.num==2)).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.2.4 使用[]指定篩選條件
df.filter((df['color']=='blue') & (df['num']==2)).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
6.3 SparkSQL篩選數據
sqlContext.sql("""select * from temp_tb where color='blue' and num=2 """).show()
+-----+---+---+
|color|num|sex|
+-----+---+---+
| blue| 2| M|
+-----+---+---+
七、三種方法按單個字段給數據排序
7.1 RDD:takeOrdered方法
takeOrdered(num,key=None):
- num:要顯示的項數
- key:使用lambda語句設置要排序的字段
# 升序示例
sRDD.takeOrdered(3,key=lambda x:int(x[1]))
[['yellow', '1', 'F'], ['blue', '2', 'M'], ['yellow', '3', 'F']]
# 降序示例
sRDD.takeOrdered(3,key=lambda x: -1 * int(x[1]))
[['blue', '7', 'M'], ['red', '5', 'M'], ['red', '5', 'M']]
7.2 DataFrame
# 升序
df.orderBy('num').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
| blue| 3| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 7| M|
+------+---+---+
# 降序
df.orderBy('num',ascending=0).show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
7.3 SQparkSQL
# 升序
sqlContext.sql('select * from temp_tb order by num').show()
+------+---+---+
| color|num|sex|
+------+---+---+
|yellow| 1| F|
| blue| 2| M|
| blue| 3| M|
|yellow| 3| F|
| black| 4| F|
| red| 5| M|
| red| 5| M|
| blue| 7| M|
+------+---+---+
# 降序sqlContext.sql('select * from temp_tb order by num desc').show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
八、三種方法按多個字段排序
8.1 RDD
# 先num降序,color升序
sRDD.takeOrdered(3,key = lambda x: (-1*x[1],x[0]))
[['black', '4', 'F'], ['blue', '2', 'M'], ['blue', '3', 'M']]
8.2 DataFrame
df.orderBy(['num','color'],ascending=[0,1]).show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
df.orderBy(df.num.desc(),df.color).show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
8.3 SparkSQL
sqlContext.sql("select * from temp_tb order by num desc , color ").show()
+------+---+---+
| color|num|sex|
+------+---+---+
| blue| 7| M|
| red| 5| M|
| red| 5| M|
| black| 4| F|
| blue| 3| M|
|yellow| 3| F|
| blue| 2| M|
|yellow| 1| F|
+------+---+---+
九、三種方法顯示不重復的數據
9.1 RDD
sRDD.map(lambda x:x[2]).distinct().collect()
['F', 'M']
sRDD.map(lambda x:(x[1],x[2])).distinct().collect()
[('3', 'F'),('4', 'F'),('5', 'M'),('3', 'M'),('7', 'M'),('1', 'F'),('2', 'M')]
9.2 DataFrame
df.select('sex').distinct().show()
+---+
|sex|
+---+
| F|
| M|
+---+
df.select('num','sex').distinct().show()
+---+---+
|num|sex|
+---+---+
| 2| M|
| 3| F|
| 4| F|
| 1| F|
| 5| M|
| 3| M|
| 7| M|
+---+---+
9.3 SparkSQL
sqlContext.sql("select distinct sex from temp_tb").show()
+---+
|sex|
+---+
| F|
| M|
+---+
sqlContext.sql("select distinct num,sex from temp_tb").show()
+---+---+
|num|sex|
+---+---+
| 2| M|
| 3| F|
| 4| F|
| 1| F|
| 5| M|
| 3| M|
| 7| M|
+---+---+
十、三種方法分組統計數據
10.1 RDD:map/reduce
在RDD中進行數據的分組統計,必須使用map/reduce
# 單字段:eg:按照sex分組統計
sRDD.map(lambda x:(x[2],int(x[1]))).reduceByKey(lambda x,y:x+y).collect()
[('F', 8), ('M', 22)]
# 多字段
sRDD.map(lambda x:((x[2],x[0]),int(x[1]))).reduceByKey(lambda x,y:x+y).collect()
[(('F', 'yellow'), 4),(('M', 'blue'), 12),(('F', 'black'), 4),(('M', 'red'), 10)]
10.2 DataFrame
df.select(['sex','num']).groupBy('sex').sum().show()
+---+--------+
|sex|sum(num)|
+---+--------+
| F| 8|
| M| 22|
+---+--------+
df.select(['sex','color','num']).groupBy(['sex','color']).sum().orderBy(['sex','color']).show()
+---+------+--------+
|sex| color|sum(num)|
+---+------+--------+
| F| black| 4|
| F|yellow| 4|
| M| blue| 12|
| M| red| 10|
+---+------+--------+
10.2.1 crosstab:長表變寬表
df.crosstab('color','sex').show()
+---------+---+---+
|color_sex| F| M|
+---------+---+---+
| yellow| 2| 0|
| red| 0| 2|
| black| 1| 0|
| blue| 0| 3|
+---------+---+---+
10.3 SparkSQL
sqlContext.sql('select sex,sum(num) from temp_tb group by sex').show()
+---+--------+
|sex|sum(num)|
+---+--------+
| F| 8|
| M| 22|
+---+--------+
參考資料
《Python+Spark 2.0+Hadoop機器學習與大數據實戰》, 林大貴,清華大學出版社,2017-12,9787302490739
