Spark-SQL3

Spark-SQL

一.Spark-SQL核心編程(四)

1.數據加載與保存:

1)通用方式:

SparkSQL 提供了通用的保存數據和數據加載的方式。這里的通用指的是使用相同的API,根據不同的參數讀取和保存不同格式的數據,SparkSQL 默認讀取和保存的文件格式為parquet。

2)加載數據:

spark.read.load 是加載數據的通用方法。如果讀取不同格式的數據,可以對不同的數據格式進行設定。

spark.read.format("…")[.option("…")].load("…")

format("…"):指定加載的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。

load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要傳入加載數據的路徑。

option("…"):在"jdbc"格式下需要傳入 JDBC 相應參數,url、user、password 和 dbtable

我們前面都是使用 read API 先把文件加載到 DataFrame 然后再查詢,其實,我們也可以直接在文件上進行查詢: 文件格式.`文件路徑`

spark.sql("select * from json.’ Spark-SQL/input/user.json’").show

3)保存數據:

df.write.save 是保存數據的通用方法。如果保存不同格式的數據,可以對不同的數據格式進行設定。

df.write.format("…")[.option("…")].save("…")

format("…"):指定保存的數據類型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"。

save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要傳入保存數據的路徑。

option("…"):在"jdbc"格式下需要傳入 JDBC 相應參數,url、user、password 和 dbtable

保存操作可以使用 SaveMode, 用來指明如何處理數據,使用 mode()方法來設置。

?

例如:df.write.mode("append").json("Spark-SQL/output")

?

?

2.Parquet

Spark SQL 的默認數據源為 Parquet 格式。Parquet 是一種能夠有效存儲嵌套數據的列式

存儲格式。數據源為 Parquet 文件時,Spark SQL 可以方便的執行所有的操作,不需要使用 format。修改配置項 spark.sql.sources.default,可修改默認數據源格式。

1)加載數據:

Val df =s park.read.load("examples/src/main/resources/users.parquet")

2)保存數據:

var df = spark.read.json("/opt/module/data/input/people.json")

df.write.mode("append").save("/opt/module/data/output")

3.JSON

Spark SQL 能夠自動推測 JSON 數據集的結構,并將它加載為一個 Dataset[Row]. 可以

通過 SparkSession.read.json()去加載 JSON 文件。

注意:Spark 讀取的 JSON 文件不是傳統的 JSON 文件,每一行都應該是一個 JSON 串

?

加載json文件

val path = "/opt/module/spark-local/people.json"?

val peopleDF = spark.read.json(path)

創建臨時表

peopleDF.createOrReplaceTempView("people")

數據查詢

val resDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")

4.CSV

Spark SQL 可以配置 CSV 文件的列表信息,讀取 CSV 文件,CSV 文件的第一行設置為

數據列。

spark.read.format("csv").option("sep",";").option("inferSchema","true")

.option("header", "true").load("data/user.csv")

5.MySQL

Spark SQL 可以通過 JDBC 從關系型數據庫中讀取數據的方式創建 DataFrame,通過對

DataFrame 一系列的計算后,還可以將數據再寫回關系型數據庫中。

IDEA通過JDBC對MySQL進行操作:

1)導入依賴

<dependency>

? ? <groupId>mysql</groupId>

? ? <artifactId>mysql-connector-java</artifactId>

? ? <version>5.1.27</version>

</dependency>

MySQL8 <version>8.0.11</version>

?

2)讀取數據

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

?

import spark.implicits._

//通用的load方式讀取

?

spark.read.format("jdbc")

? .option("url","jdbc:mysql://localhost:3306/system")

? .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver

? .option("user","root")

? .option("password","123456")

? .option("dbtable","user")

? .load().show()

?

spark.stop()

?

?

?

?

//通用的load方法的另一種形式

spark.read.format("jdbc")

? .options(

? ? Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))

? .load().show()

?

?

//通過JDBC

val pros :Properties = new Properties()

pros.setProperty("user","root")

pros.setProperty("password","123456")

val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)

df.show()

?

1)寫入數據

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

?

import spark.implicits._

val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),

? Stu("zs", 30)))

val ds:Dataset[Stu] = rdd.toDS()

?

ds.write.format("jdbc")

? .option("url","jdbc:mysql://localhost:3306/system")

? .option("driver","com.mysql.jdbc.Driver")

? .option("user","root")

? .option("password","123456")

? .option("dbtable","user2")

