1、簡介
在上一節中已經介紹過 JDFrame,文章鏈接stream流太難用了看看JDFrame 沒看過的朋友可以先看看,
這次主要講講窗口函數相關API的使用
在各種數據庫mysql, hive、spark中都有非常好用的開窗函數使用, 但是java卻沒好用的JVM層級的窗口函數使用,于是乎寫了這個,如果能熟練使用開窗函數相信能在業務代碼中大大減少我們的統計計算邏輯代碼。
本文不會介紹每個開窗函數是什么,它的語義與其他語言的窗口函數一模一樣,在這里僅作簡單介紹,后續會出相關實戰的數據分析案例。
2、Maven依賴
<dependency><groupId>io.github.burukeyou</groupId><artifactId>jdframe</artifactId><version>0.0.4</version>
</dependency>
3、窗口函數的API使用
測試代碼
static List<WebPvDto> dataList = new ArrayList<>();static {dataList.add(new WebPvDto("a",0,1));dataList.add(new WebPvDto("a",1,5));dataList.add(new WebPvDto("a",2,7));dataList.add(new WebPvDto("a",3,3));dataList.add(new WebPvDto("a",4,2));dataList.add(new WebPvDto("a",5,4));dataList.add(new WebPvDto("a",6,4));dataList.add(new WebPvDto("b",7,1));dataList.add(new WebPvDto("b",8,4));dataList.add(new WebPvDto("b",7,6));dataList.add(new WebPvDto("b",8,2));
}@Data
public static class WebPvDto {private String type;private Integer score;private Integer pvCount;public Object value;
}
ROW_NUMBER 窗口函數
生成行號,從1開始
// 等價于 select ROW_NUMBER() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overRowNumberS(WebPvDto::setValue).show(30);
輸出結果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 4
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
RANK 窗口函數
生成排名號,相同值排名一樣,排名不連續 。 如: 1 2 2 2 5 6 7
// 等價于 select rank() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overRankS(WebPvDto::setValue).show(30);
輸出結果
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 3
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
DENSE_RANK 窗口函數
生成排名號,相同值排名一樣,排名連續 如 1 2 2 2 3 4 5
// 等價于 select DENSE_RANK() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overDenseRankS(WebPvDto::setValue).show(30);
輸出結果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 3
a 3 3 4
a 4 2 5
a 0 1 6
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
PERCENT_RANK 窗口函數
// 等價于 select PERCENT_RANK() over(partition by type order pv_count desc)
SDFrame.read(dataList).defaultScale(6).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overPercentRankS(WebPvDto::setValue).show(30);
輸出結果
type score pvCount value
a 2 7 0
a 1 5 0.166667
a 5 4 0.333333
a 6 4 0.333333
a 3 3 0.666667
a 4 2 0.833333
a 0 1 1.000000
b 7 6 0
b 8 4 0.333333
b 8 2 0.666667
b 7 1 1.000000
Count窗口函數
// 等價于SQL: select count(*) over(partition by type order by pv_count desc rows between UNBOUNDED PRECEDING and CURRENT ROW)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount).roundStartRow2CurrentRow()).overCountS(WebPvDto::setValue).show(30);
輸出結果:
type score pvCount value
a 2 7 1
a 1 5 2
a 5 4 3
a 6 4 4
a 3 3 5
a 4 2 6
a 0 1 7
b 7 6 1
b 8 4 2
b 8 2 3
b 7 1 4
Sum窗口函數
// 等價于 select sum(pv_count) over(rows between 1 PRECEDING and 2 FOLLOWING)
JDFrame.read(dataList).window(Window.roundBetweenBy(Range.BEFORE(1),Range.AFTER(2))).overSumS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
輸出結果:
type score pvCount value
a 0 1 13
a 1 5 16
a 2 7 17
a 3 3 16
a 4 2 13
a 5 4 11
a 6 4 13
b 7 1 15
b 8 4 13
b 7 6 12
b 8 2 8
Avg窗口函數
// 等價于 select avg(pv_count) over(partition by type )
SDFrame.read(dataList).defaultScale(4).window(Window.groupBy(WebPvDto::getType)).overAvgS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
輸出結果
type score pvCount value
a 0 1 3.7143
a 1 5 3.7143
a 2 7 3.7143
a 3 3 3.7143
a 4 2 3.7143
a 5 4 3.7143
a 6 4 3.7143
b 7 1 3.2500
b 8 4 3.2500
b 7 6 3.2500
b 8 2 3.2500
Max窗口函數
// 等價于 select max(pv_count) over(partition by type order pv_count asc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortAsc(WebPvDto::getPvCount)).overMaxValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
輸出結果:
type score pvCount value
a 0 1 7
a 4 2 7
a 3 3 7
a 5 4 7
a 6 4 7
a 1 5 7
a 2 7 7
b 7 1 6
b 8 2 6
b 8 4 6
b 7 6 6
Min窗口函數
// 等價于 select min(pv_count) over(rows between CURRENT ROW and 2 FOLLOWING)
SDFrame.read(dataList).window(Window.roundCurrentRow2AfterBy(2)).overMinValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
type score pvCount value
a 0 1 1
a 1 5 3
a 2 7 2
a 3 3 2
a 4 2 2
a 5 4 1
a 6 4 1
b 7 1 1
b 8 4 2
b 7 6 2
b 8 2 2
Lag窗口函數
獲取當前行的前N行數據
// 等價于 select lag(pv_count,2) over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overLagS(WebPvDto::setValue,WebPvDto::getPvCount,2).show(30);
輸出結果:
type score pvCount value
a 2 7
a 1 5
a 5 4 7
a 6 4 5
a 3 3 4
a 4 2 4
a 0 1 3
b 7 6
b 8 4
b 8 2 6
b 7 1 4
Lead窗口函數
獲取當前行的后N行數據
// 等價于 select lead(pv_count,3) over()
SDFrame.read(dataList).window().overLeadS(WebPvDto::setValue,WebPvDto::getPvCount,3).show(30);
輸出結果:
type score pvCount value
a 0 1 3
a 1 5 2
a 2 7 4
a 3 3 4
a 4 2 1
a 5 4 4
a 6 4 6
b 7 1 2
b 8 4
b 7 6
b 8 2
NthValue 窗口函數
獲取窗口范圍內的第N行數據
// 等價于 select NTH_VALUE(pv_count,2) over(rows between 1 PRECEDING and CURRENT ROW)
SDFrame.read(dataList).window(Window.roundBefore2CurrentRowBy(3)).overNthValueS(WebPvDto::setValue,WebPvDto::getPvCount,2).show(30);
輸出結果:
type score pvCount value
a 0 1
a 1 5 5
a 2 7 5
a 3 3 5
a 4 2 7
a 5 4 3
a 6 4 2
b 7 1 4
b 8 4 4
b 7 6 1
b 8 2 4
FirstValue 窗口函數
獲取窗口范圍內的第1行數據
// 等價于 select FIRST_VALUE(pv_count) over(rows between 2 PRECEDING and CURRENT ROW)
SDFrame.read(dataList).window(Window.roundBetweenBy(Range.BEFORE(2), Range.CURRENT_ROW)).overFirstValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
type score pvCount value
a 0 1 1
a 1 5 1
a 2 7 1
a 3 3 5
a 4 2 7
a 5 4 3
a 6 4 2
b 7 1 4
b 8 4 4
b 7 6 1
b 8 2 4
LastValue 窗口函數
獲取窗口范圍內的最后一行數據
// 等價于 select LAST_VALUE(pv_count) over(rows between 2 PRECEDING and 2 FOLLOWING)
SDFrame.read(dataList).window(Window.roundBeforeAfterBy(2,2)).overLastValueS(WebPvDto::setValue,WebPvDto::getPvCount).show(30);
輸出結果
type score pvCount value
a 0 1 7
a 1 5 3
a 2 7 2
a 3 3 4
a 4 2 4
a 5 4 1
a 6 4 4
b 7 1 6
b 8 4 2
b 7 6
b 8 2
Ntile 窗口函數
給窗口盡量均勻的分成N個桶, 每個桶的編號從1開始, 如果分布不均勻,則優先分配給最小的桶,桶之間的大小差值最多不超過1
// 等價于 select Ntile(3) over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType)).overNtileS(WebPvDto::setValue,3).show(30);
輸出結果:
type score pvCount value
a 0 1 1
a 1 5 1
a 2 7 1
a 3 3 1
a 4 2 2
a 5 4 2
a 6 4 2
b 7 1 2
b 8 4 3
b 7 6 3
b 8 2 3
Cume_Dist 窗口函數
累積分布值, 統計的是 (小于等于當前排名號的行數 / 窗口行數) 的比率
// select cume_dist() over(partition by type order pv_count desc)
SDFrame.read(dataList).window(Window.groupBy(WebPvDto::getType).sortDesc(WebPvDto::getPvCount)).overCumeDistS(WebPvDto::setValue).show(30);
輸出結果
type score pvCount value
a 2 7 0.14
a 1 5 0.29
a 5 4 0.57
a 6 4 0.57
a 3 3 0.71
a 4 2 0.86
a 0 1 1.00
b 7 6 0.25
b 8 4 0.50
b 8 2 0.75
b 7 1 1.00
4 窗口
主要是通過Window對象去構建開窗的信息,包括窗口的分區情況,窗口的排序情況,還有窗口范圍。
窗口范圍可以通過 Range對象去枚舉指定。
如果不指定窗口信息默認窗口范圍就是全部行。 眾所周知而在 mysql中如果使用了order默認窗口范圍就是 rows between UNBOUNDED PRECEDING and CURRENT ROW
, 如果沒有使用order也沒指定rows between
, 默認窗口范圍才是全部。 這點要注意區分
5 最后
1、窗口函數的計算結果的存儲有兩種方式,一種是直接返回到FI2里, 一種是可以通過指定SetFunction 進行存儲, 所有后綴帶S的方法就是通過后者的方式的存儲, 之所以帶S后綴是為了以便于區分,并且是放到第一個方法參數里。
2、除了可以通過單獨的window()的方法去指定窗口信息,在每個over方法也可以了單獨設置。 沒單獨設置就使用window()方法里指定的窗口信息
3、在不同窗口范圍內的數據計算目前用的是各種滑動窗口算法,時間復雜度基本在O(N)左右
代碼地址
Maven依賴地址