Spark學習(Pyspark)

(1)Spark基礎入門

①什么是Spark

Spark是一款分布式內存計算的統一分析引擎。其特點就是對任意類型的數據進行自定義計算。Spark可以計算:結構化、半結構化、非結構化等各種類型的數據結構,同時也支持使用Python、Java、Scala、R以及SQL語言去開發應用程序計算數據。Spark的適用面非常廣泛,所以,被稱之為 統一的(適用面廣)的分析引擎(數據處理)
在這里插入圖片描述
RDD 是一種分布式內存抽象,其使得程序員能夠在大規模集群中做內存運算,并且有一定的容錯方式。而這也是整個 Spark 的核心數據結構,Spark 整個平臺都圍繞著RDD進行。可以說RDD就是一種數據結構抽象
RDD(Resilient Distributed Dataset),是指彈性分布式數據集。
①數據集:Spark中的編程是基于RDD的,將原始數據加載到內存變成RDD,RDD再經過若干次轉化,仍為RDD
②分布式:讀數據一般都是從分布式系統中去讀,如hdfs、kafka等,所以原始文件存在磁盤是分布式的,spark加載完數據的RDD也是分布式的,換句話說RDD是抽象的概念,實際數據仍在分布式文件系統中;因為有了RDD,在開發代碼過程會非常方便,只需要將原始數據理解為一個集合,然后對集合進行操作即可

②Spark VS Hadoop(MapReduce)

在這里插入圖片描述

③Spark 框架模塊

在這里插入圖片描述

④Spark的架構角色

在這里插入圖片描述

⑤Spark本地運行環境

"Local"指的是本地運行模式,即在單個機器上(而非集群)運行Spark應用程序的開發測試模式。這種模式允許開發者在沒有分布式環境的情況下快速測試和調試Spark代碼
在這里插入圖片描述
在這里插入圖片描述
這種模式下面有三種運行方式:
在這里插入圖片描述

①以下是基于Pyspark的測試:
在這里插入圖片描述
②以下是spark-submit的測試代碼:

代碼格式為:./spark-submit [可選的一些選項] jar包或者python代碼的路徑 [代碼的參數] 
示例如下:./spark-submit /opt/spark/examples/src/main/python/pi.py 10

在這里插入圖片描述
在這里插入圖片描述
以下是總結:
①Local模式的運行原理:Local模式就是以一個獨立進程配合其內部線程來提供完成Spark運行時環境. Local模式可以通過spark-shell/pyspark/spark-submit等來開啟
②bin/pyspark是什么程序:**是一個交互式的解釋器執行環境,環境啟動后就得到了一個Local Spark環境,**可以運行Python代碼去進行Spark計算,類似Python自帶解釋器
③Spark的4040端口是什么:Spark的任務在運行后,會在Driver所在機器綁定到4040端口,提供當前任務的監控頁面供查看

(2)SparkCore

①RDD詳解

分布式計算涉及到了以下步驟:
(1)分區控制:分區控制是將大規模數據集劃分為多個邏輯分片(Partition),以便分布式并行處理。通過合理分區(如按Key哈希、范圍劃分等),可確保數據均勻分布,避免傾斜問題。例如,Spark的RDD、Flink的KeyedStream都依賴分區實現并行計算,分區數通常與并行度掛鉤
(2)Shuffle控制:Shuffle是跨節點數據重分布的過程,涉及數據的洗牌和重組。例如,在Reduce或Join操作時,相同Key的數據需聚合到同一節點,例如Spark的reduceByKey、Flink的keyBy都會觸發Shuffle。
(3)數據存儲\序列化\發送:分布式計算中,數據需高效存儲(如HDFS、內存)、序列化(如Kryo、Protobuf)和跨節點傳輸。序列化影響網絡開銷,需權衡速度與體積;存儲格式(如Parquet)影響I/O效率。數據傳輸依賴RPC或消息隊列(如Netty),需考慮容錯與流量控制。
(4)數據計算API:計算API提供分布式操作的編程接口(如Map、Reduce、Join),隱藏底層復雜性。API設計需兼顧表達能力與性能,如Spark的RDD,Flink的DataStream。
這些功能, 不能簡單的通過Python內置的本地集合對象(如 List\ 字典等)去完成.我們在分布式框架中, 需要有一個統一的數據抽象對象, 來實現上述分布式計算所需功能.這個抽象對象, 就是RDD
在這里插入圖片描述

在這里插入圖片描述
RDD有5大特性:
①RDD是具有分區的。 ②RDD的方法會作用在所有分區上面。 ③RDD之間有互相依賴的關系
④Key-Value型的RDD可以有分區器 :只有PairRDD(元素是鍵值對(K, V)的RDD)才能顯式指定分區器,因為分區邏輯通常基于Key的哈希或范圍(如HashPartitioner或RangePartitioner)。非Key-Value型RDD(如普通數組)的分區是簡單的輪詢或隨機分配,無需依賴Key的規則
⑤RDD所在的分區規劃會盡量靠近服務器以實現本地存儲
以下是以WordCount案例為核心分析RDD

