Spark和Spring整合處理離線數據

如果你比較熟悉JavaWeb應用開發,那么對Spring框架一定不陌生,并且JavaWeb通常是基于SSM搭起的架構,主要用Java語言開發。但是開發Spark程序,Scala語言往往必不可少。

眾所周知,Scala如同Java一樣,都是運行在JVM上的,所以它具有很多Java語言的特性,同時作為函數式編程語言,又具有自己獨特的特性,實際應用中除了要結合業務場景,還要對Scala語言的特性有深入了解。

如果想像使用Java語言一樣,使用Scala來利用Spring框架特性、并結合Spark來處理離線數據,應該怎么做呢?

本篇文章,通過詳細的示例代碼,介紹上述場景的具體實現,大家如果有類似需求,可以根據實際情況做調整。

1.定義一個程序啟動入口

object Bootstrap {private val log = LoggerFactory.getLogger(Bootstrap.getClass)//指定配置文件如log4j的路徑val ConfFileName = "conf"val ConfigurePath = new File("").getAbsolutePath.substring(0, if (new File("").getAbsolutePath.lastIndexOf("lib") == -1) 0else new File("").getAbsolutePath.lastIndexOf("lib")) + this.ConfFileName + File.separator//存放實現了StatsTask的離線程序處理的類private val TASK_MAP = Map("WordCount" -> classOf[WordCount])def main(args: Array[String]): Unit = {//傳入一些參數,比如要運行的離線處理程序類名、處理哪些時間的數據if (args.length < 1) {log.warn("args 參數異常!!!" + args.toBuffer)System.exit(1)}init(args)}def init(args: Array[String]) {try {SpringUtils.init(Array[String]("applicationContext.xml"))initLog4j()val className = args(0)// 實例化離線處理類val task = SpringUtils.getBean(TASK_MAP(className))args.length match {case 3 =>// 處理一段時間的每天離線數據val dtStart = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1))val dtEnd = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(2))val days = Days.daysBetween(dtStart, dtEnd).getDays + 1for (i <- 0 until days) {val etime = dtStart.plusDays(i).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功處理: $etime 的數據")}case 2 =>// 處理指定的某天離線數據val etime = DateTimeFormat.forPattern("yyyy-MM-dd").parseDateTime(args(1)).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功處理: $etime 的數據")case 1 =>// 處理前一天離線數據val etime = DateTime.now().minusDays(1).toString("yyyy-MM-dd")task.runTask(etime)log.info(s"JOB --> $className 已成功處理: $etime 的數據")case _ => println("執行失敗 args參數:" + args.toBuffer)}} catch {case e: Exception =>println("執行失敗 args參數:" + args.toBuffer)e.printStackTrace()}// 初始化log4jdef initLog4j() {val fileName = ConfigurePath + "log4j.properties"if (new File(fileName).exists) {PropertyConfigurator.configure(fileName)log.info("日志log4j已經啟動")}}}
}

2.加載Spring配置文件工具類

object SpringUtils {private var context: ClassPathXmlApplicationContext = _def getBean(name: String): Any = context.getBean(name)def getBean[T](name: String, classObj: Class[T]): T = context.getBean(name, classObj)def getBean[T](_class: Class[T]): T = context.getBean(_class)def init(springXml: Array[String]): Unit = {if (springXml == null || springXml.isEmpty) {trythrow new Exception("springXml 不可為空")catch {case e: Exception => e.printStackTrace()}}context = new ClassPathXmlApplicationContext(springXml(0))context.start()}}

3.Spring配置文件applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:context="http://www.springframework.org/schema/context"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsdhttp://www.springframework.org/schema/context  http://www.springframework.org/schema/context/spring-context-4.0.xsd"><!-- 配置包掃描 --><context:component-scan base-package="com.bigdata.stats"/></beans>

4.定義一個trait,作為離線程序的公共"父類"

trait StatsTask extends Serializable {//"子類"繼承StatsTask重寫該方法實現自己的業務處理邏輯 def runTask(etime: String)
}
5.繼承StatsTask的離線處理類
//不要忘記添加 @Component ,否則無法利用Spring對WordCount進行實例化
@Component
class WordCount extends StatsTask {override def runTask(etime: String): Unit = {val sparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()import sparkSession.implicits._val words = sparkSession.read.textFile("/Users/BigData/Documents/data/wordcount.txt").flatMap(_.split(" ")).toDF("word")words.createOrReplaceTempView("wordcount")val df = sparkSession.sql("select word, count(*) count from wordcount group by word")df.show()}
}

