Spark,數據提取和保存

以下是使用 Spark 進行數據提取(讀取)和保存(寫入)的常見場景及代碼示例(基于 Scala/Java/Python,不含圖片操作):
?
一、數據提取(讀取)
?
1. 讀取文件數據(文本/CSV/JSON/Parquet 等)
?
Scala
?
scala ??
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
? .appName("Data Read")
? .getOrCreate()

// 讀取 CSV(含表頭)
val csvDf = spark.read.format("csv")
? .option("header", "true")
? .option("inferSchema", "true") // 自動推斷數據類型
? .load("path/to/csv/file.csv")

// 讀取 JSON
val jsonDf = spark.read.json("path/to/json/file.json")

// 讀取 Parquet(Spark 原生格式,高效)
val parquetDf = spark.read.parquet("path/to/parquet/dir")
?
?
Python
?
python ??
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Data Read").getOrCreate()

# 讀取 CSV
csv_df = spark.read.csv("path/to/csv/file.csv", header=True, inferSchema=True)

# 讀取 JSON
json_df = spark.read.json("path/to/json/file.json")

# 讀取 Parquet
parquet_df = spark.read.parquet("path/to/parquet/dir")
?
?
2. 讀取數據庫數據(如 MySQL/Hive)
?
Scala(以 MySQL 為例)
?
scala ??
val jdbcDf = spark.read.format("jdbc")
? .option("url", "jdbc:mysql://host:port/db?useSSL=false")
? .option("dbtable", "table_name")
? .option("user", "username")
? .option("password", "password")
? .load()
?
?
Python(以 Hive 為例,需啟用 Hive 支持)
?
python ??
# 讀取 Hive 表(需在 SparkSession 中啟用 Hive)
hive_df = spark.sql("SELECT * FROM hive_table")
?
?
二、數據保存(寫入)
?
1. 保存為文件(CSV/JSON/Parquet 等)
?
Scala
?
scala ??
// 保存為 CSV(覆蓋模式,含表頭)
csvDf.write.format("csv")
? .option("header", "true")
? .mode("overwrite") // 模式:overwrite/append/ignore/errorIfExists
? .save("output/csv_result")

// 保存為 Parquet(分區存儲,提升查詢性能)
parquetDf.write.partitionBy("category") // 按字段分區
? .mode("append")
? .parquet("output/parquet_result")
?
?
Python
?
python ??
# 保存為 JSON
json_df.write.json("output/json_result", mode="overwrite")

# 保存為 Parquet(指定壓縮格式)
parquet_df.write.parquet("output/parquet_result", compression="snappy")
?
?
2. 保存到數據庫(如 MySQL/Hive)
?
Scala(以 MySQL 為例)
?
scala ??
jdbcDf.write.format("jdbc")
? .option("url", "jdbc:mysql://host:port/db?useSSL=false")
? .option("dbtable", "target_table")
? .option("user", "username")
? .option("password", "password")
? .mode("append") // 追加模式
? .save()
?
?
Python(以 Hive 為例)
?
python ??
# 保存為 Hive 表(需啟用 Hive 支持)
hive_df.write.saveAsTable("hive_target_table", mode="overwrite")
?
?
三、關鍵參數說明
?
1.?讀取模式(文件)
?
- ?inferSchema?: 是否自動推斷數據類型(適用于 CSV/JSON,需讀取少量數據,影響性能)。
?
- ?header?: CSV 是否包含表頭(?true/false?)。
?
2.?寫入模式(?mode?)
?
- ?overwrite?: 覆蓋已有數據。
?
- ?append?: 追加到現有數據。
?
- ?ignore?: 忽略寫入(不報錯)。
?
- ?errorIfExists?: 存在則報錯(默認)。
?
3.?數據庫連接
?
- 需添加對應數據庫驅動(如 MySQL 的 ?mysql-connector-java?)。
?
- 對于大規模數據,建議使用分區并行寫入(如 ?option("numPartitions", "4")?)。
?
四、典型場景示例
?
場景:從 MySQL 讀取數據,清洗后保存為 Parquet
?
scala ??
// 讀取 MySQL 數據
val mysqlDf = spark.read.jdbc(
? url = "jdbc:mysql://host:port/source_db",
? dbtable = "source_table",
? properties = Map("user" -> "u", "password" -> "p")
)

