函數式Function是云計算里最近幾年流行起來的新的架構和模式,因為它不依賴云主機,非常輕量,按需使用,甚至是免費使用,特別適合哪種數據同步,數據轉發,本身不需要保存數據的業務場景,或者通過一個輸入轉換或者變化,處理以后流轉到下一個輸出。
我們剛好有這樣的業務,云云對接,把一個儲能云的數據,經過接口查詢以后,轉換后保存到我們的私有云平臺上(內部是微軟提供的IoT平臺底座)。
但是在實際開發中發現了有兩個痛點,
?觸發方式,云廠商一般會提供2個方式,
1.基于事件的調用,比如Python語言的固定式的函數,比如
def handler(event, context)
,函數名和內存占用,網絡設置等配置,這里有一個重要的設置就是觸發器,
一般最常用的是定時觸發器:
當你編寫好以后,在云平臺上直接點擊運行測試,云平臺會調用你的這個handler,你也可以通過參數event, context 獲取運行時的一些數據。如果是簡單任務,只需要在這里調用你的業務代碼就開發完成。
一切都很自然,這里的定時器,相當于linux里的cronjob, 最后運行的python ss.py 這樣的方式會導致每次啟動都會把代碼重新執行一遍,但是如果你的業務要求嚴格的話,希望這個程序運行了以后一直不退出,在程序內部運行Schedule,這樣的好處會節省調用方的資源,比如代碼需要訪問MQ,每隔3分鐘程序重新執行的時候,會頻繁得創建TCP連接。會給MQ broker的連接創造一定的壓力,我剛好遇到了對方投訴我們占用了他們的連接資源。所以我需要改造程序,讓程序一直不退出的方式,復用創建好的連接。一共經歷了3次優化
- 修改代碼,創建一個全局的Connection對象,然后在handler里直接寫while True的方式執行,發現云平臺給的handler函數,有個限制,如果超時沒有返回,平臺強制終止程序,可以在日志里查看到,程序超時被終止。這條路走不通
- 不改架構使用異步,handler里啟動一個線程,在線程里while True,后來發現執行一段時間后,效果并不好。
通過Schedule包,順便說一下,Python竟然沒有一個特別強大的定時器,非常不好用,想這樣, import schedule schedule.every(3).minutes.do(job1)需要設置好了以后,還需要 if __name __ =="__main__": ????????schedule.run_pending()為了防止程序退出還有在主線程里寫上while True: time.sleep(5) ? 或者類似這樣的代碼
import time
import threadingdef background_task():print("后臺任務開始")time.sleep(5) # 模擬長時間任務print("后臺任務完成")def handler(event, context):print("收到請求")def delayed_run():time.sleep(0.1)background_task()thread = threading.Thread(target=delayed_run)thread.start()# 主線程故意睡一會兒,防止函數提前退出time.sleep(6)return {"statusCode": 200, "body": "已嘗試啟動后臺任務"}
這樣惡心的代碼,非常的不優雅。
3. 改動架構,使用徹底異步
- 使用redis的發布訂閱模式(或者其他MQ),把啟動和執行分開。在執行同步的代碼里,handler里把連接建立好,就等待redis的消息通知,代碼如下:
- def handler(event, context):
- ? ? localtime = time.asctime(time.localtime(time.time()))
- ? ? print(f"execution at {localtime}.")
- ? ? startConn()
- ? ? start_redis_listener()
- ? ? print("??",client)
- ? ? return {"statusCode": 200, "isBase64Encoded": False, "headers": {"Content-Type": "application/json;charset=UTF-8"},
- ? ? ? ? ? ? "body": "sucess"}
這樣非常優雅,如下:- def sync_data():
- ? ? print("開始執行耗時的數據同步任務...") ? ?
- ? ? print(client) ?
- ? ? try:
- ? ? ? ? while True:
- ? ? ? ? ? ? time.sleep(5)
- ? ? ? ? ? ? print("開始處理站點列表")
- ? ? ? ? ? ? for conf in configLs:
- ? ? ? ? ? ? ? ? print(f"開始處理站點: {conf.station_name} 數據")
- ? ? ? ? ? ? ? ? site = AccessSite(conf)
- ? ? ? ? ? ? ? ? site.publish_mqtt()
- ? ? ? ? ? ? ? ? # msg = msg + f"{conf.station_name}"
- ? ? ? ? ? ? ? ? print("成功處理站點數據至能源平臺!")
- ? ? except Exception as e:
- ? ? ? ? print("同步出錯:", str(e))
- # Redis 訂閱客戶端
- def start_redis_listener():
- ? ?
- ? ? pubsub = redisClient.pubsub()
- ? ? pubsub.subscribe(["stored_energy_sync_channel"])
- ? ? print("等待 Redis 消息...")
- ? ? for message in pubsub.listen():
- ? ? ? ? if message['type'] == 'message':
- ? ? ? ? ? ? print(f"收到消息: {message['data'].decode()}")
- ? ? ? ? ? ? if message['data'].decode() == "start_sync":
- ? ? ? ? ? ? ? ? sync_data()
至于程序何時啟動,只需要在另外一個FG里,通過調用redis的pub接口,把這個sync_data()給啟動。
非常的完美,容易維護。而第二個版本,使用線程和使用定時器都無法取得滿意的效果。
注意這里的redis還可以替換成Kafka, RocketMQ,或者其他通知SMN, SQS。
2. 還有一個方式是基于http或者https的方式啟動任務,方式大同小異,都是一次執行完成以后,需要再次觸發才能繼續執行。如下圖: