spark 筆記 16: BlockManager

spark 筆記 16: BlockManager
先看一下原理性的文章:http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/?,http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/? , 另外,spark的存儲使用了Segment File的概念(http://en.wikipedia.org/wiki/Segmented_file_transfer?),概括的說,它是把文件劃分成多個段,分別存儲在不同的服務器上;在讀取的時候,同時從這些服務器上讀取。(這也是BT的基礎)。
之前分析shuffle的調用關系的時候,其實已經包含了很多的BlockManager的流程,但還是有必要系統的看一遍它的代碼。
getLocalFromDisk這個函數,是前面看shuffleManager的終點,但卻是BlockManager的起點。即使是到遠端獲取block的操作,也是發送一個消息到遠端服務器上執行getLocalFromDisk,然后再把結果發送回來。
->diskStore.getValues(blockId, serializer)

============================BlockManager============================
-> BlockManager::getLocalFromDisk
->diskStore.getValues(blockId, serializer)
->getBytes(blockId).map(bytes => blockManager.dataDeserialize(blockId, bytes, serializer))
->val segment = diskManager.getBlockLocation(blockId) --DiskBlockManager的方法,獲取block在一個文件中的一個塊位置
->if ?blockId.isShuffle and env.shuffleManager.isInstanceOf[SortShuffleManager] --如果是hash類型shuffle,
->sortShuffleManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId], this) --For sort-based shuffle, let it figure out its blocks
->else if blockId.isShuffle and shuffleBlockManager.consolidateShuffleFiles --聯合文件模式
->shuffleBlockManager.getBlockLocation(blockId.asInstanceOf[ShuffleBlockId]) --For hash-based shuffle with consolidated files
->val shuffleState = shuffleStates(id.shuffleId) --
->for (fileGroup <- shuffleState.allFileGroups)
->val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) --次函數單獨分析
->if (segment.isDefined) { return segment.get }
->else
->val file = getFile(blockId.name)--getFile(filename: String): File
->val hash = Utils.nonNegativeHash(filename)
->val dirId = hash % localDirs.length
->val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
->var subDir = subDirs(dirId)(subDirId)
->new File(subDir, filename)
->new FileSegment(file, 0, file.length())
->val channel = new RandomAccessFile(segment.file, "r").getChannel
->if (segment.length < minMemoryMapBytes)
->channel.position(segment.offset)
->channel.read(buf)
->return buf
->else
->return Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length))

ShuffleFileGroup:如何通過mapId和reduceId在ShuffleBlockManager 中獲取數據:getFileSegmentFor函數
->根據reduceId從ShuffleFileGroup的屬性val files: Array[File]里面找到reduce的文件句柄fd
? ? ->根據mapId從mapIdToIndex找到index,
? ?? ???->根據reduce找到blockOffset向量和blockLen向量,
? ??? ??? ??->再通過index從向量里面找到offset和len,
? ??? ??? ?? ???->最后通過offset和len從fd里面讀取到需要的數據

從遠本地取數據
->BlockManager::doGetLocal
->val info = blockInfo.get(blockId).orNull
->val level = info.level
->if (level.useMemory) --Look for the block in memory
->val result = if (asBlockResult)
->memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
->esle
->memoryStore.getBytes(blockId)
->if (level.useOffHeap) -- Look for the block in Tachyon
->tachyonStore.getBytes(blockId)
->if (level.useDisk)
->val bytes: ByteBuffer = diskStore.getBytes(blockId)
->if (!level.useMemory) // If the block shouldn't be stored in memory, we can just return it
->if (asBlockResult)
->return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk, info.size))
->else
->return Some(bytes)
->else --memory// Otherwise, we also have to store something in the memory store
->if (!level.deserialized || !asBlockResult) 不序列化或者不block"memory serialized", or if it should be cached as objects in memory
->val copyForMemory = ByteBuffer.allocate(bytes.limit)
->copyForMemory.put(bytes)
->memoryStore.putBytes(blockId, copyForMemory, level)
->if (!asBlockResult)
->return Some(bytes)
->else --需要序列化再寫內存
->val values = dataDeserialize(blockId, bytes)
->if (level.deserialized) // Cache the values before returning them
->val putResult = memoryStore.putIterator(blockId, values, level, returnValues = true, allowPersistToDisk = false)
->putResult.data match case Left(it) return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
->else
->return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
->val values = dataDeserialize(blockId, bytes)
從遠端獲取數據
->BlockManager::doGetRemote
->val locations = Random.shuffle(master.getLocations(blockId)) --隨機打散
->for (loc <- locations) --遍歷所有地址
->val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
->val blockMessage = BlockMessage.fromGetBlock(msg)
->val newBlockMessage = new BlockMessage()
->newBlockMessage.set(getBlock)
->typ = BlockMessage.TYPE_GET_BLOCK
->id = getBlock.id
->val blockMessageArray = new BlockMessageArray(blockMessage)
-> val responseMessage = Try(Await.result(connectionManager.sendMessageReliably(toConnManagerId, blockMessageArray.toBufferMessage), Duration.Inf))
->responseMessage match {case Success(message) => ?val bufferMessage = message.asInstanceOf[BufferMessage]
->logDebug("Response message received " + bufferMessage)
->BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage =>?
->logDebug("Found " + blockMessage)
->return blockMessage.getData
->return Some(data)

===========================end=================================
再次引用這個圖:多個map可以對應一個文件,其中每個map對應文件中的某些段。這樣做是為了減少文件數量。
spark shuffle  consolidation process
(圖片來源:http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/?)
獲取block數據返回的數據結構
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
}