// 數據清洗(示例:過濾空值)
val cleanedDf = mysqlDf.na.drop("any")

// 保存為 Parquet(按日期分區)
cleanedDf.write.partitionBy("date")
? .parquet("output/cleaned_data")
?
?
通過以上方法,可靈活使用 Spark 完成數據提取和保存任務,支持多種數據源和格式。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/906070.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/906070.shtml
英文地址,請注明出處:http://en.pswp.cn/news/906070.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何用mockito+junit測試代碼

Mockito 是一個流行的 Java 模擬測試框架,用于創建和管理測試中的模擬對象(mock objects)。它可以幫助開發者編寫干凈、可維護的單元測試,特別是在需要隔離被測組件與其他依賴項時。 目錄 核心概念 1. 模擬對象(Mock Objects) 2. 打樁(Stubbing) 3. 驗…

最新缺陷檢測模型:EPSC-YOLO(YOLOV9改進)

目錄 引言:工業缺陷檢測的挑戰與突破 一、EPSC-YOLO整體架構解析 二、核心模塊技術解析 1. EMA多尺度注意力模塊:讓模型"看得更全面" 2. PyConv金字塔卷積:多尺度特征提取利器 3. CISBA模塊:通道-空間注意力再進化 4. Soft-NMS:更智能的重疊框處理 三、實…

【Linux網絡與網絡編程】12.NAT技術內網穿透代理服務

1. NAT技術 之前我們說到過 IPv4 協議中IP 地址數量不充足的問題可以使用 NAT 技術來解決。還提到過本地主機向公網中的一個服務器發起了一個網絡請求,服務器是怎么將應答返回到該本地主機呢?(如何進行內網轉發?) 這就…

uniapp的適配方式

