說明
UCS對象是基于GFGoLite進行封裝,且側重于實現UCS規范。
內容
1 函數
我發現pydantic真是一個特別好用的東西,可以確保在數據傳遞時的可靠,以及對某個數據模型的描述。
以下,UCS給出了id、time相關的brick映射,并給出了時間到字符的轉換。其方便之處在于,本地幾乎不需要管怎么實現的,只要有服務支持即可。效率也足夠高。
from typing import List, Optional
from pydantic import BaseModelimport requests as req
class UCS(BaseModel):__version__ =1.1gfgo_lite_server: str = 'http://172.17.0.1:24090/'def get_brick_name(self, some_id = None):some_dict = {}some_dict['rec_id'] = some_idurl = self.gfgo_lite_server + 'get_brick_name/'res = req.post(url, json = some_dict).json()return res def get_brick_name_s(self, some_id_list = None):some_dict = {}some_dict['rec_id_list'] = some_id_listurl = self.gfgo_lite_server + 'get_brick_name_s/'res = req.post(url, json = some_dict).json()return res def gfgo(self, kwargs = None, pack_func = None):url = self.gfgo_lite_server + 'gfgo/'return req.post(url, json = {'kwargs':kwargs,'pack_func':pack_func}).json()def dt_str2num(self, some_dt_str = None):some_dict = {}some_dict['some_dt_str'] = some_dt_strurl = self.gfgo_lite_server + 'str2num/'res = req.post(url, json = some_dict).json()return resdef dt_str2num_s(self, some_dt_str_list = None):some_dict = {}some_dict['some_dt_str_list'] = some_dt_str_listurl = self.gfgo_lite_server + 'str2num_s/'res = req.post(url, json = some_dict).json()return res# 時間名稱def get_time_brick_name(self, dt_str_or_ts = None):some_dict = {}some_dict['dt_str_or_ts'] = dt_str_or_tsurl = self.gfgo_lite_server + 'get_time_brick_name/'res = req.post(url, json = some_dict).json()return res def get_time_brick_name_s(self, dt_str_or_ts_list = None):some_dict = {}some_dict['dt_str_or_ts_list'] = dt_str_or_ts_listurl = self.gfgo_lite_server + 'get_time_brick_name_s/'res = req.post(url, json = some_dict).json()return res # 1.1 >>>def get_brick_list(self, start_brick_name = None, end_brick_name = None):some_dict = {}some_dict['start_brick_name'] = start_brick_namesome_dict['end_brick_name'] = end_brick_nameurl = self.gfgo_lite_server + 'get_brick_list/'res = req.post(url, json = some_dict).json()return res def get_brick_bounds(self, brick_name = None):some_dict = {}some_dict['brick_name'] = brick_nameurl = self.gfgo_lite_server + 'get_brick_bounds/'res = req.post(url, json = some_dict).json()return res # 1.2 >>> 時間# get_time_brick_bounds_s - 待def get_time_brick_bounds_s(self, brick_name_list = None, char_or_num = 'char'):some_dict = {}some_dict['brick_name_list'] = brick_name_listsome_dict['char_or_num'] = char_or_numurl = self.gfgo_lite_server + 'get_time_brick_bounds_s/'res = req.post(url, json = some_dict).json()return res # get_time_brick_listdef get_time_brick_list(self,start_brick_name = None, end_brick_name = None):some_dict = {}some_dict['start_brick_name'] = start_brick_namesome_dict['end_brick_name'] = end_brick_nameurl = self.gfgo_lite_server + 'get_time_brick_list/'res = req.post(url, json = some_dict).json()return res
2 應用
2.1 時間轉換
ucs = UCS(gfgo_lite_server = 'http://xxx:24090/')
ucs.dt_str2num('2024-01-31 11:11:11')
1706670671ucs.dt_str2num_s(['2024-01-31 11:11:11','2024-02-01 11:11:11' ])
[1706670671, 1706757071]
反函數可以用get_time_str1
In [19]: get_time_str1(1706670671)
Out[19]: '2024-01-31 11:11:11'
2.2 UCS-time
一個簡單的應用如下:我需要制定一個靜態數據同步計劃,計劃需要隨著時間軸,按照時間塊熟順序向前推進。具體來說,這個對于數據庫遷移(合并)與回測都有關系。
tbrick1 = ucs.get_time_brick_name('2024-01-31 11:11:11')
'2024.01.31.11'
tbrick2 = ucs.get_time_brick_name('2024-02-10 11:11:11')
'2024.02.10.11'tbrick_list = ucs.get_time_brick_list(start_brick_name = tbrick1, end_brick_name = tbrick2)
['2024.01.31.11', '2024.01.31.12', '2024.01.31.13', ...]ucs.get_time_brick_bounds_s(tbrick_list[:3])
[['2024-01-31 11:00:00', '2024-01-31 12:00:00'],['2024-01-31 12:00:00', '2024-01-31 13:00:00'],['2024-01-31 13:00:00', '2024-01-31 14:00:00']]
btw, 在探索完UCS-time之后,我發現大約因為時間觀念比較深入人心,其實是可以更自由的轉換的,
# 將一般字符串轉為UCS 名稱
def dt_str2ucs_blockname(some_dt_str):some_dt_str1 =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
這些函數也被納入基礎函數,有時候可以起到輔助的作用,特別在使用GFGolite不那么方便的時候。
一個更具體的場景,我覺得是具有通用性的,描述如下:
- 1 數據沒有順序id,但是具備入庫時間(create_time)
- 2 數據入庫時間是不連續的,有一些時間段密集,更多的時間可能空閑,但也并不是一條沒有。
- 3 每次執行時,worker總是能夠有效推進一個brick
具體做法:
p01: 將需要的時間塊按序持久化為本地文件,并將對應的bounds作為字典
p02: 獲取當前時間的時間塊
# 將標準時間字符串轉為brick
def timestr2timeblock(some_time_str = None):x1 = some_time_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(x1.split('.')[:4])the_time_now = get_time_str1()
the_time_brick = timestr2timeblock(the_time_now)
p03:從gb中獲取上一次處理過的最新brick,并鎖定在時間軸的位置。
worker_buffer_space = 'sp_worker.general'
tier1 = 'some_tier'
tier2 = 'ucs_time_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'# ========================== Load
time_brick_list = from_pickle('time_brick_list')
time_bounds_dict = from_pickle('time_bounds_dict')
gb = GlobalBuffer()
watch_brick_pos = time_brick_list.index(the_time_brick)
p04:按照已處理之后,當前觀察時間之前的原則,選出available_time_brick_list
latest_data_brick = gb.getx(prefix +'last_brick_handled')
if latest_data_brick is None:start_pos = 0
else:start_pos = time_brick_list.index(latest_data_brick)available_time_brick_list = time_brick_list[start_pos+1:watch_brick_pos]
p05:按順序遍歷時間塊,直到處理到一個有效的brick才中斷本次循環
# cur_data_brick = available_time_brick_list[0]
for cur_data_brick in available_time_brick_list:print(cur_data_brick)cur_bound = time_bounds_dict[cur_data_brick]print(cur_bound)resp = 【獲取待處理數據】if len(resp):res_df = pd.DataFrame(resp, columns = keep_cols)if ok_to_handle:【將結果分批刪除】listofdict2 = slice_list_by_batch2(xxx.to_dict(orient='records'), 1000)for some_listofdict in listofdict2:【更新已處理塊信息】gb.setx(prefix +'last_brick_handled',cur_data_brick, persist=True)break
通過循環或者定時任務喚起這樣的worker就可以。