這是一個策略:
來自Mapper的
:發出每個記錄的三個副本并使用二級排序:
((復合鍵),值)=
((消息小時 - 一小時,當前消息的精確時間),消息)
((消息小時,消息的準確時間),消息)
((消息小時1小時,消息的準確時間),消息)
現在:您需要標準的二級排序:
setPartitioner只到密鑰的前半部分(消息的小時)
setGroupingComparator只到鍵的前半部分(消息的小時)
setSortingComparator to(消息小時,消息的準確時間)
在reducer中:每個reducer組接收消息精確時間內/ - 60到120分鐘內的所有消息 . reducer以排序順序查看"precise time of message"的所有內容 . 因此,您可以在每個減速器中保留過去60分鐘內查看的所有消息的滑動窗口
NOTE 以上假設60分鐘消息的數據可以放在單個reducer任務的內存中 . 否則,您將需要求助于將數據寫入磁盤作為窗口函數的一部分 .
Update OP要求進一步澄清窗口,所以我們走了 .
從Mapper發出的密鑰的角度考慮:每個輸入記錄有三個密鑰 . 現在在Reducer上,這意味著每個輸入記錄都出現在三個不同的組中 . 原因是我們需要針對每個輸入記錄考慮前導和滯后記錄 . 因此,現在我們讓每個組都可以訪問所有輸入記錄,這些記錄可能在最早記錄的60分鐘內以及最新記錄的60分鐘內 . 由于記錄按每小時最早的秒數分組:這意味著-60(分鐘)到120(最大)對比屬于給定小時組的任何記錄 .