Hadoop MapReduce 任務/輸入數據 分片 InputSplit 解析

InputSplit

InputSplit?是對 MapReduce 作業輸入數據的一種邏輯劃分。它并不直接包含數據本身,而是包含了處理一小塊數據所需的信息,比如數據的位置、起始點和長度。框架會為每個?InputSplit?創建一個?Mapper?任務。

InputSplit.java?的注釋中我們可以得到核心定義:

InputSplit?represents the data to be processed by an individual?Mapper. (InputSplit?代表了將被單個?Mapper?處理的數據。)

它與?InputFormat?和?RecordReader?的關系如下:

  • InputFormat: 負責驗證作業的輸入規范,并將輸入數據切分成多個邏輯上的?InputSplit
  • InputSplit: 定義了一個?Mapper?任務要處理的數據分片。它是一個對數據的引用,而不是數據本身。
  • RecordReader: 負責從?InputSplit?所指向的存儲中讀取數據,并將其解析成?<key, value>?鍵值對,然后喂給?Mapper

這個流程可以在?MapReduceTutorial.md?中找到清晰的描述:

MapReduceTutorial.md

// ... existing code ...
The MapReduce framework relies on the `InputFormat` of the job to:1.  Validate the input-specification of the job.2.  Split-up the input file(s) into logical `InputSplit` instances,each of which is then assigned to an individual `Mapper`.3.  Provide the `RecordReader` implementation used to glean inputrecords from the logical `InputSplit` for processing by the`Mapper`.
// ... existing code ...

InputSplit?抽象類分析

InputSplit?本身是一個抽象類,它定義了所有 "分片" 都必須遵守的契約。讓我們看看它的核心方法:

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputSplit {/*** Get the size of the split, so that the input splits can be sorted by size.* @return the number of bytes in the split* @throws IOException* @throws InterruptedException*/public abstract long getLength() throws IOException, InterruptedException;/*** Get the list of nodes by name where the data for the split would be local.* The locations do not need to be serialized.* * @return a new array of the node nodes.* @throws IOException* @throws InterruptedException*/public abstract String[] getLocations() throws IOException, InterruptedException;/*** Gets info about which nodes the input split is stored on and how it is* stored at each location.* * @return list of <code>SplitLocationInfo</code>s describing how the split*    data is stored at each location. A null value indicates that all the*    locations have the data stored on disk.* @throws IOException*/@Evolvingpublic SplitLocationInfo[] getLocationInfo() throws IOException {return null;}
}
  • public abstract long getLength(): 這個方法返回分片的邏輯大小(以字節為單位)。這個返回值非常重要,框架可以用它來對分片進行排序(例如,優先處理大的分片),或者用于推測任務的進度。

  • public abstract String[] getLocations(): 這是實現**數據本地性(Data Locality)**的核心。它返回一個字符串數組,其中包含了存儲這份數據分片的節點的 主機名/IP地址。MapReduce 的調度器會盡量將?Mapper?任務調度到?getLocations()?返回的節點之一上運行,這樣?Mapper?就可以從本地磁盤讀取數據,避免了昂貴的網絡I/O。

  • public SplitLocationInfo[] getLocationInfo(): 這是一個更現代、信息更豐富的?getLocations()?版本。它不僅能返回位置信息,還能告知數據在該位置的存儲狀態(例如,是在內存中還是在磁盤上)。這使得調度器可以做出更智能的決策,比如優先將任務調度到數據已在內存中的節點。

InputSplit?如何創建和使用?

開發者通常不直接創建?InputSplit。這個工作由?InputFormat?的?getSplits()?方法完成。以最常見的?FileInputFormat?為例,它負責將輸入文件切分成多個?FileSplit

FileInputFormat.java?中的?getSplits?方法展示了其核心邏輯:

FileInputFormat.java

// ... existing code .../** * Generate the list of files and make them into FileSplits.* @param job the job context* @throws IOException*/public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);
// ... existing code ...for (FileStatus file: files) {
// ... existing code ...Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}
// ... existing code ...} else { // not splittable
// ... existing code ...splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}
// ... existing code ...return splits;}
// ... existing code ...

這個過程大致如下:

  1. 獲取輸入路徑下的所有文件。
  2. 計算分片大小?splitSize,它通常與 HDFS 的塊大小(Block Size)有關。
  3. 遍歷每個文件,如果文件是可切分的(例如,未壓縮的文本文件),則根據?splitSize?將其切成一個或多個?InputSplit
  4. 在創建?InputSplit?時,會從?BlockLocation?中獲取數據塊所在的節點信息,并傳遞給?InputSplit,這就是?getLocations()?的數據來源。

InputSplit?的主要實現

