初識Spark2.0之Spark SQL

內存計算平臺Spark在今年6月份的時候正式發布了spark2.0,相比上一版本的spark1.6版本,在內存優化,數據組織,流計算等方面都做出了較大的改變,同時更加注重基于DataFrame數據組織的MLlib,更加注重機器學習整個過程的管道化。

當然,作為使用者,特別是需要運用到線上的系統,大部分廠家還是會繼續選擇已經穩定的spark1.6版本,并且在spark2.0逐漸成熟之后才會開始考慮系統組件的升級。作為開發者,還是有必要先行一步,去了解spark2.0的一些特性和使用,及思考/借鑒一些spark2.0做出某些改進的思路。

首先,為了調用spark API 來完成我們的計算,需要先創建一個sparkContext:

 String warehouseLocation = System.getProperty("user.dir") + "spark-warehouse";//用戶的當前工作目錄
SparkConf conf = new SparkConf().setAppName("spark sql test")  .set("spark.sql.warehouse.dir", warehouseLocation)  .setMaster("local[3]");
  SparkSession spark = SparkSession  .builder()  .config(conf)  .getOrCreate();

上述代碼主要有三點:

    • 使用spark sql時需要指定數據庫的文件地址,這里使用了一個本地的目錄
    • spark配置,指定spark app的名稱和數據庫地址,master url為local 3核
    • 使用SparkSession,取代了原本的SQLContext與HiveContext。對于DataFrame API的用戶來說,Spark常見的混亂源頭來自于使用哪個“context”。現在你可以使用SparkSession了,它作為單個入口可以兼容兩者。注意原本的SQLContext與HiveContext仍然保留,以支持向下兼容。這是spark2.0的一個較大的改變,對用戶更加友好。

下面開始體驗spark sql:

 //===========================================1 spark SQL===================  //數據導入方式  Dataset<Row> df = spark.read().json("..\\sparkTestData\\people.json");  //查看表  
        df.show();  //查看表結構  
        df.printSchema();  //查看某一列 類似于MySQL: select name from people  df.select("name").show();  //查看多列并作計算 類似于MySQL: select name ,age+1 from people  df.select(col("name"), col("age").plus(1)).show();  //設置過濾條件 類似于MySQL:select * from people where age>21  df.filter(col("age").gt(21)).show();  //做聚合操作 類似于MySQL:select age,count(*) from people group by age  df.groupBy("age").count().show();  //上述多個條件進行組合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age  df.select(col("name"), col("age").plus(1).alias("age")).filter(col("age").gt(21)).groupBy("age").count().show();  //直接使用spark SQL進行查詢  //先注冊為臨時表  df.createOrReplaceTempView("people");  Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");  sqlDF.show();

主要關注以下幾點:

  • 數據來源:spark可以直接導入json格式的文件數據,people.json是我從spark安裝包下拷貝的測試數據。
  • spark sql:sparkSql語法和用法和mysql有一定的相似性,可以查看表、表結構、查詢、聚合等操作。用戶可以使用sparkSql的API接口做聚合查詢等操作或者用類SQL語句實現(但是必須將DataSet注冊為臨時表)
  • DataSet:DataSet是spark2.0i引入的一個新的特性(在spark1.6中屬于alpha版本)。DataSet結合了RDD和DataFrame的優點, 并帶來的一個新的概念Encoder當序列化數據時,,Encoder產生字節碼與off-heap進行交互,,能夠達到按需訪問數據的效果,而不用反序列化整個對象。
我們可以為自定義的對象創建DataSet,首先創建一個JavaBeans:
/** * 一個描述人屬性的JavaBeans * A JavaBean is a Java object that satisfies certain programming conventions: The JavaBean class must implement either Serializable or Externalizable The JavaBean class must have a no-arg constructor All JavaBean properties must have public setter and getter methods All JavaBean instance variables should be private */  public static class Person implements Serializable {  private String name;  private int age;  public String getName() {  return name;  }  public void setName(String name) {  this.name = name;  }  public int getAge() {  return age;  }  public void setAge(int age) {  this.age = age;  }  }

接下來,就可以為該類的對象創建DataSet了,并像操作表一樣操作自定義對象的DataSet了:

   //為自定義的對象創建Dataset  List<Person> personpList = new ArrayList<Person>();  Person person1 = new Person();  person1.setName("Andy");  person1.setAge(32);  Person person2 = new Person();  person2.setName("Justin");  person2.setAge(19);  personpList.add(person1);  personpList.add(person2);  Encoder<Person> personEncoder = Encoders.bean(Person.class);  Dataset<Person> javaBeanDS = spark.createDataset(  personpList,  personEncoder  );  javaBeanDS.show();