在這里插入圖片描述
在這里插入圖片描述

(2)RDD 編程入門

(1)程序執行入口 SparkContext對象:Spark RDD 編程的程序入口對象是SparkContext對象from pyspark import SparkConf, SparkContextconf = SparkConf().setAppName("WordCount").setMaster("spark://hadoop102:7077")  # 前者是任務名字修改為你的 Master URLsc = SparkContext(conf=conf)(2)創建RDD的方法1 直接從集合對象創建rdd = sparkcontext.parallelize(參數1,參數2)# 參數1 集合對象即可,比如lis    參數2 分區數data =[123456789]rdd =sc.parallelize(data,numSlices=3)rdd.getNumPartitions()        #獲得分區數2 從文件當中讀取resultRDD = sc.textFile("hdfs://hadoop102:8020/input/article.txt") (3)RDD算子1 Transformation算子:返回的還是rdd,參數是函數式接口rdd = sc.parallelize([1, 2, 3, 4, 5])# 1. map: 對每個元素應用函數mapped = rdd.map(lambda x: x * 2)  # 輸出: [2, 4, 6, 8, 10]# 2. flatMap: 先映射后扁平化(如拆分單詞)words = sc.parallelize(["hello world", "spark demo"])flattened = words.flatMap(lambda x: x.split(" "))  # 輸出: ["hello", "world", "spark", "demo"]# 3. filter: 過濾滿足條件的元素filtered = rdd.filter(lambda x: x > 3)  # 輸出: [4, 5]# 4. distinct: 去重distinct = sc.parallelize([1, 2, 2, 3]).distinct()  # 輸出: [1, 2, 3]# 5. sample: 隨機采樣sampled = rdd.sample(withReplacement=False, fraction=0.5)  # 無放回采樣50%數據接下來是對Key-value類型的RDD操作pair_rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])# 6. mapValues: 僅對Value映射upper_values = pair_rdd.mapValues(lambda v: v * 10)  # 輸出: [("a", 10), ("b", 20), ("a", 30)]# 7. reduceByKey: 按Key聚合(優化版groupByKey)sum_by_key = pair_rdd.reduceByKey(lambda a, b: a + b)  # 輸出: [("a", 4), ("b", 2)]# 8. groupByKey: 按Key分組(可能OOM)grouped = pair_rdd.groupByKey()  # 輸出: [("a", [1, 3]), ("b", [2])]# 9. sortByKey: 按Key排序sorted_rdd = pair_rdd.sortByKey(ascending=False)  # 降序輸出: [("b", 2), ("a", 1), ("a", 3)]# 10. keys: 提取所有Keykeys = pair_rdd.keys()  # 輸出: ["a", "b", "a"]# 11. values: 提取所有Valuevalues = pair_rdd.values()  # 輸出: [1, 2, 3]多RDD操作rdd1 = sc.parallelize([1, 2, 3])rdd2 = sc.parallelize([3, 4, 5])# 12. union: 合并兩個RDD(不去重)union_rdd = rdd1.union(rdd2)  # 輸出: [1, 2, 3, 3, 4, 5]# 13. intersection: 返回交集intersection_rdd = rdd1.intersection(rdd2)  # 輸出: [3]# 14. subtract: 返回rdd1有但rdd2沒有的元素subtracted = rdd1.subtract(rdd2)  # 輸出: [1, 2]# 15. cartesian: 笛卡爾積(慎用!)cartesian_rdd = rdd1.cartesian(rdd2)  # 輸出: [(1,3), (1,4), ..., (3,5)]分區/重分區控制# 16. repartition: 全局重分區(全量Shuffle)repartitioned = rdd.repartition(4)  # 強制分成4個分區# 17. coalesce: 合并分區(減少分區,避免Shuffle)coalesced = rdd.coalesce(2)  # 合并為2個分區# 18. partitionBy: 按分區器重分(僅PairRDD)from pyspark.rdd import HashPartitionerpartitioned = pair_rdd.partitionBy(HashPartitioner(3))  # 按Key哈希分到3個分區2 Action算子:1. 數據收集與輸出rdd = sc.parallelize([1, 2, 3, 4, 5])# 1. collect(): 返回RDD所有元素到Driver(小心數據量!)result = rdd.collect()  # 輸出: [1, 2, 3, 4, 5]# 2. take(n): 返回前n個元素first_two = rdd.take(2)  # 輸出: [1, 2]# 3. first(): 返回第一個元素first_elem = rdd.first()  # 輸出: 1# 4. takeSample(withReplacement, num, seed): 隨機采樣sampled = rdd.takeSample(False, 3)  # 無放回隨機取3個元素,如 [2, 4, 1]# 5. takeOrdered(n, key=None): 按升序或自定義key返回前n個ordered = rdd.takeOrdered(3)  # 輸出: [1, 2, 3]2. 統計與聚合# 6. count(): 返回RDD元素總數count = rdd.count()  # 輸出: 5# 7. countByKey(): 統計每個Key的出現次數(僅PairRDD)pair_rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])key_counts = pair_rdd.countByKey()  # 輸出: {"a": 2, "b": 1}# 8. countByValue(): 統計每個值的出現次數value_counts = rdd.countByValue()  # 輸出: {1:1, 2:1, 3:1, 4:1, 5:1}# 9. reduce(func): 用func聚合所有元素(需滿足結合律)sum_all = rdd.reduce(lambda a, b: a + b)  # 輸出: 15 (1+2+3+4+5)# 10. fold(zeroValue, func): 類似reduce,但需初始值sum_with_zero = rdd.fold(0, lambda a, b: a + b)  # 輸出: 15# 11. aggregate(zeroValue, seqOp, combOp): 自定義聚合agg_result = rdd.aggregate((0, 0), lambda acc, x: (acc[0] + x, acc[1] + 1),  # 分區內累加和計數lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])  # 分區間合并)  # 輸出: (15, 5) 總和和總個數3. 數據保存到外部存儲# 12. saveAsTextFile(path): 保存為文本文件(每行一個元素)rdd.saveAsTextFile("output/")  # 生成目錄,內含part-00000等文件# 13. saveAsSequenceFile(path): 保存為SequenceFile(僅PairRDD)pair_rdd.saveAsSequenceFile("seq_output/")# 14. saveAsHadoopFile/pickleFile等: 其他格式保存(需配置)4. 迭代與調試# 15. foreach(func): 對每個元素應用func(無返回值)rdd.foreach(lambda x: print(x))  # 打印每個元素(在Executor端執行)# 16. foreachPartition(func): 對每個分區應用funcdef log_partition(iterator):print("Partition data:", list(iterator))rdd.foreachPartition(log_partition)  # 按分區打印數據