更多干貨搶先看: 世界格局的演變:一場“熱鬧非凡”的歷史大戲

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/95571.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/95571.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/95571.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

智能高效內存分配器測試報告

一、項目背景 這個項目是為了學習和實現一個高性能、特別是高并發場景下的內存分配器。這個項目是基于谷歌開源項目tcmalloc(Thread-Caching Malloc)實現的。tcmalloc 的核心目標就是替代系統默認的 malloc/free&#xff0c;在多線程環境下提供更高效的內存管理。C/C的malloc雖…

吱吱企業通訊軟件以安全為核心,構建高效溝通與協作一體化平臺

隨著即時通訊工具日益普及&#xff0c;企業面臨一個嚴峻的挑戰&#xff1a;如何在保障通訊數據安全的前提下&#xff0c;提升辦公效率&#xff1f;為解決此問題&#xff0c;吱吱企業通訊軟件誕生&#xff0c;通過私有化部署和深度集成的辦公系統&#xff0c;為企業打造一個既可…

校企合作| 長春大學旅游學院副董事長張海濤率隊到訪卓翼智能,共繪無人機技術賦能“AI+文旅”發展新藍圖

為積極響應國務院《關于深入實施“人工智能”行動的意見》&#xff08;國發〔2025〕11號&#xff09;號召&#xff0c;扎實推進學校“旅游”與“人工智能”雙輪驅動的學科發展戰略&#xff0c;加快無人機技術在文旅領域的創新應用&#xff0c;近日長春大學旅游學院副董事長張海…

為什么要用 MarkItDown?以及如何使用它

在處理大量文檔時&#xff0c;尤其是在構建知識庫、進行文檔分析或訓練大語言模型&#xff08;LLM&#xff09;時&#xff0c;將各種格式的文件&#xff08;如 PDF、Word、Excel、PPT、HTML 等&#xff09;轉換為統一的 Markdown 格式&#xff0c;能夠顯著提高處理效率和兼容性…

LVGL9.3 vscode 模擬環境搭建

1、git 克隆&#xff1a; git clone -b release/v9.3 https://github.com/lvgl/lv_port_pc_vscode.git 2、cmake 和 mingw 環境搭建 cmake&#xff1a; https://blog.csdn.net/qq_51355375/article/details/139186681?spm1011.2415.3001.5331 mingw&#xff1a; https://bl…

投影矩陣:計算機圖形學中的三維到二維轉換

投影矩陣是計算機圖形學中的核心概念之一&#xff0c;它負責將三維場景中的幾何數據投影到二維屏幕上&#xff0c;從而實現三維到二維的轉換。無論是游戲開發、虛擬現實&#xff0c;還是3D建模&#xff0c;投影矩陣都扮演著不可或缺的角色。本文將深入探討投影矩陣的基本原理、…

10.2 工程學中的矩陣(2)

十、例題 【例3】求由彈簧連接的 100100100 個質點的位移 u(1),u(2),...,u(100)u(1),u(2),...,u(100)u(1),u(2),...,u(100), 彈性系數均為 c1c 1c1, 每個質點受到的外力均為 f(i)0.01f(i)0.01f(i)0.01. 畫出兩端固定和固定-自由這兩種情形 u 的圖形。 解&#xff1a; % 參數設…

Mysql主從復制之延時同步

1.延時同步概念通過人為配置從庫和主庫延時N小時可以實現延時同步&#xff0c;延時同步可以解決數據庫故障出現的數據丟失問題(物理損壞如直接使用rm刪除數據庫數據和邏輯損壞如使用drop命令刪除數據庫)2.延時同步實操2.1先配置從庫延時同步&#xff0c;并且設置sql線程300秒后…

【QT特性技術講解】QPrinter、QPdf

前言 QT對打印和PDF應用場景&#xff0c;做了簡單的封裝&#xff0c;復雜的功能還是得用第三方庫&#xff0c;打印功能簡單的文本可以不用PDF&#xff0c;涉及圖形的基本都要用到PDF。 Linux打印 隨著國產信創項目替換基于Linux的桌面系統國產信創系統&#xff0c;Linux桌面系…

【大數據技術實戰】Flink+DS+Dinky 自動化構建數倉平臺

