InputSplit
InputSplit
?是對 MapReduce 作業輸入數據的一種邏輯劃分。它并不直接包含數據本身,而是包含了處理一小塊數據所需的信息,比如數據的位置、起始點和長度。框架會為每個?InputSplit
?創建一個?Mapper
?任務。
從InputSplit.java
?的注釋中我們可以得到核心定義:
InputSplit
?represents the data to be processed by an individual?Mapper
. (InputSplit
?代表了將被單個?Mapper
?處理的數據。)
它與?InputFormat
?和?RecordReader
?的關系如下:
InputFormat
: 負責驗證作業的輸入規范,并將輸入數據切分成多個邏輯上的?InputSplit
。InputSplit
: 定義了一個?Mapper
?任務要處理的數據分片。它是一個對數據的引用,而不是數據本身。RecordReader
: 負責從?InputSplit
?所指向的存儲中讀取數據,并將其解析成?<key, value>
?鍵值對,然后喂給?Mapper
。
這個流程可以在?MapReduceTutorial.md
?中找到清晰的描述:
MapReduceTutorial.md
// ... existing code ...
The MapReduce framework relies on the `InputFormat` of the job to:1. Validate the input-specification of the job.2. Split-up the input file(s) into logical `InputSplit` instances,each of which is then assigned to an individual `Mapper`.3. Provide the `RecordReader` implementation used to glean inputrecords from the logical `InputSplit` for processing by the`Mapper`.
// ... existing code ...
InputSplit
?抽象類分析
InputSplit
?本身是一個抽象類,它定義了所有 "分片" 都必須遵守的契約。讓我們看看它的核心方法:
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputSplit {/*** Get the size of the split, so that the input splits can be sorted by size.* @return the number of bytes in the split* @throws IOException* @throws InterruptedException*/public abstract long getLength() throws IOException, InterruptedException;/*** Get the list of nodes by name where the data for the split would be local.* The locations do not need to be serialized.* * @return a new array of the node nodes.* @throws IOException* @throws InterruptedException*/public abstract String[] getLocations() throws IOException, InterruptedException;/*** Gets info about which nodes the input split is stored on and how it is* stored at each location.* * @return list of <code>SplitLocationInfo</code>s describing how the split* data is stored at each location. A null value indicates that all the* locations have the data stored on disk.* @throws IOException*/@Evolvingpublic SplitLocationInfo[] getLocationInfo() throws IOException {return null;}
}
public abstract long getLength()
: 這個方法返回分片的邏輯大小(以字節為單位)。這個返回值非常重要,框架可以用它來對分片進行排序(例如,優先處理大的分片),或者用于推測任務的進度。public abstract String[] getLocations()
: 這是實現**數據本地性(Data Locality)**的核心。它返回一個字符串數組,其中包含了存儲這份數據分片的節點的 主機名/IP地址。MapReduce 的調度器會盡量將?Mapper
?任務調度到?getLocations()
?返回的節點之一上運行,這樣?Mapper
?就可以從本地磁盤讀取數據,避免了昂貴的網絡I/O。public SplitLocationInfo[] getLocationInfo()
: 這是一個更現代、信息更豐富的?getLocations()
?版本。它不僅能返回位置信息,還能告知數據在該位置的存儲狀態(例如,是在內存中還是在磁盤上)。這使得調度器可以做出更智能的決策,比如優先將任務調度到數據已在內存中的節點。
InputSplit
?如何創建和使用?
開發者通常不直接創建?InputSplit
。這個工作由?InputFormat
?的?getSplits()
?方法完成。以最常見的?FileInputFormat
?為例,它負責將輸入文件切分成多個?FileSplit
。
FileInputFormat.java
?中的?getSplits
?方法展示了其核心邏輯:
FileInputFormat.java
// ... existing code .../** * Generate the list of files and make them into FileSplits.* @param job the job context* @throws IOException*/public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);
// ... existing code ...for (FileStatus file: files) {
// ... existing code ...Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}
// ... existing code ...} else { // not splittable
// ... existing code ...splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}
// ... existing code ...return splits;}
// ... existing code ...
這個過程大致如下:
- 獲取輸入路徑下的所有文件。
- 計算分片大小?
splitSize
,它通常與 HDFS 的塊大小(Block Size)有關。 - 遍歷每個文件,如果文件是可切分的(例如,未壓縮的文本文件),則根據?
splitSize
?將其切成一個或多個?InputSplit
。 - 在創建?
InputSplit
?時,會從?BlockLocation
?中獲取數據塊所在的節點信息,并傳遞給?InputSplit
,這就是?getLocations()
?的數據來源。
InputSplit
?的主要實現
由于?InputSplit
?是抽象的,我們需要看它的具體實現才能更好地理解。
FileSplit
: 最常用的一種實現,代表一個文件的一部分。它由文件路徑、起始偏移量和長度定義。// ... existing code ... /** The file containing this split's data. */ public Path getPath() { return fs.getPath(); }/** The position of the first byte in the file to process. */ public long getStart() { return fs.getStart(); }/** The number of bytes in the file to process. */ public long getLength() { return fs.getLength(); } // ... existing code ... public String[] getLocations() throws IOException {return fs.getLocations(); } // ... existing code ...
CombineFileSplit
: 用于將多個小文件或小文件塊合并成一個單獨的?InputSplit
。這對于處理大量小文件的場景非常高效,因為它避免了為每個小文件啟動一個?Mapper
?任務所帶來的開銷。 在?TestCombineTextInputFormat.java
?測試中,我們可以看到它被明確使用:// ... existing code ...// we should have a single split as the length is comfortably smaller than// the block sizeassertEquals(1, splits.size(), "We got more than one splits!");InputSplit split = splits.get(0);assertEquals(CombineFileSplit.class, split.getClass(),"It should be CombineFileSplit"); // ... existing code ...
TaggedInputSplit
: 當使用?MultipleInputs
?功能,即針對不同的輸入路徑使用不同的?InputFormat
?或?Mapper
?時,TaggedInputSplit
?就派上用場了。它包裝了一個原始的?InputSplit
,并額外 "標記" 了應該由哪個?InputFormat
?和?Mapper
?類來處理它。// ... existing code ... public class TaggedInputSplit extends InputSplit implements Configurable, Writable {private Class<? extends InputSplit> inputSplitClass;private InputSplit inputSplit;private Class<? extends InputFormat<?, ?>> inputFormatClass;private Class<? extends Mapper<?, ?, ?, ?>> mapperClass;private Configuration conf; // ... existing code ...
CompositeInputSplit
: 用于?JOIN
?操作。它本身是?InputSplit
?的一個集合,代表了需要連接的多個數據源的相應分片。它的?getLocations()
?方法會聚合所有子分片的位置信息。// ... existing code .../*** Collect a set of hosts from all child InputSplits.*/public String[] getLocations() throws IOException, InterruptedException {HashSet<String> hosts = new HashSet<String>();for (InputSplit s : splits) {String[] hints = s.getLocations();if (hints != null && hints.length > 0) {for (String host : hints) {hosts.add(host);}}}return hosts.toArray(new String[hosts.size()]);} // ... existing code ...
總結
InputSplit
?是 MapReduce 并行計算模型的基礎。它通過將大數據集邏輯地切分為可管理的小數據塊,使得成百上千的?Mapper
?任務可以并行處理。同時,通過?getLocations()
?方法攜帶的位置信息,框架能夠將計算任務移動到數據所在的位置,從而實現了“計算向數據移動”的核心思想,極大地提升了處理效率。
FileInputFormat.getSplits
?方法的整體邏輯
這個方法的核心目標是將輸入的文件分割成多個?InputSplit
(輸入分片),每個?InputSplit
?會作為一個 Map 任務的輸入。理想情況下,分片的數量決定了 Map 任務的并行度。
下面是?getSplits
?方法的執行流程的詳細分解:
// ... existing code ...public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);boolean ignoreDirs = !getInputDirRecursive(job)&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);for (FileStatus file: files) {
// ... existing code ...if (length != 0) {
// ... existing code ...if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitable
// ... existing code ...splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}
// ... existing code ...return splits;}protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}
// ... existing code ...
1. 計算分片大小(Split Size)
方法的第一步是確定每個分片的大小范圍。這由三個值決定:
minSize
?(最小分片大小):- 代碼:?
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- 它取?
getFormatMinSplitSize()
?(默認為 1) 和?getMinSplitSize(job)
?的最大值。 getMinSplitSize(job)
?讀取 Hadoop 的配置參數?mapreduce.input.fileinputformat.split.minsize
?(舊參數為?mapred.min.split.size
)。- 作用: 保證每個分片不會小于這個值。
- 代碼:?
maxSize
?(最大分片大小):- 代碼:?
long maxSize = getMaxSplitSize(job);
- 它讀取 Hadoop 的配置參數?
mapreduce.input.fileinputformat.split.maxsize
?(舊參數為?mapred.max.split.size
)。 - 作用: 保證每個分片不會大于這個值。
- 代碼:?
blockSize
?(HDFS 塊大小):- 代碼:?
long blockSize = file.getBlockSize();
- 這是文件存儲在 HDFS 上的塊大小,由 HDFS 的配置?
dfs.blocksize
?決定。
- 代碼:?
最終的分片大小?splitSize
?通過?computeSplitSize
?方法計算得出:?splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
2. 遍歷所有輸入文件
代碼會獲取輸入路徑下的所有文件 (List<FileStatus> files = listStatus(job);
),然后逐一處理。
3. 處理單個文件
對于每個文件,程序會檢查它是否可以被切分 (isSplitable(job, path)
)。
可切分的文件 (Splittable):
- 大多數普通文本文件都是可切分的。
- 程序會根據計算出的?
splitSize
?對文件進行邏輯切分。 - 它會循環創建大小為?
splitSize
?的分片,直到剩余的字節數不足一個?splitSize
?的 1.1 倍 (SPLIT_SLOP
)。 - 最后剩余的部分會形成最后一個分片。
- 關鍵點: 這里的切分是邏輯上的。一個分片可能跨越多個 HDFS 塊,也可能一個 HDFS 塊包含多個分片。
不可切分的文件 (Not Splittable):
- 某些壓縮格式(如 Gzip)是無法從中間讀取的,因此它們是不可切分的。
- 對于這類文件,整個文件會形成一個單獨的分片,無論它有多大。這會導致該文件只能由一個 Map 任務處理,可能成為性能瓶頸。
4. 創建分片 (makeSplit
)
makeSplit
?方法會創建一個?FileSplit
?對象,它包含了分片所需的所有信息:文件路徑、起始位置、長度以及數據所在的節點位置(hosts
),以便 MapReduce 框架實現“數據本地性” (Data Locality),即盡量將計算任務調度到數據所在的節點上執行,減少網絡傳輸。
如何通過 Hadoop 參數控制 Split 數量?
理解了上面的流程,我們就可以知道如何通過調整參數來控制分片的數量了。核心就是控制?computeSplitSize
?的計算結果。
splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
讓我們來分析不同場景:
默認情況 (最常見)
mapreduce.input.fileinputformat.split.minsize
?(minSize) 默認為 1。mapreduce.input.fileinputformat.split.maxsize
?(maxSize) 默認為?Long.MAX_VALUE
?(一個非常大的數)。- 此時公式變為:?
splitSize = Math.max(1, Math.min(Long.MAX_VALUE, blockSize))
- 簡化后得到:?
splitSize = blockSize
- 結論: 在默認情況下,分片大小就等于 HDFS 的塊大小。一個 1GB 的文件,如果塊大小是 128MB,那么它會被切分為?
1024 / 128 = 8
?個分片(大約值)。
增加分片數量 (調小?
maxSize
)- 目的: 增加 Map 任務的并行度,適用于 CPU 密集型任務。
- 方法: 設置?
mapreduce.input.fileinputformat.split.maxsize
?為一個小于?blockSize
?的值。 - 示例:?
blockSize
?= 128MB。如果你設置?maxSize
?= 64MB。 - 公式變為:?
splitSize = Math.max(1, Math.min(64MB, 128MB))
- 簡化后得到:?
splitSize = 64MB
- 結論: 每個 128MB 的塊都會被切分為 2 個 64MB 的分片,分片的總數大約會翻倍。
減少分片數量 (調大?
minSize
)- 目的: 減少 Map 任務的數量,適用于處理大量小文件或者希望合并處理邏輯的場景。
- 方法: 設置?
mapreduce.input.fileinputformat.split.minsize
?為一個大于?blockSize
?的值。 - 示例:?
blockSize
?= 128MB。如果你設置?minSize
?= 256MB。 - 公式變為:?
splitSize = Math.max(256MB, Math.min(Long.MAX_VALUE, 128MB))
- 簡化后得到:?
splitSize = Math.max(256MB, 128MB)
?->?splitSize = 256MB
- 結論: 程序會嘗試創建 256MB 大小的分片,這會將兩個 128MB 的 HDFS 塊合并到一個分片中,從而使分片總數減少一半。
總結
參數 | 默認值 | 作用 | 如何影響分片數 |
---|---|---|---|
mapreduce.input.fileinputformat.split.minsize | 1 | 設置分片的最小尺寸 | 調大此值(大于blockSize)可以減少分片數。 |
mapreduce.input.fileinputformat.split.maxsize | Long.MAX_VALUE | 設置分片的最大尺寸 | 調小此值(小于blockSize)可以增加分片數。 |
dfs.blocksize | (HDFS配置, 如128MB) | HDFS塊大小 | 在默認情況下,分片大小等于塊大小,是決定分片數的基礎。 |
通過靈活配置這三個參數,你可以精確地控制 MapReduce 作業的并行度,以適應不同的業務場景和硬件資源,從而達到優化作業性能的目的。
CombineFileInputFormat 如何實現文件合并邏輯的
在 org.apache.hadoop.mapred.lib.CombineFileInputFormat
類中,getSplits
方法的實現如下:
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =super.getSplits(Job.getInstance(job));InputSplit[] ret = new InputSplit[newStyleSplits.size()];for(int pos = 0; pos < newStyleSplits.size(); ++pos) {org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos);ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),newStyleSplit.getLocations());}return ret;}
這個方法實際上是調用了父類 org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
的 getSplits
方法,然后將返回的 org.apache.hadoop.mapreduce.InputSplit
列表轉換為 org.apache.hadoop.mapred.InputSplit
數組。
??父類 org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
的 getSplits
方法實現了 文件合并邏輯:??
??獲取文件信息 :??
通過
listStatus
方法獲取輸入路徑下的所有文件信息。對于每個文件,創建
OneFileInfo
對象,該對象會獲取文件的塊信息及其所在的主機和機架信息。
??處理文件池 :??
如果配置了文件池(pools),則先處理每個池中的文件,確保同一分片中的文件來自同一池。
??生成分片 :??
調用
getMoreSplits
方法處理具體的分片生成邏輯。在
getMoreSplits
方法中,為每個文件創建OneFileInfo
對象,并調用createSplits
方法。
??文件合并邏輯 (
createSplits
方法) :????節點級別合并 :??
優先在同一節點上合并塊,以提高數據本地性。
遍歷所有節點,對于每個節點上的塊,累加它們的大小。
如果累加的大小超過
maxSplitSize
,則調用addCreatedSplit
方法創建一個分片。如果剩余塊大小超過
minSplitSizeNode
,也會創建一個分片。
??機架級別合并 :??
對于無法在節點級別合并的塊,嘗試在機架級別進行合并。
遍歷所有機架,對于每個機架上的塊,累加它們的大小。
如果累加的大小超過
minSplitSizeRack
,則創建一個分片。
??處理剩余塊 :??
最后處理所有剩余的塊,將它們合并成最后的分片。
??創建分片 :??
addCreatedSplit
方法會創建CombineFileSplit
對象,該對象包含了合并后的文件信息(路徑、起始偏移量、長度等)。
??通過這種方式,getSplits
方法實現了我們之前描述的文件合并邏輯,將大量小文件合并成較大的分片,以減少 Map 任務的數量,提高 MapReduce 作業的執行效率。??
CombineFileInputFormat 中 getSplits 方法的實現分析
getSplits
方法是 CombineFileInputFormat
類的核心,負責將多個小文件合并成較少的輸入分片,以減少 Map 任務的數量,提高作業執行效率。
首先,該方法從作業配置中獲取分片大小的相關參數:
minSizeNode
: 節點級別的最小分片大小minSizeRack
: 機架級別的最小分片大小maxSize
: 分片的最大大小
這些參數可以通過 set 方法顯式設置,或者從配置中讀取。方法還會進行參數驗證,確保它們之間的關系合理。
分片生成 (getMoreSplits
方法)
getMoreSplits
方法是分片生成的核心:
??創建 OneFileInfo 對象:??
為每個文件創建
OneFileInfo
對象,該對象包含文件的所有塊信息。files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),rackToBlocks, blockToNodes, nodeToBlocks,rackToNodes, maxSize);
??文件塊處理:??
在
OneFileInfo
構造函數中,根據文件是否可分割,將文件拆分成多個塊 (OneBlockInfo
)。??不可分割文件:?? 創建一個包含整個文件的塊
??可分割文件:?? 根據
maxSize
參數將文件拆分成多個塊,使用啟發式方法避免創建過小的塊
??構建映射關系:??
通過
populateBlockInfo
方法建立塊與節點、機架之間的映射關系。??調用 createSplits:??
使用建立的映射關系創建實際的分片。
分片創建 (createSplits
方法)
createSplits
方法實現了基于節點和機架的分片合并策略:
??節點級合并:??
遍歷所有節點,嘗試將同一節點上的塊合并成一個分片
當累積大小達到
maxSize
時,創建一個分片如果剩余塊大小超過
minSizeNode
,也創建一個分片;否則將這些塊放回待處理池
??機架級合并:??
處理節點級合并后剩余的塊
嘗試將同一機架上的塊合并成一個分片
當累積大小達到
maxSize
時,創建一個分片如果剩余塊大小超過
minSizeRack
,也創建一個分片;否則放入overflowBlocks
??溢出塊處理:??
處理機架級合并后剩余的塊 (
overflowBlocks
)繼續累積直到達到
maxSize
,創建分片處理最后剩余的塊
populateBlockInfo建立的映射關系及Hadoop支持
populateBlockInfo 方法建立了以下三種關鍵的映射關系,這些關系是實現文件合并策略的基礎:
塊到節點的映射 (blockToNodes)
//?add?this?block?to?the?block?-->?node?locations?map blockToNodes.put(oneblock,?oneblock. hosts);
這個映射關系記錄了每個塊(OneBlockInfo)所在的節點(hosts)列表,用于實現節點級別的數據本地性優化。
機架到塊的映射 (rackToBlocks)
//?add?this?block?to?the?rack?-->?block?map for?(int?j?=?0;?j?<?racks.length;?j++)?{String?rack?=?racks[j];List<OneBlockInfo>?blklist?=?rackToBlocks.get(rack);if?(blklist?==?null)?{blklist?=?new?ArrayList<OneBlockInfo>();rackToBlocks.put(rack,?blklist);}blklist.add(oneblock); }
這個映射關系將每個機架(rack)與位于該機架上的所有塊關聯起來,用于實現機架級別的數據本地性優化。
節點到塊的映射 (nodeToBlocks)
//?add?this?block?to?the?node?-->?block?map for?(int?j?=?0;?j?<?oneblock.hosts. length;?j++)?{String?node?=?oneblock.hosts[j];Set<OneBlockInfo>?blklist?=?nodeToBlocks.get(node);if?(blklist?==?null)?{blklist?=?new?LinkedHashSet<OneBlockInfo>();nodeToBlocks.put(node,?blklist);}blklist.add(oneblock); }
這個映射關系記錄了每個節點(node)上存儲的所有塊,用于在創建分片時快速查找特定節點上的所有塊。
機架到節點的映射 (rackToNodes)
if?(!racks[j].equals(NetworkTopology. DEFAULT_RACK))?{//?Add?this?host?to?rackToNodes?mapaddHostToRack(rackToNodes,?racks[j],?oneblock.hosts[j]); }
這個映射關系記錄了每個機架上包含的節點列表,用于在機架級別合并時確定分片的位置信息。
Hadoop提供的支持信息
Hadoop通過以下機制為這些映射關系的建立提供了必要的信息支持:
BlockLocation API 在 OneFileInfo 構造函數中,通過調用 getFileBlockLocations 方法獲取文件塊的位置信息:
BlockLocation[]?locations; if?(stat?instanceof?LocatedFileStatus)?{locations?=?((LocatedFileStatus)?stat).getBlockLocations(); }?else?{FileSystem?fs?=?stat.getPath().getFileSystem(conf);locations?=?fs.getFileBlockLocations(stat,?0,?stat.getLen()); }
BlockLocation 類提供了以下關鍵信息:
getHosts() : 返回存儲該塊的所有節點的主機名
getTopologyPaths() : 返回節點的網絡拓撲路徑,包含機架信息
網絡拓撲結構 Hadoop使用網絡拓撲來組織集群中的節點,通過 NetworkTopology 類管理機架信息。在 OneBlockInfo 構造函數中,如果文件系統沒有提供機架信息,則會使用默認機架:
//?if?the?file?system?does?not?have?any?rack?information,?then //?use?dummy?rack?location. if?(topologyPaths.length?==?0)?{topologyPaths?=?new?String[hosts.length];for?(int?i?=?0;?i?<?topologyPaths.length;?i++)?{topologyPaths[i]?=?(new?NodeBase(hosts[i],?NetworkTopology.DEFAULT_RACK)).toString();} }//?The?topology?paths?have?the?host?name?included?as?the?last? //?component.?Strip?it. this.racks?=?new?String[topologyPaths.length]; for?(int?i?=?0;?i?<?topologyPaths.length;?i++)?{this.racks[i]?=?(new?NodeBase(topologyPaths[i])).getNetworkLocation(); }
通過這些機制,Hadoop能夠提供詳細的塊位置信息,包括塊存儲在哪些節點上以及這些節點屬于哪個機架,從而支持CombineFileInputFormat實現高效的數據本地性優化和文件合并策略。
InputFormat
InputFormat
?描述了 MapReduce 作業的輸入規范。它像一個數據處理的“總管”,負責在作業開始時,告訴框架數據在哪里、如何劃分,以及怎樣逐條讀取。
InputFormat.java
?的注釋中,清晰地定義了它的三大核心職責:
// ... existing code ...* <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the* job to:<p>* <ol>* <li>* Validate the input-specification of the job. * <li>* Split-up the input file(s) into logical {@link InputSplit}s, each of * which is then assigned to an individual {@link Mapper}.* </li>* <li>* Provide the {@link RecordReader} implementation to be used to glean* input records from the logical <code>InputSplit</code> for processing by * the {@link Mapper}.* </li>* </ol>
// ... existing code ...
- 驗證輸入規范 (Validate the input-specification):在作業運行前,檢查輸入配置是否合法。例如,對于文件輸入,它會檢查指定的輸入路徑是否存在。
- 切分輸入 (Split-up the input):這是?
InputFormat
?最核心的功能。它將輸入數據源(如一個或多個大文件)邏輯上切分成多個?InputSplit
。每個?InputSplit
?將由一個單獨的?Mapper
?任務來處理,這是實現并行計算的基礎。 - 提供?
RecordReader
?(Provide the RecordReader):InputFormat
?本身不讀取數據。它會創建一個?RecordReader
?的實例。RecordReader
?才是真正負責從?InputSplit
?指向的數據源中讀取數據,并將其解析成?<key, value>
?鍵值對,然后傳遞給?Mapper
。
InputFormat
?抽象方法分析
InputFormat
?是一個抽象類,它定義了所有子類必須實現的兩個核心方法:
// ... existing code ...
public abstract class InputFormat<K, V> {/** * Logically split the set of input files for the job. * ...*/public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;/*** Create a record reader for a given split. * ...*/public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;}
public abstract List<InputSplit> getSplits(JobContext context)
: 這個方法在客戶端提交作業時被調用。它的任務是根據輸入數據總量、文件塊大小、配置參數等,計算出應該如何對數據進行邏輯切分,并返回一個?InputSplit
?的列表。列表中的每個?InputSplit
?對象都定義了一個?Mapper
?任務的數據處理范圍。public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context)
: 這個方法在?Mapper
?任務所在的 TaskTracker 節點上被調用。當一個?Mapper
?任務被分配到一個?InputSplit
?后,框架會調用這個方法來創建一個?RecordReader
。這個?RecordReader
?將負責從該?InputSplit
?中讀取數據,并將其轉換成?Mapper
?能處理的?<Key, Value>
?對。
InputFormat
?的主要實現
Hadoop 提供了多種?InputFormat
?的實現,以適應不同的數據源和應用場景。
a.?FileInputFormat
這是所有基于文件的?InputFormat
?的基類,它實現了文件切分的通用邏輯。默認情況下,它會根據 HDFS 的塊大小(Block Size)來切分文件。你可以在?FileInputFormat.java
?中看到切分大小的計算邏輯,它會考慮文件的塊大小以及用戶設置的最小和最大分片大小。
// ... existing code .../*** Set the minimum input split size* @param job the job to modify* @param size the minimum size*/public static void setMinInputSplitSize(Job job,long size) {job.getConfiguration().setLong(SPLIT_MINSIZE, size);}
// ... existing code .../*** Set the maximum split size* @param job the job to modify* @param size the maximum split size*/public static void setMaxInputSplitSize(Job job,long size) {job.getConfiguration().setLong(SPLIT_MAXSIZE, size);}
// ... existing code ...
FileInputFormat
?還定義了一個重要的方法?isSplitable
,用于判斷一個文件是否可以被切分。例如,使用 Gzip 壓縮的文件是流式壓縮,無法從中間開始讀取,因此是不可切分的。
// ... existing code ...protected boolean isSplitable(JobContext context, Path file) {final CompressionCodec codec =new CompressionCodecFactory(context.getConfiguration()).getCodec(file);if (null == codec) {return true;}return codec instanceof SplittableCompressionCodec;}
// ... existing code ...
b.?TextInputFormat
這是 MapReduce 作業默認的?InputFormat
。它繼承自?FileInputFormat
,用于讀取純文本文件。它的?RecordReader
?(LineRecordReader
) 會將文件中的每一行解析為一個記錄。
- Key:?
LongWritable
?類型,表示該行在文件中的起始字節偏移量。 - Value:?
Text
?類型,表示該行的內容。
c.?CombineFileInputFormat
這個?InputFormat
?用于解決“小文件問題”。如果輸入中包含大量的小文件,使用默認的?FileInputFormat
?會為每個小文件啟動一個?Mapper
,造成巨大的系統開銷。CombineFileInputFormat
?可以將多個小文件(或文件塊)打包成一個?InputSplit
,從而由一個?Mapper
?處理,大大提高了效率。 在?TestCombineTextInputFormat.java
?中可以看到它的使用場景:
// ... existing code ...// we should have a single split as the length is comfortably smaller than// the block sizeassertEquals(1, splits.size(), "We got more than one splits!");InputSplit split = splits.get(0);assertEquals(CombineFileSplit.class, split.getClass(),"It should be CombineFileSplit");
// ... existing code ...
d.?NLineInputFormat
這個?InputFormat
?確保每個?InputSplit
?包含固定行數(N行)的輸入。這在某些需要按記錄數而不是按字節大小來均衡負載的場景下很有用。
e.?DBInputFormat
用于從關系型數據庫(如 MySQL)中讀取數據。它的?getSplits
?方法不是按文件大小切分,而是根據表的數據范圍(如主鍵范圍)生成查詢語句,每個?InputSplit
?對應一個?SELECT
?查詢。
f.?CompositeInputFormat
用于處理?JOIN
?操作。它可以將來自不同數據源、使用不同?InputFormat
?的數據組合在一起,進行連接操作。
// ... existing code .../*** Convenience method for constructing composite formats.* Given operation (op), Object class (inf), set of paths (p) return:* {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }*/public static String compose(String op, Class<? extends InputFormat> inf,String... path)
// ... existing code ...
如何使用?InputFormat
在編寫 MapReduce 作業時,你可以通過?Job
?對象來指定要使用的?InputFormat
:
Job job = Job.getInstance(conf);
// ...
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
// ...
如果不指定,默認就會使用?TextInputFormat
。
總結
InputFormat
?是 MapReduce 數據輸入端的“指揮官”。它定義了數據如何被并行化處理的藍圖(通過?getSplits
),并提供了讀取這份藍圖的具體工具(通過?createRecordReader
)。從最基礎的文件切分到復雜的數據源連接,InputFormat
?的不同實現為 MapReduce 提供了強大的數據接入能力,使其能夠靈活高效地處理各種類型的數據。