SparkHiveSQL中Join操作的謂詞下推?

前言:

SparkSQL和HiveSQL的Join操作中也有謂詞下推?今天就通過大神的文章來了解下。同樣,如有冒犯,請聯系。

正文

上文簡要介紹了Join在大數據領域中的使用背景以及常用的幾種算法-broadcast hash join 、shuffle hash join以及sort merge join等,對每一種算法的核心應用場景也做了相關介紹,這里再重點說明一番:大表與小表進行join會使用broadcast hash join,一旦小表稍微大點不再適合廣播分發就會選擇shuffle hash join,最后,兩張大表的話無疑選擇sort merge join。
好了,問題來了,說是這么一說,但到底選擇哪種算法歸根結底是SQL執行引擎干的事情,按照上文邏輯,SQL執行引擎肯定要知道參與Join的兩表大小,才能選擇最優的算法嘍!那么斗膽問一句,怎么知道兩表大小?衡量兩表大小的是物理大小還是紀錄多少抑或兩者都有?其實,這是另一門學問-基于代價優化(Cost Based Optimization,簡稱CBO),它不僅能夠解釋Join算法的選擇問題,更重要的,它還能確定多表聯合Join場景下的Join順序問題。
是不是對CBO很期待呢?好吧,這里先刨個坑,下一個話題我們再聊。那今天要聊點什么呢?Join算法選擇、Join順序選擇確實對Join性能影響極大,但,還有一個很重要的因素對Join的性能至關重要,那就是Join算法優化!無論是broadcast hash join、shuffle hash join還是sort merge join,都是最基礎的join算法,有沒有什么優化方案呢?還真有,這就是今天要聊的主角-Runtime Filter(下文簡稱RF)

RF預備知識:bloom filter

RF說白了是使用bloomfilter對參與join的表進行過濾,減少實際參與join的數據量。為了下文詳細解釋整個流程,有必要先解釋一下bloomfilter這個數據結構(對之熟悉的看官可以繞道)。Bloom Filter使用位數組來實現過濾,初始狀態下位數組每一位都為0,如下圖所示:
在這里插入圖片描述
假如此時有一個集合S = {x1, x2, … xn},Bloom Filter使用k個獨立的hash函數,分別將集合中的每一個元素映射到{1,…,m}的范圍。對于任何一個元素,被映射到的數字作為對應的位數組的索引,該位會被置為1。比如元素x1被hash函數映射到數字8,那么位數組的第8位就會被置為1。下圖中集合S只有兩個元素x和y,分別被3個hash函數進行映射,映射到的位置分別為(0,3,6)和(4,7,10),對應的位會被置為1:
在這里插入圖片描述
現在假如要判斷另一個元素是否是在此集合中,只需要被這3個hash函數進行映射,查看對應的位置是否有0存在,如果有的話,表示此元素肯定不存在于這個集合,否則有可能存在。下圖所示就表示z肯定不在集合{x,y}中:
在這里插入圖片描述

RF算法理論

為了更好地說明整個過程,這里使用一個SQL示例對RF算法進行完整講解,SQL:

select item.name, order.* 
from order , item 
where order.item_id = item.id 
and item.category = ‘book’

,其中order為訂單表,item為商品表,兩張表根據商品id字段進行join,該SQL意為取出商品類別為書籍的所有訂單詳情。假設商品類型為書籍的商品并不多,join算法因此確定為broadcast hash join。整個流程如下圖所示:
在這里插入圖片描述
Step 1:將item表的join字段(item.id)經過多個hash函數映射處理為一個bloomfilter(如果對bloomfilter不了解,自行google)
Step 2:將映射好的bloomfilter分別廣播到order表的所有partition上,準備進行過濾
Step 3:以Partition2為例,存儲進程(比如DataNode進程)將order表中join列(order.item_id)數據一條一條讀出來,使用bloomfilter進行過濾。淘汰該訂單數據不是書籍相關商品的訂單,這條數據直接跳過;否則該條訂單數據有可能是待檢索訂單,將該行數據全部掃描出來。
Step 4:將所有未被bloomfilter過濾掉的訂單數據,通過本地socket通信發送到計算進程(impalad)。
Step 5:再將所有書籍商品數據廣播到所有Partition節點與step4所得訂單數據進行真正的hashjoin操作,得到最終的選擇結果。

RF算法分析