Reduce算子的圖片:
在這里插入圖片描述

(3)SparkSQL

①什么是SparkSQL

SparkSQL是非常成熟的 海量結構化數據處理框架。SparkSQL本身十分優秀, 支持SQL語言\性能強\可以自動優化\API簡單\兼容HIVE等等。企業大面積在使用SparkSQL處理業務數據。例如:離線開發,數倉搭建,科學計算,數據分析。SparkSQL具有以下特點:
在這里插入圖片描述
以下是SparkSQL和Hive的對比:
在這里插入圖片描述
Spark的數據抽象如下:
在這里插入圖片描述
在這里插入圖片描述
DataFrame 是按照二維表格的形式存儲數據,RDD則是存儲對象本身。具體對比如下:
在這里插入圖片描述
在RDD階段,程序的執行入口對象是: SparkContext
在Spark 2.0后,推出了SparkSession對象,作為Spark編碼的統一入口對象。SparkSession對象可以:

  • 用于SparkSQL編程作為入口對象
  • 用于SparkCore編程,可以通過SparkSession對象中獲取到SparkContext
  • 在這里插入圖片描述
    以下是代碼:
# coding:utf8
# SparkSQL 中的入口對象是SparkSession對象
from pyspark.sql import SparkSession
if __name__ == '__main__':
# 構建SparkSession對象, 這個對象是 構建器模式 通過builder方法來構建spark = SparkSession.builder.\appName("local[*]").\config("spark.sql.shuffle.partitions", "4").\getOrCreate()
# appName 設置程序名稱, config設置一些常用屬性
# 最后通過getOrCreate()方法 創建SparkSession對象

(2)DataFrame入門

以下是DataFrame的總體結構:
在這里插入圖片描述

①以下是Dataframe的構建方式:

df = spark.createDataFrame(rdd, schema = ['name', 'age']) 
#直接通過SparkSession對象的createDataFrame方法來將RDD轉換為DataFrame:# StructType 類這個類 可以定義整個DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
df = spark.createDataFrame(rdd, schema)
# 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add, 每一個add代表一個StructField# add方法: 參數1: 列名稱, 參數2: 列類型, 參數3: 是否允許為空#調用RDD的todf方法轉變
df = rdd.toDF(['id', 'subject', 'score'])    #列名
df = rdd.toDF(schema)                        #傳入schema#基于自定義的pandas的DF,構建Pandas的DF,而后把將Pandas的DF對象轉換成Spark的DF
pdf = pd.DataFrame({
"id": [1, 2, 3],
"name": ["張大仙", '王曉曉', '王大錘'],"age": [11, 11, 11]
})
df = spark.createDataFrame(pdf)#sparksession讀取外部數據
sparksession.read.format("text|csv|json|parquet|orc|avro|jdbc|......")
.option("K", "V") # option可選
.schema(StructType | String) 
.load("被讀取文件的路徑, 支持本地文件系統和HDFS")示例代碼如下:
1 text文件
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text")\
.schema(schema)\
.load("../data/sql/people.txt")2 json文件
df = spark.read.format("json").\
load("../data/sql/people.json")
# JSON 類型 一般不用寫.schema, json自帶, json帶有列名 和列類型(字符串和數字)df.printSchema()3 csv文件
df = spark.read.format("csv")\
.option("sep", ";")\ # 列分隔符
.option("header", False)\ # 是否有CSV標頭
.option("encoding", "utf-8")\ # 編碼
.schema("name STRING, age INT, job STRING")\ # 指定列名和類型
.load("../data/sql/people.csv") # 路徑4 parquet數據
# parquet 自帶schema, 直接load啥也不需要了
df = spark.read.format("parquet").\
load("../data/sql/users.parquet")

