天行健,君子以自強不息;地勢坤,君子以厚德載物。——《易經》
本章導讀
Spark的初始化階段、任務提交階段、執行階段,始終離不開存儲體系。
Spark為了避免Hadoop讀寫磁盤的I/O操作成為性能瓶頸,優先將配置信息、計算結果等數據存入內存,極大的提升了系統的執行效率。
4.1 存儲體系的概述
4.1.1 塊管理器BlockManager的實現
塊管理器BlockManager是Spark存儲體系中的核心組件,Driver Application和Executor都會創建BlockManager。
BlockManager主要由以下部分組成:
1.shuffle客戶端ShuffleClient;
2.BlockManagerMaster(對存在于所有Executor上的BlockManager統一管理)
3.磁盤塊管理器DiskBlockManager
4.內存存儲MemoryStore
5.磁盤存儲DiskStore
6.Tachyon存儲TachyonStore
7.非廣播Block清理器metadataCleaner和廣播Block清理器broadcastCleaner
8.壓縮算法實現CompressionCodec
BlockManager要生效,必須要初始化。
? ? ? 初始化代碼如下:
def initialize(appId: String): Unit = {
//blockTransferSevice的初始化blockTransferService.init(this)
//shuffleClient的初始化
//書中解釋ShuffleClient默認是BlockTransferService,當有外部的ShuffleService時,調用外部ShuffleService的初始化方法
shuffleClient.init(appId)blockReplicationPolicy = {val priorityClass = conf.get("spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)val clazz = Utils.classForName(priorityClass)val ret = clazz.getConstructor().newInstance().asInstanceOf[BlockReplicationPolicy]logInfo(s"Using $priorityClass for block replication policy")ret} //BlockManagerID的創建
val id =BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)val idFromMaster = master.registerBlockManager(id,maxOnHeapMemory,maxOffHeapMemory,slaveEndpoint)blockManagerId = if (idFromMaster != null) idFromMaster else id //shuffleServerId的創建。當有外部的ShuffleService時,創建新的BlockManagerId,否則ShuffleServerId默認使用當前的BlockManager的BlockManagerIdshuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}// Register Executors' configuration with the local shuffle service, if one should exist.
//向BlockManagerMaster注冊BlockManagerId,當有外部的ShuffleService時,還需要BlockManagerMaster注冊ShuffleServerIdif (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}logInfo(s"Initialized BlockManager: $blockManagerId")}
4.1.2 Spark存儲體系架構
Spark存儲體系的架構關系說明:
- 第一號表示Executor的BlockManager與Driver的BlockManager進行消息通信,例如,注冊BlockManager、更新BlockManager、獲取Block所在的BlockManager、刪除BlockExecutor等;
- 第二號表示對BlockManager的讀操作和寫操作;
- 第三號表示當MemoryStore的內存不足是,寫入DiskStore,而DiskStore實際上依賴于DiskBlockManager;
- 第四號表示通過遠端節點的Executor的BlockManager的TransportServer提供的RPC服務下載或者上傳Block;
- 第五號表示遠端節點的Executor的BlockManager訪問本地Executor的BlockManager中的TransportServer提供的RPC服務下載或者上傳Block;
- 第六號表示當存儲體系選擇為Tachyon作為存儲時,對于BlockManager的讀寫操作實際上調用了TachyonStore的putBytes、putArray、putIterator、getBytes、getValues等;
? Spark目前支持HDFS、Amazon S3兩種主流分布式存儲系統。
Spark定義了抽象類BlockStore,用于制定所有存儲類型的規范。BlockStore的繼承體系如下:
?
4.2 shuffle服務與客服端
Spark分布式部署,每個Task最終運行在不同的機器節點上。map輸出與reduce任務極有可能不在同一機器上運行
所以要遠程下載map任務的中間輸出,因此將ShuffleClient放在存儲體系最為合適。
ShuffleClient將Shuffle文件上傳到其他Executor或者下載本地的客戶端,也提供被其他Executor訪問的shuffle的服務。
Spark與hadoop都是采用Netty作為shuffle server。當有外部的ShuffleClient時,新建ExternalShuffleClient,否則默認為BlockTransferService。
BlockTransferService只有在其init方法調用,即被初始化后才提供服務,以默認的NettyBlockTransferService的init方法為例。
NettyBlockTransferService的初始化步驟如下:
1>創建RpcServer
2>構造TransportContext
3>創建RPC客戶端工廠TransportClientFactory
4>創建Netty服務器TransportServer,可以修改屬性spark.blockManager.port(默認為0,表示隨機選擇)改變TransferServer的端口。
4.2.1 Block的RPC服務
當map任務與reduce任務處于不同節點時,reduce任務需要從遠端節點下載map任務的中間輸出,因此NettyBlockRpcServer提供打開,即下載Block文件的功能;
一些情況下,為了容錯,需要將Block的數據備份到其他節點上,所以NettyBlockRpcServer還提供了上傳Block文件的RPC服務。
NettyBlockRpcServer的實現代碼清單:
class NettyBlockRpcServer(appId: String,serializer: Serializer,blockManager: BlockDataManager)extends RpcHandler with Logging {private val streamManager = new OneForOneStreamManager()override def receive(client: TransportClient,rpcMessage: ByteBuffer,responseContext: RpcResponseCallback): Unit = {val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)logTrace(s"Received request: $message")message match {case openBlocks: OpenBlocks =>val blocksNum = openBlocks.blockIds.lengthval blocks = for (i <- (0 until blocksNum).view)yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)logTrace(s"Registered streamId $streamId with $blocksNum buffers")responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)case uploadBlock: UploadBlock =>// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.val (level: StorageLevel, classTag: ClassTag[_]) = {serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata)).asInstanceOf[(StorageLevel, ClassTag[_])]}val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))val blockId = BlockId(uploadBlock.blockId)logDebug(s"Receiving replicated block $blockId with level ${level} " +s"from ${client.getSocketAddress}")blockManager.putBlockData(blockId, data, level, classTag)responseContext.onSuccess(ByteBuffer.allocate(0))}}override def receiveStream(client: TransportClient,messageHeader: ByteBuffer,responseContext: RpcResponseCallback): StreamCallbackWithID = {val message =BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]val (level: StorageLevel, classTag: ClassTag[_]) = {serializer.newInstance().deserialize(ByteBuffer.wrap(message.metadata)).asInstanceOf[(StorageLevel, ClassTag[_])]}val blockId = BlockId(message.blockId)logDebug(s"Receiving replicated block $blockId with level ${level} as stream " +s"from ${client.getSocketAddress}")// This will return immediately, but will setup a callback on streamData which will still// do all the processing in the netty thread.blockManager.putBlockDataAsStream(blockId, level, classTag)}override def getStreamManager(): StreamManager = streamManager }
?4.2.2構造傳輸上下文TransportContext
TransportContext用于維護傳輸上下文。
public TransportContext(TransportConf conf,RpcHandler rpcHandler,boolean closeIdleConnections) {this(conf, rpcHandler, closeIdleConnections, false);}
TransportContext即可以創建Netty服務,也可以創建Netty訪問客戶端。TransportContext的組成如下:
TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客戶端和服務端線程數目。
RPCHandle:負責shuffle的I/O服務端在接收端到客戶端的RPC請求之后,提供打開Block或者上傳Block的RPC處理,此處即為NettyBlockRpcServer;
decoder:在shuffle的IO服務端對客戶端傳來的ByteBuf進行解析,防止丟包和解析錯誤。
encoder:在shuffle的IO客戶端對消息內容進行編碼,防止服務端丟包和解析錯誤。
?
一個探討:基于流傳輸的是一個字節隊列,要整理解析成更好理解的數據。
?
4.2.3RPC客戶端工廠TransportClientFactory
TransportClientFactory是創建Netty客戶端TransportClient的工廠類,TransportClient用于向Netty服務端發送RPC請求。
TransportContext的createClientFactory方法用于創建TransportClientFactory。
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {return new TransportClientFactory(this, bootstraps);}
?以下為TransportClientFactory的代碼:
?TransportClientFactory由以下幾個部分組成:
clientBootstraps:用于緩存客戶端列表。
connectionPool:用于緩存客戶端連接。
numConnectionPerPeer:節點之間去數據的連接數。
?
public TransportClientFactory(TransportContext context,List<TransportClientBootstrap> clientBootstraps) {this.context = Preconditions.checkNotNull(context);this.conf = context.getConf();
//緩存客戶端列表this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));//緩存客戶端連接
this.connectionPool = new ConcurrentHashMap<>();//節點之間取數據的連接數
this.numConnectionsPerPeer = conf.numConnectionsPerPeer();this.rand = new Random();IOMode ioMode = IOMode.valueOf(conf.ioMode());//客戶端channel被創建時使用的類
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);//根據Netty的規范,客戶端只有Work組,所以此處創建workerGroup,實際上是NioEventLoopGroup
this.workerGroup = NettyUtils.createEventLoop(ioMode,conf.clientThreads(),conf.getModuleName() + "-client");
//匯集ByteBuf但對本地線程緩存禁用的分配器this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());this.metrics = new NettyMemoryMetrics(this.pooledAllocator, conf.getModuleName() + "-client", conf);}
?關于NIO,為所有的原始類型提供Buffer緩存支持;字符集編碼解碼解決方案;提供一個新的原始IO抽象Channel,
支持鎖和內存映射文件的文件訪問接口,提供多路非阻塞式的高伸縮性網絡IO。
?
?4.2.4 Netty服務器TransportServer
TransportServer提供Netty實現的服務器端,用于提供RPC服務(比如上傳、下載等)。
主要函數init函數,主要根據IP和端口號初始化。
public TransportServer(TransportContext context,String hostToBind,int portToBind,RpcHandler appRpcHandler,List<TransportServerBootstrap> bootstraps) {this.context = context;this.conf = context.getConf();this.appRpcHandler = appRpcHandler;this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));boolean shouldClose = true;try {init(hostToBind, portToBind);shouldClose = false;} finally {if (shouldClose) {JavaUtils.closeQuietly(this);}}}
Init函數的代碼情況:
?主要用于對TransportServer初始化,通過使用Netty框架的EventLoopGroup和ServerBootstrap等API創建shuffle的IO交互的客戶端。
private void init(String hostToBind, int portToBind) {IOMode ioMode = IOMode.valueOf(conf.ioMode());EventLoopGroup bossGroup =NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");EventLoopGroup workerGroup = bossGroup;PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NettyUtils.getServerChannelClass(ioMode)).option(ChannelOption.ALLOCATOR, allocator).option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS).childOption(ChannelOption.ALLOCATOR, allocator);this.metrics = new NettyMemoryMetrics(allocator, conf.getModuleName() + "-server", conf);if (conf.backLog() > 0) {bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());}if (conf.receiveBuf() > 0) {bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());}if (conf.sendBuf() > 0) {bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());}bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {RpcHandler rpcHandler = appRpcHandler;for (TransportServerBootstrap bootstrap : bootstraps) {rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);}context.initializePipeline(ch, rpcHandler);}});InetSocketAddress address = hostToBind == null ?new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);channelFuture = bootstrap.bind(address);channelFuture.syncUninterruptibly();port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();logger.debug("Shuffle server started on port: {}", port);}
4.2.5獲取遠程shuffle文件
NettyBlockTransferService的fetchBlocks方法用于獲取遠程shuffle文件,實際上是利用NettyBlockTransferService創建Netty服務。
?4.2.6 上傳shuffle文件
NettyBlockTransferService的uploadBlock方法用于上傳shuffle文件到遠程Executor,實際上也是利用NettyBlockTransferService中創建的Netty服務。
1>創建Netty服務的客戶端,客戶端連接的hostname和port正是我們隨機選擇的BlockManager的hostname和port。
2>將Block的存儲界別StorageLevel序列化。
3>將Block的ByteBuffer轉化為數組,便于序列化。
4>將appId、execId、blockId、序列化的StorageLevel、轉換位數組的Block封裝為UploadBlock,并將UploadBlock序列化為字節數組。
5>最終調用Netty客戶端的sendRpc方法將字節數組上傳,回調函數RpcResponseCallback,根據RPC的結果更改上傳狀態。
override def uploadBlock(hostname: String,port: Int,execId: String,blockId: BlockId,blockData: ManagedBuffer,level: StorageLevel,classTag: ClassTag[_]): Future[Unit] = {val result = Promise[Unit]()val client = clientFactory.createClient(hostname, port)// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.// Everything else is encoded using our binary protocol.val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)val callback = new RpcResponseCallback {override def onSuccess(response: ByteBuffer): Unit = {logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")result.success((): Unit)}override def onFailure(e: Throwable): Unit = {logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)result.failure(e)}}if (asStream) {val streamHeader = new UploadBlockStream(blockId.name, metadata).toByteBufferclient.uploadStream(new NioManagedBuffer(streamHeader), blockData, callback)} else {// Convert or copy nio buffer into array in order to serialize it.val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer,callback)}result.future}
4.3 BlockManagerMaster對BlockManager的管理
?BlockManagerMaster的主要作用是什么?
?答案:Driver上的BlockManagerMaster對于存在于Executor上的BlockManager統一管理,比如Executor需要向Driver發送注冊BlockManager、
更新Executor上Block的最新信息、詢問所需要的Block目前所在的位置以及當Executor運行結束需要將次Executor移除等。
第二個問題:Driver與Executor位于不同機器中,該如何實現?
Driver上的BlockManagerMaster會持有BlockManagerMasterActor,所有Executor也會從ActorSystem中獲取BlockManagerMasterActor的引用。
4.3.1 BlockManagerMasterActor
BlockManagerMasterActor只存在于Driver上,Executor從ActorSystem獲取的BlockManagerMasterActor的引用,然后給BlockManagerMasterActor發送消息,
實現和Driver交互。
BlockManagerMasterActor維護的很對緩存數據結構。
blockManagerInfo:緩存所有的BlockManagerId以及BlockManager的信息。
blockManagerIdByExecutor:緩存executorId與其擁有的BlockManagerId之間的映射關系。
blockLocations:緩存Block與BlockManagerId的映射關系。
4.3.2 詢問Driver并獲取回復方法
4.3.3 向BlockManagerMaster注冊BlockManagerId
4.4 磁盤塊管理器DiskBlockManager
4.4.1 DiskBlockManager的構造過程。
BlockManager初始化會創建DiskBlockManager
4.4.2 獲取磁盤文件方法getFile
很多代碼中使用DiskBlockManager的getFile方法,獲取磁盤上的文件。
通過對于getFile的分析,能夠掌握Spark磁盤散列文件存儲的實現機制。
1>根據文件名計算哈希值。
2>根據哈希值與本地文件一級目錄的總數求余數。即為dirId
3>根據哈希值與本地文件一級目錄的總數求商數,此商數與二級 目錄的數目再求余數,即為subDirId
4>如果dirId/subDirId目錄存在,獲取dirId/subDirId
def getFile(filename: String): File = {// Figure out which local directory it hashes to, and which subdirectory in thatval hash = Utils.nonNegativeHash(filename)val dirId = hash % localDirs.lengthval subDirId = (hash / localDirs.length) % subDirsPerLocalDir// Create the subdirectory if it doesn't already existval subDir = subDirs(dirId).synchronized {val old = subDirs(dirId)(subDirId)if (old != null) {old} else {val newDir = new File(localDirs(dirId), "%02x".format(subDirId))if (!newDir.exists() && !newDir.mkdir()) {throw new IOException(s"Failed to create local dir in $newDir.")}subDirs(dirId)(subDirId) = newDirnewDir}}new File(subDir, filename)}
4.4.3 臨時創建Block方法createTempShuffleBlock
當ShuffleMapTask運行結束需要把中間結果臨時保存,此時就調用createTempShuffleBlock方法創建臨時的Block,并返回TempShuffleBlockId與其文件的對偶。
4.5 磁盤存儲DiskStore
當MemoryStore沒有足夠空間時,就會使用DiskStore將塊存入磁盤,DiskStore繼承自BlockStore,并實現了getBytes、putBytes等方法。
4.5.1 NIO讀取getBytes
getBytes方法通過DiskBlockManager的getFile方法獲取文件。然后使用NIO將文件讀取到ByteBuffer。
def getBytes(blockId: BlockId): BlockData = {val file = diskManager.getFile(blockId.name)val blockSize = getSize(blockId)securityManager.getIOEncryptionKey() match {case Some(key) =>// Encrypted blocks cannot be memory mapped; return a special object that does decryption// and provides InputStream / FileRegion implementations for reading the data.new EncryptedBlockData(file, blockSize, conf, key)case _ =>new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)}}
4.5.2 NIO寫入方法putBytes
putBytes方法的作用是通過DiskBlockManager的getFile方法獲取文件。然后使用NIO的channel將ByteBuffer寫入文件。
4.5.3 數組寫入方法putArray
4.5.4 Itetator寫入方法putIterator
4.6 內存存儲MemoryStore
MemoryStore負責將沒有序列化的Java對象數組或者序列化的ByteBuffer存儲到內存中。
先看MemoryStore的數據結構。
?
private[spark] class MemoryStore(conf: SparkConf,blockInfoManager: BlockInfoManager,serializerManager: SerializerManager,memoryManager: MemoryManager,blockEvictionHandler: BlockEvictionHandler)extends Logging { ...
// Note: all changes to memory allocations, notably putting blocks, evicting blocks, and// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)// All accesses of this map are assumed to have manually synchronized on `memoryManager`private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()// Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching// always stores serialized values.private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()// Initial memory to request before unrolling any blockprivate val unrollMemoryThreshold: Long =conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)/** Total amount of memory available for storage, in bytes. */private def maxMemory: Long = {memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory}if (maxMemory < unrollMemoryThreshold) {logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +s"memory. Please configure Spark with more memory.")}logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))/** Total storage memory used including unroll memory, in bytes. */private def memoryUsed: Long = memoryManager.storageMemoryUsed/*** Amount of storage memory, in bytes, used for caching blocks.* This does not include memory used for unrolling.*/private def blocksMemoryUsed: Long = memoryManager.synchronized {memoryUsed - currentUnrollMemory}def getSize(blockId: BlockId): Long = {entries.synchronized {entries.get(blockId).size}}
源碼中可以看出
第一個變量:
private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
整個的內存存儲分為兩個部分,一個部分被很多的MemoryEntry占據的內存currentMemory,這些內存是通過entries持有的。
另一部分是unrollMemoryMap通過占座方式占用內存currentUnrollMemory。
maxUnrollMemory:當前Driver或者Executor最多展開的Block所占用的內存。
maxMemory:當前Driver或者Executor的最大內存。
currentMemory:當前Driver或者Executor已經使用的內存。
private def memoryUsed: Long = memoryManager.storageMemoryUsed
4.6.1 數據存儲方法putBytes
4.6.2 Iterator寫入方法putIterator詳解
4.6.3 安全展開方法unrollSafely
為防止寫入內存的數據過大,導致內存溢出,Spark采用了一種優化方案:
正式寫入內存之前,先用邏輯的方式申請內存,如果申請成功,再寫入內存,這個過程稱為安全展開。
4.6.4確認空閑內存方法ensureFreeSpace
ensureFreeSpace方法用于確認是否具有足夠的內存,如果不足,會釋放MemoryEntry占用的內存。
4.6.5 內存寫入方法putArray
4.6.6嘗試寫入內存方法tryToput
4.6.7 獲取內存數據方法getBytes
用于從entries中獲取MemoryEntry。
4.6.8 獲取數據方法getValues
用于從entries中獲取MemoryEntry 并將blockId和value返回。
4.7 Tachyon存儲TachyonStrore
簡介:Tachyon是一個分布式內存文件系統,可以在集群里以訪問內存的速度來訪問存在tachyon里的文件。
把Tachyon是架構在最底層的分布式文件存儲和上層的各種計算框架之間的一種中間件。
主要職責是將那些不需要落地到DFS里的文件,落地到分布式內存文件系統中,來達到共享內存,從而提高效率。同時可以減少內存冗余,GC時間等。
?
使用Tachyon原因:
1、Spark的shuffleMapTask和ResultTask被劃分為不同的Stage,ShuffleMapTask執行完畢將中間結果輸出到本地磁盤文件系統中,下一個Stage中的ResultTask通過
shuffleClient下載shuffleMapTask輸出到本地磁盤文件系統,讀寫效率比較低。
2、Spark的計算引擎和存儲體系都位于Executor的同一個進程中,計算奔潰后數據丟失
3、不同的Spark可能訪問相同的和數據,例如都要訪問數據到內存中,重復加載到內存,對象太多導致Java GC問題。
4.7.1 Tachyon簡介?
?位于現有大數據計算框架和大數據存儲系統之間的獨立一層。
4.7.2 TachyonStore的使用
4.7.3 寫入Tachyon內存的方法putIntoTachyonStore
TachyonStore實現了BlockStore的getSize、putBytes、putArray等方法。
其中put方法實際調用了putIntoTachyonStore。putIntoTachyonStore用于將數據寫入Tachyon的分布式內存中。
4.7.4 獲取序列化數據方法getBytes
4.8 塊管理器BlockManager
已經介紹了BlockManager中的主要組件了,現在看看BlockManager的自身實現。
4.8.1 移除內存方法dropFromMemory
當內存不足,可能需要騰出部分內存空間。
4.8.2 狀態報告方法reportBlockStatus
reportBlockStatus用于向BlockManagerMasterActor報告Block的狀態并且重新注冊BlockManager.
4.8.3 單對象塊寫入方法putSingle
putSingle方法用于將一個對象構成的Block寫入存儲系統。
4.8.4 序列化字節塊寫入方法putBytes
putBytes方法將序列化字節組成的Block寫入存儲系統,實際上也是調用了doPut方法。
4.8.5 數據寫入方法doPut
4.8.6 數據塊備份方法replicate
4.8.7 創建DiskBlockObjectWriter的方法getDiskWriter
4.8.8 獲取本地Block數據方法getBlockData
getBlockData用于從本地獲取Block的數據
4.8.9 獲取本地shuffle數據方法doGetLocal
當reduce和map任務在同一個節點時,不需要遠程拉取,只需要調取doGetLocal方法從本地獲取中間處理結果。
1.如果Block允許使用內存,調用MemoryStore的getValues或者getBytes方法獲取。
2.如果Block允許使用Tachyon,調用TachyonStore的getBytes方法。
3.如果BLock允許使用DiskStore,調用DiskStore的getBytes方法。
4.8.10 遠程獲取Block數據方法doGetRemote
4.8.11 獲取Block數據方法get
先本地后遠程
4.8.12 數據流序列化方法dataSerializeStream
4.9 metadataCleaner和broadcastCleaner
為了有效利用磁盤空間和內存,metadataCleaner和broadcastCleaner分別用于清除blockinfo中很久不用的非廣播和廣播Block信息。
4.10 緩存管理器CacheManager
用于緩存RDD某個分區計算后的中間結果。
CacheManager只是BlockManager的代理,真正的緩存依然使用的是BlockManager。
4.11 壓縮算法
配置屬性:spark.io.compression.codec來確定要使用的壓縮算法。默認為snappy
4.12 磁盤寫入實現DiskBlockObjectWriter
被用于輸出Spark任務的中間計算結果。
4.13 塊索引shuffle管理器IndexShuffleBlockManager
通常用于獲取Block索引文件,并且根據索引文件讀取Block文件的數據。
4.14 shuffle內存管理器ShuffleMemeoryManager
用于為執行shuffle操作的線程分配內存池。
4.15 小結
目前主要有MemoryStore、DiskStore和TachyonStore三種組成
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?