這算是之前一直想做的一件事,趁周末趕快做了。
業務意義:現實中有大量的輿情,這對我們的決策會有比較重要的作用
技術依賴:
- 1 模型基礎能力
- 2 消息隊列
- 3 異步獲取消息
- 4 時間序列庫
1 模型基礎能力
大模型發展到現在,已經有了很大的變化。過去,在本地執行的大模型能力是完全不夠的,deepseek14b出來之后我測了一下,就基礎能力還是已經ok,但是推理過程我不知道是不是模型執行本身的代價,但其實我是不需要的。
本地模型能力夠了之后,那么就很有意思。一方面,它可以比較節約成本。對于一些研究性質的任務應該是夠的,比如我這次就打算用4060TI(16G)和Mac MiniM4(16G)來運行模型。另一方面,由于數據不出去,所以對數據隱私保護就比較好,這就可以嘗試更多重要的任務。
另外就是模型的尺寸問題了,我粗略的感覺了一下
尺寸 | 能力 | 輸入上限 |
---|---|---|
1.5b | 也許可以做一些特別特別簡單的任務 | 2k |
8b | 可以做簡單任務 | 上8k |
14b | 可以做中等復雜的任務 | 8k |
32b | 可以做較復雜任務 | |
100b + | 可以直接商業化 |
目前我覺得14b可能是性價比最高的模型尺寸。
小模型有一個明顯的問題,就是可能無法嚴格按照指令執行任務。以生成json為例,可能會出現更高的失敗率。當然尺寸模型也會碰到同樣的問題。所以,對于大模型應用來說,應該用一些封裝方式轉為可靠節點。
小模型,很像以前集成學習中的弱分類器,它的使用并不會那么直接。需要有一套體系來進行校驗和評估,有意義的是,這套校驗和評估方法同樣適合更大尺寸的模型,尤其是面對現在層出不同的大模型時。
標準化封裝,然后使用體系進行歸納校驗
2 消息隊列
消息隊列非常重要,緩沖和回放是最核心的功能,kafka這方面做的真挺不錯。
像類似新聞的摘要數據到來后進入隊列,一方面是緩沖,另一方面也是進行按時間的自然截斷。如果覺得有必要,是可以讓這些數據進入持久隊列的,我給自己準備的存儲空間非常多,可能未來還會更多(火力不足恐懼癥)
由于處理不是唯一的,所以需要有多個出隊列,這時候就要用回放了。例如,我先在有 input_topic1,然后我需要進行下一步etl,可能我先想到了方法1 :只提取新聞的標題和正文,滿足了現在的需求,于是寫入 tier2_topic_etl1。可能過一陣我又想到方法2:再提取評論連接。這時候就會寫入tier2_topic_etl2。最后也許會放棄方法1,又或者放棄方法2,但這個就比較簡單了。
類似的,或者不可避免的,我會采用多個不同的模型去處理相同的數據,這時候就又可以繼續往后分叉了。可以非常方便的服務于多種探索,只要換一個消費組id就行了。這些不同的分支,可以在不同層級的部分進行合并,這方面mongo又非常合適。所以生成唯一主鍵是必要的。
當然,最后如果結論是采納的,一定會在時間序列庫中表現為事件。到了這個層級,才是(基本)可決策的。
3 異步獲取消息
對于大量的數據獲取和流轉,一定要采用異步/協程的方式。
從去年開始,我才深深體會到這個問題。以前很多時候是批量傳入,批量處理,并沒有感覺。在大量實時性的任務中,一定會有數量龐大,但是數據又很小的任務,且會受到網絡波動。沒有異步和有異步效率可能差百倍。
在大模型時代,很多請求是獨立到來的,比如用戶的一個問題。我粗略估算過,大模型時代單條數據處理的成本可能是以前一百萬倍。所以,必須要能夠服務好單條的請求。
目前我的大部分服務已經轉向了異步和微批次,在worker端重新搭建了fastapi+aps+celery,主要是發現其他的一些成熟異步框架也是這樣的,所以還不如自己搭一下。搭好有一陣子了,之前還沒想好怎么去進行統一化調度,所以也沒有立即啟用。我想接著這個周末,需要將這個推入試產狀態。
4 時間序列庫
技術和工具只是一個表象和約束,關鍵是背后的邏輯。我覺得line protocal挺好的,不僅約定了一條簡潔的時間數據從技術上怎么定義的,同時也反過來促使業務人員思考:
- 1 什么時候要建立一個bucket (db level)
- 2 可能會有哪些measurement (collection level)
- 3 對于某條數據來說,哪些是tags,哪些是field
這種結構會對數據的最終處理帶來影響,不是唯一的,但是是非常有用的。最近在看clickhouse的書,里面也是做了一些取舍(不支持update,delete, 不太支持單條精確查詢等),然后利用一些新的有利特性(CPU的SIMD),從而達到令人驚嘆的性能(極高效取數、極高效統計和極少磁盤占用)。
終于可以言歸正傳:現在的 point是我能每分鐘拿到一些公開新聞摘要,我需要從中間提取事件,然后找出一類特定事件(比如打擊證券行業犯罪等)。所以這里有兩部分:①提取數據 ②判定事件
因為獲取到的數據結構性非常好,所以我會先用簡單的正則將數據提取出來,校驗后就可以送到下一步。
然后在下一步對事件進行判斷,然后給到類別的判定和置信度,這里用到deepseek-r1-14b。
數據的規律是以時間開頭的固定元組。我簡單寫了一個清洗邏輯:
import re
def is_valid_time(text= None):pattern = r'^([01]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$'return bool(re.match(pattern, text))import redef is_has_brackets(text = None):pattern = r'【[^】]*】'return bool(re.search(pattern, text))
some_msg.split('\n')
s = pd.Series(some_msg.split('\n'))
s_list = list(s)
# 1 標記
time_tag = s.apply(is_valid_time)
import numpy as np
time_tag_arr = time_tag.valuestime_tag_pos = np.argwhere(time_tag_arr ==True).ravel()
time_tag_pos1 = np.append(time_tag_pos, time_tag_pos[-1] + 6)
# time_tag_pos_s = pd.Series(time_tag_pos)
# time_tag_pos_s1 = time_tag_pos_s.diff(1)time_tag_pos_start = time_tag_pos1[:-1]
time_tag_pos_end = time_tag_pos1[1:]# 將起始點和結束點組合成一個二維數組
time_tag_pos2 = np.column_stack((time_tag_pos_start, time_tag_pos_end))candidate_tuple_list = []
for tem_tuple in time_tag_pos2:tem_list = s_list[tem_tuple[0]:tem_tuple[1]]if is_has_brackets(tem_list[1]):tem_dict = {}tem_dict['event_time'] = tem_list[0]tem_dict['title'] = tem_list[1].replace('*','')for j in range(2,len(tem_list)):tem_v = tem_list[j]if tem_v.startswith('閱'):tem_dict['read'] = tem_v elif tem_v.startswith('[評論'):tem_dict['comments'] = tem_v elif tem_v.startswith('分享'):tem_dict['share'] = tem_velse:pass candidate_tuple_list.append(tem_dict)
理解原文結構和寫代碼大約花了我1.5個小時,主要是在怎么通過下標快速從列表中取出子列表這個問題上糾結了很久。如果只是為了實現邏輯的話,不會超過0.5個小時。
然后隨之而來是一個問題:我相信在這個case中,數據的格式一般不會有大變化 。但是未來肯定是會變的,所以這種固定解析邏輯比較不那么靠譜,得要加一個校驗。怎么樣才能有更好的效率和效果呢?
如果我能抽象出其中一定有的元素,或者我需要的元素,這個可以比較抽象;然后讓大模型去完成這個解析顯然會更具有通用性。
這種方案后續再試,很多工具都在變,而且我這種非結構化爬取的內容應該不會很多。原則上,應該還是原始數據- 初篩 - 精篩。初篩是一定需要的,先把相關的提出來,或者把不相關的過濾掉。
好了,這一步就算etl,我先做掉。
做之前,需要先訂立一些元數據,我會同步寫往rocks和mongo。平時主要用rocks,除非數據掛了才用mongo。
數據清洗完了之后就可以入庫了,第一次可以將全量的數據寫入。這次我沒有寫原始數據,之后可能需要備份一批,否則kafka只有7天 ,嗯,似乎也夠了,新聞太老意義不大。
接下來是deepseek時間:對數據進行事件的判定
簡單封裝一下:
調用函數
import time
from openai import OpenAI
def call_local_deepseek(api_key ='ollama',base_url="http://172.17.0.1:11434/v1",model="deepseek-r1:14b",system_prompt = '', user_prompt = ''):tick1 = time.time()client = OpenAI(base_url=base_url, # 替換為遠端服務器的 IP 或域名api_key=api_key, # 必填項,但會被忽略)response = client.chat.completions.create(model=model, # 替換為你下載的模型名稱messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=0 # 設置 temperature 參數,這里設置為 0.7,你可以根據需要調整)print('call_local_deepseek takes %.2f' %(time.time() - tick1))# 輸出結果return response.choices[0].message.content
# 測試
call_local_deepseek(user_prompt ='你是誰')
結果的解析函數
import re
import json
def extract_json_from_response(response):"""從大模型的回復中提取 JSON 部分。:param response: 大模型的回復內容:return: 解析后的 JSON 數據(字典形式),若未找到則返回空字典"""# 定義正則表達式模式,用于匹配 JSON 內容json_pattern = r'```json\s*([\s\S]*?)\s*```'# 查找匹配的 JSON 內容match = re.search(json_pattern, response)if match:# 提取 JSON 字符串json_str = match.group(1)try:# 解析 JSON 字符串為 Python 字典json_data = json.loads(json_str)return json_dataexcept json.JSONDecodeError:print("JSON 解析失敗,請檢查模型回復的 JSON 格式是否正確。")return None
# 以下是一個 Python 函數,用于提取大模型回復中指定標簽內的內容。如果未找到指定標簽,函數將返回空字符串。import re
def extract_tag_content(response, tag):"""從大模型的回復中提取指定標簽內的內容。:param response: 大模型的回復內容:param tag: 要提取的標簽,例如 'think':return: 標簽內的內容,如果未找到則返回空字符串"""# 構建正則表達式模式,用于匹配指定標簽內的內容pattern = fr'<{tag}>([\s\S]*?)</{tag}>'# 查找匹配的內容match = re.search(pattern, response)if match:# 提取標簽內的內容return match.group(1)return None
看效果
用函數提取出內容:效果是不錯的。不過r1廢話多的特點導致了每條數據的處理時間太長了。4060ti有點頂不住啊,哈哈
In [11]: extract_json_from_response(res_content)
Out[11]:
{'market_punishment': 65,'market_down': 0,'explanation': '新聞標題明確提到了對兩家私募機構的違規操作進行了處罰,這與A股市場的監管和處罰相關。閱讀量較高(52.3萬)和分享量(47次)也表明了該事件的關注度。因此,這條新聞的相關性評分較高,但考慮到涉及的是小規模私募,整體影響可能有限,所以評分為65分。'}
我試試并發,據說ollama允許4個,至少3個看起來是好的
In [16]: keyword_args_list = [{'system_prompt':system_prompt, 'user_prompt' : str(sample_data1)},...: {'system_prompt':system_prompt, 'user_prompt' : str(sample_data2)},...: {'system_prompt':system_prompt, 'user_prompt' : str(sample_data3)}...:...: ]...:...:...: tick1 = time.time()...: res_list = thread_concurrent_run(call_local_deepseek, keyword_args_list = keyword_args_list, max_workers= 3)...: tick2 = time.time()...:...:
2025-03-22 23:04:49 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 17.80
2025-03-22 23:04:49 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 17.92
2025-03-22 23:04:52 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 21.00
解析出來
In [18]: res_list1 = [extract_json_from_response(x) for x in res_list]In [19]: res_list1
Out[19]:
[{'market_punishment': 0,'market_down': 10,'explanation': '新聞標題提到市場一片大好,沒有涉及A股市場的處罰信息,因此市場處罰評分為0。雖然整體情緒積極,但并未明確提及市場下跌或投資者損失,故市場下跌評分較低為10分。'},{'market_punishment': 0,'market_down': 90,'explanation': '新聞標題明確提到A股市場大跌1.5%,屬于A股市場整體下跌的情況,相關性很高。'},{'market_punishment': 60,'market_down': 10,'explanation': '新聞標題提到對兩家小規模私募機構的違規操作進行了處罰,這直接關聯到A股市場的監管和整治,因此屬于A股市場處罰類別。雖然沒有提到具體的市場下跌情況,但處罰通常可能會影響市場情緒,所以相關性較高。閱讀量和分享量也顯示了一定的關注度。'}]
然后很奇怪的是,單發和多發的情況有點不一樣。對于第一條,是穩定這樣的。重復跑3條也是穩定的。暫時只能理解量大質量就下滑了。有點搞笑。之前我發現在批量實體識別的時候也有類似的問題。
{'market_punishment': 0,'market_down': 0,'explanation': '新聞標題提到市場一片大好,沒有涉及A股市場的處罰或下跌內容,因此兩個類別的相關性評分為0。'}
然后我試了下8b,就這個基礎問題好像也還行,速度快了一倍。
最后我試了下1.5b,就完全bbq
因為mac可能是需要設置一下網絡開放,速度感覺和4060ti也差不多。
實際跑了一個一分鐘的數據,哈哈,有點尷尬,趕不上數據的速度,就先科研一下吧。