一、背景&#xff1a;企業數倉建設的現狀與挑戰在數字化轉型進入深水區的今天&#xff0c;數據已成為企業核心生產要素&#xff0c;而實時數倉作為 “數據驅動決策” 的關鍵載體&#xff0c;其建設水平直接決定企業在市場競爭中的響應速度與決策精度。根據 IDC《2024 年全球大數…

Python開篇:撬動未來的萬能鑰匙 —— 從入門到架構的全鏈路指南

Python&#xff1a;撬動未來的萬能鑰匙——從入門到架構的全鏈路指南 在技術的星空中&#xff0c;Python 是那顆永不隕落的超新星——它用簡潔的語法點燃創造之火&#xff0c;以龐大的生態鋪就革新之路。無論你身處哪個領域&#xff0c;這把鑰匙正在打開下一個時代的大門。2024…

【QT隨筆】事件過濾器(installEventFilter 和 eventFilter 的組合)之生命周期管理詳解

【QT隨筆】事件過濾器(installEventFilter 和 eventFilter 的組合)之生命周期管理詳解 上一章節中提到事件過濾器(Event Filter),用于處理特定事件。其中第二小節中提到了事件過濾器生命周期管理。本文將詳細解析事件過濾器生命周期管理這一部分的內容。 (關注不迷路哈!…

關于linux軟件編程12——網絡編程3

一、單循環服務器 特點:1.可以處理多個客戶端 (不能同時)2.效率不高//單循環服務器: socket bind listen while (1) {connfd accept();//通信 }特點:簡單 可以處理多客戶端 不能同時 二、并發服務器 --- 同時可以處理多個客戶端1、設置一個選項(開啟一個功能) ---讓地址重…

thinkphp6通過workerman使用websocket

安裝workerman依賴 composer require topthink/think-worker composer require topthink/think-worker1.0.* # 指定兼容版本?:ml-citation{ref"1,7" data"citationList"}config配置 config/worker.php <?php return [// 擴展自身需要的配置host …

Rust SQLx 開發指南:利用 Tokio 進行性能優化

在當今高并發的應用開發環境中&#xff0c;數據庫操作往往是性能瓶頸的主要來源之一。SQLx 作為一個純 Rust 編寫的異步 SQL 客戶端庫&#xff0c;通過與 Tokio 運行時深度集成&#xff0c;為開發者提供了處理數據庫 I/O 密集型操作的強大工具。本文將帶您深入了解如何利用這兩…

嵌入式硬件電路分析---AD采集電路

文章目錄摘要AD采集電路1AD采集電路2R77的真正作用是什么&#xff1f;理想與現實&#xff1a;為什么通常可以忽略R77的影響&#xff1f;摘要 AD采集 AD采集電路1 這是個人畫的簡化后的AD采集電路 這是一個AD檢測電路&#xff0c;R1是一個可變電阻&#xff0c;R2是根據R1的常用…

Python爬取nc數據

1、單文件爬取爬取該網站下的crupre.nc數據&#xff0c;如下使用requests庫&#xff0c;然后填寫網站的url&#xff1a;"http://clima-dods.ictp.it/regcm4/CLM45/crudata/"和需要下載的文件名&#xff1a;"crupre.nc"import requests import osdef downlo…

策略模式 + 工廠模式

策略模式&#xff1a;簡單來說解決的行為的封裝與選擇。如HandlerMapping&#xff0c;將 HTTP 請求映射到對應的處理器&#xff08;Controller 或方法&#xff09;。工廠模式&#xff1a;解決的是具有相同屬性的對象創建問題&#xff0c;如BeanFactory創建bean對象。解決的代碼…

Diamond基礎3:在線邏輯分析儀Reveal的使用

文章目錄1. 與ILA的區別2. 使用Reveal步驟3.Reveal注意事項4.傳送門1. 與ILA的區別 Reveal是Lattice Diamond集成開發環境用于在線監測信號的工具&#xff0c;ILA是xilinx的Vivado集成開發工具的在線邏輯分析儀&#xff0c;同Reveal一樣&#xff0c;均可以在項目運行過程中&am…

超適合程序員做知識整理的 AI 網站

這次要給大家分享一個超適合程序員做知識整理的 AI 網站 ——Notion AI&#xff0c;網址是Notion&#xff0c;它能把你隨手記的雜亂筆記、代碼片段、技術文檔&#xff0c;一鍵梳理成邏輯清晰的結構化內容&#xff0c;小索奇我用它整理 “Python 爬蟲知識點” 時&#xff0c;原本…