private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
val master: BlockManagerMaster,
defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf,
securityManager: SecurityManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager)
extends BlockDataProvider with Logging {
shuffle狀態,主要包含了unusedFileGroups、allFileGroups兩個屬性,記錄當前已經使用和未使用的ShuffleFileGroup
/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
* ShuffleFileGroups, as well as all ShuffleFileGroups that have been created for the shuffle.
*/
private class ShuffleState(val numBuckets: Int) {
val nextFileId = new AtomicInteger(0)
val unusedFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()
val allFileGroups = new ConcurrentLinkedQueue[ShuffleFileGroup]()

/**
* The mapIds of all map tasks completed on this Executor for this shuffle.
* NB: This is only populated if consolidateShuffleFiles is FALSE. We don't need it otherwise.
*/
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}
shuffleStates 是一個基于時間戳的hash table?
private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]

private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
Used by sort-based shuffle: shuffle結束時將結果注冊到shuffleStates
/**
* Register a completed map without getting a ShuffleWriterGroup. Used by sort-based shuffle
* because it just writes a single file by itself.
*/
def addCompletedMap(shuffleId: Int, mapId: Int, numBuckets: Int): Unit = {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
val shuffleState = shuffleStates(shuffleId)
shuffleState.completedMapTasks.add(mapId)
}
將自己注冊給master?
/**
* Initialize the BlockManager. Register to the BlockManagerMaster, and start the
* BlockManagerWorker actor.
*/
private def initialize(): Unit = {
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
BlockManagerWorker.startBlockManagerWorker(this)
}
從本地磁盤獲取一個block數據。為了方便使用
/**
* A short-circuited method to get blocks directly from disk. This is used for getting
* shuffle blocks. It is safe to do so without a lock on block info since disk store
* never deletes (recent) items.
*/
def getLocalFromDisk(blockId: BlockId, serializer: Serializer): Option[Iterator[Any]] = {
diskStore.getValues(blockId, serializer).orElse {
throw new BlockException(blockId, s"Block $blockId not found on disk, though it should be")
}
}

ShuffleWriterGroup:每個shuffleMapTask都有一組shuffleWriter,它給每個reducer分配了一個writer。當前只有HashShufflle使用了,唯一一個實例化是在forMapTask返回的,給HashShuffleWriter的shuffle屬性使用:
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[BlockObjectWriter]

/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
}

/**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one file
* per reducer (this set of files is called a ShuffleFileGroup).
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle
* files, it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 3-tuple:
* - shuffleId: The unique id given to the entire shuffle stage.
* - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
* Each shuffle file is then mapped to a FileSegment, which is a 3-tuple (file, offset, length)
* that specifies where in a given file the actual block data is located.
*
* Shuffle file metadata is stored in a space-efficient manner. Rather than simply mapping
* ShuffleBlockIds directly to FileSegments, each ShuffleFileGroup maintains a list of offsets for
* each block stored in each file. In order to find the location of a shuffle block, we search the
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// TODO: Factor this into a separate class for each ShuffleManager implementation
private[spark]
class ShuffleBlockManager(blockManager: BlockManager,
shuffleManager: ShuffleManager) extends Logging {
ShuffleFileGroup是一組文件,每個reducer對應一個。每個map將會對應一個這個文件(但多個map可以對應一個文件)。多個map對應一個文件時,它們寫入是分段寫入的(mapId,ReduceId)通過getFileSegmentFor函數獲取到這個塊的內容
privateobject /**
* .
* .
*/
private class val Int, val Int, val private var numBlocksInt 0