由于?InputSplit?是抽象的,我們需要看它的具體實現才能更好地理解。

  • FileSplit: 最常用的一種實現,代表一個文件的一部分。它由文件路徑、起始偏移量和長度定義。

    // ... existing code ...
    /** The file containing this split's data. */
    public Path getPath() { return fs.getPath(); }/** The position of the first byte in the file to process. */
    public long getStart() { return fs.getStart(); }/** The number of bytes in the file to process. */
    public long getLength() { return fs.getLength(); }
    // ... existing code ...
    public String[] getLocations() throws IOException {return fs.getLocations();
    }
    // ... existing code ...
    
  • CombineFileSplit: 用于將多個小文件或小文件塊合并成一個單獨的?InputSplit。這對于處理大量小文件的場景非常高效,因為它避免了為每個小文件啟動一個?Mapper?任務所帶來的開銷。 在?TestCombineTextInputFormat.java?測試中,我們可以看到它被明確使用:

    // ... existing code ...// we should have a single split as the length is comfortably smaller than// the block sizeassertEquals(1, splits.size(), "We got more than one splits!");InputSplit split = splits.get(0);assertEquals(CombineFileSplit.class, split.getClass(),"It should be CombineFileSplit");
    // ... existing code ...
    
  • TaggedInputSplit: 當使用?MultipleInputs?功能,即針對不同的輸入路徑使用不同的?InputFormat?或?Mapper?時,TaggedInputSplit?就派上用場了。它包裝了一個原始的?InputSplit,并額外 "標記" 了應該由哪個?InputFormat?和?Mapper?類來處理它。

    // ... existing code ...
    public class TaggedInputSplit extends InputSplit implements Configurable, Writable {private Class<? extends InputSplit> inputSplitClass;private InputSplit inputSplit;private Class<? extends InputFormat<?, ?>> inputFormatClass;private Class<? extends Mapper<?, ?, ?, ?>> mapperClass;private Configuration conf;
    // ... existing code ...
    
  • CompositeInputSplit: 用于?JOIN?操作。它本身是?InputSplit?的一個集合,代表了需要連接的多個數據源的相應分片。它的?getLocations()?方法會聚合所有子分片的位置信息。

    // ... existing code .../*** Collect a set of hosts from all child InputSplits.*/public String[] getLocations() throws IOException, InterruptedException {HashSet<String> hosts = new HashSet<String>();for (InputSplit s : splits) {String[] hints = s.getLocations();if (hints != null && hints.length > 0) {for (String host : hints) {hosts.add(host);}}}return hosts.toArray(new String[hosts.size()]);}
    // ... existing code ...
    

總結

InputSplit?是 MapReduce 并行計算模型的基礎。它通過將大數據集邏輯地切分為可管理的小數據塊,使得成百上千的?Mapper?任務可以并行處理。同時,通過?getLocations()?方法攜帶的位置信息,框架能夠將計算任務移動到數據所在的位置,從而實現了“計算向數據移動”的核心思想,極大地提升了處理效率。

FileInputFormat.getSplits?方法的整體邏輯

這個方法的核心目標是將輸入的文件分割成多個?InputSplit(輸入分片),每個?InputSplit?會作為一個 Map 任務的輸入。理想情況下,分片的數量決定了 Map 任務的并行度。

下面是?getSplits?方法的執行流程的詳細分解:

// ... existing code ...public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = new StopWatch().start();long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus(job);boolean ignoreDirs = !getInputDirRecursive(job)&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);for (FileStatus file: files) {
// ... existing code ...if (length != 0) {
// ... existing code ...if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitable
// ... existing code ...splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}
// ... existing code ...return splits;}protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}
// ... existing code ...
1. 計算分片大小(Split Size)

方法的第一步是確定每個分片的大小范圍。這由三個值決定:

  • minSize?(最小分片大小):

    • 代碼:?long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    • 它取?getFormatMinSplitSize()?(默認為 1) 和?getMinSplitSize(job)?的最大值。
    • getMinSplitSize(job)?讀取 Hadoop 的配置參數?mapreduce.input.fileinputformat.split.minsize?(舊參數為?mapred.min.split.size)。
    • 作用: 保證每個分片不會小于這個值。
  • maxSize?(最大分片大小):

    • 代碼:?long maxSize = getMaxSplitSize(job);
    • 它讀取 Hadoop 的配置參數?mapreduce.input.fileinputformat.split.maxsize?(舊參數為?mapred.max.split.size)。
    • 作用: 保證每個分片不會大于這個值。
  • blockSize?(HDFS 塊大小):

    • 代碼:?long blockSize = file.getBlockSize();
    • 這是文件存儲在 HDFS 上的塊大小,由 HDFS 的配置?dfs.blocksize?決定。

最終的分片大小?splitSize?通過?computeSplitSize?方法計算得出:?splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

2. 遍歷所有輸入文件

代碼會獲取輸入路徑下的所有文件 (List<FileStatus> files = listStatus(job);),然后逐一處理。

3. 處理單個文件

對于每個文件,程序會檢查它是否可以被切分 (isSplitable(job, path))。

  • 可切分的文件 (Splittable):

    • 大多數普通文本文件都是可切分的。
    • 程序會根據計算出的?splitSize?對文件進行邏輯切分。
    • 它會循環創建大小為?splitSize?的分片,直到剩余的字節數不足一個?splitSize?的 1.1 倍 (SPLIT_SLOP)。
    • 最后剩余的部分會形成最后一個分片。
    • 關鍵點: 這里的切分是邏輯上的。一個分片可能跨越多個 HDFS 塊,也可能一個 HDFS 塊包含多個分片。
  • 不可切分的文件 (Not Splittable):

    • 某些壓縮格式(如 Gzip)是無法從中間讀取的,因此它們是不可切分的。
    • 對于這類文件,整個文件會形成一個單獨的分片,無論它有多大。這會導致該文件只能由一個 Map 任務處理,可能成為性能瓶頸。
4. 創建分片 (makeSplit)

makeSplit?方法會創建一個?FileSplit?對象,它包含了分片所需的所有信息:文件路徑、起始位置、長度以及數據所在的節點位置(hosts),以便 MapReduce 框架實現“數據本地性” (Data Locality),即盡量將計算任務調度到數據所在的節點上執行,減少網絡傳輸。

如何通過 Hadoop 參數控制 Split 數量?

理解了上面的流程,我們就可以知道如何通過調整參數來控制分片的數量了。核心就是控制?computeSplitSize?的計算結果。

splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

讓我們來分析不同場景:

  1. 默認情況 (最常見)

    • mapreduce.input.fileinputformat.split.minsize?(minSize) 默認為 1。
    • mapreduce.input.fileinputformat.split.maxsize?(maxSize) 默認為?Long.MAX_VALUE?(一個非常大的數)。
    • 此時公式變為:?splitSize = Math.max(1, Math.min(Long.MAX_VALUE, blockSize))
    • 簡化后得到:?splitSize = blockSize
    • 結論: 在默認情況下,分片大小就等于 HDFS 的塊大小。一個 1GB 的文件,如果塊大小是 128MB,那么它會被切分為?1024 / 128 = 8?個分片(大約值)。
  2. 增加分片數量 (調小?maxSize)

    • 目的: 增加 Map 任務的并行度,適用于 CPU 密集型任務。
    • 方法: 設置?mapreduce.input.fileinputformat.split.maxsize?為一個小于?blockSize?的值。
    • 示例:?blockSize?= 128MB。如果你設置?maxSize?= 64MB。
    • 公式變為:?splitSize = Math.max(1, Math.min(64MB, 128MB))
    • 簡化后得到:?splitSize = 64MB
    • 結論: 每個 128MB 的塊都會被切分為 2 個 64MB 的分片,分片的總數大約會翻倍。
  3. 減少分片數量 (調大?minSize)

    • 目的: 減少 Map 任務的數量,適用于處理大量小文件或者希望合并處理邏輯的場景。
    • 方法: 設置?mapreduce.input.fileinputformat.split.minsize?為一個大于?blockSize?的值。
    • 示例:?blockSize?= 128MB。如果你設置?minSize?= 256MB。
    • 公式變為:?splitSize = Math.max(256MB, Math.min(Long.MAX_VALUE, 128MB))
    • 簡化后得到:?splitSize = Math.max(256MB, 128MB)?->?splitSize = 256MB
    • 結論: 程序會嘗試創建 256MB 大小的分片,這會將兩個 128MB 的 HDFS 塊合并到一個分片中,從而使分片總數減少一半。

總結

參數

默認值

作用

如何影響分片數

mapreduce.input.fileinputformat.split.minsize

1

設置分片的最小尺寸

調大此值(大于blockSize)可以減少分片數。

mapreduce.input.fileinputformat.split.maxsize

Long.MAX_VALUE

設置分片的最大尺寸

調小此值(小于blockSize)可以增加分片數。

dfs.blocksize

(HDFS配置, 如128MB)

HDFS塊大小

在默認情況下,分片大小等于塊大小,是決定分片數的基礎。

通過靈活配置這三個參數,你可以精確地控制 MapReduce 作業的并行度,以適應不同的業務場景和硬件資源,從而達到優化作業性能的目的。


CombineFileInputFormat 如何實現文件合并邏輯的

org.apache.hadoop.mapred.lib.CombineFileInputFormat類中,getSplits方法的實現如下:

public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {List<org.apache.hadoop.mapreduce.InputSplit> newStyleSplits =super.getSplits(Job.getInstance(job));InputSplit[] ret = new InputSplit[newStyleSplits.size()];for(int pos = 0; pos < newStyleSplits.size(); ++pos) {org.apache.hadoop.mapreduce.lib.input.CombineFileSplit newStyleSplit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) newStyleSplits.get(pos);ret[pos] = new CombineFileSplit(job, newStyleSplit.getPaths(),newStyleSplit.getStartOffsets(), newStyleSplit.getLengths(),newStyleSplit.getLocations());}return ret;}

