舉例
package com.scala.my
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext
/**
*
* @author root
* 測試步驟:
* ? ?1\打開h15\h16\h17\h18,啟動zookeeper,再啟動hadoop集群:start-all.sh,再啟動mysql
* ? ?2\在h15上創建文件夾wordcount_checkpoint,用于docheckpoint
* ? ? ? 在h5上mysql的dg數據庫中創建表t_word
* ? ?3\啟動eclipse的本程序,讓他等待著
* ? ?4\在h15的dos窗口下輸入單詞,以空格分隔的單詞(需要在h15上開啟端口9999:#nc -lk 9999)
* ? ?5\查詢h15上的mysql的dg數據庫的t_word表是否有數據即可
*
* 注:建表語句
* ? ? mysql> show create table wordcount; ?//查看表語句
CREATE TABLE ? t_word (
id ?int(11) NOT NULL AUTO_INCREMENT,
updated_time ?timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
word varchar(255) DEFAULT NULL,
count ?int(11) DEFAULT NULL,
PRIMARY KEY (id)
);
*/
*
* 測試結果:通過,注意-----》第74行沒有取得數據,原因在最后沒有觸發事件(封裝事件),目前已經解決
*
* sh spark-submit --master spark://de2:7077 --class 全類名 --driver-class-path /mysql-connector-java-5.1.26.jar ?sparkstreaming.jar
sh spark-submit --class com.day6.scala.my.PresistMysqlWordCount --master yarn-cluster --driver-class-path /home/spark-1.5.1-bin-hadoop2.4/lib/mysql-connector-
java-5.1.31-bin.jar /home/spark-1.5.1-bin-hadoop2.4/sparkstreaming.jar
$bin/hadoop dfsadmin -safemode leave
也就是關閉Hadoop的安全模式,這樣問題就解決了。
*/
object PresistMysqlWordCount {
def main(args: Array[String]): Unit = {
//獲取streamingContext,并且設置每5秒切割一次rdd
// ? ?val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))
val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))
//設置checkpoit緩存策略
/**
* 利用 checkpoint 來保留上一個窗口的狀態,
* 這樣可以做到移動窗口的更新統計
*/
sc.checkpoint("hdfs://hh15:8020/wordcount_checkpoint")
// ? ?sc.checkpoint("hdfs://h15:8020/wordcount_checkpoint")
//獲取doc窗口或者hdfs上的words
// ? ?val lines=sc.textFileStream("hdfs://h15:8020/文件夾名稱") ?//實時監控hdfs文件夾下新增的數據
val lines = sc.socketTextStream("hh15", 9999)
// ? ?val lines = sc.socketTextStream("h15", 9999)
//壓扁
val words = lines.flatMap { x => x.split(" ") }
//map
val paris = words.map { (_, 1) }
//定義一個函數,用于保持狀態
val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
var newValue = prevValueState.getOrElse(0)
for (value wd.foreachPartition(
data => {
val conn = ConnectPool.getConn("root", "1714004716", "hh15", "dg")
// ? ? ? ?val conn = ConnectPool.getConn("root", "1714004716", "h15", "dg")
//插入數據
// ? ? ? ?conn.prepareStatement("insert into t_word2(word,num) values('tom',23)").executeUpdate()
try {
for (row