? .mode(SaveMode.Append)

? .save()

?

spark.stop()

?

二.Spark-SQL核心編程(五)

1.Spark-SQL連接Hive

Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL 編譯時可以包含 Hive 支持,也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表訪問、UDF (用戶自定義函數)、Hive 查詢語言(HQL)等。需要強調的一點是,如果要在 Spark SQL 中包含Hive 的庫,并不需要事先安裝 Hive。一般來說,最好還是在編譯 Spark SQL 時引入 Hive支持,這樣就可以使用這些特性了。

使用方式分為內嵌Hive、外部Hive、Spark-SQL CLI、Spark beeline 以及代碼操作。

1)內嵌的 HIVE

如果使用 Spark 內嵌的 Hive, 則什么都不用做, 直接使用即可。但是在實際生產活動當中,幾乎沒有人去使用內嵌Hive這一模式。

2)外部的 HIVE

在虛擬機中下載以下配置文件:

?

如果想在spark-shell中連接外部已經部署好的 Hive,需要通過以下幾個步驟:

Spark 要接管 Hive 需要把 hive-site.xml 拷貝到 conf/目錄下,并將url中的localhost改為node01

?

?

?

把 MySQL 的驅動 copy 到 jars/目錄下

?

把 core-site.xml 和 hdfs-site.xml 拷貝到 conf/目錄下

重啟 spark-shell

?

?

3)運行 Spark beeline(了解)

Spark Thrift Server 是 Spark 社區基于 HiveServer2 實現的一個 Thrift 服務。旨在無縫兼容HiveServer2。因為 Spark Thrift Server 的接口和協議都和 HiveServer2 完全一致,因此我們部署好 Spark Thrift Server 后,可以直接使用 hive 的 beeline 訪問 Spark Thrift Server 執行相關語句。Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依舊可以和 Hive Metastore進行交互,獲取到 hive 的元數據。

如果想連接 Thrift Server,需要通過以下幾個步驟:

Spark 要接管 Hive 需要把 hive-site.xml 拷貝到 conf/目錄下

把 Mysql 的驅動 copy 到 jars/目錄下

把 core-site.xml 和 hdfs-site.xml 拷貝到 conf/目錄下

啟動 Thrift Server

使用 beeline 連接 Thrift Server

beeline -u jdbc:hive2://node01:10000 -n root

4)運行Spark-SQL CLI

Spark SQL CLI 可以很方便的在本地運行 Hive 元數據服務以及從命令行執行查詢任務。在 Spark 目錄下執行如下命令啟動 Spark SQL CLI,直接執行 SQL 語句,類似于 Hive 窗口。

操作步驟:

1.將mysql的驅動放入jars/當中;

2.將hive-site.xml文件放入conf/當中;

3.運行bin/目錄下的spark-sql.cmd 或者打開cmd,在D:\spark\spark-3.0.0-bin-hadoop3.2\bin當中直接運行spark-sql

?

可以直接運行SQL語句,如下所示:

5)代碼操作Hive

1.導入依賴。

<dependency>

? ? <groupId>org.apache.spark</groupId>

? ? <artifactId>spark-hive_2.12</artifactId>

? ? <version>3.0.0</version>

</dependency>

<dependency>

? ? <groupId>org.apache.hive</groupId>

? ? <artifactId>hive-exec</artifactId>

? ? <version>2.3.3</version>

</dependency>

?

可能出現下載jar包的問題:

D:\maven\repository\org\pentaho\pentaho-aggdesigner-algorithm\5.1.5-jhyde

?

2. 將hive-site.xml 文件拷貝到項目的 resources 目錄中。

3.代碼實現。

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("hive")

val spark:SparkSession = SparkSession.builder()

? .enableHiveSupport()

? .config(sparkConf)

? .getOrCreate()

?

spark.sql("show databases").show()

spark.sql("create database spark_sql")

spark.sql("show databases").show()

?

?

注意:

1.如果在執行操作時,出現如下錯誤:

?

可以在代碼最前面增加如下代碼解決:

System.setProperty("HADOOP_USER_NAME", "node01")

此處的 node01 改為自己的 hadoop 用戶名稱

2.在開發工具中創建數據庫默認是在本地倉庫,通過參數修改數據庫倉庫的地址: config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse")

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/77610.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/77610.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/77610.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

DeepSeek與Napkin:信息可視化領域的創新利器

