BuildHashTable函數細節步驟
該函數位置處于hash_join_iterator.cc
403 ~ 560行
step1:如果被驅動表迭代器沒有更多的行數,更新m_state為EOR,然后返回false,表明創建hash表失敗
if (!m_build_iterator_has_more_rows) {m_state = State::END_OF_ROWS;return false;
}
**step2:**還原插入行緩沖區的最后一行。如果構建輸入是一個嵌套循環,內部有一個過濾器,那么這是必需的。這里還不是很理解
if (m_row_buffer.Initialized() &&m_row_buffer.LastRowStored() != m_row_buffer.end()) {hash_join_buffer::LoadIntoTableBuffers(m_build_input_tables, m_row_buffer.LastRowStored()->second);
}
step3:清除行buffer并且將多有迭代器重新指向它。如果初始化成功,直接返回true。
if (InitRowBuffer()) {return true;
}
step4:初始化了兩個變量
reject_duplicate_keys
和store_rows_with_null_in_join_key
const bool reject_duplicate_keys = RejectDuplicateKeys();
const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
RejectDuplicateKeys()
函數返回值為true的話,說明拒絕哈希表中的重復鍵。當遇到半連接或反連接等相同鍵值只需要返回一條結果,不需要返回extra情況。
對于反連接與半連接可以參考:半連接&反連接
指明當前jointype為外連接JoinType::OUTER
step5:將被驅動表輸入的SetNullRowFlag
清除。這是為了防止hashjoin用于獨立子查詢時init被調用多次的情況,不然這個標志將被之前執行的hashjoin操作污染。
m_build_input->SetNullRowFlag(/*is_null_row=*/false);
step6:開始通過迭代器從m_build_input循環讀數據,
1、如果線程被kill的話,返回true。
2、當build input為空,內連接和半連接結果也會為空,然而反連接的輸出將是probe input的所有行
3、當讀到build 迭代器的最后一行,這說明我們不會再去在probe 迭代器中讀取數據了。這時候需要我們禁止probe row保存數據
PFSBatchMode batch_mode(m_build_input.get());
for (;;) { // Termination condition within loop.int res = m_build_input->Read();if (res == 1) {DBUG_ASSERT(thd()->is_error() ||thd()->killed); // my_error should have been called.return true;}if (res == -1) {m_build_iterator_has_more_rows = false;// If the build input was empty, the result of inner joins and semijoins// will also be empty. However, if the build input was empty, the output// of antijoins will be all the rows from the probe input.if (m_row_buffer.empty() && m_join_type != JoinType::ANTI &&m_join_type != JoinType::OUTER) {m_state = State::END_OF_ROWS;return false;}// As we managed to read to the end of the build iterator, this is the// last time we will read from the probe iterator. Thus, we can disable// probe row saving again (it was enabled if the hash table ran out of// memory _and_ we were not allowed to spill to disk).m_write_to_probe_row_saving = false;SetReadingProbeRowState();return false;}
step7 :
1、請求所有表的行ID
2、存儲當前位于表記錄緩沖區中的行,將其放到store_row_result中
3、根據store_row_result狀態進行處理
-
如果是
*ROW_STORED*
,說明已經存儲完畢,直接breakcase hash_join_buffer::StoreRowResult::ROW_STORED:break;
-
如果是
BUFFER_FULL
,說明緩存區已經滿了.如果允許的話,向磁盤操作。如果不允許向磁盤操作,就繼續從probe 迭代器中讀取數據,并且開啟probe row保存,這樣沒有匹配的probe rows將被寫到saving file中。在下一次refill hash表的時候,從saving file中讀取probe row。
if (!m_allow_spill_to_disk) {if (m_join_type != JoinType::INNER) {// Enable probe row saving, so that unmatched probe rows are written// to the probe row saving file. After the next refill of the hash// table, we will read rows from the probe row saving file, ensuring// that we only read unmatched probe rows.InitWritingToProbeRowSavingFile();}SetReadingProbeRowState();return false; } // If we are not allowed to spill to disk, just go on to reading from// the probe iterator. if (!m_allow_spill_to_disk) {if (m_join_type != JoinType::INNER) {// Enable probe row saving, so that unmatched probe rows are written// to the probe row saving file. After the next refill of the hash// table, we will read rows from the probe row saving file, ensuring// that we only read unmatched probe rows.InitWritingToProbeRowSavingFile();}SetReadingProbeRowState();return false; }
初始化兩個input的hashjoinchunk。估計需要多少chunks,planner會事先給出一個數,這里會重新計算得到每個塊都合適的磁盤塊。
if (InitializeChunkFiles(m_estimated_build_rows, m_row_buffer.size(), kMaxChunks,m_probe_input_tables, m_build_input_tables,/*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER,&m_chunk_files_on_disk)) {DBUG_ASSERT(thd()->is_error()); // my_error should have been called.return true; }
將迭代器上剩余的數據寫到磁盤的chunk file上,如果出現IO錯誤的話,返回true
if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables,m_join_conditions, kChunkPartitioningHashSeed,&m_chunk_files_on_disk,true /* write_to_build_chunks */,false /* write_rows_with_null_in_join_key */,m_tables_to_get_rowid_for,&m_temporary_row_and_join_key_buffer)) {DBUG_ASSERT(thd()->is_error() ||thd()->killed); // my_error should have been called.return true; }
從build input起始地方刷新并定位所有chunk files。
for (ChunkPair &chunk_pair : m_chunk_files_on_disk) {if (chunk_pair.build_chunk.Rewind()) {DBUG_ASSERT(thd()->is_error() ||thd()->killed); // my_error should have been called.return true;} } SetReadingProbeRowState(); return false; }
-
如果狀態為FATAL_ERROR,說明出現意料之外的錯誤,可能是malloc失敗。返回true。
case hash_join_buffer::StoreRowResult::FATAL_ERROR:// An unrecoverable error. Most likely, malloc failed, so report OOM.// Note that we cannot say for sure how much memory we tried to allocate// when failing, so just report 'join_buffer_size' as the amount of// memory we tried to allocate.my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),thd()->variables.join_buff_size);return true;}