使用WebSocket實時獲取印度股票數據源(無調用次數限制)實戰
一、前置準備
1. 獲取API密鑰
登錄 StockTV開發者平臺 → 聯系客服獲取測試Key(格式MY4b781f618e3f43c4b055f25fa61941ad
),該密鑰無調用次數限制且支持實時數據持續訂閱。
2. 安裝Python依賴
pip install websocket-client json pandas
二、核心代碼實現
1. 建立WebSocket連接
import websocket
import json
import timeAPI_KEY = "YOUR_API_KEY"
WS_URL = f"wss://ws-api.stocktv.top/connect?key={API_KEY}"def on_message(ws, message):"""處理實時行情推送"""data = json.loads(message)if data.get('type') == 'stock':print(f"[{data['symbol']}] 價格: {data['last']} 漲跌幅: {data['pcp']}%")def on_error(ws, error):print(f"連接異常: {error}")def on_close(ws, close_status_code, close_msg):print(f"連接關閉: {close_msg}")def on_open(ws):"""連接成功后訂閱股票"""subscribe_msg = json.dumps({"action": "subscribe","symbols": ["RELIANCE", "NSEI"] # 印度信實工業/Nifty50指數})ws.send(subscribe_msg)print("訂閱成功,開始接收實時數據...")
2. 啟動實時監聽(含自動重連)
def start_websocket():while True:try:ws = websocket.WebSocketApp(WS_URL,on_message=on_message,on_error=on_error,on_close=on_close)ws.on_open = on_openws.run_forever()except Exception as e:print(f"連接異常,5秒后重連: {str(e)}")time.sleep(5)# 啟動線程持續運行
import threading
threading.Thread(target=start_websocket, daemon=True).start()
3. 添加心跳機制(保持長連接)
def send_heartbeat(ws):"""每30秒發送心跳包"""while True:try:ws.send(json.dumps({"action": "ping"}))time.sleep(30)except Exception as e:break# 在on_open函數中啟動心跳線程
def on_open(ws):# ...原有訂閱代碼...threading.Thread(target=send_heartbeat, args=(ws,), daemon=True).start()
三、實時數據示例輸出
訂閱成功,開始接收實時數據...
[RELIANCE] 價格: 2856.15 漲跌幅: +1.23%
[NSEI] 價格: 22985.40 漲跌幅: +0.75%
[RELIANCE] 價格: 2857.80 漲跌幅: +1.35%
四、關鍵參數說明
字段 | 說明 | 示例值 |
---|---|---|
symbol | 股票/指數代碼 | RELIANCE, NSEI |
last | 最新成交價 | 2856.15 |
pcp | 漲跌幅百分比(自動帶±號) | +1.23% |
volume | 成交量(股) | 1254875 |
timestamp | 數據時間戳(Unix毫秒級) | 1725002394123 |
五、注意事項
-
連接穩定性
通過自動重連機制+心跳包保障7×24小時持續運行 -
數據時效性
印度市場交易時段為IST 9:15-15:30(北京時間11:45-18:00),非交易時段無實時數據推送 -
性能優化
建議使用異步處理框架(如asyncio
)避免數據堆積,實測單連接可承載100+標的實時推送