上面通過一個SQL示例簡單演示了整個RF算法在broadcast hash join中的操作流程,根據流程對該算法進行一下理論層次分析:

  • RF本質:通過謂詞(
    bloomfilter)下推,在存儲層通過bloomfilter對數據進行過濾,可以從三個方面實現對Join的優化。其一,如果可以跳過很多記錄,就可以減少了數據IO掃描次數。這點需要重點解釋一下,許多朋友會有這樣的疑問:既然需要把數據掃描出來使用BloomFilter進行過濾,為什么還會減少IO掃描次數呢?這里需要關注一個事實:大多數表存儲行為都是列存,列之間獨立存儲,掃描過濾只需要掃描join列數據(而不是所有列),如果某一列被過濾掉了,其他對應的同一行的列就不需要掃描了,這樣減少IO掃描次數。其二,減少了數據從存儲層通過socket(甚至TPC)發送到計算層的開銷,其三,減少了最終hash
    join執行的開銷。
  • RF代價:對照未使用RF的Broadcast Hash
    Join來看,前者主要增加了bloomfilter的生成、廣播以及大表根據bloomfilter進行過濾這三個開銷。通常情況下,這幾個步驟在小表較小的情況下代價并不大,基本可以忽略。
  • RF優化效果:基本取決于bloomfilter的過濾效果,如果大量數據被過濾掉了,那么join的性能就會得到極大提升;否則性能提升就會有限。
  • RF實現:和常見的謂詞下推(’=‘,’>’,’<‘等)一樣,RF實現需要在計算層以及存儲層分別進行相關邏輯實現,計算層要構造bloomfilter并將bloomfilter下傳到存儲層,存儲層要實現使用該bloomfilter對指定數據進行過濾。

RF效果驗證

事實上,RF這個東東的優化效果是在組內同事何大神做impala on parquet以及impala on kudu的基準對比測試的時候分析發現的。實際測試中,impala on parquet 比之impala on kudu性能有明顯優勢,目測至少10倍性能提升。同一SQL解析引擎,不同存儲引擎,性能竟然天壤之別!為了分析具體原因,同事就使用impala的執行計劃分析工具對兩者的執行計劃分別進行了分析,才透過蛛絲馬跡發現前者使用了RF,而后者并沒有(當然可能還有其他因素,但RF肯定是原因之一)。
簡單復盤一下這次測試吧,基準測試使用TPCDS測試,數據規模為1T,本文使用測試過程中的一個典型SQL(Q40)作為示例對RF的神奇功效進行回放演示。下圖是Q40的對比性能,直觀上來看RF可以直接帶來40x的性能提升,40倍哎,這到底是怎么做到的?
在這里插入圖片描述
先來簡單看看Q40的SQL語句,如下所示,看起來比較復雜,核心涉及到3個表(catalog_sales join date_dim 、catalog_sales join warehouse 、catalog_sales join item)的join操作:

