【Easylive】視頻在線人數統計系統實現詳解 WebSocket 及其在在線人數統計中的應用

【Easylive】項目常見問題解答(自用&持續更新中…) 匯總版

視頻在線人數統計系統實現詳解

1. 系統架構概述

您實現的是一個基于Redis的視頻在線人數統計系統,主要包含以下組件:

  1. 心跳上報接口:客戶端定期調用以維持在線狀態
  2. Redis存儲結構:使用兩種鍵存儲在線信息
  3. 過期監聽機制:通過Redis的鍵過期事件自動減少在線人數
  4. 計數維護邏輯:確保在線人數的準確性

2. 核心實現細節

2.1 數據結構設計

系統使用了兩種Redis鍵:

  1. 用戶播放鍵 (userPlayOnlineKey)
    ? 格式:video:play:user:{fileId}:{deviceId}
    ? 作用:標記特定設備是否在線
    ? 過期時間:8秒

  2. 在線計數鍵 (playOnlineCountKey)
    ? 格式:video:play:online:{fileId}
    ? 作用:存儲當前視頻的在線人數
    ? 過期時間:10秒

2.2 心跳上報流程 (reportVideoPlayOnline)

public Integer reportVideoPlayOnline(String fileId, String deviceId) {// 構造Redis鍵String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);// 新用戶上線處理if (!redisUtils.keyExists(userPlayOnlineKey)) {// 設置用戶鍵(8秒過期)redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 增加在線計數(10秒過期)return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();}// 已有用戶續期處理redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 返回當前在線人數Integer count = (Integer) redisUtils.get(playOnlineCountKey);return count == null ? 1 : count;
}

工作流程

  1. 客戶端每5-7秒調用一次/reportVideoPlayOnline接口
  2. 服務端檢查用戶鍵是否存在:
    ? 不存在:創建用戶鍵(8秒過期),增加計數鍵(10秒過期)
    ? 存在:續期兩個鍵的過期時間
  3. 返回當前在線人數

2.3 過期監聽機制 (RedisKeyExpirationListener)

@Override
public void onMessage(Message message, byte[] pattern) {String key = message.toString();// 只處理用戶播放鍵的過期事件if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {return;}// 從key中提取fileIdInteger userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);// 減少對應視頻的在線計數redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));
}

工作流程

  1. Redis在用戶鍵(8秒)過期時發送通知
  2. 監聽器收到通知后:
    ? 驗證是否為用戶播放鍵
    ? 從鍵名中提取視頻ID(fileId)
    ? 減少對應視頻的在線計數

2.4 計數遞減邏輯 (decrementPlayOnlineCount)

public void decrementPlayOnlineCount(String key) {redisUtils.decrement(key);
}

作用:簡單地減少指定鍵的計數值

3. 關鍵設計原理

3.1 雙鍵設計的意義

  1. 用戶播放鍵
    ? 作為"心跳"存在的證據
    ? 過期時間(8秒)短于計數鍵(10秒),確保先檢測到用戶離線

  2. 在線計數鍵
    ? 集中存儲當前在線人數
    ? 稍長的過期時間防止誤刪

3.2 時間參數設計

? 8秒用戶鍵過期:假設客戶端每5-7秒上報一次,8秒確保能檢測到中斷
? 10秒計數鍵過期:比用戶鍵多2秒,防止競態條件
? 客戶端上報頻率:建議5-7秒一次,平衡準確性和服務器壓力

3.3 容錯機制

  1. 計數鍵續期:每次心跳都會延長計數鍵的過期時間
  2. 空值處理:當計數鍵不存在時返回1作為默認值
  3. 精確遞減:只在用戶鍵過期時才減少計數,避免重復遞減

4. 工作流程圖

Client Server Redis Listener 心跳上報(fileId, deviceId) 檢查user:{fileId}:{deviceId}存在? 不存在 設置user鍵(8s) 增加online計數(10s) 存在 續期user鍵(8s) 續期online鍵(10s) alt [新用戶] [已有用戶] 返回當前在線人數 user鍵過期通知 減少對應online計數 loop [過期監聽] Client Server Redis Listener

5. 系統優勢

  1. 實時性高:秒級檢測用戶離線
  2. 性能優異:完全基于Redis內存操作
  3. 擴展性強:輕松支持大量并發用戶
  4. 準確可靠:雙重驗證機制防止誤計數
  5. 資源節約:自動清理不活躍用戶的記錄

6. 潛在優化方向

  1. 批量上報:允許客戶端一次上報多個視頻的狀態
  2. 分布式鎖:在極高并發下保證計數準確
  3. 異常處理:增加Redis操作失敗的重試機制
  4. 監控指標:添加在線人數變化的監控和報警
  5. 動態過期:根據系統負載動態調整過期時間

