本教程將指導您如何使用Java Websocket客戶端連接實時行情接口,并訂閱相關數據。
步驟1:配置您的項目
確保您的項目已引入以下依賴:
jakarta.websocket-api
jakarta.websocket-client-api
fastjson2
lombok
spring-context
(如果使用Spring框架)
步驟2:創建Websocket客戶端
創建一個Java類,例如 WebsocketExample
,并添加 @ClientEndpoint
和 @Component
注解。
package org.example.ws;import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;// 注冊獲取API KEY: www.infoway.io
// 官方對接文檔:docs.infoway.io@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {// 本地session通道private static Session session;// wss連接地址 business可以為stock、crypto、common;apikey為您的憑證// 申請API KEY: www.infoway.ioprivate static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";@PostConstructpublic void connectAll() {try {// 建立WEBSOCKET連接connect(WS_URL);// 開啟自動重連startReconnection(WS_URL);} catch (Exception e) {log.error("Failed to connect to " + WS_URL + ": " + e.getMessage());}}// 自動重連機制,會開啟一個定時線程判斷連接是否斷開,斷開自動重連private void startReconnection(String s) {ScheduledExecutorService usExecutor = Executors.newScheduledThreadPool(1);Runnable usTask = () -> {if (session == null || !session.isOpen()) {log.debug("reconnection...");connect(s);}};usExecutor.scheduleAtFixedRate(usTask, 1000, 10000, TimeUnit.MILLISECONDS);}// 建立WEBSOCKET連接具體實現private void connect(String s) {try {WebSocketContainer container = ContainerProvider.getWebSocketContainer();session = container.connectToServer(WebsocketExample.class, URI.create(s));} catch (DeploymentException | IOException e) {log.error("Failed to connect to the server: {}", e.getMessage());}}// WEBSOCKET連接建立成功后會執行下面的方法@OnOpenpublic void onOpen(Session session) throws IOException, InterruptedException {System.out.println("Connection opened: " + session.getId());// 訂閱實時成交明細 (代碼 10000)subscribeToData(session, 10000, "BTCUSDT", "trade_trace");// 訂閱實時盤口數據 (代碼 10003)Thread.sleep(5000); // 間隔一段時間subscribeToData(session, 10003, "BTCUSDT", "depth_trace");// 訂閱實時K線數據 (代碼 10006, 1分鐘K線)Thread.sleep(5000); // 間隔一段時間subscribeKlineData(session, "BTCUSDT", 1, "kline_trace");// 定時發送心跳 (每30秒)ScheduledExecutorService pingExecutor = Executors.newScheduledThreadPool(1);Runnable pingTask = WebsocketExample::ping;pingExecutor.scheduleAtFixedRate(pingTask, 30, 30, TimeUnit.SECONDS);}// 封裝訂閱數據請求private void subscribeToData(Session session, int code, String symbol, String trace) throws IOException {JSONObject sendObj = new JSONObject();sendObj.put("code", code);sendObj.put("trace", trace);JSONObject data = new JSONObject();data.put("codes", symbol);sendObj.put("data", data);session.getBasicRemote().sendText(sendObj.toJSONString());}// 封裝訂閱K線數據請求private void subscribeKlineData(Session session, String symbol, int klineType, String trace) throws IOException {JSONObject klineSendObj = new JSONObject();klineSendObj.put("code", 10006);klineSendObj.put("trace", trace);JSONObject klineData = new JSONObject();JSONArray klineDataArray = new JSONArray();JSONObject klineObj = new JSONObject();klineObj.put("type", klineType);klineObj.put("codes", symbol);klineDataArray.add(klineObj);klineData.put("arr", klineDataArray);klineSendObj.put("data", klineData);session.getBasicRemote().sendText(klineSendObj.toJSONString());}@OnMessagepublic void onMessage(String message, Session session) {// 處理接收到的消息,包含訂閱成功/失敗提示和行情數據System.out.println("Message received: " + message);}@OnClosepublic void onClose(Session session, CloseReason reason) {System.out.println("Connection closed: " + session.getId() + ", reason: " + reason);}@OnErrorpublic void onError(Throwable error) {error.printStackTrace();}// 發送心跳請求public static void ping() {try {JSONObject jsonObject = new JSONObject();jsonObject.put("code", 10010);jsonObject.put("trace", "heartbeat_trace");session.getBasicRemote().sendText(jsonObject.toJSONString());} catch (IOException e) {throw new RuntimeException("Failed to send heartbeat: " + e.getMessage(), e);}}
}
步驟3:理解核心方法
WS_URL
: 這是WebSocket連接地址,您需要替換 yourApikey
為您的實際憑證。
business
參數指定業務類型:stock
(股票), crypto
(加密貨幣), common
(通用)。
@PostConstruct connectAll()
: Spring Boot應用啟動時會自動調用此方法,用于建立WebSocket連接和啟動自動重連機制。
startReconnection(String s)
: 實現自動重連的邏輯,每隔10秒檢查連接狀態,如果斷開則嘗試重新連接。
connect(String s)
: 建立WebSocket連接的具體實現。
@OnOpen onOpen(Session session)
: 連接成功建立后,此方法會被調用。您可以在這里發送訂閱請求。
訂閱請求: 通過構建JSON對象發送訂閱請求。code
字段是請求協議號,data
字段包含訂閱的具體內容(例如:交易對、K線類型)。
實時成交明細: code
為 10000
。
實時盤口數據: code
為 10003
。
實時K線數據: code
為 10006
。type
字段指定K線類型(例如:1代表1分鐘K線)。
心跳機制: ping()
方法會定時發送心跳請求 (code: 10010
),防止長時間不活躍導致連接斷開。
@OnMessage onMessage(String message, Session session)
: 當接收到服務端推送的消息時,此方法會被調用。您可以在這里解析并處理行情數據。
@OnClose onClose(Session session, CloseReason reason)
: 連接關閉時調用。
@OnError onError(Throwable error)
: 連接發生錯誤時調用。
步驟4:運行您的應用
如果您使用的是Spring Boot,直接運行您的主應用類即可。WebSocket客戶端會自動啟動并嘗試連接。
注意事項:
- API文檔: 詳細的
code
列表和data
格式請參考Infoway的WebSocket API文檔:https://docs.infoway.io/websocket-api/endpoints - 錯誤處理: 生產環境中,您應該對連接、發送和接收消息中的異常進行更詳細的捕獲和處理。
- 自定義Trace:
trace
字段是自定義標識,可用于追蹤請求響應。
通過以上步驟,您就能成功連接實時行情接口并開始接收數據了。