文章目錄 前言? 一、核心適配方式對比📏 二、rpx 單位:uni-app 的核心適配機制🧱 三、默認設計稿適配(750寬)🔁 四、字體 & 屏幕密度適配🛠 五、特殊平臺適配(底部安全區、劉海…

JAVA EE(進階)_進階的開端

別放棄浸透淚水的昨天,晨光已為明天掀開新篇 ——陳長生. ?主頁:陳長生.-CSDN博客? 📕上一篇:JAVA EE_HTTP-CSDN博客 1.什么是Java EE Java EE(Java Pla…

SQL腳本規范

主要作用:數據庫的備份和遷移 SQL腳本規范 每一個sql語句必須與;結束 腳本結構: { 刪庫,建庫 刪表,建表 插入初始數據 } 建庫語法: CREATE DATABASE 數據庫名CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; CHARA…

std::ratio<1,1000> 是什么意思?

author: hjjdebug date: 2025年 05月 14日 星期三 09:45:24 CST description: std::ratio<1,1000> 是什么意思&#xff1f; 文章目錄 1. 它是一種數值嗎&#xff1f;2. 它是一種類型嗎&#xff1f;3. std:ratio 是什么呢&#xff1f;4. 分析一個展開后的模板函數5.小結: …

測試--測試分類 (白盒 黑盒 單元 集成)

一、按照測試目標分類&#xff08;測試目的是什么&#xff09; 主類別細分說明1. 界面測試UI內容完整性、一致性、準確性、友好性&#xff0c;布局排版合理性&#xff0c;控件可用性等2. 功能測試檢查軟件功能是否符合需求說明書&#xff0c;常用黑盒方法&#xff1a;邊界值、…

整理了 2009 - 2025 年的【199 管綜真題 + 解析】PDF,全套共 34 份文件

每年真題原卷 ? 每年詳細解析 ? &#x1f4c2;【管綜真題 2009-2025】 &#x1f4c2;【管綜解析 2009-2025】 目錄樹&#xff1a; ├── 2009-2025管綜真題 PDF │ ├── 2009年199管綜真題.pdf │ ├── 2010年199管綜真題.pdf │ ├── 2011年199管綜真題.pd…

用golang實現二叉搜索樹(BST)

目錄 一、概念、性質二、二叉搜索樹的實現1. 結構2. 查找3. 插入4. 刪除5. 中序遍歷 中序前驅/后繼結點 一、概念、性質 二叉搜索樹&#xff08;Binary Search Tree&#xff09;&#xff0c;簡寫BST&#xff0c;又稱為二叉查找樹 它滿足&#xff1a; 空樹是一顆二叉搜索樹對…

自動化:批量文件重命名

自動化&#xff1a;批量文件重命名 1、前言 2、效果圖 3、源碼 一、前言 今天來分享一款好玩的自動化腳&#xff1a;批量文件重命名 有時候呢&#xff0c;你的文件被下載下來文件名都是亂七八糟毫無規律&#xff0c;但是當時你下載的時候沒辦法重名或者你又不想另存為重新重…

VueUse/Core:提升Vue開發效率的實用工具庫

文章目錄 引言什么是VueUse/Core&#xff1f;為什么選擇VueUse/Core&#xff1f;核心功能詳解1. 狀態管理2. 元素操作3. 實用工具函數4. 瀏覽器API封裝5. 傳感器相關 實戰示例&#xff1a;構建一個拖拽上傳組件性能優化技巧與原生實現對比常見問題解答總結 引言 在現代前端開發…

stm32 ADC單通道轉換

stm32c8t6僅有12位分辨率 1、單次轉換 非掃描 1、初始化 void Ad_Init() {RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE);RCC_APB2PeriphClockCmd(RCC_APB2Periph_ADC1, ENABLE);//配置ADCCLK時鐘分頻,ADC的輸入時鐘不得超過14MHzRCC_ADCCLKConfig(RCC_PCLK2_Div6);G…

2KW壓縮機驅動參考設計【SCH篇】

實物展示&#xff1a; ACDC: VAC和VAC-為交流電壓檢測&#xff1a; 1.C33 C34作為Y電容走線寬度要求&#xff1a; Y電容一般用于L/N到地之間&#xff08;L-PE 或 N-PE&#xff09;&#xff0c;主要作用是抑制共模干擾。其走線的電流非常小&#xff0c;推薦使用 ≥ 1mm 寬的走…

python05——循環結構

1、while循環 n0 #初始條件 while n<5: #判斷print(hello python) #要重復執行的代碼print(n) #注意同級代碼縮進相同n1 #計數器結果&#xff1a; hello python 0 hello python 1 hello python 2 hello python 3 hello python 4 hello python 5 #求階乘和 sum0 n1 whil…

LINUX編譯、運行、測試lowcoder_CN

參考 二者沒有太大差異。 LINUX編譯、運行、測試lowcoder-CSDN博客 下載 git clone https://github.com/mousheng/lowcoder_CN 或 git clone https://gitcode.com/gh_mirrors/lo/lowcoder_CNcd lowcoder_CN三個模塊 node-service api-service client 每個模塊都有自己的…

Python 基礎之函數命名

幾個問題 使用描述性蛇形命名法&#xff08;snake_case&#xff09;Python函數名應使用什么大小寫格式&#xff1f;為什么函數名要具有描述性&#xff1f;方法的命名規范是什么&#xff1f;函數、變量和類的命名有何區別&#xff1f; Python函數的命名有一些不可違背的硬性規…

redis 命令大全整理

http://doc.redisfans.com/ 原網址 Redis 命令分類 Key(鍵) Key(鍵)命令 exists/del/keys/type/scanobject/move/dump/migratettl/pttl/persist/expireat/pexpireat/expire/pexpirerename/renamenxsort/randomkey/restoreexists 語法:exists key [key ...] 檢查一個或多…

React中useDeferredValue與useTransition終極對比。

文章目錄 前言一、核心差異對比二、代碼示例對比1. useDeferredValue&#xff1a;延遲搜索結果更新2. useTransition&#xff1a;延遲路由切換 三、應用場景總結四、注意事項五、原理剖析1. 核心機制對比2. 關鍵差異3. 代碼實現原理 總結 前言 在React的并發模式下&#xff0c…

高并發內存池|定長內存池的設計

二、定長內存池的設計 設計一個定長的內存池&#xff0c;這個內存池的定長在于&#xff0c;當剩余空間使用完畢后&#xff0c;總是開辟相同長度的新空間來使用。我們會使用到一個指針來切割劃分大空間為小空間。大空間是內存池向系統申請的內存大小&#xff0c;而小空間是程序…