這個方法實際上是調用了父類 org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormatgetSplits方法,然后將返回的 org.apache.hadoop.mapreduce.InputSplit列表轉換為 org.apache.hadoop.mapred.InputSplit數組。

??父類 org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormatgetSplits方法實現了 文件合并邏輯:??

  1. ??獲取文件信息 :??

    • 通過 listStatus方法獲取輸入路徑下的所有文件信息。

    • 對于每個文件,創建 OneFileInfo對象,該對象會獲取文件的塊信息及其所在的主機和機架信息。

  2. ??處理文件池 :??

    • 如果配置了文件池(pools),則先處理每個池中的文件,確保同一分片中的文件來自同一池。

  3. ??生成分片 :??

    • 調用 getMoreSplits方法處理具體的分片生成邏輯。

    • getMoreSplits方法中,為每個文件創建 OneFileInfo對象,并調用 createSplits方法。

  4. ??文件合并邏輯 (createSplits方法) :??

    • ??節點級別合并 :??

      • 優先在同一節點上合并塊,以提高數據本地性。

      • 遍歷所有節點,對于每個節點上的塊,累加它們的大小。

      • 如果累加的大小超過 maxSplitSize,則調用 addCreatedSplit方法創建一個分片。

      • 如果剩余塊大小超過 minSplitSizeNode,也會創建一個分片。

    • ??機架級別合并 :??

      • 對于無法在節點級別合并的塊,嘗試在機架級別進行合并。

      • 遍歷所有機架,對于每個機架上的塊,累加它們的大小。

      • 如果累加的大小超過 minSplitSizeRack,則創建一個分片。

    • ??處理剩余塊 :??

      • 最后處理所有剩余的塊,將它們合并成最后的分片。

  5. ??創建分片 :??

    • addCreatedSplit方法會創建 CombineFileSplit對象,該對象包含了合并后的文件信息(路徑、起始偏移量、長度等)。

