如果你比較熟悉JavaWeb應用開發,那么對Spring框架一定不陌生,并且JavaWeb通常是基于SSM搭起的架構,主要用Java語言開發。但是開發Spark程序,Scala語言往往必不可少。
眾所周知,Scala如同Java一樣,都是運行在JVM上的,所以它具有很多Java語言的特性,同時作為函數式編程語言,又具有自己獨特的特性,實際應用中除了要結合業務場景,還要對Scala語言的特性有深入了解。
如果想像使用Java語言一樣,使用Scala來利用Spring框架特性、并結合Spark來處理離線數據,應該怎么做呢?
本篇文章,通過詳細的示例代碼,介紹上述場景的具體實現,大家如果有類似需求,可以根據實際情況做調整。
1.定義一個程序啟動入口
object Bootstrap {private val log = LoggerFactory.getLogger(Bootstrap.getClass)//指定配置文件如log4j的路徑val ConfFileName = "conf"val ConfigurePath = new File("").getAbsolutePath.substring(0, if (new File("").getAbsolutePath.lastIndexOf("lib") == -1) 0else new File("").getAbsolutePath.lastIndexOf("lib")) + this.ConfFileName + File.separator//存放實現了StatsTask的離線程序處理的類private val TASK_MAP = Map("WordCount" -> classOf[WordCount])def main(args: Array[String]): Unit = {//傳入一些參數,比如要運行的離線處理程序類名、處理哪些時間的數據if (args.length < 1) {log.warn("args 參數異常!!!" + args.toBuffer)System.exit(1)}init(args)}def init(args: Array[String]) {try {SpringUtils.init(Array[String]("applicationContext.xml"))initLog4j()val className = args(0)// 實例化離線處理類val task = SpringUtils.getBean(TASK_MAP(className))args.length match {case 3 =>// 處理一段時間的每天離線數據val dtStart = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1))val dtEnd = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(2))val days = Days.daysBetween(dtStart, dtEnd).getDays + 1for (i <- 0 until days) {val etime = dtStart.plusDays(i).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功處理: $etime 的數據")}case 2 =>// 處理指定的某天離線數據val etime = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1)).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功處理: $etime 的數據")case 1 =>// 處理前一天離線數據val etime = DateTime.now().minusDays(1).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功處理: $etime 的數據")case _ => println("執行失敗 args參數:" + args.toBuffer)}} catch {case e: Exception =>println("執行失敗 args參數:" + args.toBuffer)e.printStackTrace()}// 初始化log4jdef initLog4j() {val fileName = ConfigurePath + "log4j.properties"if (new File(fileName).exists) {PropertyConfigurator.configure(fileName)log.info("日志log4j已經啟動")}}}
}
2.加載Spring配置文件工具類
object SpringUtils {private var context: ClassPathXmlApplicationContext = _def getBean(name: String): Any = context.getBean(name)def getBean[T](name: String, classObj: Class[T]): T = context.getBean(name, classObj)def getBean[T](_class: Class[T]): T = context.getBean(_class)def init(springXml: Array[String]): Unit = {if (springXml == null || springXml.isEmpty) {trythrow new Exception("springXml 不可為空")catch {case e: Exception => e.printStackTrace()}}context = new ClassPathXmlApplicationContext(springXml(0))context.start()}}
3.Spring配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"><!-- 配置包掃描 --><context:component-scan base-package="com.bigdata.stats"/></beans>
4.定義一個trait,作為離線程序的公共"父類"
trait StatsTask extends Serializable {//"子類"繼承StatsTask重寫該方法實現自己的業務處理邏輯 def runTask(etime: String)
}
5.繼承StatsTask的離線處理類
//不要忘記添加 @Component ,否則無法利用Spring對WordCount進行實例化
@Component
class WordCount extends StatsTask {override def runTask(etime: String): Unit = {val sparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()import sparkSession.implicits._val words = sparkSession.read.textFile("/Users/BigData/Documents/data/wordcount.txt").flatMap(_.split(" ")).toDF("word")words.createOrReplaceTempView("wordcount")val df = sparkSession.sql("select word, count(*) count from wordcount group by word")df.show()}
}
更多干貨搶先看: 世界格局的演變:一場“熱鬧非凡”的歷史大戲