一、問題引出
object TestRDDPersist { def main( args: Array[ String ] ) : Unit = { val conf = new SparkConf( ) . setMaster( "local[*]" ) . setAppName( "persist" ) val sc = new SparkContext( conf) val rdd = sc. makeRDD( List( "hello world" , "hello spark" ) ) val flatRdd = rdd. flatMap( _. split( " " ) ) val mapRdd = flatRdd. map( word => { println( "@@@@@@@@@@" ) ( word, 1 ) } ) val reduceRdd = mapRdd. reduceByKey( _ + _) reduceRdd. collect( ) . foreach( println) println( "**********" ) val groupRdd = mapRdd. groupByKey( ) groupRdd. collect( ) . foreach( println) }
}
二、RDD Cache
object TestRDDPersist { def main( args: Array[ String ] ) : Unit = { val conf = new SparkConf( ) . setMaster( "local[*]" ) . setAppName( "persist" ) val sc = new SparkContext( conf) val rdd = sc. makeRDD( List( "hello world" , "hello spark" ) ) val flatRdd = rdd. flatMap( _. split( " " ) ) val mapRdd = flatRdd. map( word => { println( "@@@@@@@@@@" ) ( word, 1 ) } ) mapRdd. persist( ) val reduceRdd = mapRdd. reduceByKey( _ + _) reduceRdd. collect( ) . foreach( println) println( "**********" ) val groupRdd = mapRdd. groupByKey( ) groupRdd. collect( ) . foreach( println) }
}
object StorageLevel { val NONE = new StorageLevel( false , false , false , false ) val DISK_ONLY = new StorageLevel( true , false , false , false ) val DISK_ONLY_2 = new StorageLevel( true , false , false , false , 2 ) val MEMORY_ONLY = new StorageLevel( false , true , false , true ) val MEMORY_ONLY_2 = new StorageLevel( false , true , false , true , 2 ) val MEMORY_ONLY_SER = new StorageLevel( false , true , false , false ) val MEMORY_ONLY_SER_2 = new StorageLevel( false , true , false , false , 2 ) val MEMORY_AND_DISK = new StorageLevel( true , true , false , true ) val MEMORY_AND_DISK_2 = new StorageLevel( true , true , false , true , 2 ) val MEMORY_AND_DISK_SER = new StorageLevel( true , true , false , false ) val MEMORY_AND_DISK_SER_2 = new StorageLevel( true , true , false , false , 2 ) val OFF_HEAP = new StorageLevel( true , true , true , false , 1 )
}
三、RDD CheckPoint
object TestRDDPersist { def main( args: Array[ String ] ) : Unit = { val conf = new SparkConf( ) . setMaster( "local[*]" ) . setAppName( "persist" ) val sc = new SparkContext( conf) sc. setCheckpointDir( "checkpoint" ) val rdd = sc. makeRDD( List( "hello world" , "hello spark" ) ) val flatRdd = rdd. flatMap( _. split( " " ) ) val mapRdd = flatRdd. map( word => { println( "@@@@@@@@@@" ) ( word, 1 ) } ) mapRdd. checkpoint( ) val reduceRdd = mapRdd. reduceByKey( _ + _) reduceRdd. collect( ) . foreach( println) println( "**********" ) val groupRdd = mapRdd. groupByKey( ) groupRdd. collect( ) . foreach( println) }
}
四、緩存和檢查點區別
cache 和 persist 會在原有的血緣關系中添加新的依賴,一旦數據出錯可以重頭讀取數據;checkpoint 檢查點會切斷原有的血緣關系,重新建立新的血緣關系,相當于改變數據源 cache 是將數據臨時存儲在 JVM 堆內存中,性能較高,但安全性低,persist 可以指定存儲級別,將數據臨時存儲在磁盤文件中,涉及到 IO,性能較低,作業執行完畢后臨時文件會被刪除;checkpoint 是將數據長久地存儲分布式文件系統中,安全性較高,但涉及 IO 且會獨立開啟一個作業從數據源開始獲取數據,所以性能較低,一般在 checkpoint 前先進行 cache,當 checkpoint 時 job 只需從緩存中讀取數據即可,可以提高性能