??通過這種方式,getSplits方法實現了我們之前描述的文件合并邏輯,將大量小文件合并成較大的分片,以減少 Map 任務的數量,提高 MapReduce 作業的執行效率。??


CombineFileInputFormat 中 getSplits 方法的實現分析

getSplits方法是 CombineFileInputFormat類的核心,負責將多個小文件合并成較少的輸入分片,以減少 Map 任務的數量,提高作業執行效率。

首先,該方法從作業配置中獲取分片大小的相關參數:

  • minSizeNode: 節點級別的最小分片大小

  • minSizeRack: 機架級別的最小分片大小

  • maxSize: 分片的最大大小

這些參數可以通過 set 方法顯式設置,或者從配置中讀取。方法還會進行參數驗證,確保它們之間的關系合理。

分片生成 (getMoreSplits方法)

getMoreSplits方法是分片生成的核心:

  1. ??創建 OneFileInfo 對象:??

    為每個文件創建 OneFileInfo對象,該對象包含文件的所有塊信息。

    files[i] = new OneFileInfo(stat, conf, 
    isSplitable(job, stat.getPath()),rackToBlocks, blockToNodes, nodeToBlocks,rackToNodes, maxSize);
  2. ??文件塊處理:??

    OneFileInfo構造函數中,根據文件是否可分割,將文件拆分成多個塊 (OneBlockInfo)。

    • ??不可分割文件:?? 創建一個包含整個文件的塊

    • ??可分割文件:?? 根據 maxSize參數將文件拆分成多個塊,使用啟發式方法避免創建過小的塊

  3. ??構建映射關系:??

    通過 populateBlockInfo方法建立塊與節點、機架之間的映射關系。

  4. ??調用 createSplits:??

    使用建立的映射關系創建實際的分片。

分片創建 (createSplits方法)

createSplits方法實現了基于節點和機架的分片合并策略:

  1. ??節點級合并:??

    • 遍歷所有節點,嘗試將同一節點上的塊合并成一個分片

    • 當累積大小達到 maxSize時,創建一個分片

    • 如果剩余塊大小超過 minSizeNode,也創建一個分片;否則將這些塊放回待處理池

  2. ??機架級合并:??

    • 處理節點級合并后剩余的塊

    • 嘗試將同一機架上的塊合并成一個分片

    • 當累積大小達到 maxSize時,創建一個分片

    • 如果剩余塊大小超過 minSizeRack,也創建一個分片;否則放入 overflowBlocks

  3. ??溢出塊處理:??

    • 處理機架級合并后剩余的塊 (overflowBlocks)

    • 繼續累積直到達到 maxSize,創建分片

    • 處理最后剩余的塊

populateBlockInfo建立的映射關系及Hadoop支持

