1.概述
Python API中文文檔
本文介紹在阿里云實時計算flink中使用python作業,把oss中的數據同步數據到阿里云selectdb的過程。python簡單的語法特性更適合flink作業的開發;
先說結論:
在實際開發中遇到了很多問題,導致python作業基本基本無法運行。最后放棄了;
- python作業中的標量函數的錯誤沒有日志,永遠是報這個錯誤:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具體問題;
- python作業中的用戶定義的標量函數基本無法運行。本地測試沒有問題的函數,提交到flink中就報錯。懷疑是環境中沒有flink-python.jar,自己上傳此jar和flink中的包不兼容(阿里云flink和開源版本flink有些jar包不一樣);
- 如果各位遇到些問題并且有解決方案,麻煩也告知我,非常感謝;
2.目標
把阿里云sls日志中的數據準實時同步到云服務selectdb;
源表 | flink | 結果表 |
---|---|---|
阿里云sls | 實時計算flink | 云服務selectdb |
3.步驟
3.1.搭建環境
#**創建虛擬環境essa-flink,pyhton版本為3.11.9
conda create -n essa-flink python=3.11.9#**安裝apache-flink-1.20版本。安裝的依賴比較大,指定國內的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
3.2.創建作業
作業代碼本身很簡單,逐行讀取sls的日志,進行轉換后保存到selectdb中。轉換函數為do_active_log,在本地測試過程中遇到了第一個問題后,很輕松愉快就通過了。部署在flink中出現了其它問題;
- 首先是阿里云提供sls連接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,報錯缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源碼,確實沒有定義此類。提工單后,建設使用低版本解決;
- 然后報錯缺少flink-python,不能執行python函數。于是把flink-python上傳,并在作業中引用依賴;
- 最后報錯ExceptionInChainedOperatorException: Could not forward element to next,無法執行。把作業中函數調用do_active_log刪除后正常。提工單后還是沒有解決。最后放棄,改用jar作業;
def do_active_log(row: Row) -> Row:'''用戶登錄日志處理'''logging.info('執行do_active_log函數...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''創建用戶登錄日志結果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''創建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加載依賴的jar包t_env.get_config().set("pipeline.jars", "依賴包.jar")#**創建sls源ds = get_soruce_datastream(t_env)#**用戶登錄日志處理#**讀取sls日志數據,然后使用自定義標量函數處理數據ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()