3.阿里云flinkselectdb-py作業

1.概述

Python API中文文檔
本文介紹在阿里云實時計算flink中使用python作業,把oss中的數據同步數據到阿里云selectdb的過程。python簡單的語法特性更適合flink作業的開發;
先說結論:
在實際開發中遇到了很多問題,導致python作業基本基本無法運行。最后放棄了;

  • python作業中的標量函數的錯誤沒有日志,永遠是報這個錯誤:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具體問題;
  • python作業中的用戶定義的標量函數基本無法運行。本地測試沒有問題的函數,提交到flink中就報錯。懷疑是環境中沒有flink-python.jar,自己上傳此jar和flink中的包不兼容(阿里云flink和開源版本flink有些jar包不一樣);
  • 如果各位遇到些問題并且有解決方案,麻煩也告知我,非常感謝;

2.目標

把阿里云sls日志中的數據準實時同步到云服務selectdb;

源表flink結果表
阿里云sls實時計算flink云服務selectdb

3.步驟

3.1.搭建環境

#**創建虛擬環境essa-flink,pyhton版本為3.11.9
conda create -n essa-flink python=3.11.9#**安裝apache-flink-1.20版本。安裝的依賴比較大,指定國內的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

3.2.創建作業

作業代碼本身很簡單,逐行讀取sls的日志,進行轉換后保存到selectdb中。轉換函數為do_active_log,在本地測試過程中遇到了第一個問題后,很輕松愉快就通過了。部署在flink中出現了其它問題;

  • 首先是阿里云提供sls連接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,報錯缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源碼,確實沒有定義此類。提工單后,建設使用低版本解決;
  • 然后報錯缺少flink-python,不能執行python函數。于是把flink-python上傳,并在作業中引用依賴;
  • 最后報錯ExceptionInChainedOperatorException: Could not forward element to next,無法執行。把作業中函數調用do_active_log刪除后正常。提工單后還是沒有解決。最后放棄,改用jar作業;