這個實現很好地平衡了準確性、實時性和性能要求,是一個非常典型的在線人數統計解決方案。

WebSocket 及其在在線人數統計中的應用

WebSocket 基礎介紹

什么是 WebSocket?

WebSocket 是一種在單個 TCP 連接上進行全雙工通信的協議,它使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。

與傳統 HTTP 輪詢的區別

特性WebSocketHTTP 輪詢
連接方式持久化連接每次請求新建連接
通信方向全雙工半雙工
實時性毫秒級依賴輪詢間隔(通常秒級)
服務器推送支持不支持
資源消耗連接初期開銷大,后期開銷小每次請求都有完整HTTP開銷
適用場景高實時性應用實時性要求不高的應用

基于 WebSocket 的在線人數統計實現

系統架構設計

客戶端A ──┐├─── WebSocket 服務器 ─── Redis 集群
客戶端B ──┘           ││數據庫(持久化)

核心實現代碼

1. WebSocket 服務端實現 (Spring Boot)
@ServerEndpoint("/online/{videoId}")
@Component
public class VideoOnlineEndpoint {private static ConcurrentMap<String, Set<Session>> videoSessions = new ConcurrentHashMap<>();private static RedisTemplate<String, String> redisTemplate;@Autowiredpublic void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {VideoOnlineEndpoint.redisTemplate = redisTemplate;}@OnOpenpublic void onOpen(Session session, @PathParam("videoId") String videoId) {// 添加會話到視頻組videoSessions.computeIfAbsent(videoId, k -> ConcurrentHashMap.newKeySet()).add(session);// 更新Redis計數String redisKey = "video:online:" + videoId;redisTemplate.opsForValue().increment(redisKey);redisTemplate.expire(redisKey, 10, TimeUnit.MINUTES);// 廣播更新后的在線人數broadcastOnlineCount(videoId);}@OnClosepublic void onClose(Session session, @PathParam("videoId") String videoId) {// 從視頻組移除會話Set<Session> sessions = videoSessions.get(videoId);if (sessions != null) {sessions.remove(session);// 更新Redis計數String redisKey = "video:online:" + videoId;redisTemplate.opsForValue().decrement(redisKey);// 廣播更新后的在線人數broadcastOnlineCount(videoId);}}@OnErrorpublic void onError(Session session, Throwable error) {error.printStackTrace();}private void broadcastOnlineCount(String videoId) {String count = redisTemplate.opsForValue().get("video:online:" + videoId);String message = "ONLINE_COUNT:" + (count != null ? count : "0");Set<Session> sessions = videoSessions.get(videoId);if (sessions != null) {sessions.forEach(session -> {try {session.getBasicRemote().sendText(message);} catch (IOException e) {e.printStackTrace();}});}}
}
2. 客戶端實現 (JavaScript)
const videoId = '12345'; // 當前觀看的視頻ID
const socket = new WebSocket(`wss://yourdomain.com/online/${videoId}`);// 連接建立時
socket.onopen = function(e) {console.log("WebSocket連接已建立");
};// 接收消息
socket.onmessage = function(event) {if(event.data.startsWith("ONLINE_COUNT:")) {const count = event.data.split(":")[1];updateOnlineCountDisplay(count);}
};// 連接關閉時
socket.onclose = function(event) {if (event.wasClean) {console.log(`連接正常關閉,code=${event.code} reason=${event.reason}`);} else {console.log('連接異常斷開');// 嘗試重新連接setTimeout(() => connectWebSocket(), 5000);}
};// 錯誤處理
socket.onerror = function(error) {console.log(`WebSocket錯誤: ${error.message}`);
};function updateOnlineCountDisplay(count) {document.getElementById('online-count').innerText = count;
}
3. 心跳機制實現
// 客戶端心跳
setInterval(() => {if(socket.readyState === WebSocket.OPEN) {socket.send("HEARTBEAT");}
}, 30000); // 30秒發送一次心跳// 服務端心跳檢測 (Java)
@ServerEndpoint配置中添加:
@OnMessage
public void onMessage(Session session, String message) {if("HEARTBEAT".equals(message)) {session.getAsyncRemote().sendText("HEARTBEAT_ACK");}
}

方案優勢分析

  1. 實時性極佳
    ? 在線人數變化可實時推送到所有客戶端
    ? 無輪詢延遲,通常達到毫秒級更新

  2. 精確計數
    ? 基于實際連接狀態計數
    ? 避免Redis過期時間的估算誤差

  3. 擴展功能容易
    ? 可輕松擴展實現彈幕、實時評論等功能
    ? 支持復雜的互動場景

  4. 減少無效請求
    ? 相比HTTP輪詢減少90%以上的請求量
    ? 顯著降低服務器壓力

潛在挑戰與解決方案

1. 連接保持問題

問題:移動網絡不穩定導致頻繁斷開

解決方案
? 實現自動重連機制
? 使用心跳包檢測連接狀態
? 設置合理的超時時間

2. 大規模并發問題

問題:單視頻熱點導致連接數激增

解決方案
? 使用WebSocket集群
? 引入負載均衡(如Nginx)
? 實現連接分片策略

3. 狀態同步問題

問題:集群環境下狀態同步

解決方案
? 使用Redis Pub/Sub同步各節點狀態
? 采用一致性哈希分配連接
? 實現分布式會話管理

性能優化建議

  1. 協議優化
    ? 啟用WebSocket壓縮擴展
    ? 使用二進制協議替代文本協議

  2. 資源控制
    ? 實現連接數限制
    ? 設置單個IP連接限制

  3. 監控體系
    ? 建立連接數監控
    ? 實現異常連接報警

  4. 優雅降級
    ? WebSocket不可用時自動降級為長輪詢
    ? 提供兼容性方案

與傳統方案的對比

指標WebSocket方案Redis鍵過期方案
實時性毫秒級秒級(依賴過期時間)
精確度100%準確有1-2秒延遲
實現復雜度較高較低
服務器負載連接初期高,維持期低持續中等負載
擴展性容易擴展其他實時功能僅限于計數
客戶端兼容性需現代瀏覽器支持所有環境兼容
移動端表現可能因網絡切換斷開不受影響

適用場景建議

推薦使用WebSocket方案當:
? 需要實時顯示精確在線人數
? 已經使用或計劃使用WebSocket實現其他功能(如彈幕、聊天)
? 客戶端環境可控(如自己的APP或現代瀏覽器)
? 有足夠資源維護WebSocket基礎設施

推薦保持Redis方案當:
? 實時性要求不是極高(秒級可接受)
? 需要支持老舊客戶端
? 系統規模較小,希望簡單維護
? 主要關注計數而非實時交互

混合方案設計

結合兩種方案優勢的折中實現:

// WebSocket連接時更新精確計數
@OnOpen
public void onOpen(Session session, @PathParam("videoId") String videoId) {// 更新內存中的精確計數incrementLocalCount(videoId);// 每10秒同步到Redis一次if(needSyncToRedis(videoId)) {redisTemplate.opsForValue().set("video:online:" + videoId, getLocalCount(videoId).toString());}
}// 對外提供查詢接口
@GetMapping("/online/{videoId}")
public int getOnlineCount(@PathVariable String videoId) {// 優先返回本地精確計數Integer localCount = getLocalCount(videoId);if(localCount != null) {return localCount;}// 回退到Redis計數String count = redisTemplate.opsForValue().get("video:online:" + videoId);return count != null ? Integer.parseInt(count) : 0;
}

這種混合方案:
? 對WebSocket客戶端提供精確計數
? 對非WebSocket客戶端提供近似的Redis計數
? 平衡了精確性和兼容性

查看在線觀看人數

通過輪詢上報心跳,在服務端記錄設備有沒有不停地上報心跳,如果沒有上報心跳,通過 Redis 的 key 的失效,會有一個通知沒有再上報心跳,就會把在線人數 -1。

Redis在線人數統計實現詳解

以下是帶有詳細注釋的代碼實現,解釋了基于Redis的在線人數統計系統的工作原理:

/*** 客戶端上報心跳接口* @param fileId 視頻文件ID* @param deviceId 設備唯一標識* @return 當前在線人數*/
@RequestMapping("/reportVideoPlayOnline")
public ResponseVO reportVideoPlayOnline(@NotEmpty String fileId, @NotEmpty String deviceId){// 調用Redis組件處理心跳上報,并返回成功響應return getSuccessResponseVO(redisComponent.reportVideoPlayOnline(fileId, deviceId));
}/*** 處理視頻在線人數統計的核心方法* @param fileId 視頻文件ID* @param deviceId 設備唯一標識* @return 當前在線人數*/
public Integer reportVideoPlayOnline(String fileId, String deviceId){// 構建Redis鍵:用戶級別的鍵,用于標記特定設備是否在線String userPlayOnlineKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER, fileId, deviceId);// 構建Redis鍵:視頻級別的鍵,用于存儲當前視頻的總在線人數String playOnlineCountKey = String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId);// 檢查是否是新的觀看用戶(該設備首次上報或已過期)if (!redisUtils.keyExists(userPlayOnlineKey)) {// 設置用戶鍵,8秒后過期(如果8秒內沒有下次心跳,則認為離線)redisUtils.setex(userPlayOnlineKey, fileId, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 增加視頻的總在線人數計數,并設置10秒過期時間return redisUtils.incrementex(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10).intValue();}// 以下是已有用戶的處理邏輯:// 續期視頻的總在線人數鍵(10秒)redisUtils.expire(playOnlineCountKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 10);// 續期用戶級別的鍵(8秒)redisUtils.expire(userPlayOnlineKey, Constants.REDIS_KEY_EXPIRES_ONE_SECONDS * 8);// 獲取當前在線人數(防止并發問題導致的計數不準確)Integer count = (Integer) redisUtils.get(playOnlineCountKey);// 如果獲取不到計數(極端情況),默認返回1return count == null ? 1 : count;
}/*** 減少在線人數計數* @param key 需要減少計數的Redis鍵*/
public void decrementPlayOnlineCount(String key) {// 對指定鍵的值進行原子遞減redisUtils.decrement(key);
}/*** Redis鍵過期監聽器,用于處理用戶離線情況*/
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {@Resourceprivate RedisComponent redisComponent;public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}/*** 處理Redis鍵過期事件* @param message 過期消息* @param pattern 模式*/@Overridepublic void onMessage(Message message, byte[] pattern) {// 獲取過期的鍵名String key = message.toString();// 只處理用戶級別的在線狀態鍵過期事件if (!key.startsWith(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE_PREIFX + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX)) {return;}// 從鍵名中提取視頻ID// 計算用戶鍵前綴的長度Integer userKeyIndex = key.indexOf(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX) + Constants.REDIS_KEY_VIDEO_PLAY_COUNT_USER_PREFIX.length();// 截取視頻ID(假設ID長度為20)String fileId = key.substring(userKeyIndex, userKeyIndex + Constants.LENGTH_20);// 減少對應視頻的在線人數計數redisComponent.decrementPlayOnlineCount(String.format(Constants.REDIS_KEY_VIDEO_PLAY_COUNT_ONLINE, fileId));}
}

系統工作流程詳解

  1. 心跳上報機制
    ? 客戶端每隔5-7秒調用/reportVideoPlayOnline接口上報心跳
    ? 服務端通過Redis記錄設備最后一次活躍時間

  2. 雙鍵設計原理
    ? 用戶鍵(userPlayOnlineKey)
    ? 格式:video:play:user:{fileId}:{deviceId}
    ? 作用:標記特定設備是否在線
    ? 過期時間:8秒(如果8秒內沒有心跳則認為離線)
    ? 計數鍵(playOnlineCountKey)
    ? 格式:video:play:online:{fileId}
    ? 作用:存儲當前視頻的總在線人數
    ? 過期時間:10秒(比用戶鍵稍長,防止競態條件)

  3. 新用戶上線處理

    if (!redisUtils.keyExists(userPlayOnlineKey)) {redisUtils.setex(userPlayOnlineKey, fileId, 8);return redisUtils.incrementex(playOnlineCountKey, 10);
    }
    

    ? 當用戶鍵不存在時,創建用戶鍵并增加總計數

  4. 已有用戶續期處理

    redisUtils.expire(playOnlineCountKey, 10);
    redisUtils.expire(userPlayOnlineKey, 8);
    

    ? 續期兩個鍵的過期時間,保持活躍狀態

  5. 離線檢測機制
    ? 當用戶鍵8秒過期時,觸發RedisKeyExpirationListener
    ? 監聽器從鍵名提取videoId,減少對應視頻的在線計數

  6. 容錯處理

    Integer count = (Integer) redisUtils.get(playOnlineCountKey);
    return count == null ? 1 : count;
    

    ? 防止極端情況下計數鍵丟失,返回默認值1

設計優勢分析

  1. 精確計數:基于實際心跳而非估算,結果準確
  2. 自動清理:通過Redis過期機制自動清理不活躍用戶
  3. 低延遲:鍵過期通知機制實現秒級離線檢測
  4. 高性能:完全基于內存操作,無數據庫IO
  5. 可擴展:Redis集群支持橫向擴展

關鍵參數說明

參數說明
用戶鍵過期時間8秒客戶端應每5-7秒上報一次心跳
計數鍵過期時間10秒比用戶鍵稍長,防止競態條件
視頻ID長度20需與業務系統保持一致

這個實現方案在保證準確性的同時,具有優秀的性能和可擴展性,非常適合中小規模的實時在線人數統計場景。

自看

通過Redis計數器來給視頻的在線觀看人數進行增加和減少,也就是通過心跳來不停上報當前用戶是否正在觀看,當瀏覽器關閉時,該用戶就不會再持續上報心跳,此時該用戶的Redis Key則會失效,Redis Key失效的時候會發送消息通知,根據這個消息通知得知失效,再去減少在線觀看人數。

Netty與視頻在線人數統計的結合

Netty基礎介紹

Netty是一個異步事件驅動的網絡應用框架,用于快速開發高性能、高可靠性的網絡服務器和客戶端程序。它基于Java NIO(Non-blocking I/O)構建,主要特點包括:

  1. 高性能:支持百萬級并發連接
  2. 低延遲:非阻塞I/O模型減少等待時間
  3. 高擴展性:模塊化設計,可靈活擴展
  4. 協議支持:內置HTTP、WebSocket、TCP/UDP等協議支持

為什么考慮用Netty實現在線人數統計?

當前基于HTTP輪詢+Redis的實現存在以下可優化點:
? HTTP開銷大:每次輪詢都需要完整的HTTP請求/響應頭
? 實時性有限:依賴輪詢間隔(通常秒級)
? 服務器壓力:高并發時大量無效輪詢請求

Netty可以解決這些問題,提供真正的實時通信能力

基于Netty的在線人數統計設計

系統架構

客戶端App/Web ──? Netty服務器集群 ──? Redis集群││ (WebSocket/TCP長連接)▼
用戶行為數據(心跳、上下線)

核心組件實現

1. Netty服務器初始化
public class VideoOnlineServer {public void start(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 心跳檢測(15秒無讀寫則關閉連接)pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.SECONDS));// 自定義協議解碼/編碼pipeline.addLast("decoder", new OnlineMessageDecoder());pipeline.addLast("encoder", new OnlineMessageEncoder());// 業務邏輯處理器pipeline.addLast("handler", new OnlineMessageHandler());}});ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
2. 消息處理器實現
public class OnlineMessageHandler extends SimpleChannelInboundHandler<OnlineMessage> {// 視頻ID到Channel組的映射private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, OnlineMessage msg) {switch (msg.getType()) {case CONNECT: // 連接初始化handleConnect(ctx, msg.getVideoId(), msg.getDeviceId());break;case HEARTBEAT: // 心跳handleHeartbeat(ctx, msg.getVideoId(), msg.getDeviceId());break;case DISCONNECT: // 主動斷開handleDisconnect(ctx, msg.getVideoId(), msg.getDeviceId());break;}}private void handleConnect(ChannelHandlerContext ctx, String videoId, String deviceId) {// 加入視頻頻道組ChannelGroup group = videoGroups.computeIfAbsent(videoId, k -> new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));group.add(ctx.channel());// 更新Redis計數long count = RedisUtils.increment("video:online:" + videoId);// 廣播新在線人數broadcastCount(videoId, count);}private void handleHeartbeat(ChannelHandlerContext ctx, String videoId, String deviceId) {// 更新設備最后活躍時間(Redis)RedisUtils.setex("device:active:" + videoId + ":" + deviceId, "1", 15); // 15秒過期// 可選擇性返回當前人數ctx.writeAndFlush(new OnlineMessage(HEARTBEAT_ACK, getOnlineCount(videoId)));}
}
3. 客戶端斷連處理
@Override
public void channelInactive(ChannelHandlerContext ctx) {// 從所有視頻組中移除該ChannelvideoGroups.values().forEach(group -> group.remove(ctx.channel()));// 更新Redis計數(需要維護設備到視頻ID的映射)String deviceId = getDeviceId(ctx.channel());String videoId = getVideoId(ctx.channel());long count = RedisUtils.decrement("video:online:" + videoId);// 廣播新人數broadcastCount(videoId, count);
}@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {// 處理空閑連接if (evt instanceof IdleStateEvent) {ctx.close(); // 關閉超時未心跳的連接}
}

與傳統方案的對比

特性Netty實現方案HTTP輪詢+Redis方案
實時性毫秒級依賴輪詢間隔(通常秒級)
協議開銷僅心跳數據(幾十字節)完整HTTP頭(通常幾百字節)
服務器壓力長連接維護,無重復握手每次輪詢都新建連接
并發能力單機支持10萬+連接受限于HTTP服務器性能
實現復雜度較高簡單
移動網絡適應性需處理頻繁重連天然適應

關鍵設計考慮

  1. 連接管理
    ? 使用ChannelGroup管理同視頻的用戶連接
    ? IdleStateHandler自動檢測空閑連接

  2. 狀態同步
    ? Redis存儲全局計數,避免Netty單點問題
    ? 定期同步內存與Redis的數據

  3. 消息協議設計

    message OnlineMessage {enum Type {CONNECT = 0;HEARTBEAT = 1;DISCONNECT = 2;}Type type = 1;string videoId = 2;string deviceId = 3;int64 count = 4; // 用于服務端返回當前人數
    }
    
  4. 彈性設計
    ? 客戶端實現自動重連
    ? 服務端優雅降級機制

性能優化技巧

  1. 對象池化:重用消息對象減少GC
  2. 零拷貝:使用CompositeByteBuf合并小數據包
  3. 事件循環:業務邏輯放入單獨線程池
  4. 批量操作:合并Redis操作減少網絡往返

適用場景建議

推薦使用Netty當:
? 需要真正的實時互動(如直播彈幕)
? 預期有超高并發(萬級同時在線)
? 已經需要維護長連接(如游戲、IM)

保持當前方案當:
? 實時性要求不高
? 開發資源有限
? 客戶端環境復雜(如需要支持老舊瀏覽器)

Netty方案雖然實現復雜度較高,但能為視頻平臺提供更實時、更高效的在線人數統計能力,并為未來擴展實時互動功能奠定基礎。

Netty與WebSocket的關系及在實時統計中的應用

Netty和WebSocket是不同層次的技術,但它們可以緊密結合來構建高性能的實時通信系統。以下是它們的核心關系和在視頻在線人數統計中的應用分析:

1. Netty與WebSocket的基礎關系

維度NettyWebSocket二者關系
定位網絡應用框架通信協議Netty是實現WebSocket協議的底層框架之一
層級傳輸層/應用層框架應用層協議Netty提供了對WebSocket協議的支持
功能處理TCP/UDP連接、編解碼、并發等提供全雙工通信能力Netty幫助高效實現WebSocket的通信能力
典型使用可作為WebSocket服務器的基礎實現運行在Netty等框架之上開發者通過Netty API構建WebSocket服務

2. 技術棧組合原理

[WebSocket客戶端] ←WebSocket協議→ [Netty WebSocket服務端] ←TCP→ [操作系統網絡棧]
  1. 協議支持
    ? Netty內置WebSocketServerProtocolHandler等組件
    ? 自動處理WebSocket握手、幀編解碼等底層細節

  2. 性能優勢
    ? Netty的Reactor線程模型優化WebSocket連接管理
    ? 零拷貝技術提升WebSocket數據傳輸效率

  3. 擴展能力
    ? 在WebSocket之上可添加自定義協議
    ? 方便集成SSL/TLS等安全層

3. 在視頻在線統計中的聯合實現

基于Netty的WebSocket服務端示例

public class VideoWebSocketServer {public void start(int port) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// HTTP編解碼器(用于WebSocket握手)pipeline.addLast(new HttpServerCodec());// 聚合HTTP請求pipeline.addLast(new HttpObjectAggregator(65536));// WebSocket協議處理器pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));// 自定義業務處理器pipeline.addLast(new OnlineStatsHandler());}});ChannelFuture f = b.bind(port).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}

在線統計業務處理器

public class OnlineStatsHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {// 視頻頻道組映射private static Map<String, ChannelGroup> videoGroups = new ConcurrentHashMap<>();@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {// 解析JSON消息:{"action":"heartbeat","videoId":"123"}JsonObject json = parseJson(msg.text());String videoId = json.getString("videoId");ChannelGroup group = videoGroups.computeIfAbsent(videoId, k -> new DefaultChannelGroup(ctx.executor()));switch (json.getString("action")) {case "join":group.add(ctx.channel());broadcastCount(videoId, group.size());break;case "heartbeat":// 更新Redis活躍記錄redis.incr("active:" + videoId + ":" + ctx.channel().id());break;}}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {// 從所有組中移除并更新計數videoGroups.values().forEach(group -> {if (group.remove(ctx.channel())) {broadcastCount(getVideoId(ctx), group.size());}});}
}

4. 與傳統HTTP輪詢方案的對比

特性Netty+WebSocketHTTP輪詢
連接方式1個持久連接頻繁新建連接
頭部開銷握手后無冗余頭每次請求都帶完整HTTP頭
實時性毫秒級依賴輪詢間隔(通常秒級)
服務器壓力連接數×心跳頻率請求數×輪詢頻率
移動網絡適應需處理網絡切換天然適應
實現復雜度較高簡單

5. 典型消息流程

  1. 連接建立

    客戶端 → HTTP Upgrade請求 → Netty(完成WebSocket握手) → 建立持久連接
    
  2. 心跳維持

    // 客戶端每10秒發送
    {"action":"heartbeat","videoId":"123","timestamp":1620000000}// 服務端響應
    {"type":"ack","online":1524}
    
  3. 人數推送

    // 服務端主動推送
    {"type":"stats","videoId":"123","online":1525,"change":1}
    

6. 性能優化關鍵點

  1. 連接管理
    ? 使用ChannelGroup管理視頻房間的訂閱者
    ? 配置合理的IdleStateHandler檢測死連接

  2. 序列化優化

    // 使用二進制協議代替JSON
    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
    pipeline.addLast(new ProtobufEncoder());
    
  3. 集群擴展

    // 使用Redis Pub/Sub同步各節點狀態
    redis.subscribe("video:123", (channel, message) -> {broadcastToLocalClients(message);
    });
    
  4. 監控指標
    ? 跟蹤每個視頻頻道的連接數
    ? 監控消息吞吐量和延遲

Netty與WebSocket的結合為實時統計提供了高并發、低延遲的解決方案,特別適合需要精確到毫秒級的在線人數統計場景,同時為未來擴展實時彈幕、即時消息等功能奠定了基礎。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/75819.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/75819.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/75819.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux 高級命令與常見操作:文本處理、系統管理與網絡調試

下面是一份針對已經熟悉 Linux 基礎命令的用戶所整理的「高級命令與常見操作」筆記&#xff0c;涵蓋文本處理、系統管理、網絡調試與其他常用的進階技巧。請你審核下面筆記&#xff0c;檢查是否有過時的內容&#xff0c;如有請進行替換&#xff0c;確保其符合現代化需求&#x…

使用MFC ActiveX開發KingScada控件(OCX)

最近有個需求&#xff0c;要在KingScada上面開發一個控件。 原來是用的WinCC&#xff0c;WinCC本身是支持調用.net控件&#xff0c;就是winform控件的&#xff0c;winform控件開發簡單&#xff0c;相對功能也更豐富。奈何WinCC不是國產的。 話說KingScada&#xff0c;國產組態軟…

QScrollArea 內部滾動條 QSS 樣式失效問題及解決方案

在使用 Qt 進行 UI 開發時,我們經常希望通過 QSS(Qt Style Sheets)自定義控件的外觀,比如為 QScrollArea 的內部滾動條設置特定的樣式。然而,有開發者遇到了這樣的問題:在 UI 設計器中預覽 QSS 顯示效果正常,但程序運行時卻顯示為系統默認樣式。經過反復測試和調試,最終…

使用OpenSceneGraph生成3D數據格式文件

OpenSceneGraph (OSG) 提供了多種方式來生成和導出3D數據格式文件。以下是詳細的生成方法和示例代碼&#xff1a; 一、基本文件生成方法 1. 使用osgDB::writeNodeFile函數 這是最直接的生成方式&#xff0c;支持多種格式&#xff1a; #include <osgDB/WriteFile>osg:…

JMeter接口性能測試從入門到精通

前言&#xff1a; 本文主要介紹了如何利用jmter進行接口的性能測試 1.在測試計劃中添加線程組 1.1.線程組界面中元素含義 如果點擊循環次數為永遠&#xff1a; 2.添加HTTP取樣器 2.1.填寫登錄接口的各個參數 2.2.在線程組下面增加查看結果樹 請求成功的情況&#xff1a; 請求…

C++抽卡模擬器

近日在學校無聊&#xff0c;寫了個抽卡模擬器供大家娛樂。 代碼實現以下功能&#xff1a;抽卡界面&#xff0c;抽卡判定、動畫播放、存檔。 1.抽卡界面及判定 技術有限&#xff0c;不可能做的和原神一樣精致。代碼如下&#xff08;注&#xff1a;這不是完整代碼&#xff0c;…

詳解相機的內參和外參,以及內外參的標定方法

1 四個坐標系 要想深入搞清楚相機的內參和外參含義&#xff0c; 首先得清楚以下4個坐標系的定義&#xff1a; 世界坐標系&#xff1a; 名字看著很唬人&#xff0c; 其實沒什么大不了的&#xff0c; 這個就是你自己定義的某一個坐標系。 比如&#xff0c; 你把房間的某一個點定…

學透Spring Boot — 011. 一篇文章學會Spring Test

系列文章目錄 這是學透Spring Boot的第11篇文章。更多系列文章請關注 CSDN postnull 用戶的專欄 文章目錄 系列文章目錄Spring Test的依賴Spring Test的核心功能SpringBootTest 加載Spring上下文依賴注入有問題時Spring配置有問題時 WebMvcTest 測試Web層&#xff08;Controll…

Mysql 數據庫編程技術01

一、數據庫基礎 1.1 認識數據庫 為什么學習數據庫 瞬時數據&#xff1a;比如內存中的數據&#xff0c;是不能永久保存的。持久化數據&#xff1a;比如持久化至數據庫中或者文檔中&#xff0c;能夠長久保存。 數據庫是“按照數據結構來組織、存儲和管理數據的倉庫”。是一個長…

新一代AI架構實踐:數字大腦AI+智能調度MCP+領域執行APP的黃金金字塔體系

新一代AI架構實踐&#xff1a;數字大腦智能調度領域執行的黃金金字塔體系 一、架構本質的三層穿透性認知 1.1 核心范式轉變&#xff08;CPS理論升級&#xff09; 傳統算法架構&#xff1a;數據驅動 → 特征工程 → 模型訓練 → 業務應用 新一代AI架構&#xff1a;物理規律建…

macOS可視化桌面配置docker加速器

macOS可視化桌面配置docker加速器 在鏡像settings->docker Engine改為國內鏡像修改為國內鏡像重啟docker(可視化界面啟動或者使用命令行)使用命令重啟可視化界面重啟 在鏡像settings->docker Engine改為國內鏡像 修改為國內鏡像 {"registry-mirrors": ["…

Nginx 基礎使用(2025)

一、Nginx目錄結構 [rootlocalhost ~]# tree /usr/local/nginx /usr/local/nginx ├── client_body_temp # POST 大文件暫存目錄 ├── conf # Nginx所有配置文件的目錄 │ ├── fastcgi.conf # fastcgi相…

用spring-webmvc包實現AI(Deepseek)事件流(SSE)推送

前后端&#xff1a; Spring Boot Angular spring-webmvc-5.2.2包 代碼片段如下&#xff1a; 控制層&#xff1a; GetMapping(value "/realtime/page/ai/sse", produces MediaType.TEXT_EVENT_STREAM_VALUE)ApiOperation(value "獲取告警記錄進行AI分析…

基于Python的招聘推薦數據可視化分析系統

【Python】基于Python的招聘推薦數據可視化分析系統&#xff08;完整系統源碼開發筆記詳細部署教程&#xff09;? 目錄 一、項目簡介二、項目界面展示三、項目視頻展示 一、項目簡介 &#x1f680;&#x1f31f; 基于Python的招聘推薦數據可視化分析系統&#xff01;&#x1…

使用注解開發springMVC

引言 在學習過第一個springMVC項目建造過后&#xff0c;讓我們直接進入真實開發中所必需的注解開發&#xff0c; 是何等的簡潔高效&#xff01;&#xff01; 注&#xff1a;由于Maven可能存在資源過濾的問題&#xff0c;在maven依賴中加入 <build><resources>&l…

linux專題3-----禁止SSH的密碼登錄

要在linux系統中禁止密碼登錄&#xff0c;您可以通過修改 SSH 配置來實現。請按照以下步驟操作(此處以 Ubuntu為例)&#xff1a; 1、SSH 登錄到您的服務器&#xff08;或直接在命令行模式下&#xff09;。 2、備份 SSH 配置文件&#xff1a; 在終端中運行以下命令以備份現有的…

基于LangChain和通義(Tongyi)實現NL2SQL的智能檢索(無需訓練)

在數據驅動的時代,如何高效地從數據庫中獲取信息成為了一個重要的挑戰。自然語言到SQL(NL2SQL)技術提供了一種便捷的解決方案,使用戶能夠用自然語言查詢數據庫,而無需深入了解SQL語法。本文將探討如何利用LangChain和通義(Tongyi)實現NL2SQL的智能檢索,具體步驟如下: …

深度學習處理文本(10)

保存自定義層 在編寫自定義層時&#xff0c;一定要實現get_config()方法&#xff1a;這樣我們可以利用config字典將該層重新實例化&#xff0c;這對保存和加載模型很有用。該方法返回一個Python字典&#xff0c;其中包含用于創建該層的構造函數的參數值。所有Keras層都可以被序…

機器視覺3D中激光偏鏡的優點

機器視覺的3D應用中,激光偏鏡(如偏振片、波片、偏振分束器等)通過其獨特的偏振控制能力,顯著提升了系統的測量精度、抗干擾能力和適應性。以下是其核心優點: 1. 提升3D成像精度 抑制環境光干擾:偏振片可濾除非偏振的環境雜光(如日光、室內照明),僅保留激光偏振信號,大…

線程同步的學習與應用

1.多線程并發 1).多線程并發引例 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <pthread.h>int wg0; void *fun(void *arg) {for(int i0;i<1000;i){wg;printf("wg%d\n",wg);} } i…