《深入理解Spark-核心思想與源碼分析》(四)第四章存儲體系

天行健,君子以自強不息;地勢坤,君子以厚德載物。——《易經》

本章導讀

  Spark的初始化階段、任務提交階段、執行階段,始終離不開存儲體系。

  Spark為了避免Hadoop讀寫磁盤的I/O操作成為性能瓶頸,優先將配置信息、計算結果等數據存入內存,極大的提升了系統的執行效率。

4.1 存儲體系的概述

4.1.1 塊管理器BlockManager的實現

  塊管理器BlockManager是Spark存儲體系中的核心組件,Driver Application和Executor都會創建BlockManager。

  BlockManager主要由以下部分組成:

  1.shuffle客戶端ShuffleClient;

  2.BlockManagerMaster(對存在于所有Executor上的BlockManager統一管理)

  3.磁盤塊管理器DiskBlockManager

  4.內存存儲MemoryStore

  5.磁盤存儲DiskStore

  6.Tachyon存儲TachyonStore

  7.非廣播Block清理器metadataCleaner和廣播Block清理器broadcastCleaner

  8.壓縮算法實現CompressionCodec

  

  BlockManager要生效,必須要初始化。

? ? ? 初始化代碼如下:

 def initialize(appId: String): Unit = {
  //blockTransferSevice的初始化blockTransferService.init(this)
  //shuffleClient的初始化
  //書中解釋ShuffleClient默認是BlockTransferService,當有外部的ShuffleService時,調用外部ShuffleService的初始化方法
shuffleClient.init(appId)blockReplicationPolicy = {val priorityClass = conf.get("spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)val clazz = Utils.classForName(priorityClass)val ret = clazz.getConstructor().newInstance().asInstanceOf[BlockReplicationPolicy]logInfo(s"Using $priorityClass for block replication policy")ret}   //BlockManagerID的創建
val id =BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)val idFromMaster = master.registerBlockManager(id,maxOnHeapMemory,maxOffHeapMemory,slaveEndpoint)blockManagerId = if (idFromMaster != null) idFromMaster else id    //shuffleServerId的創建。當有外部的ShuffleService時,創建新的BlockManagerId,否則ShuffleServerId默認使用當前的BlockManager的BlockManagerIdshuffleServerId = if (externalShuffleServiceEnabled) {logInfo(s"external shuffle service port = $externalShuffleServicePort")BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)} else {blockManagerId}// Register Executors' configuration with the local shuffle service, if one should exist.
   //向BlockManagerMaster注冊BlockManagerId,當有外部的ShuffleService時,還需要BlockManagerMaster注冊ShuffleServerIdif (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}logInfo(s"Initialized BlockManager: $blockManagerId")}

  

4.1.2 Spark存儲體系架構

Spark存儲體系的架構關系說明:

  • 第一號表示Executor的BlockManager與Driver的BlockManager進行消息通信,例如,注冊BlockManager、更新BlockManager、獲取Block所在的BlockManager、刪除BlockExecutor等;
  • 第二號表示對BlockManager的讀操作和寫操作;
  • 第三號表示當MemoryStore的內存不足是,寫入DiskStore,而DiskStore實際上依賴于DiskBlockManager;
  • 第四號表示通過遠端節點的Executor的BlockManager的TransportServer提供的RPC服務下載或者上傳Block;
  • 第五號表示遠端節點的Executor的BlockManager訪問本地Executor的BlockManager中的TransportServer提供的RPC服務下載或者上傳Block;
  • 第六號表示當存儲體系選擇為Tachyon作為存儲時,對于BlockManager的讀寫操作實際上調用了TachyonStore的putBytes、putArray、putIterator、getBytes、getValues等;

?  Spark目前支持HDFS、Amazon S3兩種主流分布式存儲系統。

   Spark定義了抽象類BlockStore,用于制定所有存儲類型的規范。BlockStore的繼承體系如下:

?

4.2 shuffle服務與客服端

Spark分布式部署,每個Task最終運行在不同的機器節點上。map輸出與reduce任務極有可能不在同一機器上運行

所以要遠程下載map任務的中間輸出,因此將ShuffleClient放在存儲體系最為合適。

ShuffleClient將Shuffle文件上傳到其他Executor或者下載本地的客戶端,也提供被其他Executor訪問的shuffle的服務。

Spark與hadoop都是采用Netty作為shuffle server。當有外部的ShuffleClient時,新建ExternalShuffleClient,否則默認為BlockTransferService。

BlockTransferService只有在其init方法調用,即被初始化后才提供服務,以默認的NettyBlockTransferService的init方法為例。

NettyBlockTransferService的初始化步驟如下:

1>創建RpcServer

2>構造TransportContext

3>創建RPC客戶端工廠TransportClientFactory

4>創建Netty服務器TransportServer,可以修改屬性spark.blockManager.port(默認為0,表示隨機選擇)改變TransferServer的端口。

4.2.1 Block的RPC服務

當map任務與reduce任務處于不同節點時,reduce任務需要從遠端節點下載map任務的中間輸出,因此NettyBlockRpcServer提供打開,即下載Block文件的功能;

一些情況下,為了容錯,需要將Block的數據備份到其他節點上,所以NettyBlockRpcServer還提供了上傳Block文件的RPC服務。

NettyBlockRpcServer的實現代碼清單:

class NettyBlockRpcServer(appId: String,serializer: Serializer,blockManager: BlockDataManager)extends RpcHandler with Logging {private val streamManager = new OneForOneStreamManager()override def receive(client: TransportClient,rpcMessage: ByteBuffer,responseContext: RpcResponseCallback): Unit = {val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)logTrace(s"Received request: $message")message match {case openBlocks: OpenBlocks =>val blocksNum = openBlocks.blockIds.lengthval blocks = for (i <- (0 until blocksNum).view)yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)logTrace(s"Registered streamId $streamId with $blocksNum buffers")responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)case uploadBlock: UploadBlock =>// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.val (level: StorageLevel, classTag: ClassTag[_]) = {serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata)).asInstanceOf[(StorageLevel, ClassTag[_])]}val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))val blockId = BlockId(uploadBlock.blockId)logDebug(s"Receiving replicated block $blockId with level ${level} " +s"from ${client.getSocketAddress}")blockManager.putBlockData(blockId, data, level, classTag)responseContext.onSuccess(ByteBuffer.allocate(0))}}override def receiveStream(client: TransportClient,messageHeader: ByteBuffer,responseContext: RpcResponseCallback): StreamCallbackWithID = {val message =BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]val (level: StorageLevel, classTag: ClassTag[_]) = {serializer.newInstance().deserialize(ByteBuffer.wrap(message.metadata)).asInstanceOf[(StorageLevel, ClassTag[_])]}val blockId = BlockId(message.blockId)logDebug(s"Receiving replicated block $blockId with level ${level} as stream " +s"from ${client.getSocketAddress}")// This will return immediately, but will setup a callback on streamData which will still// do all the processing in the netty thread.blockManager.putBlockDataAsStream(blockId, level, classTag)}override def getStreamManager(): StreamManager = streamManager
}

  

?4.2.2構造傳輸上下文TransportContext

TransportContext用于維護傳輸上下文。

  public TransportContext(TransportConf conf,RpcHandler rpcHandler,boolean closeIdleConnections) {this(conf, rpcHandler, closeIdleConnections, false);}

  

TransportContext即可以創建Netty服務,也可以創建Netty訪問客戶端。TransportContext的組成如下:

TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客戶端和服務端線程數目。

RPCHandle:負責shuffle的I/O服務端在接收端到客戶端的RPC請求之后,提供打開Block或者上傳Block的RPC處理,此處即為NettyBlockRpcServer;

decoder:在shuffle的IO服務端對客戶端傳來的ByteBuf進行解析,防止丟包和解析錯誤。

encoder:在shuffle的IO客戶端對消息內容進行編碼,防止服務端丟包和解析錯誤。

?

一個探討:基于流傳輸的是一個字節隊列,要整理解析成更好理解的數據。

?

4.2.3RPC客戶端工廠TransportClientFactory

TransportClientFactory是創建Netty客戶端TransportClient的工廠類,TransportClient用于向Netty服務端發送RPC請求。

TransportContext的createClientFactory方法用于創建TransportClientFactory。

 public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {return new TransportClientFactory(this, bootstraps);}

  

?以下為TransportClientFactory的代碼:

?TransportClientFactory由以下幾個部分組成:

clientBootstraps:用于緩存客戶端列表。

connectionPool:用于緩存客戶端連接。

numConnectionPerPeer:節點之間去數據的連接數。

?

  public TransportClientFactory(TransportContext context,List<TransportClientBootstrap> clientBootstraps) {this.context = Preconditions.checkNotNull(context);this.conf = context.getConf();
//緩存客戶端列表this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));//緩存客戶端連接
  this.connectionPool = new ConcurrentHashMap<>();//節點之間取數據的連接數
  this.numConnectionsPerPeer = conf.numConnectionsPerPeer();this.rand = new Random();IOMode ioMode = IOMode.valueOf(conf.ioMode());//客戶端channel被創建時使用的類
  this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);//根據Netty的規范,客戶端只有Work組,所以此處創建workerGroup,實際上是NioEventLoopGroup
  this.workerGroup = NettyUtils.createEventLoop(ioMode,conf.clientThreads(),conf.getModuleName() + "-client");
  //匯集ByteBuf但對本地線程緩存禁用的分配器this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());this.metrics = new NettyMemoryMetrics(this.pooledAllocator, conf.getModuleName() + "-client", conf);}

  

?關于NIO,為所有的原始類型提供Buffer緩存支持;字符集編碼解碼解決方案;提供一個新的原始IO抽象Channel,

支持鎖和內存映射文件的文件訪問接口,提供多路非阻塞式的高伸縮性網絡IO。

?

?4.2.4 Netty服務器TransportServer

TransportServer提供Netty實現的服務器端,用于提供RPC服務(比如上傳、下載等)。

主要函數init函數,主要根據IP和端口號初始化。

  public TransportServer(TransportContext context,String hostToBind,int portToBind,RpcHandler appRpcHandler,List<TransportServerBootstrap> bootstraps) {this.context = context;this.conf = context.getConf();this.appRpcHandler = appRpcHandler;this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));boolean shouldClose = true;try {init(hostToBind, portToBind);shouldClose = false;} finally {if (shouldClose) {JavaUtils.closeQuietly(this);}}}

 Init函數的代碼情況:

?主要用于對TransportServer初始化,通過使用Netty框架的EventLoopGroup和ServerBootstrap等API創建shuffle的IO交互的客戶端。

  private void init(String hostToBind, int portToBind) {IOMode ioMode = IOMode.valueOf(conf.ioMode());EventLoopGroup bossGroup =NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");EventLoopGroup workerGroup = bossGroup;PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NettyUtils.getServerChannelClass(ioMode)).option(ChannelOption.ALLOCATOR, allocator).option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS).childOption(ChannelOption.ALLOCATOR, allocator);this.metrics = new NettyMemoryMetrics(allocator, conf.getModuleName() + "-server", conf);if (conf.backLog() > 0) {bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());}if (conf.receiveBuf() > 0) {bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());}if (conf.sendBuf() > 0) {bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());}bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {RpcHandler rpcHandler = appRpcHandler;for (TransportServerBootstrap bootstrap : bootstraps) {rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);}context.initializePipeline(ch, rpcHandler);}});InetSocketAddress address = hostToBind == null ?new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);channelFuture = bootstrap.bind(address);channelFuture.syncUninterruptibly();port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();logger.debug("Shuffle server started on port: {}", port);}

4.2.5獲取遠程shuffle文件 

NettyBlockTransferService的fetchBlocks方法用于獲取遠程shuffle文件,實際上是利用NettyBlockTransferService創建Netty服務。

?4.2.6 上傳shuffle文件

NettyBlockTransferService的uploadBlock方法用于上傳shuffle文件到遠程Executor,實際上也是利用NettyBlockTransferService中創建的Netty服務。

1>創建Netty服務的客戶端,客戶端連接的hostname和port正是我們隨機選擇的BlockManager的hostname和port。

2>將Block的存儲界別StorageLevel序列化。

3>將Block的ByteBuffer轉化為數組,便于序列化。

4>將appId、execId、blockId、序列化的StorageLevel、轉換位數組的Block封裝為UploadBlock,并將UploadBlock序列化為字節數組。

5>最終調用Netty客戶端的sendRpc方法將字節數組上傳,回調函數RpcResponseCallback,根據RPC的結果更改上傳狀態。

  override def uploadBlock(hostname: String,port: Int,execId: String,blockId: BlockId,blockData: ManagedBuffer,level: StorageLevel,classTag: ClassTag[_]): Future[Unit] = {val result = Promise[Unit]()val client = clientFactory.createClient(hostname, port)// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.// Everything else is encoded using our binary protocol.val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)val callback = new RpcResponseCallback {override def onSuccess(response: ByteBuffer): Unit = {logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")result.success((): Unit)}override def onFailure(e: Throwable): Unit = {logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)result.failure(e)}}if (asStream) {val streamHeader = new UploadBlockStream(blockId.name, metadata).toByteBufferclient.uploadStream(new NioManagedBuffer(streamHeader), blockData, callback)} else {// Convert or copy nio buffer into array in order to serialize it.val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer,callback)}result.future}

  

4.3 BlockManagerMaster對BlockManager的管理

?BlockManagerMaster的主要作用是什么?

?答案:Driver上的BlockManagerMaster對于存在于Executor上的BlockManager統一管理,比如Executor需要向Driver發送注冊BlockManager、

更新Executor上Block的最新信息、詢問所需要的Block目前所在的位置以及當Executor運行結束需要將次Executor移除等。

第二個問題:Driver與Executor位于不同機器中,該如何實現?

Driver上的BlockManagerMaster會持有BlockManagerMasterActor,所有Executor也會從ActorSystem中獲取BlockManagerMasterActor的引用。

4.3.1 BlockManagerMasterActor

BlockManagerMasterActor只存在于Driver上,Executor從ActorSystem獲取的BlockManagerMasterActor的引用,然后給BlockManagerMasterActor發送消息,

實現和Driver交互。

BlockManagerMasterActor維護的很對緩存數據結構。

blockManagerInfo:緩存所有的BlockManagerId以及BlockManager的信息。

blockManagerIdByExecutor:緩存executorId與其擁有的BlockManagerId之間的映射關系。

blockLocations:緩存Block與BlockManagerId的映射關系。

4.3.2 詢問Driver并獲取回復方法

4.3.3 向BlockManagerMaster注冊BlockManagerId

4.4 磁盤塊管理器DiskBlockManager

4.4.1 DiskBlockManager的構造過程。

BlockManager初始化會創建DiskBlockManager

4.4.2 獲取磁盤文件方法getFile

很多代碼中使用DiskBlockManager的getFile方法,獲取磁盤上的文件。

通過對于getFile的分析,能夠掌握Spark磁盤散列文件存儲的實現機制。

1>根據文件名計算哈希值。

2>根據哈希值與本地文件一級目錄的總數求余數。即為dirId

3>根據哈希值與本地文件一級目錄的總數求商數,此商數與二級 目錄的數目再求余數,即為subDirId

4>如果dirId/subDirId目錄存在,獲取dirId/subDirId

  def getFile(filename: String): File = {// Figure out which local directory it hashes to, and which subdirectory in thatval hash = Utils.nonNegativeHash(filename)val dirId = hash % localDirs.lengthval subDirId = (hash / localDirs.length) % subDirsPerLocalDir// Create the subdirectory if it doesn't already existval subDir = subDirs(dirId).synchronized {val old = subDirs(dirId)(subDirId)if (old != null) {old} else {val newDir = new File(localDirs(dirId), "%02x".format(subDirId))if (!newDir.exists() && !newDir.mkdir()) {throw new IOException(s"Failed to create local dir in $newDir.")}subDirs(dirId)(subDirId) = newDirnewDir}}new File(subDir, filename)}

 

4.4.3 臨時創建Block方法createTempShuffleBlock

當ShuffleMapTask運行結束需要把中間結果臨時保存,此時就調用createTempShuffleBlock方法創建臨時的Block,并返回TempShuffleBlockId與其文件的對偶。

4.5 磁盤存儲DiskStore

當MemoryStore沒有足夠空間時,就會使用DiskStore將塊存入磁盤,DiskStore繼承自BlockStore,并實現了getBytes、putBytes等方法。

4.5.1 NIO讀取getBytes

getBytes方法通過DiskBlockManager的getFile方法獲取文件。然后使用NIO將文件讀取到ByteBuffer。

  def getBytes(blockId: BlockId): BlockData = {val file = diskManager.getFile(blockId.name)val blockSize = getSize(blockId)securityManager.getIOEncryptionKey() match {case Some(key) =>// Encrypted blocks cannot be memory mapped; return a special object that does decryption// and provides InputStream / FileRegion implementations for reading the data.new EncryptedBlockData(file, blockSize, conf, key)case _ =>new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)}}

4.5.2 NIO寫入方法putBytes

putBytes方法的作用是通過DiskBlockManager的getFile方法獲取文件。然后使用NIO的channel將ByteBuffer寫入文件。

4.5.3 數組寫入方法putArray

4.5.4 Itetator寫入方法putIterator

4.6 內存存儲MemoryStore

MemoryStore負責將沒有序列化的Java對象數組或者序列化的ByteBuffer存儲到內存中。

先看MemoryStore的數據結構。

?

private[spark] class MemoryStore(conf: SparkConf,blockInfoManager: BlockInfoManager,serializerManager: SerializerManager,memoryManager: MemoryManager,blockEvictionHandler: BlockEvictionHandler)extends Logging {
...

 

  // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and// acquiring or releasing unroll memory, must be synchronized on `memoryManager`!private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)// All accesses of this map are assumed to have manually synchronized on `memoryManager`private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()// Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching// always stores serialized values.private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()// Initial memory to request before unrolling any blockprivate val unrollMemoryThreshold: Long =conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)/** Total amount of memory available for storage, in bytes. */private def maxMemory: Long = {memoryManager.maxOnHeapStorageMemory + memoryManager.maxOffHeapStorageMemory}if (maxMemory < unrollMemoryThreshold) {logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +s"memory. Please configure Spark with more memory.")}logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))/** Total storage memory used including unroll memory, in bytes. */private def memoryUsed: Long = memoryManager.storageMemoryUsed/*** Amount of storage memory, in bytes, used for caching blocks.* This does not include memory used for unrolling.*/private def blocksMemoryUsed: Long = memoryManager.synchronized {memoryUsed - currentUnrollMemory}def getSize(blockId: BlockId): Long = {entries.synchronized {entries.get(blockId).size}}

  源碼中可以看出

 第一個變量:

private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true) 

整個的內存存儲分為兩個部分,一個部分被很多的MemoryEntry占據的內存currentMemory,這些內存是通過entries持有的。
另一部分是unrollMemoryMap通過占座方式占用內存currentUnrollMemory。

maxUnrollMemory:當前Driver或者Executor最多展開的Block所占用的內存。
maxMemory:當前Driver或者Executor的最大內存。
currentMemory:當前Driver或者Executor已經使用的內存。
private def memoryUsed: Long = memoryManager.storageMemoryUsed

4.6.1 數據存儲方法putBytes

4.6.2 Iterator寫入方法putIterator詳解

4.6.3 安全展開方法unrollSafely

為防止寫入內存的數據過大,導致內存溢出,Spark采用了一種優化方案:

正式寫入內存之前,先用邏輯的方式申請內存,如果申請成功,再寫入內存,這個過程稱為安全展開。

4.6.4確認空閑內存方法ensureFreeSpace

ensureFreeSpace方法用于確認是否具有足夠的內存,如果不足,會釋放MemoryEntry占用的內存。

4.6.5 內存寫入方法putArray

4.6.6嘗試寫入內存方法tryToput

4.6.7 獲取內存數據方法getBytes

用于從entries中獲取MemoryEntry。

4.6.8 獲取數據方法getValues

用于從entries中獲取MemoryEntry 并將blockId和value返回。

4.7 Tachyon存儲TachyonStrore

簡介:Tachyon是一個分布式內存文件系統,可以在集群里以訪問內存的速度來訪問存在tachyon里的文件。

把Tachyon是架構在最底層的分布式文件存儲和上層的各種計算框架之間的一種中間件。

主要職責是將那些不需要落地到DFS里的文件,落地到分布式內存文件系統中,來達到共享內存,從而提高效率。同時可以減少內存冗余,GC時間等。

?

使用Tachyon原因:

1、Spark的shuffleMapTask和ResultTask被劃分為不同的Stage,ShuffleMapTask執行完畢將中間結果輸出到本地磁盤文件系統中,下一個Stage中的ResultTask通過

shuffleClient下載shuffleMapTask輸出到本地磁盤文件系統,讀寫效率比較低。

2、Spark的計算引擎和存儲體系都位于Executor的同一個進程中,計算奔潰后數據丟失

3、不同的Spark可能訪問相同的和數據,例如都要訪問數據到內存中,重復加載到內存,對象太多導致Java GC問題。

4.7.1 Tachyon簡介?

?位于現有大數據計算框架和大數據存儲系統之間的獨立一層。

4.7.2 TachyonStore的使用

4.7.3 寫入Tachyon內存的方法putIntoTachyonStore

TachyonStore實現了BlockStore的getSize、putBytes、putArray等方法。

其中put方法實際調用了putIntoTachyonStore。putIntoTachyonStore用于將數據寫入Tachyon的分布式內存中。

4.7.4 獲取序列化數據方法getBytes

4.8 塊管理器BlockManager

已經介紹了BlockManager中的主要組件了,現在看看BlockManager的自身實現。

4.8.1 移除內存方法dropFromMemory

當內存不足,可能需要騰出部分內存空間。

4.8.2 狀態報告方法reportBlockStatus

reportBlockStatus用于向BlockManagerMasterActor報告Block的狀態并且重新注冊BlockManager.

4.8.3 單對象塊寫入方法putSingle

putSingle方法用于將一個對象構成的Block寫入存儲系統。

4.8.4 序列化字節塊寫入方法putBytes

putBytes方法將序列化字節組成的Block寫入存儲系統,實際上也是調用了doPut方法。

4.8.5 數據寫入方法doPut

4.8.6 數據塊備份方法replicate

4.8.7 創建DiskBlockObjectWriter的方法getDiskWriter

4.8.8 獲取本地Block數據方法getBlockData

getBlockData用于從本地獲取Block的數據

4.8.9 獲取本地shuffle數據方法doGetLocal

當reduce和map任務在同一個節點時,不需要遠程拉取,只需要調取doGetLocal方法從本地獲取中間處理結果。

1.如果Block允許使用內存,調用MemoryStore的getValues或者getBytes方法獲取。

2.如果Block允許使用Tachyon,調用TachyonStore的getBytes方法。

3.如果BLock允許使用DiskStore,調用DiskStore的getBytes方法。

4.8.10 遠程獲取Block數據方法doGetRemote

4.8.11 獲取Block數據方法get

先本地后遠程

4.8.12 數據流序列化方法dataSerializeStream

4.9 metadataCleaner和broadcastCleaner

為了有效利用磁盤空間和內存,metadataCleaner和broadcastCleaner分別用于清除blockinfo中很久不用的非廣播和廣播Block信息。

4.10 緩存管理器CacheManager

用于緩存RDD某個分區計算后的中間結果。

CacheManager只是BlockManager的代理,真正的緩存依然使用的是BlockManager。

4.11 壓縮算法

配置屬性:spark.io.compression.codec來確定要使用的壓縮算法。默認為snappy

4.12 磁盤寫入實現DiskBlockObjectWriter

被用于輸出Spark任務的中間計算結果。

4.13 塊索引shuffle管理器IndexShuffleBlockManager

通常用于獲取Block索引文件,并且根據索引文件讀取Block文件的數據。

4.14 shuffle內存管理器ShuffleMemeoryManager

用于為執行shuffle操作的線程分配內存池。

4.15 小結

目前主要有MemoryStore、DiskStore和TachyonStore三種組成

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?

?





?

?

?

?

 

?

?

?

?

  

  

?

轉載于:https://www.cnblogs.com/sunrunzhi/p/10321249.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/248583.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/248583.shtml
英文地址,請注明出處:http://en.pswp.cn/news/248583.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

看完這篇文章保你面試穩操勝券 ——(必考題)javaScript 篇

? 進大廠收藏這一系列就夠了,全方位搜集總結,為大家歸納出這篇面試寶典,面試途中祝你一臂之力!,共分為四個系列 ? 本 篇 為 《 看 完 這 篇 文 章 保 你 面 試 穩 操 勝 券 》 第 三 篇 ( j

Django打造大型企業官網-項目部署

Django打造大型企業官網-項目部署 一、準備工作 1、在開發機上的準備工作 1&#xff09;確認項目沒有bug。 2&#xff09;打開終端&#xff0c;進入虛擬環境&#xff0c;再 cd 到項目根目錄下&#xff0c;執行命令&#xff1a;pip freeze > requirements.txt&#xff0c;將…

2019.01.26 codeforces 1096G. Lucky Tickets(生成函數)

傳送門 題意簡述&#xff1a;現在有一些號碼由000~999中的某些數字組成&#xff08;會給出&#xff09;&#xff0c;號碼總長度為nnn&#xff0c;問有多少個號碼滿足前n2\frac n22n?個數碼的和等于后n2\frac n22n?個數碼的和&#xff08;保證nnn是偶數&#xff09;&#xff0…

看完這篇文章保你面試穩操勝券——小程序篇

? 進大廠收藏這一系列就夠了,全方位搜集總結,為大家歸納出這篇面試寶典,面試途中祝你一臂之力!,共分為四個系列 ? 本 篇 為 《 看 完 這 篇 文 章 保 你 面 試 穩 操 勝 券 》 第 四 篇 ( 微

17 | 如何正確地顯示隨機消息?

我在上一篇文章&#xff0c;為你講解完order by語句的幾種執行模式后&#xff0c;就想到了之前一個做英語學習App的朋友碰到過的一個性能問題。今天這篇文章&#xff0c;我就從這個性能問題說起&#xff0c;和你說說MySQL中的另外一種排序需求&#xff0c;希望能夠加深你對MySQ…

看完這篇文章保你面試穩操勝券——React篇

? 進大廠收藏這一系列就夠了,全方位搜集總結,為大家歸納出這篇面試寶典,面試途中祝你一臂之力!,共分為四個系列 ? 本 篇 為 《 看 完 這 篇 文 章 保 你 面 試 穩 操 勝 券 》 第 五 篇 ( r

HTML的footer置于頁面最底部

vue項目中&#xff0c;使用element-ui的布局&#xff0c;仍然出現footer不固定頁面底部的情況&#xff0c;網上找到的一個管用的 方法是&#xff1a;footer高度固定絕對定位 <html><head></head><body><div class"header">header</…

logstash異常

logstash異常 123Unrecognized VM option UseParNewGCError: Could not create the Java Virtual Machine.Error: A fatal exception has occurred. Program will exit.logstash的版本6.4.1&#xff0c;修改config/jvm.options&#xff0c;注釋掉-XX:UseParNewGC這個配置即可。…

QT+VS中使用qDebbug()打印調試信息無法顯示

首先右鍵點擊項目名稱&#xff0c;找到最后一項屬性 然后依次設置為如圖所示即可 再次編譯后&#xff0c;會彈出CMD窗口&#xff0c;出現qDebug的調試信息。 轉載于:https://www.cnblogs.com/WindSun/p/10328404.html

WebAPIs移動端特效——不看你就虧大了

Web APIs 本篇學習目標: ?能夠寫出移動端觸屏事件 ?能夠寫出常見的移動端特效 ?能夠使用移動端開發插件開發移動端特效 ?能夠使用移動端開發框架開發移動端特效 ?能夠寫出 sessionStorage 數據的存儲以及獲取 ?能夠寫出 localStorage 數據的存儲以及獲取 ?能夠說出它們兩…

MVC是一種用于表示層設計的復合設計模式

它們之間的交互有以下幾種&#xff1a;1.當用戶在視圖上做任何需要調用模型的操作時&#xff0c;它的請求將被控制器截獲。2.控制器按照自身指定的策略&#xff0c;將用戶行為翻譯成模型操作&#xff0c;調用模型相應邏輯實現。3.控制器可能會在接到視圖操作時&#xff0c;指定…

Centos7.2源碼安裝redis

1、下載redis包&#xff08;此處可到官網查看&#xff0c;有相應的命令&#xff09; wget http://download.redis.io/releases/redis-5.0.3.tar.gz 2、解壓之后&#xff0c;并進行make編譯 tar xzf redis-5.0.3.tar.gz -C /usr/local/cd /usr/local/redis-5.0.3/make如果出現如…

手擼移動端輪播圖(內含源碼)

移動輪播圖 移動端輪播圖與PC段輪播圖&#xff0c;在技術選擇上是有區別的&#xff0c;因為移動端的瀏覽器版本非常好&#xff0c;對于H5和CSS3的支持非常完美&#xff0c;所以很多效果可以CSS3的方式實現&#xff0c;比如可以使用 Transorm 屬性替代原來的動畫函數 可以自動…

原創jquery插件treeTable(轉)

由于工作需要&#xff0c;要直觀的看到某個業務是由那些子業務引起的異常&#xff0c;所以我需要用樹表的方式來展現各個層次的數據。 需求&#xff1a; 1、數據層次分明&#xff1b; 2、數據讀取慢、需要動態加載孩子節點&#xff1b; 3、支持默認展開多少層。 在網上找到了很…

初探Vue3

&#x1f31c;本篇文章目錄\textcolor{green}{本篇文章目錄}本篇文章目錄 &#x1f31b; &#x1f435; 新構建工具Vite\textcolor{blue}{新構建工具Vite}新構建工具Vite &#x1f435; CompositionAPI火爆來襲\textcolor{blue}{Composition API火爆來襲}CompositionAPI火爆來…

linux執行python命令后permission denied

linux下執行python后顯示被拒絕問題定位&#xff1a; 1、檢查下要執行的文件的權限是否存在執行權限&#xff0c;否則執行chmod命令賦予權限&#xff1b; 2、若賦予權限后仍然顯示沒有權限&#xff0c;檢查下執行的python文件是否有權限&#xff0c;否則執行chmod賦予執行權限。…

mysql zip 安裝

第一步下載mysql.zip https://dev.mysql.com/downloads/mysql/5.7.html#downloads 第二步&#xff1a;解壓文件后在其目錄下&#xff0c; 新建 my.ini 注意編碼為ansi&#xff0c;新建 data 空文件夾 my.ini內容為&#xff1a; [mysql]# 設置mysql客戶端默認字符集default…

Vue3的響應式原理解析

Vue3的響應式原理解析 Vue2響應式原理回顧 // 1.對象響應化&#xff1a;遍歷每個key&#xff0c;定義getter、setter // 2.數組響應化&#xff1a;覆蓋數組原型方法&#xff0c;額外增加通知邏輯 const originalProto Array.prototype const arrayProto Object.create(orig…

react Native 環境安裝配置——圖解版一目了然

?原創不易&#xff0c;還希望各位大佬支持一下\textcolor{blue}{原創不易&#xff0c;還希望各位大佬支持一下}原創不易&#xff0c;還希望各位大佬支持一下 &#x1f525; Flutter和reactNative的區別\textcolor{green}{Flutter和react Native的區別}Flutter和reactNative的…

第七章 字典和集合[DDT書本學習 小甲魚]【2】

7.1.2 字典的各種內置方法在序列里為不存在位置賦值&#xff0c;會出現錯誤&#xff1b;而在字典不存在得位置賦值&#xff0c;會創建。工廠函數&#xff08;類型&#xff09;以前學過 str(),int(),list(),tuple()....... 1.fromkeys() 用于創建和返回一個新的字典 不是修改 2…