同時,可以利用Java反射的特性,來從其他數據集中創建DataSet對象:

 //spark支持使用java 反射機制推斷表結構  //1 首先創建一個存儲person對象的RDD  JavaRDD<Person> peopleRDD = spark.read()  .textFile("..\\sparkTestData\\people.txt")  .javaRDD()  .map(new Function<String, Person>() {  public Person call(String line) throws Exception {  String[] parts = line.split(",");  Person person = new Person();  person.setName(parts[0]);  person.setAge(Integer.parseInt(parts[1].trim()));  return person;  }  });  //2 表結構推斷  Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);  peopleDF.createOrReplaceTempView("people");  //3 定義map 這里對每個元素做序列化操作  Encoder<String> stringEncoder = Encoders.STRING();  Dataset<String> peopleSerDF = peopleDF.map(new MapFunction<Row, String>() {  public String call(Row row) throws Exception {  return "Name: " + row.getString(1) + " and age is " + String.valueOf(row.getInt(0));  }  }, stringEncoder);  peopleSerDF.show();  //==============================================3 從RDD創建Dataset StructType對象的使用  JavaRDD<String> peopleRDD2 = spark.sparkContext()  .textFile("..\\sparkTestData\\people.txt", 1)  .toJavaRDD();  // 創建一個描述表結構的schema  String schemaString = "name age";  List<StructField> fields = new ArrayList<StructField>();  for (String fieldName : schemaString.split(" ")) {  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);  fields.add(field);  }  StructType schema = DataTypes.createStructType(fields);  // Convert records of the RDD (people) to Rows  JavaRDD<Row> rowRDD = peopleRDD2.map(new Function<String, Row>() {  //@Override  public Row call(String record) throws Exception {  String[] attributes = record.split(",");  return RowFactory.create(attributes[0], attributes[1].trim());  }  });  // Apply the schema to the RDD  Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);  // Creates a temporary view using the DataFrame  peopleDataFrame.createOrReplaceTempView("people");  peopleDataFrame.show();

主要關注以下幾點:

  • RDD:從普通文本文件中解析數據,并創建結構化數據結構的RDD。
  • 表結構推斷的方式創建DataSet:利用Java類反射特性將RDD轉換為DataSet。
  • 指定表結構的方式創建DataSet:我們可以使用StructType來明確定義我們的表結構,完成DataSet的創建
如何將自己的數據/文本導入spark并創建spark的數據對象,對新手來說顯得尤為關鍵,對自己的數據表達好了之后,才有機會去嘗試spark的其他API ,完成我們的目標。一般數據源在經過我們其他程序的前處理之后,存儲成行形式的文本/json格式或者本身存儲的hive/mysql數據庫中,spark對這些數據源的調用都是比較方便的。
?
介紹完了spark-sql的數據導入及數據表達后,我們來完成一個比較簡單的數據統計任務。一般在工作生活中對某些數據按一定的周期進行統計分析是一個比較常見的任務了。下面,我們就以股票統計的例子為例。我們使用spark的窗口統計功能,來對某一公司的股票在2016年6月份的各個星期的均值做統計。
 //在Spark 2.0中,window API內置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常類似。  Dataset<Row> stocksDF = spark.read().option("header","true").  option("inferSchema","true").  csv("..\\sparkTestData\\stocks.csv");  //stocksDF.show();  
  Dataset<Row> stocks201606 = stocksDF.filter("year(Date)==2016").  filter("month(Date)==6");  stocks201606.show(100,false);

首先讀入了csv格式的數據文件,同時將2016年6月份的數據過濾出來,并以不截斷的方式輸出前面100條記錄,運行的結果為:

調用window接口做窗口統計:

  //window一般在group by語句中使用。window方法的第一個參數指定了時間所在的列;  //第二個參數指定了窗口的持續時間(duration),它的單位可以是seconds、minutes、hours、days或者weeks。  Dataset<Row> tumblingWindowDS = stocks201606.groupBy(window(stocks201606.col("Date"),"1 week")).  agg(avg("Close").as("weekly_average"));  tumblingWindowDS.show(100,false);  tumblingWindowDS.sort("window.start").  select("window.start","window.end","weekly_average").  show(false);

其運行結果為:

由于沒有指定窗口的開始時間,因此統計的開始時間為2016-05-26,并且不是從0點開始的。通常情況下,這樣統計就顯得有點不對了,因此我們需要指定其開始的日期和時間,但是遺憾的是spark并沒有接口/參數讓我們明確的指定統計窗口的開始時間。好在提供了另外一種方式,指定偏移時間,上述時間(2016-05-26 08:00:00)做一個時間偏移,也可以得到我們想要的開始時間(2016-06-01 00:00:00)。

 //在前面的示例中,我們使用的是tumbling window。為了能夠指定開始時間,我們需要使用sliding window(滑動窗口)。  //到目前為止,沒有相關API來創建帶有開始時間的tumbling window,但是我們可以通過將窗口時間(window duration)  //和滑動時間(slide duration)設置成一樣來創建帶有開始時間的tumbling window。代碼如下:  Dataset<Row>  windowWithStartTime = stocks201606.  groupBy(window(stocks201606.col("Date"),"1 week","1 week", "136 hour")).  agg(avg("Close").as("weekly_average"));  //6 days參數就是開始時間的偏移量;前兩個參數分別代表窗口時間和滑動時間,我們打印出這個窗口的內容:  windowWithStartTime.sort("window.start").  select("window.start","window.end","weekly_average").  show(false);

運行結果為:

這就得到了我們需要的統計結果了。

關于spark2.0的sparkSql部分,基本就介紹這么多了。

?

?

?

?

?

轉載于:https://www.cnblogs.com/itboys/p/6676858.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/284002.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/284002.shtml
英文地址,請注明出處:http://en.pswp.cn/news/284002.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

webpack開發Vue配置

一直以來使用webpack都是用的別人的配置&#xff0c;這幾天自己學習了一下。 項目地址&#xff1a;https://github.com/donghaohao... 新建整個工程 npm init安裝依賴&#xff0c;這里我們開發vue項目&#xff0c;npm install vue --save&#xff0c;然后是開發時的依賴npm ins…

ABP詳細教程——模塊類

概述模塊化是ABP vNext的最大亮點&#xff0c;也是ABP vNext框架的核心&#xff0c;而模塊類是ABP vNext框架模塊化的核心要素。這一章節&#xff0c;我就從模塊類的用法、運行機制、源代碼等層面&#xff0c;帶大家詳細了解ABP vNext的模塊類。用法在ABP的約定中&#xff0c;每…

[轉]Eureka工作原理

目錄 Eureka 工作原理 Eureka 核心概念 自我保護機制 Eureka 集群原理 Eurka 工作流程 總結 Eureka 工作原理 上節內容為大家介紹了&#xff0c;注冊中心 Eureka 產品的使用&#xff0c;以及如何利用 Eureka 搭建單臺和集群的注冊中心。這節課我們來繼續學習 Eureka&…

centos7下別名(alias)的特殊用法

版權聲明&#xff1a;轉載請注明出處:http://blog.csdn.net/dajitui2024 https://blog.csdn.net/dajitui2024/article/details/79438200 參考&#xff1a;https://www.cyberciti.biz/faq/bash-bypass-alias-command-on-linux-macos-unix/ 正常情況下&#xff0c;定義過的別名&a…

解決WDCP3環境gbk網站編碼程序亂碼問題

因為默認WDCP V3版本環境編碼格式是UTF-8版本&#xff0c;如果我們程序采用的是GBK編碼肯定都會有亂碼問題。 我們到WDCP后臺&#xff0c;"網站管理"-"PHP設置"&#xff0c;看到上圖所示&#xff0c;準備直接在線編輯PHP.INI文件。 這里我們找到"defa…

重談聯想5G編碼投票事件

此前&#xff0c;司馬南談了聯想好幾個問題&#xff0c;其中最尖銳的要屬國有資產流失&#xff0c;這是聯想管理層無法回避的死穴。不過&#xff0c;司馬南批判聯想5G投票背刺H公司&#xff0c;這基本就是造謠了。當年&#xff0c;媒體把編碼投票炒作的很厲害&#xff0c;抨擊聯…

JStorm2.1.1集群的安裝和使用

為什么80%的碼農都做不了架構師&#xff1f;>>> JStorm2.1.1集群的安裝和使用 Storm是一個免費開源、分布式、高容錯的實時計算系統&#xff0c;而JStorm是阿里巴巴開源的基于Storm采用Java重寫的一套分布式實時流計算框架&#xff0c;在性能和支持的集群規模上做了…

Hystrix 原理

Hystrix是什么&#xff1f; Hystrix是Netflix開源庫&#xff0c;這是一個針對分布式系統的延遲和容錯庫。 Hystrix 供分布式系統使用&#xff0c;提供延遲和容錯功能&#xff0c;隔離遠程系統、訪問和第三方程序庫的訪問點&#xff0c;防止級聯失敗&#xff0c;保證復雜的分布…

「深度」無人機實名制政策特稿|市場看好、資本關注,“反黑飛”正在崛起

從政策和需求來看&#xff0c;“反黑飛”越來越重要&#xff0c;市場也正在不斷崛起。 對于大多數人來說&#xff0c;今天是最適合明目張膽“裝嫩”的六一兒童節。不過&#xff0c;在無人機廠商和無人機玩家的眼里&#xff0c;今天是無人機實名制政策正式實施的日子。 近年來&…