populateBlockInfo 方法建立了以下三種關鍵的映射關系,這些關系是實現文件合并策略的基礎:

    1. 塊到節點的映射 (blockToNodes)

    //?add?this?block?to?the?block?-->?node?locations?map
    blockToNodes.put(oneblock,?oneblock.
    hosts);

    這個映射關系記錄了每個塊(OneBlockInfo)所在的節點(hosts)列表,用于實現節點級別的數據本地性優化。

    1. 機架到塊的映射 (rackToBlocks)

    //?add?this?block?to?the?rack?-->?block?map
    for?(int?j?=?0;?j?<?racks.length;?j++)?{String?rack?=?racks[j];List<OneBlockInfo>?blklist?=?rackToBlocks.get(rack);if?(blklist?==?null)?{blklist?=?new?ArrayList<OneBlockInfo>();rackToBlocks.put(rack,?blklist);}blklist.add(oneblock);
    }

    這個映射關系將每個機架(rack)與位于該機架上的所有塊關聯起來,用于實現機架級別的數據本地性優化。

    1. 節點到塊的映射 (nodeToBlocks)

    //?add?this?block?to?the?node?-->?block?map
    for?(int?j?=?0;?j?<?oneblock.hosts.
    length;?j++)?{String?node?=?oneblock.hosts[j];Set<OneBlockInfo>?blklist?=?nodeToBlocks.get(node);if?(blklist?==?null)?{blklist?=?new?LinkedHashSet<OneBlockInfo>();nodeToBlocks.put(node,?blklist);}blklist.add(oneblock);
    }

    這個映射關系記錄了每個節點(node)上存儲的所有塊,用于在創建分片時快速查找特定節點上的所有塊。

    1. 機架到節點的映射 (rackToNodes)

    if?(!racks[j].equals(NetworkTopology.
    DEFAULT_RACK))?{//?Add?this?host?to?rackToNodes?mapaddHostToRack(rackToNodes,?racks[j],?oneblock.hosts[j]);
    }

    這個映射關系記錄了每個機架上包含的節點列表,用于在機架級別合并時確定分片的位置信息。

Hadoop提供的支持信息

Hadoop通過以下機制為這些映射關系的建立提供了必要的信息支持:

    1. BlockLocation API 在 OneFileInfo 構造函數中,通過調用 getFileBlockLocations 方法獲取文件塊的位置信息:

    BlockLocation[]?locations;
    if?(stat?instanceof?LocatedFileStatus)?{locations?=?((LocatedFileStatus)?stat).getBlockLocations();
    }?else?{FileSystem?fs?=?stat.getPath().getFileSystem(conf);locations?=?fs.getFileBlockLocations(stat,?0,?stat.getLen());
    }

    BlockLocation 類提供了以下關鍵信息:

    • getHosts() : 返回存儲該塊的所有節點的主機名

    • getTopologyPaths() : 返回節點的網絡拓撲路徑,包含機架信息

    1. 網絡拓撲結構 Hadoop使用網絡拓撲來組織集群中的節點,通過 NetworkTopology 類管理機架信息。在 OneBlockInfo 構造函數中,如果文件系統沒有提供機架信息,則會使用默認機架:

    //?if?the?file?system?does?not?have?any?rack?information,?then
    //?use?dummy?rack?location.
    if?(topologyPaths.length?==?0)?{topologyPaths?=?new?String[hosts.length];for?(int?i?=?0;?i?<?topologyPaths.length;?i++)?{topologyPaths[i]?=?(new?NodeBase(hosts[i],?NetworkTopology.DEFAULT_RACK)).toString();}
    }//?The?topology?paths?have?the?host?name?included?as?the?last?
    //?component.?Strip?it.
    this.racks?=?new?String[topologyPaths.length];
    for?(int?i?=?0;?i?<?topologyPaths.length;?i++)?{this.racks[i]?=?(new?NodeBase(topologyPaths[i])).getNetworkLocation();
    }

通過這些機制,Hadoop能夠提供詳細的塊位置信息,包括塊存儲在哪些節點上以及這些節點屬于哪個機架,從而支持CombineFileInputFormat實現高效的數據本地性優化和文件合并策略。

InputFormat

InputFormat?描述了 MapReduce 作業的輸入規范。它像一個數據處理的“總管”,負責在作業開始時,告訴框架數據在哪里、如何劃分,以及怎樣逐條讀取。

InputFormat.java?的注釋中,清晰地定義了它的三大核心職責:

// ... existing code ...* <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the* job to:<p>* <ol>*   <li>*   Validate the input-specification of the job. *   <li>*   Split-up the input file(s) into logical {@link InputSplit}s, each of *   which is then assigned to an individual {@link Mapper}.*   </li>*   <li>*   Provide the {@link RecordReader} implementation to be used to glean*   input records from the logical <code>InputSplit</code> for processing by *   the {@link Mapper}.*   </li>* </ol>
// ... existing code ...
  1. 驗證輸入規范 (Validate the input-specification):在作業運行前,檢查輸入配置是否合法。例如,對于文件輸入,它會檢查指定的輸入路徑是否存在。
  2. 切分輸入 (Split-up the input):這是?InputFormat?最核心的功能。它將輸入數據源(如一個或多個大文件)邏輯上切分成多個?InputSplit。每個?InputSplit?將由一個單獨的?Mapper?任務來處理,這是實現并行計算的基礎。
  3. 提供?RecordReader?(Provide the RecordReader)InputFormat?本身不讀取數據。它會創建一個?RecordReader?的實例。RecordReader?才是真正負責從?InputSplit?指向的數據源中讀取數據,并將其解析成?<key, value>?鍵值對,然后傳遞給?Mapper

