spark的使用
spark是一款分布式的計算框架,用于調度成百上千的服務器集群。
安裝pyspark
# os.environ['PYSPARK_PYTHON']='解析器路徑' pyspark_python配置解析器路徑
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
pip install pyspark # 原始國外安裝
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark #網址安裝
java安裝
前置安裝軟件java包
java官網下載地址
一鍵下一步安裝,配置環境變量
首先創建一個JAVA_HOME的全局變量然后在path中通過%%引入執行下面的bin 路徑%JAVA_HOME%\bin
執行成功
from pyspark import SparkConf,SparkContext# 創建sparkConf 類對象
conf= SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 創建sparkConf類對象創建sparkContext對象
sc =SparkContext(conf=conf)
# 打印pySpark的運行腳本
print(sc.version)
# 停止sparkContext對象的運行(停止pySpark程序)
sc.stop()
PySpark的數據計算,都是基于RDD對象來進行的,RDD對象內置豐富的:成員方法(算子)
map算子
功能:map算子,是將RDD的數據一條條處理,處理的邏輯基于map算子中接收的處理函數,返回新的RDD語法:
# 簡單執行map將數據乘以10返回,如果不引入python解析器的路徑引入就會報錯,
from pyspark import SparkConf, SparkContext
# 指定spark的python解析器路徑
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 創建sparkConf 類對象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 創建sparkConf類對象創建sparkContext對象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])def func(data):return data * 10# map傳入一個參數有返回值,是函數或者是值
rdd2 = rdd.map(func)
print(rdd2.collect())
flatMap
flatMap跟map差不多就是在最后做了一個解除嵌套的功能
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 創建sparkConf 類對象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 創建sparkConf類對象創建sparkContext對象
sc = SparkContext(conf=conf)rdd = sc.parallelize(['中石科技 時間還復活甲 如今房價','慰問金 咖啡機 姐夫哥','格很高 客服管家二惡烷 可歸結為'])rdd2 = rdd.flatMap(lambda x:x.split(' '))print(rdd2.collect())
map的結果
reduceByKey
reduceByKey對數據進行分組可以兩兩計算
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:/dev/python/python3.11.4/python.exe"
# 創建sparkConf 類對象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 創建sparkConf類對象創建sparkContext對象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('男', 11), ('男', 22), ('女', 21), ('男', 31), ('女', 99)])
# 把男女進行分組value值進行計算
rdd2 = rdd.reduceByKey(lambda a, b:a+b)print(rdd2.collect()) # [('女', 120), ('男', 64)]
reduce
與reduce的區別就是沒有進行分組
take
取出前幾個數據
...
rdd = sc.parallelize([1,2,3,4,5]).take(3) # [1,2,3]
count
計算rdd中的數據個數
filter
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)rdd=sc.parallelize([1,2,3,4,5])rdd2=rdd.filter(lambda a:a%2==0)
print(rdd2.collect()) # [2,4]
distinct
進行數據去重
from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)add= sc.parallelize([1,2,3,4,5,6,73,3,2,4,56,3,5])add2=add.distinct()
print(add2.collect()) # [56, 1, 73, 2, 3, 4, 5, 6]
sortBy排序
from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python3.11.4/python.exe'conf = SparkConf().setMaster('local[*]').setAppName('test_spark')
sc = SparkContext(conf=conf)add = sc.textFile('D:/wordText.txt')word_rdd = add.flatMap(lambda x: x.split(' '))
word_with_rdd = word_rdd.map(lambda word: (word, 1))
result_rdd =word_with_rdd.reduceByKey(lambda a,b:a+b)
result_num=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1) # 1.根據什么排序,2.True 升序 False降序 3.分布式分區
print(result_num.collect())
collect
將rdd內容變成list,從而就可以打印出來