摘要 在數字化信息爆炸的時代&#xff0c;如何高效地組織思路并將其轉化為直觀、清晰的可視化圖表&#xff0c;成為眾多領域面臨的關鍵問題。本文深入剖析了DeepSeek與Napkin這兩款工具&#xff0c;詳細探討它們在信息處理與可視化過程中的功能特性、協同工作機制、應用場景、…

conda 創建、激活、退出、刪除環境命令

參考博客&#xff1a;Anaconda創建環境、刪除環境、激活環境、退出環境 使用起來覺得有些不方便可以改進&#xff0c;故寫此文。 1. 創建環境 使用 -y 跳過確認 conda create -n 你的環境名 -y 也可以直接選擇特定版本 python 安裝&#xff0c;以 3.10 為例&#xff1a; co…

嵌入式芯片中的 低功耗模式 內容細講

電源域與運行級別概述 電源域&#xff08;Power Domain&#xff09; 核心域&#xff08;Core Domain&#xff09;&#xff1a;包括 CPU 核心和關鍵架構模塊&#xff08;如 NVIC、CPU 內核寄存器&#xff09;。 外設域&#xff08;Peripheral Domain&#xff09;&#xff1a;…

Java中常見的鎖synchronized、ReentrantLock、ReentrantReadWriteLock、StampedLock

在Java中&#xff0c;鎖是實現多線程同步的核心機制。不同的鎖適用于不同的場景&#xff0c;理解其實現原理和使用方法對優化性能和避免并發問題至關重要。 一、隱式鎖&#xff1a;synchronized 關鍵字 實現原理 基于對象監視器&#xff08;Monitor&#xff09;&#xff1a;每…

@JsonView + 單一 DTO:如何實現多場景 JSON 字段動態渲染

JsonView 單一 DTO&#xff1a;如何實現多場景 JSON 字段動態渲染 JsonView 單一 DTO&#xff1a;如何實現多場景 JSON 字段動態渲染1、JsonView 注解產生的背景2、為了滿足不同場景下返回對應的屬性的做法有哪些&#xff1f;2.1 最快速的實現則是針對不同場景新建不同的 DTO…

Etcd 壓縮整理

etcd數據存儲 在實際生產中使用 ETCD 存儲元數據&#xff0c;起初集群規模不大的時候元數據信息不多沒有發現什么問題。隨著集群規模越來越大&#xff0c;可能引發存儲問題。 —auto-compaction-retention 由于ETCD數據存儲多版本數據&#xff0c;隨著寫入的主鍵增加歷史版本需…

【更新完畢】2025媽媽杯C題 mathercup數學建模挑戰賽C題數學建模思路代碼文章教學:音頻文件的高質量讀寫與去噪優化

完整內容請看文章最下面的推廣群 我將先給出文章、代碼、結果的完整展示, 再給出四個問題詳細的模型 面向音頻質量優化與存儲效率提升的自適應編碼與去噪模型研究 摘 要 隨著數字媒體技術的迅速發展&#xff0c;音頻處理技術在信息時代的應用愈加廣泛&#xff0c;特別是在存儲…

React-請勿在循環或者條件語句中使用hooks

這是React Hooks的首要規則&#xff0c;這是因為React Hooks 是以單向循環鏈表的形式存儲&#xff0c;即是有序的。循環是為了從最后一個節點移到一個節點的時候&#xff0c;只需通過next一步就可以拿到第一個節點&#xff0c;而不需要一層層回溯。React Hooks的執行&#xff0…

【大模型】 LangChain框架 -LangChain實現問答系統

LangChain 介紹與使用方法 1. 什么是 LangChain&#xff1f;2. LangChain 的主要功能3. 如何使用 LangChain&#xff1f;3.1 環境準備3.2 基本使用示例3.2.1 簡單的問答系統3.2.2 結合外部工具 3.3 高級用法 4. 常見問題及解決方法4.1 安裝問題4.2 運行問題4.3 性能問題 5. 實戰…

企業級HAProxy高可用離線部署實戰(附Kubernetes APIServer負載均衡配置)

企業級HAProxy高可用離線部署實戰&#xff08;附Kubernetes APIServer負載均衡配置&#xff09; 摘要&#xff1a;本文深入講解在離線環境下部署HAProxy 3.1.1的全流程&#xff0c;涵蓋源碼編譯、系統服務封裝、K8S APIServer四層負載配置等核心環節&#xff0c;并提供生產級高…

Python網絡爬蟲設計(一)