InputFormat?抽象方法分析

InputFormat?是一個抽象類,它定義了所有子類必須實現的兩個核心方法:

// ... existing code ...
public abstract class InputFormat<K, V> {/** * Logically split the set of input files for the job.  * ...*/public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;/*** Create a record reader for a given split. * ...*/public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;}
  • public abstract List<InputSplit> getSplits(JobContext context): 這個方法在客戶端提交作業時被調用。它的任務是根據輸入數據總量、文件塊大小、配置參數等,計算出應該如何對數據進行邏輯切分,并返回一個?InputSplit?的列表。列表中的每個?InputSplit?對象都定義了一個?Mapper?任務的數據處理范圍。

  • public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context): 這個方法在?Mapper?任務所在的 TaskTracker 節點上被調用。當一個?Mapper?任務被分配到一個?InputSplit?后,框架會調用這個方法來創建一個?RecordReader。這個?RecordReader?將負責從該?InputSplit?中讀取數據,并將其轉換成?Mapper?能處理的?<Key, Value>?對。

InputFormat?的主要實現

Hadoop 提供了多種?InputFormat?的實現,以適應不同的數據源和應用場景。

a.?FileInputFormat

這是所有基于文件的?InputFormat?的基類,它實現了文件切分的通用邏輯。默認情況下,它會根據 HDFS 的塊大小(Block Size)來切分文件。你可以在?FileInputFormat.java?中看到切分大小的計算邏輯,它會考慮文件的塊大小以及用戶設置的最小和最大分片大小。

// ... existing code .../*** Set the minimum input split size* @param job the job to modify* @param size the minimum size*/public static void setMinInputSplitSize(Job job,long size) {job.getConfiguration().setLong(SPLIT_MINSIZE, size);}
// ... existing code .../*** Set the maximum split size* @param job the job to modify* @param size the maximum split size*/public static void setMaxInputSplitSize(Job job,long size) {job.getConfiguration().setLong(SPLIT_MAXSIZE, size);}
// ... existing code ...

FileInputFormat?還定義了一個重要的方法?isSplitable,用于判斷一個文件是否可以被切分。例如,使用 Gzip 壓縮的文件是流式壓縮,無法從中間開始讀取,因此是不可切分的。

// ... existing code ...protected boolean isSplitable(JobContext context, Path file) {final CompressionCodec codec =new CompressionCodecFactory(context.getConfiguration()).getCodec(file);if (null == codec) {return true;}return codec instanceof SplittableCompressionCodec;}
// ... existing code ...
b.?TextInputFormat

這是 MapReduce 作業默認的?InputFormat。它繼承自?FileInputFormat,用于讀取純文本文件。它的?RecordReader?(LineRecordReader) 會將文件中的每一行解析為一個記錄。

  • Key:?LongWritable?類型,表示該行在文件中的起始字節偏移量。
  • Value:?Text?類型,表示該行的內容。
c.?CombineFileInputFormat

這個?InputFormat?用于解決“小文件問題”。如果輸入中包含大量的小文件,使用默認的?FileInputFormat?會為每個小文件啟動一個?Mapper,造成巨大的系統開銷。CombineFileInputFormat?可以將多個小文件(或文件塊)打包成一個?InputSplit,從而由一個?Mapper?處理,大大提高了效率。 在?TestCombineTextInputFormat.java?中可以看到它的使用場景:

// ... existing code ...// we should have a single split as the length is comfortably smaller than// the block sizeassertEquals(1, splits.size(), "We got more than one splits!");InputSplit split = splits.get(0);assertEquals(CombineFileSplit.class, split.getClass(),"It should be CombineFileSplit");
// ... existing code ...
d.?NLineInputFormat

這個?InputFormat?確保每個?InputSplit?包含固定行數(N行)的輸入。這在某些需要按記錄數而不是按字節大小來均衡負載的場景下很有用。

e.?DBInputFormat

用于從關系型數據庫(如 MySQL)中讀取數據。它的?getSplits?方法不是按文件大小切分,而是根據表的數據范圍(如主鍵范圍)生成查詢語句,每個?InputSplit?對應一個?SELECT?查詢。

f.?CompositeInputFormat

用于處理?JOIN?操作。它可以將來自不同數據源、使用不同?InputFormat?的數據組合在一起,進行連接操作。