/**
* For instance,
* if mapId 5 is the first block in each file, mapIdToIndex(5) = 0.
*/
private val mapIdToIndex new Int, Int/**
* Stores consecutive offsets and lengths of blocks into each reducer file, ordered by
* position in the file.
* Note: * .
*/
private val blockOffsetsByReducer fillLongnew Longprivate val blockLengthsByReducer fillLongnew Longdef applyIntdef recordMapOutputInt, Long, LongassertmapIdToIndexnumBlocks
numBlocks 1
for 0 blockOffsetsByReducerblockLengthsByReducer/** Returns the FileSegment associated with the given map task, or None if no entry exists. */
def getFileSegmentForInt, Intval val blockOffsetsByReducerval blockLengthsByReducerval mapIdToIndex, 1if 0val val Somenew , , else











來自為知筆記(Wiz)


posted on 2015-01-27 16:20 過雁 閱讀(...) 評論(...) 編輯 收藏

轉載于:https://www.cnblogs.com/zwCHAN/p/4253287.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/259435.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/259435.shtml
英文地址,請注明出處:http://en.pswp.cn/news/259435.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python的異常處理

python的try語句有兩種風格 一&#xff1a;種是處理異常&#xff08;try/except/else&#xff09; 二&#xff1a;種是無論是否發生異常都將執行最后的代碼&#xff08;try/finally&#xff09; try/except/else風格 try: <語句> #運行別的代碼 except <名字>&…

解決SQL單用戶模式不能轉為多用戶模式

數據庫CS 轉為單用戶模式后&#xff0c;卻不能訪問屬性&#xff0c;一直想不通&#xff0c;但畢竟是測試用的&#xff0c;也就沒放心上。網上找到段代碼可以恢復多用戶模式。卻還是不能解決不能訪問單用戶屬性的問題。USE master;GoDECLARE SQL VARCHAR(MAX);SET SQLSELECT SQL…

C++ stringstream介紹,使用方法與例子

C引入了ostringstream、istringstream、stringstream這三個類&#xff0c;要使用他們創建對象就必須包含sstream.h頭文件。   istringstream類用于執行C風格的串流的輸入操作。 ostringstream類用于執行C風格的串流的輸出操作。 strstream類同時可以支持C風格的串流的輸入…

xp下添加linux啟動,如何在windows xp系統下安裝linux???

我剛剛想開始學linux&#xff0c;請教如何安裝&#xff01;|我今天才安裝了Redhat 9.0。LINUX不可能在WINDOWS下安裝。比較簡單的方法是先在XP下用PQMAGIC分好區boot ext3 100M , / ext3 6G , swap 內存1&#xff0d;2倍詳細見www.linuxfans.org linux安裝說明最后&#xf…

linux rar安裝

描述&#xff1a;Linux默認自帶ZIP壓縮&#xff0c;最大支持4GB壓縮&#xff0c;RAR的壓縮比大于4GB. 流程&#xff1a;下載 》安裝 》 使用 -------------------------------------------------- 下載 # wget http://www.rarsoft.com/rar/rarlinux-x64-5.2.1b1.tar.gz--16:01:…

hoj 2739 中國郵局問題

