說明
為了將GlobalFunc推向正常的應用軌道,構造一個功能簡單的服務。
內容
1 工作模式
Server模式:以API服務方式執行
Worker模式: 以Worker方式執行。通過left/right文件夾和rsync方式執行任務并寫結果。
2 構造方法
重載和執行;從標準鏡像環境啟動服務,通過數據庫更新函數,然后保存為新的鏡像。
2.1 step0 選擇基礎鏡像建立容器gfgo_lite_build
docker run -it --name=gfgo_lite_build myregistry.domain.com:24052/worker.andy.globalfunc:v101 bash
退出,然后重啟
docker restart gfgo_lite_build
由于容器設置為退出不刪除,所以可以分步進行操作,每次執行
docker exec -it gfgo_lite_build bash
2.2 step1 從數據庫獲取數據并生成py
用最基礎的方式構造生成方法,先獲取三個基礎文件
- 1 RedisOrMongo_v100.py :從Redis或者Mongo中獲取數據
- 2 WMongo_V9000_012.py : 提供Mongo的操作, 是RedisOrMongo的依賴對象
- 3 worker_gen_file.py: 只提供兩個功能,將文本生成為文件,以及根絕py生成包的init文件
GFGoLite將會發布為服務,要可以通過接口實現文件的增/刪,所以要將這部分功能進行接口封裝。
為了不讓項目入口的文件看起來太亂,建立init_funcs文件夾,把這些文件放進去。
目前的源文件主要來自本地文件,先進行上傳
gfbase = GFBase(project_path = git_project_path,redis_agent_host = redis_agent_host,tier1 = tier1 )
# 2 掃描所有文件的信息
scan_dict = gfbase._get_scan_files()
# 選擇某個包的文件全部上傳
some_pack_list = [x for x in scan_dict.keys() if x .startswith('Base.')]
for some_file in some_pack_list:gfbase._save_a_file_rom(some_file)
第一次建立基礎的程序包,在入口位置(/workspace/
)執行程序
from init_funcs import create_file,generate_init_py,RedisOrMongo, Naiveimport os
import json
# 聲明空間# 在容器中啟動
redis_cfg = Naive()
redis_cfg.redis_agent_host = 'http://172.17.0.1:24021/'
redis_cfg.redis_connection_hash = None# 聲明當前要創建的程序文件夾:默認為funcs
target_folder = 'GlobalFunc'
tier1 = 'sp_GlobalFunc'
var_space_name = 'Base'
# 分支,一般默認為master
branch_name = 'master'
tier2 = '_'.join([var_space_name, branch_name])
the_space_name = '.'.join([tier1,tier2])target_folder = target_folder + '/' + var_space_name
os.makedirs(target_folder, exist_ok=True)rom = RedisOrMongo(the_space_name, redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')# 這個一般根據需要,或者代碼中得來 --- 需要的列表項
func_list = [ 'from_pickle','to_pickle']
for some_name in func_list:# 獲取 meta,data : data就是代碼字符the_data = rom.getx(some_name)filename = the_data['meta']['name']filedata = the_data['data']create_file(target_folder, filename, filedata)# 生成初始化文件
generate_init_py(target_folder)
此時文件結構為
root@a1f87a061e62:/workspace# tree /workspace/
/workspace/
├── GFGo.py
├── GlobalFunc
│ └── Base
│ ├── from_pickle.py
│ ├── __init__.py
│ └── to_pickle.py
├── init_funcs
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-38.pyc
│ │ ├── RedisOrMongo_v100.cpython-38.pyc
│ │ ├── WMongo_V9000_012.cpython-38.pyc
│ │ └── worker_gen_file.cpython-38.pyc
│ ├── RedisOrMongo_v100.py
│ ├── WMongo_V9000_012.py
│ └── worker_gen_file.py
├── m7.24065.pkl
├── mymeta.pkl
└── __pycache__└── GFGo.cpython-38.pyc
再給GlobalFunc添加一個包的初始化文件(引入Base)
__init__.py
from . import Base
2.3 step2 建立server_funcs.py
接下來建立server_funcs.py
。在這個文件中,保留服務所依賴的基礎函數和配置,非常少。
# 保留,用于提供server.py的引入
# 【引入時處于和GlobalFunc同級別的位置】import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):def default(self, obj):if isinstance(obj, np.integer):return int(obj)elif isinstance(obj, np.floating):return float(obj)elif isinstance(obj, np.ndarray):return obj.tolist()if isinstance(obj, datetime):return obj.__str__()if isinstance(obj, dd.timedelta):return obj.__str__()else:return super(MyEncoder, self).default(obj)# 【創建tornado所需問文件夾】
import os
# 如果路徑不存在則創建
def create_folder_if_notexist(somepath):if not os.path.exists(somepath):os.makedirs(somepath)return True
m_static = os.path.join(os.getcwd(),'m_static')
m_template = os.path.join(os.getcwd(),'m_template')create_folder_if_notexist(m_static)
create_folder_if_notexist(m_template)settings = {
'static_path':m_static,
'template_path':m_template
}
還有一些函數,需要依賴較大的離線文件,因此這部分函數也放在這個文件里,在服務啟動時一次性導入。
# 【創建時間軸】
from GlobalFunc import Base base_year=1970
next_years=100
time_zone=8time_axis_name = 'base_year{0}_next{1}_tz{2}'.format(base_year,next_years,time_zone)
if Base.is_file_exists(filename ='%s.pkl' % time_axis_name):print('時間軸文件 %s 已存在' % time_axis_name)
else:yymon_dict = Base.gen_time_axis(base_year=base_year, next_years=next_years,time_zone=time_zone)Base.to_pickle(data = yymon_dict, file_name=time_axis_name)yymon_dict = Base.from_pickle(time_axis_name)
at2 = Base.ATimer2(yymon_dict = yymon_dict)
# 樣例字符
some_dt_str = '2024-01-31 11:11:11'
# 字符轉數值
at2.char2num(some_dt_str)
# 數值轉字符
at2.num2char(the_ts=1706670671)
# 計算字符的時間差:可同比與datetime的delta(3.37 微秒)
at2.c_period_btw('2024-01-31 11:11:11', '2024-02-01 11:11:11')時間軸文件 base_year1970_next100_tz8 已存在
Out[1]: 86400
這部分知識為了引入時間軸對象,通過偏移/查找的方式快速進行時間的轉換和偏移計算。
3 服務
使用tornado建立服務,可以使用web進行簡單的應用。
3.1 設計
GlobalFunc本身有兩種類型的操作:
- 1 開發。
- 2 應用。
目前使用GFBase對象進行文件的操作,使用GFGo對象進行調用操作。
因為有些細節還沒有想清楚,但是又不能停下來,所以才會想創建GFGoLite:急用先行,邊做邊完善。到積累成熟時再創建完整版的GlobalFunc服務。
第一步,我想僅僅使用GFGo的內容。這種模式下,在用戶側【client use】使用API調用已經存在于數據庫中的函數;使用代碼編輯的方式在本地進行開發【server produce】,然后將文件覆蓋到數據庫。
3.2 GFGo
GFGo.py
提供了位置參數調用以及關鍵字調用兩種方式,需要注意的是,未來將規范所有的函數均采用關鍵字參數。
另外對象還提供了一個包的重載方法,主要對應在運行時函數包可能發生增改。
import subprocess
import importlib.utildef reload_package(package_name):try:# 檢查包是否已經被導入spec = importlib.util.find_spec(package_name)if spec is None:# 如果包未導入,先嘗試導入它print(f"警告:未找到包 '{package_name}',嘗試導入...")exec(f"import {package_name}")# 構造重新加載模塊的命令reload_command = f'python -c "import importlib; import {package_name}; importlib.reload({package_name}); print(\'成功重新加載模塊: {package_name}\')"'# 使用 subprocess 調用命令subprocess.run(reload_command, shell=True)except subprocess.CalledProcessError as e:print("重新加載模塊失敗:", e)# 用于GlobalFunc參數化獲取函數 func_name like
def get_func_by_para(some_func_pack, full_func_name):pack,funcname = full_func_name.split('.')the_pack = getattr(some_func_pack,pack)the_func = getattr(the_pack, funcname)return the_func# 在包的父目錄執行,把GlobalFunc當成總包 | 在編輯的時候,我們會沉入到內部操作,在應用時則要掛在最外層
# import GlobalFunc as funcs
# the_func = get_func_by_para(funcs, 'Base.to_pickle')class GFGo:def __init__(self):pass # 執行 pack_func ~ Base.to_pickle@staticmethoddef _exe_func_args(global_func = None, pack_func = None, args = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(*args)@staticmethoddef _exe_func_kwargs(global_func = None, pack_func = None, kwargs = None):exe_func = get_func_by_para(global_func, pack_func)return exe_func(**kwargs)# 重載包@staticmethoddef _reload_package(package_name):reload_package(package_name)
3.3 server.py
在這里會導入所有必須的函數、對象提供服務。
目前假設,該服務唯一會導致的數據變化是函數的增改,所以容器的提交僅僅意味著函數變多了。
from server_funcs import *
from GFGo import GFGo
import GlobalFunc as funcs import tornado.httpserver # http服務器
import tornado.ioloop # ?
import tornado.options # 指定服務端口和路徑解析
import tornado.web # web模塊
from tornado.options import define, options
import os.path # 獲取和生成template文件路徑# 增加GF的創建實例
gfgo = GFGo()app_list = []IndexHandler_path = r'/'
class IndexHandler(tornado.web.RequestHandler):def get(self):self.write('【GET】This is Website for Internal API System')self.write('Please Refer to API document')print('Get got a request test')# print(buffer_dict)def post(self):request_body = self.request.bodyprint('Trying Decode Json')some_dict = json.loads(request_body)print(some_dict)msg_dict = {}msg_dict['info'] = '【POST】This is Website for Internal API System'msg_dict['input_dict'] = some_dictself.write(json.dumps(msg_dict))print('Post got a request test')
IndexHandler_tuple = (IndexHandler_path,IndexHandler)
app_list.append(IndexHandler_tuple)GFGoHandler_path = r'/gfgo/'
class GFGoHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)kwargs = some_dict['kwargs']pack_func = some_dict['pack_func']res = gfgo._exe_func_kwargs(global_func =funcs,pack_func= pack_func, kwargs = kwargs)self.write(json.dumps(res))
gfgo_tuple = (GFGoHandler_path,GFGoHandler)
app_list.append(gfgo_tuple)# 時間軸服務作為基礎的服務
'''
1 start_dt 開始時間
2 end_dt 結束時間(選填),默認調用當前
3 time_unit 時間單位,默認秒
4 bias_hours 偏移時間(選填),默認采取東八區
'''# 單元素調用
TimeAxisHandler_path = r'/time_gap/'
class TimeAxisHandler(tornado.web.RequestHandler):def post(self):request_body = self.request.bodysome_dict = json.loads(request_body)start_dt = some_dict['start_dt']bias_hours = some_dict.get('bias_hours') or 8end_dt = some_dict.get('end_dt') or Base.get_time_str1(bias_hours = bias_hours)time_unit = some_dict['time_unit'] or 'seconds'time_gap_seconds = at2.c_period_btw(start_dt,end_dt)if time_unit.lower() =='seconds':res = time_gap_secondselif time_unit.lower() =='minutes':res = time_gap_seconds // 60elif time_unit.lower() =='hours':res = time_gap_seconds // 3600elif time_unit.lower() =='days':res = time_gap_seconds //86400else:res = Noneself.write(json.dumps(res))
timeaxis_tuple = (TimeAxisHandler_path,TimeAxisHandler)
app_list.append(timeaxis_tuple)# Next: 時間間隔的列表處理,以及
if __name__ == '__main__':#tornado.options.parse_command_line()apps = tornado.web.Application(app_list, **settings)http_server = tornado.httpserver.HTTPServer(apps)define('port', default=8000, help='run on the given port', type=int)http_server.listen(options.port)# 單核# 多核打開注釋# 0 是全部核# http_server.start(num_processes=10) # tornado將按照cpu核數來fork進程# ---啟動print('Server Started')tornado.ioloop.IOLoop.instance().start()root@a1f87a061e62:/workspace# python3 server.py
時間軸文件 base_year1970_next100_tz8 已存在
Server Started
[I 240303 09:05:47 web:2239] 200 POST /gfgo/ (127.0.0.1) 0.54ms
[I 240303 09:05:57 web:2239] 200 POST /time_gap/ (127.0.0.1) 0.44ms
[I 240303 09:06:13 web:2239] 200 POST /time_gap/ (127.0.0.1) 0.57ms
功能測試
import requests as req # 測試1:調用Base包的函數
kwargs = {'ts':None, 'bias_hours':8}
pack_func = 'Base.get_time_str1'some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_funcres = req.post('http://127.0.0.1:8000/gfgo/', json = some_dict).json()
In [8]: res
Out[8]: '2024-03-03 16:30:53'# 測試2:調用基礎時間軸函數
some_dict = {}
some_dict['start_dt'] = '2024-03-03 00:00:00'
some_dict['time_unit'] ='hours'
res = req.post('http://127.0.0.1:8000/time_gap/', json = some_dict).json()In [14]: res
Out[14]: 17
3.4 部署
3.4.1 保存為鏡像
將容器進行提交就可以了
┌─root@m7:~
└─ $ docker commit gfgo_lite_build myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100
sha256:d2791b11a8fd3b5d307c7ce7ccc0622b8f42a84970a0c5f877ff88da54caeb8a
┌─root@m7:~
└─ $ docker push myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100
The push refers to repository [myregistry.domain.com:24052/worker.andy.gfgo_lite_24090]
e83f4383e7b7: Pushed
f1db272a1809: Mounted from worker.andy.globalfunc
...
805802706667: Mounted from worker.andy.globalfunc
v100: digest: sha256:9bd2d0a8674a3b3acf843aed6ab5b4372190dcae55edc6ef7a24181295f83549 size: 6221
3.4.2 啟動服務
容器無需掛載任何數據卷啟動
docker run -d \--restart=always \--name=gfgo_lite_24090 \-v /etc/localtime:/etc/localtime \-v /etc/timezone:/etc/timezone\-v /etc/hostname:/etc/hostname\-e "LANG=C.UTF-8" \-w /workspace\-p 24090:8000\myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v100 \sh -c "python3 server.py"
在掛載了時區后,時間偏移參數就要置為0了
import requests as req # 測試1:調用Base包的函數
kwargs = {'ts':None, 'bias_hours':0}
pack_func = 'Base.get_time_str1'some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_funcres = req.post('http://127.0.0.1:24090/gfgo/', json = some_dict).json()
print(res)# 測試2:調用基礎時間軸函數
some_dict = {}
some_dict['start_dt'] = '2024-03-03 00:00:00'
some_dict['time_unit'] ='hours'
some_dict['bias_hours'] =-8
res = req.post('http://127.0.0.1:24090/time_gap/', json = some_dict).json()
print(res)
4 后續
一方面是函數的繼續增多,調用(例如重載)。
另一方面是函數的增改。
很多應用需要分為單個和批次調用,例如對時間的計算可以為單個執行,也可以輸入一個列表。