②以下是DataFrame的基本操作方式
DSL:領域特定語言。其實就是指DataFrame的特有API比如:df.where().limit()
SQL風格就是使用SQL語句處理DataFrame的數據比如:spark.sql(“SELECT * FROM xxx)

(1)DSL語法風格
# 1. 基本操作示例
df.show()  # 展示 DataFrame 的前20行數據# 打印 DataFrame 的 schema
df.printSchema()  # 以樹形結構打印 DataFrame 的列名和數據類型
columns = df.columns  # 獲取 DataFrame 的所有列名列表
dtypes = df.dtypes  # 獲取列名和對應數據類型的元組列表
schema = df.schema  # 獲取 DataFrame 的完整 schema 信息
row_count = df.count()  # 計算 DataFrame 中的總行數
stats = df.describe()  # 對數值列計算 count, mean, stddev, min, max# 2. 列操作示例selected = df.select("name", "age")  # 只選擇 name 和 age 兩列
new_df = df.withColumn("age_plus_10", col("age") + 10)  # 創建新列 age_plus_10
renamed_df = df.withColumnRenamed("old_name", "new_name")  # 將 old_name 列改名為 new_name
dropped_df = df.drop("unwanted_column")  # 刪除 unwanted_column 列# 3. 過濾和排序示例filtered_df = df.filter(col("age") > 18)  # 篩選 age 大于18的行
distinct_df = df.distinct()  # 去除完全相同的重復行
sorted_df = df.orderBy("age", ascending=False)  # 按 age 降序排列
limited_df = df.limit(100)  # 只返回前100行數據# 4. 聚合操作示例
grouped_df = df.groupBy("department")  # 按 department 列分組
agg_df = df.agg(avg("salary").alias("avg_salary"))  # 計算 salary 列的平均值
count_df = df.groupBy("department").count()  # 計算每個部門的記錄數
sum_df = df.groupBy("product").sum("sales")  # 計算每個產品的總銷售額
avg_df = df.groupBy("class").avg("score")  # 計算每個班級的平均分
max_df = df.groupBy("year").max("temperature")  # 獲取每年的最高溫度
min_df = df.groupBy("month").min("price")  # 獲取每月的最低價格
pivot_df = df.groupBy("date").pivot("category").sum("value")  # 創建按日期和類別的透視表# 5. 連接操作示例
## 內連接
joined_df = df1.join(df2, df1.id == df2.id, "inner")  # 基于 id 列的內連接,還有其他連接
union_df = df1.union(df2)  # 垂直合并兩個結構相同的 DataFrame
union_by_name_df = df1.unionByName(df2)  # 按列名合并,允許列順序不同# 6. 窗口函數示例
## 定義窗口分區
window_spec = Window.partitionBy("department").orderBy("salary")  # 按部門分區并按工資排序
ranked_df = df.withColumn("rank", rank().over(window_spec))  # 計算每個部門內的工資排名
lag_df = df.withColumn("prev_salary", lag("salary").over(window_spec))  # 獲取前一行的工資值# 7. 缺失值處理示例
## 填充缺失值
filled_df = df.na.fill(0)  # 將所有空值填充為0
dropped_na_df = df.na.drop()  # 刪除包含任何空值的行
replaced_df = df.na.replace(["old_value"], ["new_value"], "column_name")  # 替換指定列中的特定值# 8. 數據類型轉換示例
casted_df = df.withColumn("age_int", col("age").cast("integer"))  # 將 age 列轉為整數類型# 9. 寫入操作示例
df.write.csv("output.csv")  # 將 DataFrame 寫入 CSV 文件
df.write.parquet("output.parquet")  # 將 DataFrame 寫入 Parquet 文件# 10. 其他常用操作示例
cached_df = df.cache()  # 將 DataFrame 緩存到內存中
repartitioned_df = df.repartition(10)  # 將數據重新分區為10個分區
coalesced_df = df.coalesce(2)  # 將分區數減少到2個
df.explain()  # 打印 DataFrame 的執行計劃
sampled_df = df.sample(0.1)  # 隨機采樣10%的數據
data = df.collect()  # 將所有數據收集到驅動程序節點
pandas_df = df.toPandas()  # 將 Spark DataFrame 轉為 pandas DataFrame(2)SQL風格:DataFrame的一個強大之處就是我們可以將它看作是一個關系型數據表,
然后可以通過在程序中使用spark.sql() 來執行SQL語句查詢,結果返回一個DataFrame
如果想使用SQL風格的語法,需要將DataFrame注冊成表,采用如下的方式:
df.createTempView("score") # 注冊一個臨時視圖(表)
df.create0rReplaceTempView(“score") #注冊一個臨時表,如果存在進行替換
df.createGlobalTempView("score")# 注冊一個全局表
以下是示例:SQL風格處理, 以RDD為基礎做數據加載
#注冊sparktext和sparksession對象
spark = SparkSession.builder.\appName("create df").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext#讀取數據
rdd = sc.textFile("hdfs://node1:8020/input/words.txt").\
flatMap(lambda x: x.split(" ")).\
map(lambda x: [x])
df = rdd.toDF(["word"]
df.createTempView("words")# 使用sql語句處理df注冊的表
spark.sql("""
SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC
""").show()(3)數據清洗API:
#去重API dropDuplicates,無參數是對數據進行整體去重
df.dropDuplicates().show()
# API 同樣可以針對字段進行去重,如下傳入age字段,表示只要年齡一樣 就認為你是重復數據
df.dropDuplicates(['age','job']).show()# 如果有缺失,進行數據刪除
#無參數 為 how=any執行,只要有一個列是null 數據整行刪除,如果填入how='all'表示全部列為空 才會刪除,how參數默認是a
df.dropna().show()
#指定閥值進行刪除,thresh=3表示,有效的列最少有3個,這行數據才保留設定thresh后,how參數無效了
df.dropna(thresh=3).show()# 可以指定閥值 以及配合指定列進行工作,thresh=2,subset=['name','age'〕表示 針對這2個列,有效列最少為2個才保留數據。
df.dropna(thresh=2,subset=['name','age']).show()# 將所有的空,按照你指定的值進行填充,不理會列的任何空都被填充
df.fillna("loss").show()
#指定列進行填充
df.fillna("loss",subset=['job']).show()
# 給定字典 設定各個列的填充規則
df.fillna({"name":"未知姓名""age":1"job":"worker"}).show()(4)DataFrame寫出:
df.write.mode().format().option(K,V).save(PATH)
# mode,傳入模式字符串可選:append 追加,overwrite 覆蓋,ignore 忽略,error 重復就報異常
# format,傳入格式字符串,可選:text,csV,json,parquet,orc,avro,jdbc
# 注意text源只支持單列df寫出
# option 設置屬性,如:.option("sep",",")r
# save 寫出的路徑,支持本地文件和HDFS下面是例子:
# Write text 寫出,只能寫出一個單列數據
df.select(F.concat_ws("_--","user id","movie_id", "rank", "ts")).\
write.\
mode("overwrite").\
format("text").\
save("../data/output/sql/text")# WriteCSV寫出
df.write.mode("overwrite").\
format("csv").\
option("sep",",").\
option("header",True).\
save("../data/output/sql/csv")#Write Json寫出
df.write.mode("overwrite").\
format("json").\
save("../data/output/sql/json")#Write Parquet 寫出
df.write.mode("overwrite").\format("parquet").\
save("../data/output/sql/parquet")#不給format,默認以 parquet寫出
df.write.mode("overwrite").save("../data/output/sql/default")(5)DataFrame 通過JDBC讀寫數據庫
# 寫DF通過JDBC到數據庫中
df.write.mode("overwrite").\
format("jdbc").\
option("url","jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true").\
option("dbtable","u data").\
option("user""root").\
option("password""123456").\
save()#從數據庫里面通過JDBC加載數據
df = spark.read.format("jdbc').\
option("url","jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true").\
option("dbtable","u data").\
option("user""root").\
option("password",“123456").\
load()

(3)Spark函數定義

①UDF函數
在這里插入圖片描述
在這里插入圖片描述
以下是代碼:

方法1
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
# 創建SparkSession
spark = SparkSession.builder.appName("UDF_Example").getOrCreate()
# 定義普通Python函數
def add_numbers(x, y):return x + y# 注冊UDF - 方式1
add_udf = spark.udf.register("add_udf_name", add_numbers, IntegerType())
# 使用SQL風格
spark.sql("CREATE TEMPORARY VIEW numbers AS SELECT 5 as num1, 10 as num2")
spark.sql("SELECT add_udf_name(num1, num2) as result FROM numbers").show()# 使用DSL風格
from pyspark.sql import functions as F
df = spark.createDataFrame([(5, 10)], ["num1", "num2"])
df.select(add_udf(F.col("num1"), F.col("num2")).alias("result")).show()方法2
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType# 創建SparkSession
spark = SparkSession.builder.appName("UDF_Example").getOrCreate()# 定義普通Python函數
def greet(name):return f"Hello, {name}!"# 注冊UDF - 方式2
greet_udf = F.udf(greet, StringType())# 使用DSL風格
df = spark.createDataFrame([("Alice",), ("Bob",)], ["name"])
df.select(greet_udf(F.col("name")).alias("greeting")).show()

②SparkSQL的窗口函數:PySpark的窗口函數(Window Functions)是一種在數據集的特定窗口范圍內執行計算的高級函數,它能在保留原始數據行的同時,對分組內的數據進行排序、聚合和分析。窗口函數的核心作用是為每一行計算基于其"窗口"(由PARTITION BY定義的分組和ORDER BY定義的排序)的衍生值,例如排名、累計和、移動平均等。與普通聚合函數不同,窗口函數不會減少數據行數,而是為每行添加新的計算列,保持原始數據的完整性。
在這里插入圖片描述
PS:一個添加列的函數
在這里插入圖片描述
代碼如下:

from pyspark.sql import functions as F
# 添加常量列
df = df.withColumn("country", F.lit("China"))# 基于現有列計算
df = df.withColumn("total_price", F.col("price") * F.col("quantity"))# 將salary列轉換為千元單位
df = df.withColumn("salary", F.col("salary") / 1000)# 修改數據類型
df = df.withColumn("age", F.col("age").cast("integer"))# 添加部門內薪水排名
window_spec = Window.partitionBy("department").orderBy(F.desc("salary"))
df = df.withColumn("dept_rank", F.rank().over(window_spec))df = df.withColumn("salary_level", F.when(F.col("salary") > 10000, "high").when(F.col("salary") > 5000, "medium").otherwise("low"))

以下是窗口函數的應用:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType# 初始化SparkSession
spark = SparkSession.builder \.appName("WindowFunctionsDemo") \.getOrCreate()# 創建示例數據
data = [("Sales", "John", 5000, "2023-01-15"),("Sales", "Mike", 4500, "2023-01-10"),("Sales", "Lisa", 6000, "2023-01-20"),("IT", "Tom", 5500, "2023-01-12"),("IT", "Emma", 7000, "2023-01-18"),("IT", "Alex", 5200, "2023-01-05"),("HR", "Sarah", 4800, "2023-01-08"),("HR", "David", 5300, "2023-01-22")
]schema = StructType([StructField("department", StringType(), True),StructField("employee", StringType(), True),StructField("salary", IntegerType(), True),StructField("date", StringType(), True)
])df = spark.createDataFrame(data, schema)# 定義窗口規范 - 按部門分區并按薪水排序
window_spec_by_salary = Window.partitionBy("department").orderBy("salary")# 定義窗口規范 - 按部門分區并按日期排序
window_spec_by_date = Window.partitionBy("department").orderBy("date")# 定義窗口規范 - 按部門分區(不排序)
window_spec_partition_only = Window.partitionBy("department")# 定義行范圍窗口 - 當前行及其前后1行
window_spec_rows = Window.partitionBy("department").orderBy("salary").rowsBetween(-1, 1)# 定義無界窗口 - 從分區開始到當前行
window_spec_unbounded = Window.partitionBy("department").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)# ====================== 排名函數 ======================# 1. ROW_NUMBER() - 為每行分配唯一序號(相同值不同序號)
df = df.withColumn("row_num", F.row_number().over(window_spec_by_salary))# 2. RANK() - 為行分配排名(相同值相同排名,留空位)
df = df.withColumn("rank", F.rank().over(window_spec_by_salary))# 3. DENSE_RANK() - 為行分配排名(相同值相同排名,不留空位)
df = df.withColumn("dense_rank", F.dense_rank().over(window_spec_by_salary))# 4. PERCENT_RANK() - 計算行的相對排名(0到1之間)
df = df.withColumn("percent_rank", F.percent_rank().over(window_spec_by_salary))# 5. NTILE(n) - 將行分成n組并分配組號
df = df.withColumn("ntile_4", F.ntile(4).over(window_spec_by_salary))  # 分成4組# ====================== 分析函數 ======================# 6. LAG(col, offset, default) - 獲取前offset行的值
df = df.withColumn("prev_salary", F.lag("salary", 1).over(window_spec_by_date))# 7. LEAD(col, offset, default) - 獲取后offset行的值
df = df.withColumn("next_salary", F.lead("salary", 1).over(window_spec_by_date))# 8. FIRST_VALUE(col) - 獲取窗口中的第一個值
df = df.withColumn("first_salary", F.first_value("salary").over(window_spec_by_date))# 9. LAST_VALUE(col) - 獲取窗口中的最后一個值
df = df.withColumn("last_salary", F.last_value("salary").over(window_spec_by_date))# 10. CUME_DIST() - 計算行的累積分布(0到1之間)
df = df.withColumn("cume_dist", F.cume_dist().over(window_spec_by_salary))# ====================== 聚合函數 ======================# 11. SUM() OVER - 計算窗口內總和
df = df.withColumn("sum_salary", F.sum("salary").over(window_spec_partition_only))# 12. AVG() OVER - 計算窗口內平均值
df = df.withColumn("avg_salary", F.avg("salary").over(window_spec_partition_only))# 13. MIN() OVER - 找出窗口內最小值
df = df.withColumn("min_salary", F.min("salary").over(window_spec_partition_only))# 14. MAX() OVER - 找出窗口內最大值
df = df.withColumn("max_salary", F.max("salary").over(window_spec_partition_only))# 15. COUNT() OVER - 計算窗口內行數
df = df.withColumn("count_emp", F.count("employee").over(window_spec_partition_only))# ====================== 高級窗口函數 ======================# 16. 移動平均 - 當前行及其前后1行
df = df.withColumn("moving_avg", F.avg("salary").over(window_spec_rows))# 17. 累計總和 - 從分區開始到當前行
df = df.withColumn("running_total", F.sum("salary").over(window_spec_unbounded))# 18. 按值范圍計算 - 當前值±500的范圍
window_spec_range = Window.partitionBy("department").orderBy("salary").rangeBetween(-500, 500)
df = df.withColumn("range_avg", F.avg("salary").over(window_spec_range))# 顯示結果
df.orderBy("department", "salary").show(truncate=False)# 停止SparkSession
spark.stop()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/95685.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/95685.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/95685.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

PDF壓縮原理詳解:如何在不失真的前提下減小文件體積?

與直接刪除內容不同,良好的PDF壓縮能在大幅減小體積的同時,較好地保留原有文字清晰度和圖像質量,兼顧實用性與視覺效果。軟件操作十分直觀,僅需設置輸入文件與輸出路徑,點擊【開始壓縮】按鈕即可啟動處理。畫質壓縮等級…

從應用場景看國產化FPGA潛力,紫光同創研討會武漢·北京站回顧

八月,紫光同創 FPGA 技術研討會先后在武漢、北京舉行。作為紫光同創官方合作伙伴,ALINX 攜紫光同創 FPGA 開發板及行業解決方案亮相,與來自通信、工業控制、醫療、圖像視頻、消費電子等領域的近 200 位行業專家齊聚一堂,通過主題演…

安卓APK包體優化全攻略

目錄 正常默認打包流程(以Android平臺為例) 查看編輯器打包日志 壓縮圖片 壓縮網格模型 壓縮貼圖 壓縮音頻文件 只打64位包 最終大小 正常默認打包流程(以Android平臺為例) 準備工作: 確保已安裝最新版Unity H…

嵌入式學習日記(28)進程、線程

回收資源空間子進程回收策略1、wait阻塞回收:一般情況下父進程專門負責回收2、waitpid非阻塞回收:搭配輪詢方式回收3、不回收:子進程任務一致執行4、異步回收:子進程結束后通知父進程進行回收exec 函數族三種調用外部程序的方式#i…

測試用例的一些事項

為什么要寫測試用例?寫測試用例的原因是為了避免遺漏測試,我們要根據給的文檔將邏輯都表達出來,不能因為簡單而不寫,日后版本更新就知道自己哪些測了哪些沒測。在沒有文檔的時候測試用例該怎么寫?大家可以考慮安全測試…

當Java遇見AI:飛算驅動的個人博客介紹智能生成風暴

一、飛算JavaAI:重新定義個人開發的"智能魔法棒" 1.1 開發者需求變革:從"技術門檻"到"創意優先"的時代 在數字化浪潮席卷全球的今天,個人品牌建設已成為技術從業者、創業者乃至學生的剛需——無論是程序員分享…

小程序排名優化:用戶行為數據背后的提升密碼

用戶在小程序中的每一次點擊、每一次停留、每一次分享,都在產生著有價值的數據。這些看似零散的用戶行為數據,其實隱藏著提升小程序排名的密碼。平臺在判定小程序排名時,用戶行為數據是重要的參考依據,因為它直接反映了小程序對用…

【DSP28335 入門教程】深度解析中斷系統:三級架構與響應機制

大家好,歡迎來到我們的 DSP28335 深度解析系列。在之前的實戰中,我們通過 while(1) 循環和延時函數實現了各種控制,這種方式被稱為輪詢。但輪詢就像一個焦急的門衛,需要不停地去檢查每個門口是否有人,既浪費精力又效率…

代碼隨想錄二刷之“字符串”~GO

1.344. 反轉字符串 - 力扣&#xff08;LeetCode&#xff09; func reverseString(s []byte) {left : 0right : len(s)-1for left < right{s[left],s[right] s[right],s[left]leftright--}return } 感悟&#xff1a;還是go語法熟練程度的問題&#xff0c;需要注意的是&am…

(!萬字血書!)文本預處理:NLP 版 “給數據洗澡” 指南

好吧&#xff0c;我承認我是個標題黨&#xff01;(不這樣你會點進來享受這篇 通俗易懂 的好文章嗎&#xff1f;) 正經標題&#xff1a;文本預處理全流程:從基礎到實踐 &#xff08;屏幕前的你&#xff0c;帥氣低調有內涵&#xff0c;美麗大方很優雅… 所以&#xff0c;求…

最新chrome瀏覽器elasticsearch-head無法安裝使用問題

chrome瀏覽器網址欄復制粘貼以下內容輸入回車 chrome://flags/#allow-legacy-mv2-extensions 找到Allow legacy extension manifest versions項右側選擇Enabled啟用&#xff0c;重啟瀏覽器即可。

CSS aspect-ratio 屬性

aspect-ratio 是 CSS 中用于控制元素寬高比的屬性&#xff0c;通過一行代碼即可實現響應式比例布局&#xff0c;無需復雜計算。它確保元素在不同屏幕尺寸下保持固定比例&#xff0c;提升響應式設計效率。一、基本語法與取值selector {aspect-ratio: <width> / <height…

FreeRTOS多核支持

個人博客&#xff1a;blogs.wurp.top 簡介 1. 多核支持概述 在傳統的單核系統中&#xff0c;FreeRTOS 通常運行在一個 CPU 核心上&#xff0c;負責任務調度、中斷處理和資源管理。然而&#xff0c;在多核系統中&#xff0c;多個核心可以并行執行不同的任務或線程&#xff0c…

CUDA中的基本概念

要學習cuda的同學相信已經對其有一定的了解了&#xff0c;至少直到它是干什么的了。這篇文章主要是對cuda編程中的主要概念進行總結&#xff0c;有了一個大致的輪廓后就好入手了。 異構架構 異構架構即使用CPU和GPU共同進行計算。GPU不能作為一個獨立的運行平臺&#xff08;程序…

【LINUX網絡】HTTP協議基本結構、搭建自己的HTTP簡單服務器

目錄 1. 初識HTTP 2. URL 2.1 基本結構 2.2 URL中的?與urldecode\urlencode 易混淆&#xff1a;URL和HTTP傳輸請求兩者是什么關系&#xff1f; HTTP的宏觀結構 3. DEMO CODE loop模塊&#xff0c;核心邏輯 HttpServer 初代版本&#xff08;DEMO 0.0&#xff09; DEMO 1.0 DEMO…

Spring Boot 靜態函數無法自動注入 Bean?深入解析與解決方案

在 Spring Boot 項目中&#xff0c;開發者常遇到一個典型問題&#xff1a;在靜態方法或靜態變量中嘗試使用 Autowired 注入 Bean 時&#xff0c;始終得到 null 值。本文將深入剖析這一問題的根源&#xff0c;并提供多種可靠解決方案。問題重現&#xff1a;為什么注入失敗&#…

存儲過程作為系統邏輯核心的架構思考 —— 以 SaaS 系統為例

在企業級系統尤其是 SaaS 架構中&#xff0c;技術選型一旦確定&#xff0c;就意味著底層數據庫類型基本不會輕易更換。既然如此&#xff0c;我們可以更大膽地將數據庫能力本身納入系統設計的核心&#xff0c;而不僅僅把它當成一個被動的存儲引擎。存儲過程&#xff08;Stored P…

Ubuntu20.04下Remmina的VNC密碼忘記后重置

你遇到的錯誤&#xff1a; ** error creating password: /home/ysc/.vnc/passwd storepasswd: No such file or directory說明&#xff1a;x11vnc -storepasswd 無法創建密碼文件&#xff0c;因為 .vnc 目錄不存在。 雖然你可能以為路徑是對的&#xff0c;但系統找不到 /home/y…

從“存得對”到“存得準”:MySQL 數據類型與約束全景指南

目錄 一、為什么需要數據類型與約束&#xff1f; 二、MySQL 數據類型全覽 1. 數值類型&#xff1a;精確 VS 近似 2. 日期時間類型&#xff1a;別讓“0000-00-00”出現 3. 字符串類型&#xff1a;CHAR、VARCHAR、TEXT、BLOB 4. JSON 類型&#xff1a;文檔與關系共舞 5. 空…

Effective C++ 條款42:了解 typename 的雙重含義

Effective C 條款42&#xff1a;了解typename的雙重含義 核心思想&#xff1a;在模板聲明中&#xff0c;typename和class可互換使用&#xff0c;但在模板內部&#xff0c;typename必須用于顯式指明嵌套從屬類型名稱&#xff08;nested dependent type name&#xff09;&#xf…