1 /*若原圖的基圖不連通,2 或者存在某個點的入度或出度為 0 則無解。3 統計所有點的入度出度之差 Di, 對于 Di > 0 的點,4 加邊(s, i, Di, 0); 對于 Di < 0 的點加邊(i, t, -Di,0);5 對原圖中的每條邊(i, j),6 在網絡中加邊(i, j, ∞, Dij),Dij 為邊(i, j)的權值。7 求一…

R語言編程藝術(3)R語言編程基礎

本文對應《R語言編程藝術》 第7章&#xff1a;R語言編程結構&#xff1b; 第9章&#xff1a;面向對象的編程&#xff1b; 第13章&#xff1a;調試 R語言編程結構 控制語句&#xff1a; 循環&#xff1a; for (n in x) { } while (condition) { } repeat { }另外break也可以用在…

用C++流成員函數put輸出單個字符

轉載&#xff1a;http://c.biancheng.net/cpp/biancheng/view/254.html 在程序中一般用cout和插入運算符“<<”實現輸出&#xff0c;cout流在內存中有相應的緩沖區。有時用戶還有特殊的輸出要求&#xff0c;例如只輸出一個字符。ostream類除了提供上面介紹過的用于格式控…

linux 擴充db2表空間,如何擴充db2的表空間、加容器等表空間維護操作

db2 "alter tablespace GJDATA resize (FILE /backup/GJDATA32K45G)"db2 "alter tablespace GJIDX resize (FILE /backup/GJIDX32K45G)"容器路徑 db2 list tablespace containers for8容器大小 db2pd -d uibsch -tablespaces降低容器空間 resize 增加容器…

CheckBox控件

前臺代碼&#xff1a; 1 <asp:CheckBox ID"CheckBox1" runat"server" Text "蘋果"/> 2 <asp:CheckBox ID"CheckBox2" runat"server" Text "檸檬"/> 3 <asp:CheckBox ID"CheckBox3" runa…

.NET垃圾回收筆記

名詞 垃圾收集目標 ephemeral GC發生在Gen 0 和Gen 1 的垃圾收集 Full GC發生Gen 2 及以上的Gen與LOH的垃圾收集 垃圾收集模式 工作站模式GC直接發生在內存分配的線程&#xff08;也是當前的工作托管線程&#xff09;上 服務器模式每個CPU核都有一個自己獨立的GC線程與托管堆 垃…

go.js中的圖標(icons)的使用

2019獨角獸企業重金招聘Python工程師標準>>> 1、圖標庫下載&#xff1a; 將icons引入&#xff1a;http://gojs.net/latest/samples/icons.js 2、樣式演示 地址&#xff1a;http://gojs.net/latest/samples/icons.html 轉載于:https://my.oschina.net/u/2391658/blog…

Pygame - Python游戲編程入門(1)

前言 在上一篇中&#xff0c;我們初步熟悉了pygame的控制流程&#xff0c;但這對于一個游戲而言是遠遠不夠的。所以在這一篇中&#xff0c;我們的任務是添加一架飛機&#xff08;玩家&#xff09;&#xff0c;并且能夠控制它進行移動&#xff0c;這樣我們就又離目標進了一步了~…

C++字符輸入getchar()和字符輸出putchar()

轉載&#xff1a;http://c.biancheng.net/cpp/biancheng/view/117.html C還保留了C語言中用于輸入和輸出單個字符的函數&#xff0c;使用很方便。其中最常用的有getchar函數和putchar函數。 putchar函數(字符輸出函數) putchar函數的作用是向終端輸出一個字符。例如&#xf…

linux實現shell,linux

4.5Mhttp://www.starbase-929.net/media/Calibre%20Library/Ken%20O.%20Bartch/Linux%20Shell%20Scription%20With%20Bash%20(1778)/Linux%20Shell%20Scription%20With%20Bash%20-%20Ken%20O.%20Bartch.pdfstarbase-929.net全網免費4.0Mhttp://www.myaitcampus.net/elibrary/im…

AQS淺析

2019獨角獸企業重金招聘Python工程師標準>>> AQS的原理淺析 本文是《Java特種兵》的樣章&#xff0c;本書即將由工業出版社出版 AQS的全稱為&#xff08;AbstractQueuedSynchronizer&#xff09;&#xff0c;這個類也是在java.util.concurrent.locks下面。這個類似乎…

str045漏洞提權linux,Linux運維知識之CVE-2016-5195 Dirtycow: Linux內核提權漏洞

本文主要向大家介Linux運維知識之CVE-2016-5195 Dirtycow&#xff1a; Linux內核提權漏洞紹了&#xff0c;通過具體的內容向大家展現&#xff0c;希望對大家學習Linux運維知識有所幫助。CVE-2016-5195 Dirtycow&#xff1a; Linux內核提權漏洞以下都是github上找的源碼&#xf…

編程如寫作

昨晚似乎是個適合寫作的夜&#xff0c;不論是自己還是朋友&#xff0c;都比平常更容易被觸動。看著微博上朋友們的心路&#xff0c;想寫點什么卻似乎找不出非常值得大書特書的主題&#xff0c;只是歪坐在電腦旁&#xff0c;喝著咖啡&#xff0c;單曲循環著倉木麻衣的《time aft…

C++中cin、cin.get()、cin.getline()、getline()等函數的用法

轉載&#xff1a;http://www.cnblogs.com/flatfoosie/archive/2010/12/22/1914055.html c輸入流函數主要以下幾個&#xff1a; 1、cin 2、cin.get() 3、cin.getline() 4、getline() 附:cin.ignore();cin.get()//跳過一個字符,例如不想要的回車,空格等字符 1、cin>>…

工作環境總結(1)開發環境搭建

1、安裝git 安裝文件&#xff1a;Git-2.12.0-64-bit.exe 下載地址&#xff1a;https://github.com/git-for-windows/git/releases/download/v2.12.0.windows.1/Git-2.12.0-64-bit.exe 在git bash中配置&#xff0c;git bash命令行中執行&#xff08;只有使用到egit時使用&…