// ... existing code .../*** Convenience method for constructing composite formats.* Given operation (op), Object class (inf), set of paths (p) return:* {@code <op>(tbl(<inf>,<p1>),tbl(<inf>,<p2>),...,tbl(<inf>,<pn>)) }*/public static String compose(String op, Class<? extends InputFormat> inf,String... path)
// ... existing code ...

如何使用?InputFormat

在編寫 MapReduce 作業時,你可以通過?Job?對象來指定要使用的?InputFormat

Job job = Job.getInstance(conf);
// ...
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
// ...

如果不指定,默認就會使用?TextInputFormat

總結

InputFormat?是 MapReduce 數據輸入端的“指揮官”。它定義了數據如何被并行化處理的藍圖(通過?getSplits),并提供了讀取這份藍圖的具體工具(通過?createRecordReader)。從最基礎的文件切分到復雜的數據源連接,InputFormat?的不同實現為 MapReduce 提供了強大的數據接入能力,使其能夠靈活高效地處理各種類型的數據。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/94632.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/94632.shtml
英文地址,請注明出處:http://en.pswp.cn/web/94632.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【力扣】面試經典150題總結04-區間/棧

1.匯總區間&#xff08;簡單&#xff09;判斷連續的范圍&#xff0c;記錄每個區間的首尾&#xff0c;存進list。2.合并區間&#xff08;中等&#xff09;先按照左端點排序&#xff0c;然后判斷右端點是否和下個區間的左端點重合&#xff0c;重疊進行合并。3.插入區間&#xff0…

SpringBoot 常用跨域處理方案

1.什么是跨域&#xff1f; 跨域是瀏覽器為了保障安全而遵循的一種規則&#xff0c;是同源策略的一部分。 同源&#xff1a;要求協議、域名、端口三者完全相同。 跨域&#xff1a;只要協議、域名、端口中有任何一個不同&#xff0c;瀏覽器就會判定為跨域請求。 跨域&#xf…

Springboot框架的“上海迪士尼”旅游管理網站設計與開發(代碼+數據庫+LW)

摘 要 隨著旅游行業的不斷發展&#xff0c;特別是主題公園的快速增長&#xff0c;如何通過現代信息技術提升旅游服務質量與管理效率成為了行業的重要課題。上海迪士尼作為中國乃至全球知名的主題公園&#xff0c;其旅游管理網站的設計與開發&#xff0c;不只需要滿足游客對信…

后臺管理系統-16-vue3之動態路由的實現