select  w_state  ,i_item_id  ,
sum(case when (cast(d_date as date) < cast (1998-04-08as date))  then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_before  ,
sum(case when (cast(d_date as date) >= cast (1998-04-08as date))  then cs_sales_price – coalesce(cr_refunded_cash,0) else 0 end) as sales_after  
from  catalog_sales 
left outer join catalog_returns on  (catalog_sales.cs_order_number = catalog_returns.cr_order_number  
and catalog_sales.cs_item_sk = catalog_returns.cr_item_sk)  ,
warehouse  ,item  ,date_dim  
where  i_current_price between 0.99 and 1.49  
and item.i_item_sk = catalog_sales.cs_item_sk  
and catalog_sales.cs_warehouse_sk = warehouse.w_warehouse_sk  
and catalog_sales.cs_sold_date_sk = date_dim.d_date_sk  
and date_dim.d_date between1998-03-09and1998-05-08group by  w_state,i_item_id  
order by w_state,i_item_id  limit 100;

典型的星型結構,其中catalog_sales是事實表,其他表為緯度表。本次分析選擇其中catalog_sales join item這個緯度的join。因為對比測試中兩者的SQL解析引擎都是使用impala,所以SQL執行計劃基本都相同。在此基礎上,來看看執行計劃中單個執行節點在執行catalog_sales join item操作時由先到后的主要階段耗時,其中只貼出來重要耗時階段(Q40中Join算法為shuffle hash join,與上文所舉broadcast hash join示例略有不同,不過不影響結論):

實驗項目impala on kudu(without runtime filter)impala on kudu(without runtime filter)
total time43s996ms2s385ms
bloomfilter生成Filter 0 arrival: 857ms
Filter 1 arrival: 879ms
Filter 2 arrival: 939ms
大表scan掃描HDFS_SCAN_NODE (id=0):(Total: 3s479ms)
– RowsRead: 72.01M
RowsReturned: 72.01M
– RowsReturnedRate: 20.69 M/s
HDFS_SCAN_NODE (id=0):(Total: 2s011ms)
– RowsRead: 72.01M
RowsReturned: 35.92K
– RowsReturnedRate: 17.86 K/sec
Filter 0 (1.00 MB):
– Rows processed: 72.01M
– Rows rejected: 71.43M
– Rows total: 72.01M
Filter 1 (1.00 MB):
– Rows processed: 49.15K
– Rows rejected: 126
– Rows total: 49.15K
Filter 2 (1.00 MB):
– Rows processed: 584.38K
– Rows rejected: 548.46K
– Rows total: 584.38K
數據加載計算進程內存DataStreamSender (dst_id=11):(Total: 15s984ms)
– NetworkThroughput(*): 298.78 MB/sec
– OverallThroughput: 100.85 MB/sec
– RowsReturned: 72.01M– SerializeBatchTime: 10s567ms
TransmitDataRPCTime: 5s395ms
DataStreamSender (dst_id=11):(Total: 10.725ms)
– NetworkThroughput(*): 244.06 MB/sec
– OverallThroughput: 71.23 MB/sec
– RowsReturned: 35.92K
SerializeBatchTime: 7.544ms
TransmitDataRPCTime: 3.130ms
Hash JoinHASH_JOIN_NODE (id=5): (Total: 19s104ms
– BuildPartitionTime: 862.560ms
– BuildRows: 8.99M
– BuildRowsPartitioned: 8.99M
– BuildTime: 373.855ms
– ……
– ProbeRows: 90.00M
– ProbeRowsPartitioned: 0 (0)
ProbeTime: 17s628ms
– RowsReturned: 90.00M
– RowsReturnedRate: 985.85 K/s
– SpilledPartitions: 0 (0)
– UnpinTime: 960.000ns
HASH_JOIN_NODE (id=6): (Total: 21.707ms)
– BuildPartitionTime: 3.487ms
– BuildRows: 18.81K (18814)
– BuildRowsPartitioned: 18.81K
– BuildTime: 646.817us
– ……
– ProbeRows: 85.28K (85278)
– ProbeRowsPartitioned: 0 (0)
ProbeTime: 6.396ms
– RowsReturned: 85.27K
– RowsReturnedRate: 38.88 K/s
– SpilledPartitions: 0 (0)
– UnpinTime: 915.000ns

經過對兩種場景執行計劃的解析,可以基本驗證上文所做的基本理論結果:
1. 確認經過RF之后大表的數據量得到大量濾除,只剩下少量數據參與最終的HashJoin。參見第二行大表scan掃描結果,未使用rf的返回結果有7千萬行+紀錄,而經過RF過濾之后滿足條件的只有3w+紀錄。3萬相比7千萬,性能優化效果自然不言而喻。
2. 經過RF濾除之后,少量數據經過網絡從存儲進程加載到計算進程內存的網絡耗時大量減少。參見第三行“數據加載到計算進程內存”,前者耗時15s,后者耗時僅僅11ms。主要耗時分為兩部分,其中數據序列化時間占到2/3-10s左右,數據經過RPC傳輸時間占另外1/3 -5s左右。
3. 最后,經過RF濾除之后,參與到最終Hash Join的數據量大幅減少,Hash Join耗時前者是19s,后者是21ms左右。主要耗時在于大表Probe Time,前者消耗了17s左右,而后者僅需6ms。

說好的謂詞下推呢?
講真,剛開始接觸RF的時候覺得這簡直是一個實實在在的神器,崇拜之情溢于言表。然而,經過一段時間的探索消化,直至把這篇文章寫完,也就是此時此刻,忽然覺得它并不高深莫測,說白了就是一個謂詞下推,不同的是這里的謂詞稍微奇怪一點,是一個bloomfilter而已。

提到謂詞下推,這里再引申一下下。以前經常滿大街聽到謂詞下推,然而對謂詞下推卻總感覺懵懵懂懂,并不明白的很真切。經過RF的洗禮,現在確信有了更進一步的理解。這里拿出來和大家交流交流。個人認為謂詞下推有兩個層面的理解:

  • 其一是邏輯執行計劃優化層面的說法,比如SQL語句:select * from order ,item where item.id =order.item_id and item.category =‘book’,正常情況語法解析之后應該是先執行Join操作,再執行Filter操作。通過謂詞下推,可以將Filter操作下推到Join操作之前執行。即將where item.category = ‘book’下推到 item.id = order.item_id之前先行執行。

  • 其二是真正實現層面的說法,謂詞下推是將過濾條件從計算進程下推到存儲進程先行執行,注意這里有兩種類型進程:計算進程以及存儲進程。計算與存儲分離思想,這在大數據領域相當常見,比如最常見的計算進程有SparkSQL、Hive、impala等,負責SQL解析優化、數據計算聚合等,存儲進程有HDFS(DataNode)、Kudu、HBase,負責數據存儲。正常情況下應該是將所有數據從存儲進程加載到計算進程,再進行過濾計算。謂詞下推是說將一些過濾條件下推到存儲進程,直接讓存儲進程將數據過濾掉。這樣的好處顯而易見,過濾的越早,數據量越少,序列化開銷、網絡開銷、計算開銷這一系列都會減少,性能自然會提高。

寫到這里,忽然意識到筆者在上文出現了一個很嚴重的認知錯誤:RF機制并不僅僅是一個簡單的謂詞下推,它的精髓在于提出了一個重要的謂詞-bloomfilter。當前對RF支持的系統并不多,筆者只知道目前唯有Impala on Parquet進行了支持。Impala on Kudu雖說Impala支持,但Kudu并不支持。SparkSQL on Parqeut中雖有存儲系統支持,無奈計算引擎-SparkSQL目前還不支持。

轉自:http://hbasefly.com/2017/04/10/bigdata-join-2/

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/456344.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/456344.shtml
英文地址,請注明出處:http://en.pswp.cn/news/456344.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【轉載】通過金礦模型介紹動態規劃 (動態規劃入門)

先附上原文地址&#xff1a;http://www.cnblogs.com/sdjl/articles/1274312.html 通過金礦模型介紹動態規劃 對于動態規劃&#xff0c;每個剛接觸的人都需要一段時間來理解&#xff0c;特別是第一次接觸的時候總是想不通為什么這種方法可行&#xff0c;這篇文章就是為了…

flask模型中【外鍵】relationship的使用筆記

模型中relationship的使用筆記 模型.PY class User(db.Model):# __tablename__ user1 #定義表名id db.Column(db.Integer, primary_keyTrue, autoincrementTrue)username db.Column(db.String(10), nullableTrue)password db.Column(db.String(64), nullableTrue)phone …

六種方式實現生產者消費者(未完)

2019獨角獸企業重金招聘Python工程師標準>>> 一、利用Object對象是wait和notify\notifyAll package com.jv.parallel.consumerandproducer.objectwait;public class Car {private volatile int flag 0;public void showConsumer(){System.out.println("I am a…

SQL中基于代價的優化

還記得筆者在上篇文章無意中挖的一個坑么&#xff1f;如若不知&#xff0c;強烈建議看官先行閱讀前面兩文&#xff0d;《SparkSQL Join原理》和《Join中竟然也有謂詞下推?》 第一篇文章主要分析了大數據領域Join的三種基礎算法以及各自的適用場景&#xff0c;第二篇文章在第一…

git如何解決沖突(代碼托管在coding)

分支A提交合并請求到分支B&#xff0c;有沖突git fetch code 拉取遠程倉庫的其他分支代碼&#xff08;我拉代碼是remote add code所以這里是code,可以用git remote查看&#xff09;git checkout 分支A 切換到分支Agit pull code 分支A 拉取分支A代碼git checkout 分支B 切換到分…

cookie和session之會話機制: ? http 協議? ---》 無狀態協議

設置cookie&#xff1a; 通過response對象&#xff1a; response make_response() response.set_cookie(key,value,max_age(單位second),expires(要求是detetime類型)) expires datetime(year2018,month11,day5) #expires是這么設置的 expires datetime.n…

Java Map 怎樣實現Key 的唯一性?

大家都知道。在Map和Set不可存在反復元素&#xff1f; 可是對于內部的細節我們并不了解。今天我們就一塊來 探討一下&#xff01; 1 對于 HashMap HashSet 他們的底層數據結構的實現是&#xff1a;維護了一張 HashTable 。容器中的元素所有存儲在Hashtable 中。他們再加入…

win10下安裝pyspark及碰到的問題

文章目錄前言安裝過程Q1總結&#xff1a;前言 最近由于工作需要&#xff0c;需要了解下pyspark&#xff0c;所以就在win10環境下裝了下&#xff0c;然后在pycharm中使用的時候碰到了一些問題。整個過程可謂是一波三折。下面一一道來。 安裝過程 安裝過程就不詳細說了&#x…

解決AttributeError AttributeError: 'NoneType' object has no attribute 'filename'

原因忘記上傳文件 表單需要加屬性 enctype"multipart/form-data" 否則報錯&#xff01;AttributeError AttributeError: NoneType object has no attribute filename enctype"multipart/form-data是設置表單的MIME編碼。默認情況&#xff0c;這個編碼格式是ap…

SQLAlchemy()分頁器paginate方法

Flask的數據分頁示例 用法&#xff1a; 1&#xff0c;首先寫數據獲取的視圖函數&#xff0c;就像這樣&#xff1a; # 首頁 blog_bp.route(/, endpointindex) def index():#獲取頁數page request.args.get(page,1)paginate Article.query.paginate(pageint(page),per_page3)…

開源中國 2014 年源創會年度計劃

時光總是從敲代碼的指尖不經意地滑過&#xff0c;轉眼2014年已快過去一半&#xff0c;OSC依然心懷著最初的夢想。 源創會&#xff0c;oscer的線下快樂大本營&#xff0c;我們仍在繼續...... 聆聽技術大牛講解最前沿的技術&#xff0c;和同道中人切磋IT秘籍&#xff0c;吃點心侃…

互聯網金融行業申請評分卡(A卡)簡介

文章目錄前言基本概念1、信用違約風險的基本概念什么是信用違約風險&#xff1a;組成部分違約的主體個貸中常用的違約定義M0&#xff0c;M1&#xff0c;M2的定義2、申請評分卡的重要性和特性信貸場景中的評分卡申請評分卡的概念為什么要開發申請評分卡評分卡的特性 &#xff08…

Flask的csrf_token的用法

在flask當中&#xff0c;flask-wtf模塊時攜帶csrf校驗的&#xff0c;只是需要開啟&#xff1b; 如果不開啟校驗就不需要校驗&#xff0c;但是那樣不安全。 Csrf是針對與post請求的跨域限制&#xff0c;get請求沒有作用 csrf_token的開啟 在flask中開啟csrf保護 from flask_…

dotty編譯器語法特性之一交叉類型,聯合類型和文本單例類型

2019獨角獸企業重金招聘Python工程師標準>>> ###翻譯&#xff1a;http://dotty.epfl.ch/docs/reference/intersection-types.html #交叉類型 trait Resettable {def reset(): this.type } trait Growable[T] {def add(x: T): this.type } def f(x: Resettable &…

【轉】Zookeeper 安裝和配置

轉自&#xff1a;http://coolxing.iteye.com/blog/1871009 Zookeeper的安裝和配置十分簡單, 既可以配置成單機模式, 也可以配置成集群模式. 下面將分別進行介紹. 單機模式 1. 配置 點擊這里下載zookeeper的安裝包之后, 解壓到合適目錄. 進入zookeeper目錄下的conf子目錄, 創建z…

一分鐘精通Flask-Bootstrap的使用

要想在程序中集成Bootstrap&#xff0c;顯然要對模板做所有必要的改動。不過&#xff0c;更簡單的方法是使用一個名為Flask-Bootstrap 的Flask 擴展&#xff0c;簡化集成的過程。 安裝&#xff1a; Flask-Bootstrap 使用pip安裝&#xff1a; pip install flask_bootstrap Fl…

linux生產環境下安裝anaconda總結

前言&#xff1a; 工作中&#xff0c;常常要在新的linux生產服務器中安裝自己的集成python環境&#xff0c;這種情況下有一點需要注意&#xff1a;不能覆蓋生產服務器中的python環境&#xff08;也就是自己的python環境要和系統的python環境分開&#xff09;。一般情況下系統自…

Delphi TScrollBar 用于滾動窗口、組件內容

滾動條組件&#xff08;TScrollBar&#xff09;此組件是一個Windows滾動條&#xff0c;用于滾動窗口、組件內容。許多控制有滾動條屬性&#xff0c;它們把滾動條作為自己的一部分&#xff0c;對于沒有完整滾動條的控制&#xff0c;TScrollBar組件提供了一個附加的功能&#xff…

FSF 稱 DRM 被用于鎖定、控制和監視用戶

自由軟件基金會正在督促美國政府廢除DMCA中保護DRM的反規避條款。DMCA的1201條款禁止繞過DRM保護的內容和設備。 自由軟件基金會的Donald Robertson在致函美國版權辦公室的信&#xff08;PDF&#xff09;中指出&#xff0c;技術保護措施和數字限制管理&#xff08;即DRM&#x…

改數據庫表結構類型兩種方法

alter table user change password password varchar(128) not null; alter table user modify column password varchar(128) not null;