寫入的簡單流程:
1.生成ExternalSorter對象
2.將消息都是插入ExternalSorter對象中
3.獲取到mapOutputWriter,將中間產生的臨時文件合并到一個臨時文件
4.生成最后的data文件和index文件
可以看到寫入的重點類是ExternalSorter對象
ExternalSorter
基本功能:對(k,v)進行排序,中間可能存在合并操作,最后生成(k,c)。
- 使用partitioner對key進行分區
- 在每個分區中使用Comparator進行排序
- 輸出一個單獨的文件,每個分區對應這個文件中的一段范圍。
如果禁用了合并操作,類型C必須等于V
這個類的工作流程如下:
- 使用數據,反復填充內存緩沖區。如果是可以合并的數據,則使用PartitionedAppendOnlyMap;如果不合并,則使用PartitionedPairBuffer。在這些緩沖區中,我們首先按分區ID對元素進行排序,然后可能還會按鍵進行排序。為了避免對每個鍵多次調用分區器,我們將分區ID與每條記錄一同存儲。
- 當每個緩沖區達到內存限制時,會將其spill到文件中。這個文件首先按分區ID排序,如果需要做聚合操作,其次可能按鍵或鍵的哈希碼排序。對于每個文件,跟蹤每個分區在內存中的對象數量,因此不必為每個元素都寫出分區ID。
- 當用戶請求迭代器或文件輸出時,溢寫的文件會與任何剩余的內存數據一起被合并,使用上述定義的相同排序順序(除非排序和聚合都被禁用)。如果需要按鍵進行聚合,我們或者從ordering參數中使用全序,或者讀取具有相同哈希碼的鍵并相互比較它們的相等性來合并值。
- 用戶在結束時應調用stop()方法來刪除所有中間文件。
緩存buffer:PartitionedAppendOnlyMap、PartitionedPairBuffer
關鍵方法:insertAll、maybeSpillCollection、spill、writePartitionedMapOutput
PartitionedPairBuffer
capacity 容量
curSize 當前放入的數據量
data 數組,存儲的數據,(k,v)占用數組的兩個位置
insert
如果容量達到瓶頸就進行擴容。
先存key,再存value。再調用afterUpdate
afterUpdate
numUpdates數據插入/更新次數
nextSampleNum下一次采樣的次數
更新numUpdates,如果達到采樣次數,執行采樣takeSample
takeSample
samples中只存兩個樣品數據,用來計算每次更新的差值。
采樣的時候要移除多余的數據。更新下一次采樣的數據量。
estimateSize
預估大小。
最后一個樣品的lastSize+bytesPerUpdate*新增的更新次數。
resetSamples
重新進行采樣。
growArray
擴容2倍容量,遷移數據,重啟采樣
partitionedDestructiveSortedIterator
生成比較器comparator,調用sort對緩存的數據進行排序。
sorter是使用TimSort進行排序的。
TimSort介紹: https://zhuanlan.zhihu.com/p/695042849
iterator
用pos計算剩余量。
data(2 * pos)為key,data(2 * pos+1) 為value
PartitionedAppendOnlyMap
存儲數據用的數組data,里面的元素是key0, value0, key1, value1, key2, value2…
changeValue
PartitionedAppendOnlyMap插入數據不再是追加,而是有一個相同key合并值的過程。
- key是null,返回null,不進行存儲
- key首次插入,更新data中的對應的kv值
- key非首次插入,更新data中合并的的新value
- key發生哈希沖突,就向后加1,直到不沖突
update
跟changeValue類似。
growTable
比較簡單,就是容量擴大兩倍,將舊的kv值重新計算hash插入到新的數組中,如果發生hash沖突就不斷向后移動一位。
iterator
核心方法是nextValue,在nextValue中,遍歷data數組的對應key值,要求不是null,表明這個位置是有值的。
如果有key為null,要求pos=-1且haveNullValue=true
partitionedDestructiveSortedIterator
調用destructiveSortedIterator方法
destructiveSortedIterator
data數組中元素是分散的,首先將數組中的元素都集中到數組的前面。后面就跟PartitionedPairBuffer的partitionedDestructiveSortedIterator方法一樣使用TimeSort進行排序。
采樣相關方法
跟上面的PartitionedPairBuffer的采樣相關方法一樣。
spill相關方法
入口方法是maybeSpillCollection
maybeSpillCollection
不論使用的數據結構是buffer還是map,都是計算消耗的容量,再調用maybeSpill方法,最后重新初始化化對應數據結構。可以想到maybeSpill中就將緩存的數據放到了本地。
maybeSpill
每32條數據就進行一次內存使用情況判斷。如果當前使用內存超過了限制,就先申請新的內存,按照兩倍的內存使用量申請,不一定申請到足量的內存。申請后還是內存使用超過了限制,就進行spill,調用spill方法,同時調用releaseMemory釋放內存。
releaseMemory
spill
調用destructiveSortedWritablePartitionedIterator方法返回排好序的分區迭代器。
調用spillMemoryIteratorToDisk將數據溢寫到磁盤上
最后將生成的文件記錄到spills中
destructiveSortedWritablePartitionedIterator
調用對應數據結構的partitionedDestructiveSortedIterator方法返回排序的迭代器。
就是上面的PartitionedPairBuffer和PartitionedAppendOnlyMap的partitionedDestructiveSortedIterator方法。
spillMemoryIteratorToDisk
創建臨時文件,生成對應的writer
遍歷將數據寫入的文件中,每10000條進行一次flush。
如果失敗了,調用revertPartialWritesAndClose進行回滾。
revertPartialWritesAndClose
如果這次寫入出現問題,使用這個方法。回滾寫入,只保留截止到上一次寫入的內容。
writePartitionedMapOutput
將排好序的緩存和文件合并成一個文件輸出。
spills為空,即沒有產生排序文件。將緩存中數據生成排好序的迭代器,遍歷寫入到文件中。
存在排好序的文件。則需要調用partitionedIterator方法將文件數據和緩存的數據進行合并,再遍歷輸出。
partitionedIterator
調用merge方法合并內存和文件數據
merge
merge的第一個參數是spilled文件,第二個參數是內存緩存的數據。
流程是遍歷分區,取出對應分區的spilled文件中和緩存中的數據。
根據情況進行聚合或者排序等操作后輸出合并后的排好序的文件。
mergeSort
使用堆排序,但是heap中存放的是已經排好序的iterator。
最小值就是heap中首個iterator中的第一個元素。
mergeWithAggregation
有總排序,這樣相同的key會在一起。
調用mergeSort將iterators合并成一個排好序的iterator。
next方法就是遍歷key出來全部的值,進行合并后輸出,因為是全局有序,不需要遍歷iterator全部數據。
沒有總排序
跟上面流程類似,先得到合并的iterator,但是它不是全局有序的。存在不同的key在comparator比較下相等,如使用hash進行比較,因此存在 aaabaaa 這種情況的key分布。
在獲取相同key對應的值的時候需要遍歷iterator的使用comparator和equal進行比較數據,再進行合并。返回值是一個comparator相同有可能key不同的key組成的iterator