在navicat中新建數據庫

前言&#xff1a; 在本地新建一個名為editor的數據庫&#xff1b; 過程&#xff1a; 1.&#xff1b; 2.選擇&#xff1a;utf8mb4 -- UTF-8 Unicode字符集&#xff0c;原因在于&#xff1a;utf8mb4兼容utf8&#xff0c;且比utf8能表示更多的字符。&#xff0c;而且它支持表情符號…

MASA Stack 第三期社區例會

MASA Blazor 0.5.0發版內容功能Autocomplete&#xff1a;支持通過設置AutoSelectFirst參數開啟自動選擇第一項的功能&#xff0c;支持CacheItems參數&#xff0c;增強使用上下鍵的用戶體驗。BottomNavigation&#xff1a;&#xff1a;一個替代側邊欄的新組件。它主要用于移動應…

MySQL添加用戶、刪除用戶與授權

MySql中添加用戶,新建數據庫,用戶授權,刪除用戶,修改密碼(注意每行后邊都跟個;表示一個命令語句結束): 1.新建用戶 1.1 登錄MYSQL&#xff1a; >mysql -u root -p >密碼 1.2 創建用戶&#xff1a; mysql> insert into mysql.user(Host,User,Password) values("lo…

[轉]高并發架構設計之--「服務降級」、「服務限流」與「服務熔斷」

目錄 服務降級 1 、簡介 2 、使用場景 3 、核心設計 3.1 分布式開關 3.2 自動降級分類 3.3 配置中心 3.4 處理策略 3.5 降級分類 3.6 服務降級要考慮的問題 4 、高級特性 4.1 分級降級 4.2 降級權值 5 、總結與展望 服務限流 一、為什么要做服務限流設計&…

【Linux】【Services】【nfs】nfs安裝與配置

1. 概念 1.1. NFS&#xff1a;Network File System&#xff0c;傳統意義上&#xff0c;文件系統在內核中實現。 1.2. RPC&#xff1a;Remote Procedure Call protocol&#xff0c;遠程過程調用&#xff0c;函數調用&#xff08;遠程主機上的函數&#xff09; 1.3. 端口&#xf…

SpringBoot獲取ApplicationContext

2019獨角獸企業重金招聘Python工程師標準>>> 有兩種方法&#xff1a; 創建Component實現ApplicationContextAware接口&#xff0c;SpringBoot會自動調用這個類的setApplicationConext()方法。鼓勵使用這種方式。SpringApplication.run(MyApplication.class, args)這…

SkiaSharp 之 WPF 自繪 投籃小游戲(案例版)

此案例主要是針對光線投影法碰撞檢測功能的示例&#xff0c;順便做成了一個小游戲&#xff0c;很簡單&#xff0c;但是&#xff0c;效果卻很不錯。投籃小游戲規則&#xff0c;點擊投籃目標點&#xff0c;就會有一個球沿著相關拋物線&#xff0c;然后&#xff0c;判斷是否進入籃…

zuul集成ribbon完成服務通信和負載均衡

目錄 Zuul2服務通信 超時相關 默認超時配置 自定義超時配置 負載均衡 Zuul2服務通信 描述&#xff1a;zuul2通過Ribbon完成客戶端負載均衡以及與服務器群集進行通信。 zuul2的通信是集成Ribbon實現的&#xff0c;在Origin中集成Ribbon基本配置&#xff08;例如IClientCo…

時任上海來伊份互聯網事業群總裁王戈鈞 :傳統企業(線上+線下)移動互聯網改造...

2017年12月22日-23日&#xff0c;第13屆信息化領袖峰會暨2017中國數字化貢獻人物頒獎盛典在上海盛大開幕。本次峰會由上海市經濟和信息化委員會指導&#xff0c;上海市國有資產信息中心、上海市計算機用戶協會、上海市信息服務業行業協會、上海大數據聯盟、上海市高等教育學會支…

Linux系統時間\硬件時間(date、tzselect、clock、hwclock、ntpdate)

1、系統時間和硬件時間 在Linux中有硬件時鐘與系統時鐘兩種時鐘。硬件時鐘是指主機板上的時鐘設備&#xff0c;也就是通常可在BIOS畫面設定的時鐘。系統時鐘則是指kernel中的時鐘。所有Linux相關指令與函數都是讀取系統時鐘的設定。因為存在兩種不同的時鐘&#xff0c;那么它們…

C#------如何判斷輸入的是否為純數字

private void Btn_OK_Click(object sender, EventArgs e){IDormitoryAdminCardService aservice new DormitoryAdminCardService();string text this.CardNoEdit.Text;//判斷是否輸入的是純數字string pattern "^[0-9]*$";Regex regex new Regex(pattern);if (re…