目錄 一、網絡爬蟲 1、基本的爬蟲 2、獲取URL 3、查找網頁源碼關鍵字 4、代碼實現 二、requests庫 1、requests的優勢和劣勢 2、獲取網頁的其他庫 &#xff08;1&#xff09;selenium庫 &#xff08;2&#xff09;pyppeteer庫 三、pyppeteer庫 1、pyppeteer庫的來歷…

BR_頻譜20dB 帶寬(RF/TRM/CA/BV-05-C [TX Output Spectrum – 20 dB Bandwidth])

目錄 一、規范要求 1、協議章節 2、測試目的 二、測試方法 1、樣機初值條件&#xff1a; 2、測試步驟: 方法一&#xff1a;頻譜儀 方法二&#xff1a;綜測儀CMW500 3、預期結果 一、規范要求 1、協議章節 4.5.5 RF/TRM/CA/BV-05-C [TX Output Spectrum – 20 dB Ba…

【橘子大模型】初探rag知識庫的構建

一、簡介 我們在實現了一系列功能之后&#xff0c;終于來到了rag的部分&#xff0c;下面我們將基于langchain來實現一個rag檢索。 關于rag方面的知識&#xff0c;可以查看這兩篇文章&#xff1a; 大模型應用之RAG詳解 什么是 RAG&#xff08;檢索增強生成&#xff09; 或者是去…

CentOS7執行yum命令報錯 Could not retrieve mirrorlist http://mirrorlist.centos.org

CentOS7執行yum命令報錯 引更新yum源備份原有源創建新的源文件清理并重建緩存 引 CentOS 7 系統無法連接到 CentOS 的官方鏡像站點。這通常是由于網絡問題或 CentOS 7 已停止維護導致的&#xff08;2024年6月30日后 CentOS 7 已進入 EOL&#xff09; 報錯明細&#xff1a; 已…

VSCode安裝與環境配置(Mac環境)

20250419 - 概述 大概是非常久之前了&#xff0c;裝了VSCode&#xff0c;估計都得21的時候了&#xff0c;電腦上也沒更新過。當時安裝也直接裝上就完事了。這次把版本更新一下&#xff0c;同時記錄一下這個安裝過程。 安裝 mac下安裝非常簡單&#xff0c;直接從官網下載&am…

QML動畫--ParallelAnimation和SequentialAnimation

一、ParallelAnimation ParallelAnimation 是 QML 中用于并行執行多個動畫的容器動畫類型&#xff0c;可以同時運行多個子動畫。 基本用法 qml import QtQuick 2.15Rectangle {id: rectwidth: 100; height: 100color: "red"x: 0; y: 0; opacity: 1.0ParallelAnim…

NLP高頻面試題(四十三)——什么是人類偏好對齊中的「對齊稅」(Alignment Tax)?如何緩解?

一、什么是「對齊稅」(Alignment Tax)? 所謂「對齊稅」(Alignment Tax),指的是在使人工智能系統符合人類偏好的過程中,所不可避免付出的性能損失或代價。換句話說,當我們迫使AI遵循人類價值觀和規范時,AI系統往往無法達到其最大理論性能。這種性能上的妥協和折衷,就…

速查手冊:TA-Lib 超過150種量化技術指標計算全解 - 1. Overlap Studies(重疊指標)

速查手冊&#xff1a;TA-Lib 超過150種量化技術指標計算全解 - 1. Overlap Studies&#xff08;重疊指標&#xff09; TA-Lib&#xff08;Technical Analysis Library&#xff09;是廣泛使用的金融技術分析庫&#xff0c;實現了超過150種技術指標計算函數&#xff0c;適用于股票…

重構未來智能:Anthropic 解碼Agent設計哲學三重奏

第一章 智能體進化論&#xff1a;從工具到自主體的認知躍遷 1.1 LLM應用范式演進圖譜 階段技術形態應用特征代表場景初級階段單功能模型硬編碼規則執行文本摘要/分類進階階段工作流編排多模型協同調度跨語言翻譯流水線高級階段自主智能體動態決策交互編程調試/客服對話 1.1.…

Git 中修改某個特定的commit提交內容

在 Git 中修改某個特定的提交&#xff08;commit&#xff09;通常需要使用 交互式變基&#xff08;Interactive Rebase&#xff09; 或 修改提交&#xff08;Commit Amend&#xff09;。以下是不同場景下的具體操作步驟&#xff1a; 一、修改最近的提交&#xff08;最新提交&am…