def do_active_log(row: Row) -> Row:'''用戶登錄日志處理'''logging.info('執行do_active_log函數...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''創建用戶登錄日志結果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''創建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加載依賴的jar包t_env.get_config().set("pipeline.jars", "依賴包.jar")#**創建sls源ds = get_soruce_datastream(t_env)#**用戶登錄日志處理#**讀取sls日志數據,然后使用自定義標量函數處理數據ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/64198.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/64198.shtml
英文地址,請注明出處:http://en.pswp.cn/web/64198.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

互聯網視頻云平臺EasyDSS無人機推流直播技術如何助力野生動植物保護工作?

在當今社會,隨著科技的飛速發展,無人機技術已經廣泛應用于各個領域,為我們的生活帶來了諸多便利。而在動植物保護工作中,無人機的應用更是為這一領域注入了新的活力。EasyDSS,作為一款集視頻處理、分發、存儲于一體的綜…

51c視覺~YOLO~合集8

我自己的原文哦~ https://blog.51cto.com/whaosoft/12897680 1、Yolo9 1.1、YOLOv9SAM實現動態目標檢測和分割 主要介紹基于YOLOv9SAM實現動態目標檢測和分割 背景介紹 在本文中,我們使用YOLOv9SAM在RF100 Construction-Safety-2 數據集上實現自定義對象檢測模…

Docker Container 可觀測性最佳實踐

Docker Container 介紹 Docker Container( Docker 容器)是一種輕量級、可移植的、自給自足的軟件運行環境,它在 Docker 引擎的宿主機上運行。容器在許多方面類似于虛擬機,但它們更輕量,因為它們不需要模擬整個操作系統…

氣相色譜-質譜聯用分析方法中的常用部件,分流平板更換

分流平板,是氣相色譜-質譜聯用分析方法中的一個常用部件,它可以實現氣相色譜柱流與MS檢測器流的分離和分流。常見的氣質聯用儀分流平板有很多種,如單層T型分流平板、雙層T型分流平板、螺旋分流平板等等。 操作視頻http://www.spcctech.com/v…

易基因: BS+ChIP-seq揭示DNA甲基化調控非編碼RNA(VIM-AS1)抑制腫瘤侵襲性|Exp Mol Med

大家好,這里是專注表觀組學十余年,領跑多組學科研服務的易基因。 肝細胞癌(hepatocellular carcinoma,HCC)早期復發仍然是一個具有挑戰性的領域,其中涉及的機制尚未完全被理解。盡管微血管侵犯&#xff08…

鴻蒙系統文件管理基礎服務的設計背景和設計目標

有一定經驗的開發者通常對文件管理相關的api應用或者底層邏輯都比較熟悉,但是關于文件管理服務的設計背景和設計目標可能了解得不那么清楚,本文旨在分享文件管理服務的設計背景及目標,方便廣大開發者更好地理解鴻蒙系統文件管理服務。 1 鴻蒙…

如何配置 Java 環境變量:設置 JAVA_HOME 和 PATH

目錄 一、什么是 Java 環境變量? 二、配置 Java 環境變量 1. 下載并安裝 JDK 2. 配置 JAVA_HOME Windows 系統 Linux / macOS 系統 3. 配置 PATH Windows 系統 Linux / macOS 系統 4. 驗證配置 三、常見問題與解決方案 1. 無法識別 java 或 javac 命令 …

Doris 數據庫外部表-JDBC 外表,Oracle to Doris

簡介 提供了 Doris 通過數據庫訪問的標準接口 (JDBC) 來訪問外部表,外部表省去了繁瑣的數據導入工作,讓 Doris 可以具有了訪問各式數據庫的能力,并借助 Doris 本身的 OLAP 的能力來解決外部表的數據分析問題: 支持各種數據源接入…

分布式 IO 模塊助力沖壓機械臂產線實現智能控制

在當今制造業蓬勃發展的浪潮中,沖壓機械臂產線的智能化控制已然成為提升生產效率、保障產品質量以及增強企業競爭力的關鍵所在。而分布式 IO 模塊的應用,正如同為這條產線注入了一股強大的智能動力,開啟了全新的高效生產篇章。 傳統挑戰 沖壓…

CSS系列(37)-- Overscroll Behavior詳解

前端技術探索系列:CSS Overscroll Behavior詳解 📱 致讀者:探索滾動交互的藝術 👋 前端開發者們, 今天我們將深入探討 CSS Overscroll Behavior,這個強大的滾動行為控制特性。 基礎概念 🚀 …

深度學習中的并行策略概述:4 Tensor Parallelism

深度學習中的并行策略概述:4 Tensor Parallelism 使用 PyTorch 實現 Tensor Parallelism 。首先定義了一個簡單的模型 SimpleModel,它包含兩個全連接層。然后,本文使用 torch.distributed.device_mesh 初始化了一個設備網格,這代…

企業銷售人員培訓系統|Java|SSM|VUE| 前后端分離

【技術棧】 1??:架構: B/S、MVC 2??:系統環境:Windowsh/Mac 3??:開發環境:IDEA、JDK1.8、Maven、Mysql5.7 4??:技術棧:Java、Mysql、SSM、Mybatis-Plus、VUE、jquery,html 5??數據庫…

vue 本地自測iframe通訊

使用 postMessage API 來實現跨窗口(跨域)的消息傳遞。postMessage 允許你安全地發送消息到其他窗口,包括嵌套的 iframe,而不需要擔心同源策略的問題。 發送消息(父應用) 1. 父應用:發送消息給…

Linux:code:network:devinet_sysctl_forward;IN_DEV_FORWARD

文章目錄 簡介sysctl 設置使用,arp_process間接使用IN_DEV_RX_REDIRECTSdev_disable_lro簡介 最近在看Linux里的forwarding的功能。順便在這里總結一下。有些詳細代碼邏輯,如果可以記錄一下,會好一點。 sysctl 設置 這個函數在查看的時候需要注意的問題:變量名起的有點簡…

自然語言處理與知識圖譜的融合與應用

目錄 前言1. 知識圖譜與自然語言處理的關系1.1 知識圖譜的定義與特點1.2 自然語言處理的核心任務1.3 二者的互補性 2. NLP在知識圖譜構建中的應用2.1 信息抽取2.1.1 實體識別2.1.2 關系抽取2.1.3 屬性抽取 2.2 知識融合2.3 知識推理 3. NLP與知識圖譜融合的實際應用3.1 智能問答…

PHP 數組

PHP 數組 PHP 是一種流行的服務器端編程語言,它提供了強大的數組處理能力。PHP 數組是一種數據結構,用于存儲相同類型或不同類型的多個值。在 PHP 中,數組可以分為一維數組、二維數組和多維數組。本文將詳細介紹 PHP 數組的各種操作&#xf…

CSS(三)盒子模型

目錄 Content Padding Border Margin 盒子模型計算方式 使用 box-sizing 屬性控制盒子模型的計算 所有的HTML元素都可以看作像下圖這樣一個矩形盒子: 這個模型包括了四個區域:content(內容區域)、padding(內邊距…

基于NodeMCU的物聯網窗簾控制系統設計

最終效果 基于NodeMCU的物聯網窗簾控制系統設計 項目介紹 該項目是“物聯網實驗室監測控制系統設計(仿智能家居)”項目中的“家電控制設計”中的“窗簾控制”子項目,最前者還包括“物聯網設計”、“環境監測設計”、“門禁系統設計計”和“小…

有沒有免費提取音頻的軟件?音頻編輯軟件介紹!

出于工作和生活娛樂等原因,有時候我們需要把音頻單獨提取出來(比如歌曲伴奏、人聲清唱等、樂器獨奏等)。要提取音頻必須借助音頻處理軟件,那么有沒有免費提取音頻的軟件呢?下面我們將為大家介紹幾款免費軟件&#xff0…

WPF自定義窗口 輸入驗證不生效

WPF自定義窗口 輸入驗證不生效 WPF ValidationRule 不生效 WPF ValidationRule 不生效 解決方案&#xff1a;在WindowStyle的Template中添加AdornerDecorator標簽。 <Style x:Key"WindowStyle1" TargetType"{x:Type Window}"><Setter Property&…