文章目錄 1 動態路由 1.1 stores/index.js(動態添加路由函數) 1.1.1 獲取菜單數據 1.1.2 動態導入組件 1.1.3 處理菜單項 1.1.4 添加路由 1.1.5 整體代碼 1.2 router/index.js(移除子路由) 1.3 Login.vue(登錄頁面) 2 多賬號bug問題 2.1 問題復現 2.2 解決多賬號路由問題(store…

社群團購平臺與定制開發開源AI智能名片S2B2C商城小程序的融合創新研究

摘要&#xff1a;本文聚焦于社群團購平臺這一在移動互聯網背景下興起的電子商務運營機構&#xff0c;深入剖析其依托移動互聯網滿足消費者多元購物需求的特點。同時&#xff0c;引入定制開發開源AI智能名片S2B2C商城小程序這一關鍵元素&#xff0c;探討二者融合所帶來的創新模式…

模型交互中的會話狀態管理實踐

模型交互中的會話狀態管理實踐 目錄 引言會話狀態的手動管理構建多輪對話消息序列追加歷史響應實現上下文共享API支持的自動會話狀態管理利用 previous_response_id 實現線程式對話模型響應數據保存與計費說明上下文窗口管理與令牌限制令牌計算與窗口溢出風險令牌工具輔助統計…

基于Java+Springboot的船舶運維系統

源碼編號&#xff1a;sy23源碼名稱&#xff1a;基于Springboot的船舶運維系統用戶類型&#xff1a;多角色&#xff0c;船員、維修人員、管理員數據庫表數量&#xff1a;9 張表主要技術&#xff1a;Java、Vue、ElementUl 、SpringBoot、Maven運行環境&#xff1a;Windows/Mac、J…

零基礎也能照做的WordPress網站安全漏洞修復 + 高級優化保姆級教程。

建議先在**暫存環境&#xff08;Staging&#xff09;**演練后再動正式站&#xff0c;避免線上故障。下面第一部分就教你“備份暫存還原演練”。 總覽導航&#xff08;按順序完成&#xff09; 備份與還原演練&#xff08;UpdraftPlus 寶塔/阿里/騰訊/七牛&#xff09;高危加固…

HI3516DV500/HI3519DV500 Docker開發環境配置

目錄一、拉取Ubuntu 18.04 docker鏡像二、查看已有鏡像三、基于鏡像創建容器1. 創建容器2. 退出容器3. 查看容器4. 啟動容器5. 進入容器6. 更新容器內部軟件源四、安裝CANN包1. 安裝基礎依賴環境2. 安裝并配置python 3.7.5配置環境變量安裝vim添加使環境變量生效檢查python版本…

實體店轉型破局之道:新零售社區商城小程序開發重構經營生態

在數字化浪潮的席卷下&#xff0c;實體店經營正經歷著前所未有的變革與挑戰。客戶進店率持續走低、同行競爭白熱化、經營成本不斷攀升、電商平臺沖擊加劇……這些痛點如同達摩克利斯之劍&#xff0c;懸在傳統實體商家的頭頂。然而&#xff0c;危機往往與機遇并存&#xff0c;新…

前端-如何將前端頁面輸出為PDF并打包的壓縮包中

需要引入的依賴&#xff1a;import * as utils from ../../utils/utils import html2canvas from "html2canvas"; import JSZip from "jszip"; import JSPDF from "jspdf"; import FileSaver from "file-saver"import { Loading } fro…

LabVIEW 頻譜分析應用

LabVIEW 頻譜分析程序廣泛應用于聲學、振動、電力電子等領域&#xff0c;用于噪聲頻譜分析、設備故障診斷、電能質量評估等。通過模塊化 VI 組合&#xff0c;可快速搭建 "信號模擬 - 采集&#xff08;或緩存&#xff09;- 頻譜分析 - 結果展示" 完整流程&#xff0c;…

北斗導航 | 基于MCMC粒子濾波的接收機自主完好性監測(RAIM)算法(附matlab代碼)

詳細闡述基于MCMC粒子濾波的接收機自主完好性監測(RAIM)算法的原理、理論和實現方法,并提供完整的MATLAB代碼示例。 1. 原理與理論 1.1 接收機自主完好性監測 (RAIM) 簡介 RAIM是一種完全由GPS接收機內部實現的算法,用于在不依賴外部系統的情況下,監測GPS信號的完好性(…

【機器學習】4 Gaussian models

本章目錄 4 Gaussian models 97 4.1 Introduction 97 4.1.1 Notation 97 4.1.2 Basics 97 4.1.3 MLE for an MVN 99 4.1.4 Maximum entropy derivation of the Gaussian * 101 4.2 Gaussian discriminant analysis 101 4.2.1 Quadratic discriminant analysis (QDA) 102 4.2.2…

Ruoyi-Vue 靜態資源權限鑒權:非登錄不可訪問

一. 背景 移除/profile下靜態資源訪問權限后&#xff0c;富文本等組件中的圖片加載失敗!!! 使用ruoyi-vue3.8.9過程中發現上傳的在ruoyi.profile下的文件未登錄直接使用鏈接就可以訪問下載&#xff0c;感覺這樣不太安全&#xff0c;所以想對其進行鑒權限制&#xff0c;修改為只…

關于窗口關閉釋放內存,主窗口下的子窗口關閉釋放不用等到主窗口關閉>setAttribute(Qt::WA_DeleteOnClose);而且無需手動釋放

?QWidget重寫closeEvent后&#xff0c;點擊關閉時釋放內存會調用析構函數?&#xff0c;但需注意內存釋放的時機和方式。 關閉事件與析構函數的關系 重寫closeEvent時&#xff0c;若在事件處理中調用deleteLater()或手動刪除對象&#xff0c;析構函數會被觸發。但需注意&#…

C# 簡單工廠模式(構建簡單工廠)

構建簡單工廠 現在很容易給出簡單工廠類。只檢測逗號是否存在&#xff0c;然后返回其中的一個類的實例。 public class NameFactory {public NameFactory(){}public Namer getName(string name){int iname.IndexOf(",");if(i>0)return new LastFirst(name);else{r…

uniappx與uniapp的區別

uniappx與uniapp的定位差異uniappx是DCloud推出的擴展版框架&#xff0c;基于uniapp進行功能增強&#xff0c;主要面向需要更復雜原生交互或跨平臺深度定制的場景。uniapp則是標準版&#xff0c;適用于常規的跨平臺應用開發&#xff0c;強調開發效率和代碼復用。功能擴展性unia…

vue實現拖拉拽效果,類似于禪道首頁可拖拽排布展示內容(插件-Grid Layout)

vue實現拖拉拽效果&#xff08;插件-Grid Layout&#xff09; 這個是類似與禪道那種首頁有多個指標模塊&#xff0c;允許用戶自己拼裝內容的那種感覺。 實現效果 插件資料 vue3版本 如果項目是vue3 的話使用的是 Grid Layout Plus。 官網&#xff1a;https://grid-layout-pl…

在Excel和WPS表格中打印時加上行號和列標

在電腦中查看excel和WPS表格的工作表時&#xff0c;能看到行號&#xff08;12345.....&#xff09;和列標&#xff08;ABCDE...&#xff09;&#xff0c;但是打印出來以后默認是沒有行號和列標的&#xff0c;如果要讓打印&#xff08;或者轉為PDF&#xff09;出來以后仍能看到行…