最近想學學WebSocket做一個實時通訊的練手項目
主要用到的技術棧是WebSocket Netty Vue Pinia MySQL SpringBoot,實現一個持久化數據,單一群聊,支持多用戶的聊天界面
下面是實現的過程
后端
SpringBoot啟動的時候會占用一個端口,而Netty也會占用一個端口,這兩個端口不能重復,并且因為Netty啟動后會阻塞當前線程,因此需要另開一個線程防止阻塞住SpringBoot
1. 編寫Netty服務器
個人認為,Netty最關鍵的就是channel,可以代表一個客戶端
我在這使用的是@PostConstruct注解,在Bean初始化后調用里面的方法,新開一個線程運行Netty,因為希望Netty受Spring管理,所以加上了spring的注解,也可以直接在啟動類里注入Netty然后手動啟動
@Service
public class NettyService {private EventLoopGroup bossGroup = new NioEventLoopGroup(1);private EventLoopGroup workGroup = new NioEventLoopGroup();@Autowiredprivate WebSocketHandler webSocketHandler;@Autowiredprivate HeartBeatHandler heartBeatHandler;@PostConstructpublic void initNetty() throws BaseException {new Thread(()->{try {start();} catch (Exception e) {throw new RuntimeException(e);}}).start();}@PreDestroypublic void destroy() throws BaseException {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}@Asyncpublic void start() throws BaseException {try {ChannelFuture channelFuture = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline()
// http解碼編碼器.addLast(new HttpServerCodec())
// 處理完整的 HTTP 消息.addLast(new HttpObjectAggregator(64 * 1024))
// 心跳檢測時長.addLast(new IdleStateHandler(300, 0, 0, TimeUnit.SECONDS))
// 心跳檢測處理器.addLast(heartBeatHandler)
// 支持ws協議(自定義).addLast(new WebSocketServerProtocolHandler("/ws",null,true,64*1024,true,true,10000))
// ws請求處理器(自定義).addLast(webSocketHandler);}}).bind(8081).sync();System.out.println("Netty啟動成功");ChannelFuture future = channelFuture.channel().closeFuture().sync();}catch (InterruptedException e){throw new InterruptedException ();}finally {
//優雅關閉bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}
}
服務器類只是指明一些基本信息,包含處理器類,支持的協議等等,具體的處理邏輯需要再自定義類來實現
2. 心跳檢測處理器
心跳檢測是指 服務器無法主動確定客戶端的狀態(用戶可能關閉了網頁,但是服務端沒辦法知道),為了確定客戶端是否在線,需要客戶端定時發送一條消息,消息內容不重要,重要的是發送消息代表該客戶端仍然在線,當客戶端長時間沒有發送數據時,代表客戶端已經下線
package org.example.payroll_management.websocket.netty.handler;@Component
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelDuplexHandler {@Autowiredprivate ChannelContext channelContext;private static final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){// 心跳檢測超時IdleStateEvent e = (IdleStateEvent) evt;logger.info("心跳檢測超時");if (e.state() == IdleState.READER_IDLE){Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));Integer userId = attr.get();// 讀超時,當前已經下線,主動斷開連接ChannelContext.removeChannel(userId);ctx.close();} else if (e.state() == IdleState.WRITER_IDLE){ctx.writeAndFlush("心跳檢測");}}super.userEventTriggered(ctx, evt);}
}
3. webSocket處理器
當客戶端發送消息,消息的內容會發送當webSocket處理器中,可以對對應的方法進行處理,我這里偷懶了,就做了一個群組,全部用戶只能在同一群中聊天,不過創建多個群組,或單對單聊天也不復雜,只需要將群組的ID進行保存就可以
這里就產生第一個問題了,就是SpringMVC的攔截器不會攔截其他端口的請求,解決方法是將token放置到請求參數中,在userEventTriggered方法中重新進行一次token檢驗
第二個問題,我是在攔截器中通過ThreadLocal保存用戶ID,不走攔截器在其他地方拿不到用戶ID,解決方法是,在userEventTriggered方法中重新保存,或者channel中可以保存附件(自身攜帶的數據),直接將id保存到附件中
第三個問題,消息的持久化,當用戶重新打開界面時,肯定希望消息仍然存在,鑒于webSocket的實時性,數據持久化肯定不能在同一個線程中完成,我在這使用BlockingQueue+線程池完成對消息的異步保存,或者也可以用mq實現
不過用的Executors.newSingleThreadExecutor();可能會產生OOM的問題,后面可以自定義一個線程池,當任務滿了之后,指定拒絕策略為拋出異常,再通過全局異常捕捉拿到對應的數據保存到數據庫中,不過俺這種小項目應該不會產生這種問題
第四個問題,消息內容,這個需要前后端統一一下,確定一下傳輸格式就OK了,然后從JSON中取出數據處理
最后就是在線用戶統計,這個沒什么好說的,里面有對應的方法,當退出時,直接把channel踢出去就可以了
package org.example.payroll_management.websocket.netty.handler;@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Autowiredprivate ChannelContext channelContext;@Autowiredprivate MessageMapper messageMapper;@Autowiredprivate UserService userService;private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);private static final BlockingQueue<WebSocketMessageDto> blockingQueue = new ArrayBlockingQueue(1024 * 1024);private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();// 提交線程@PostConstructprivate void init(){EXECUTOR_SERVICE.submit(new MessageHandler());}private class MessageHandler implements Runnable{// 異步保存@Overridepublic void run() {while(true){WebSocketMessageDto message = null;try {message = blockingQueue.take();logger.info("消息持久化");} catch (InterruptedException e) {throw new RuntimeException(e);}Integer success = messageMapper.saveMessage(message);if (success < 1){try {throw new BaseException("保存信息失敗");} catch (BaseException e) {throw new RuntimeException(e);}}}}}// 當讀事件發生時(有客戶端發送消息)@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {Channel channel = channelHandlerContext.channel();// 收到的消息String text = textWebSocketFrame.text();Attribute<Integer> attr = channelHandlerContext.channel().attr(AttributeKey.valueOf(channelHandlerContext.channel().id().toString()));Integer userId = attr.get();logger.info("接收到用戶ID為 {} 的消息: {}",userId,text);// TODO 將text轉成JSON,提取里面的數據WebSocketMessageDto webSocketMessage = JSONUtil.toBean(text, WebSocketMessageDto.class);if (webSocketMessage.getType().equals("心跳檢測")){logger.info("{}發送心跳檢測",userId);}else if (webSocketMessage.getType().equals("群發")){ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);WebSocketMessageDto messageDto = JSONUtil.toBean(text, WebSocketMessageDto.class);WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();webSocketMessageDto.setType("群發");webSocketMessageDto.setText(messageDto.getText());webSocketMessageDto.setReceiver("all");webSocketMessageDto.setSender(String.valueOf(userId));webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));blockingQueue.add(webSocketMessageDto);channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonPrettyStr(webSocketMessageDto)));}else{channel.writeAndFlush("請發送正確的格式");}}// 建立連接后觸發(有客戶端建立連接請求)@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("建立連接");super.channelActive(ctx);}// 連接斷開后觸發(有客戶端關閉連接請求)@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Attribute<Integer> attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));Integer userId = attr.get();logger.info("用戶ID:{} 斷開連接",userId);ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);channelGroup.remove(ctx.channel());ChannelContext.removeChannel(userId);WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();webSocketMessageDto.setType("用戶變更");List<OnLineUserVo> onlineUser = userService.getOnlineUser();webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));webSocketMessageDto.setReceiver("all");webSocketMessageDto.setSender("0");webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));super.channelInactive(ctx);}// 建立連接后觸發(客戶端完成連接)@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete){WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String uri = handshakeComplete.requestUri();logger.info("uri: {}",uri);String token = getToken(uri);if (token == null){logger.warn("Token校驗失敗");ctx.close();throw new BaseException("Token校驗失敗");}logger.info("token: {}",token);Integer userId = null;try{Claims claims = JwtUtil.extractClaims(token);userId = Integer.valueOf((String) claims.get("userId"));}catch (Exception e){logger.warn("Token校驗失敗");ctx.close();throw new BaseException("Token校驗失敗");}// 向channel中的附件中添加用戶IDchannelContext.addContext(userId,ctx.channel());ChannelContext.setChannel(userId,ctx.channel());ChannelContext.setChannelGroup(null,ctx.channel());ChannelGroup channelGroup = ChannelContext.getChannelGroup(null);WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto();webSocketMessageDto.setType("用戶變更");List<OnLineUserVo> onlineUser = userService.getOnlineUser();webSocketMessageDto.setText(JSONUtil.toJsonStr(onlineUser));webSocketMessageDto.setReceiver("all");webSocketMessageDto.setSender("0");webSocketMessageDto.setSendDate(TimeUtil.timeFormat("yyyy-MM-dd"));channelGroup.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(webSocketMessageDto)));}super.userEventTriggered(ctx, evt);}private String getToken(String uri){if (uri.isEmpty()){return null;}if(!uri.contains("token")){return null;}String[] split = uri.split("\\?");if (split.length!=2){return null;}String[] split1 = split[1].split("=");if (split1.length!=2){return null;}return split1[1];}
}
4. 工具類
主要用來保存用戶信息的
不要問我為什么又有static又有普通方法,問就是懶得改,這里我直接保存的同一個群組,如果需要多群組的話,就需要建立SQL數據了
package org.example.payroll_management.websocket;@Component
public class ChannelContext {private static final Map<Integer, Channel> USER_CHANNEL_MAP = new ConcurrentHashMap<>();private static final Map<Integer, ChannelGroup> USER_CHANNELGROUP_MAP = new ConcurrentHashMap<>();private static final Integer GROUP_ID = 10086;private static final Logger logger = LoggerFactory.getLogger(ChannelContext.class);public void addContext(Integer userId,Channel channel){String channelId = channel.id().toString();AttributeKey attributeKey = null;if (AttributeKey.exists(channelId)){attributeKey = AttributeKey.valueOf(channelId);} else{attributeKey = AttributeKey.newInstance(channelId);}channel.attr(attributeKey).set(userId);}public static List<Integer> getAllUserId(){return new ArrayList<>(USER_CHANNEL_MAP.keySet());}public static void setChannel(Integer userId,Channel channel){USER_CHANNEL_MAP.put(userId,channel);}public static Channel getChannel(Integer userId){return USER_CHANNEL_MAP.get(userId);}public static void removeChannel(Integer userId){USER_CHANNEL_MAP.remove(userId);}public static void setChannelGroup(Integer groupId,Channel channel){if(groupId == null){groupId = GROUP_ID;}ChannelGroup channelGroup = USER_CHANNELGROUP_MAP.get(groupId);if (channelGroup == null){channelGroup =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);USER_CHANNELGROUP_MAP.put(GROUP_ID, channelGroup);}if (channel == null){return ;}channelGroup.add(channel);logger.info("向group中添加channel,ChannelGroup已有Channel數量:{}",channelGroup.size());}public static ChannelGroup getChannelGroup(Integer groupId){if (groupId == null){groupId = GROUP_ID;}return USER_CHANNELGROUP_MAP.get(groupId);}public static void removeChannelGroup(Integer groupId){if (groupId == null){groupId = GROUP_ID;}USER_CHANNELGROUP_MAP.remove(groupId);}
}
寫到這里,Netty服務就搭建完成了,后面就可以等著前端的請求建立了
前端
前端我使用的vue,因為我希望當用戶登錄后自動建立ws連接,所以我在登錄成功后添加上了ws建立請求,然后我發現,如果用戶關閉網頁后重新打開,因為跳過了登錄界面,ws請求不會自動建立,所以需要一套全局的ws請求
不過我前端不是很好(其實后端也一般),所以很多地方肯定有更優的寫法
1. pinia
使用pinia保存ws請求,方便在其他組件中調用
定義WebSocket實例(ws)和一個請求建立判斷(wsConnect)
后面就可以通過ws接收服務的消息
import { defineStore } from 'pinia'export const useWebSocketStore = defineStore('webSocket', {state() {return {ws: null,wsConnect: false,}},actions: {wsInit() {if (this.ws === null) {const token = localStorage.getItem("token")if (token === null) return;this.ws = new WebSocket(`ws://localhost:8081/ws?token=${token}`)this.ws.onopen = () => {this.wsConnect = true;console.log("ws協議建立成功")// 發送心跳const intervalId = setInterval(() => {if (!this.wsConnect) {clearInterval(intervalId)}const webSocketMessageDto = {type: "心跳檢測"}this.sendMessage(JSON.stringify(webSocketMessageDto));}, 1000 * 3 * 60);}this.ws.onclose = () => {this.ws = null;this.wsConnect = false;}}},sendMessage(message) {if (message == null || message == '') {return;}if (!this.wsConnect) {console.log("ws協議沒有建立")this.wsInit();}this.ws.send(message);},wsClose() {if (this.wsConnect) {this.ws.close();this.wsConnect = false;}}}
})
然后再app.vue中循環建立連接(建立請求重試)
const wsConnect = function () {const token = localStorage.getItem("token")if (token === null) {return;}try {if (!webSocket.wsConnect) {console.log("嘗試建立ws請求")webSocket.wsInit();} else {return;}} catch {wsConnect();}}
2. 聊天組件
界面相信大伙都會畫,主要說一下我遇到的問題
第一個 上拉刷新,也就是加載歷史記錄的功能,我用的element-plus UI,也不知道是不是我的問題,UI里面的無限滾動不是重復發送請求就是無限發送請求,而且好像沒有上拉加載的功能。于是我用了IntersectionObserver來解決,在頁面底部加上一個div,當觀察到這個div時,觸發請求
第二個 滾動條到達頂部時,請求數據并放置數據,滾動條會自動滾動到頂部,并且由于觀察的元素始終在頂端導致無限請求,這個其實也不是什么大問題,因為聊天的消息是有限的,沒有數據之后我設置了停止觀察,主要是用戶體驗不是很好。這是我是添加了display: flex;?flex-direction: column-reverse;解決這個問題的(flex很神奇吧)。大致原理好像是垂直翻轉了(例如上面我將觀察元素放到div第一個子元素位置,添加flex后觀察元素會到最后一個子元素位置上),也就是說當滾動條在最底部時,添加數據后,滾動條會自動滾動到最底部,不過這樣體驗感非常的不錯
不要問我為什么數據要加 || 問就是數據懶得統一了
<style lang="scss" scoped>.chatBox {border-radius: 20px;box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;width: 1200px;height: 600px;background-color: white;display: flex;.chat {width: 1000px;height: inherit;.chatBackground {height: 500px;overflow: auto;display: flex;flex-direction: column-reverse;.loading {text-align: center;font-size: 12px;margin-top: 20px;color: gray;}.chatItem {width: 100%;padding-bottom: 20px;.avatar {margin-left: 20px;display: flex;align-items: center;.username {margin-left: 10px;color: rgb(153, 153, 153);font-size: 13px;}}.chatItemMessage {margin-left: 60px;padding: 10px;font-size: 14px;width: 200px;word-break: break-all;max-width: 400px;line-height: 25px;width: fit-content;border-radius: 10px;height: auto;/* background-color: skyblue; */box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;}.sendDate {font-size: 12px;margin-top: 10px;margin-left: 60px;color: rgb(187, 187, 187);}}}.chatBottom {height: 100px;background-color: #F3F3F3;border-radius: 20px;display: flex;box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;.messageInput {border-radius: 20px;width: 400px;height: 40px;}}}.userList {width: 200px;height: inherit;border-radius: 20px;box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;.user {width: inherit;height: 50px;line-height: 50px;text-indent: 2em;border-radius: 20px;transition: all 0.5s ease;}}}.user:hover {box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;transform: translateX(-5px) translateY(-5px);}
</style><template>{{hasMessage}}<div class="chatBox"><div class="chat"><div class="chatBackground" ref="chatBackgroundRef"><div class="chatItem" v-for="i in messageList"><div class="avatar"><el-avatar :size="40" :src="imageUrl" /><div class="username">{{i.username || i.userId}}</div></div><div class="chatItemMessage">{{i.text || i.content}}</div><div class="sendDate">{{i.date || i.sendDate}}</div></div><div class="loading" ref="loading">顯示更多內容</div></div><div class="chatBottom"><el-input class="messageInput" v-model="message" placeholder="消息內容"></el-input><el-button @click="sendMessage">發送消息</el-button></div></div><!-- 做成無限滾動 --><div class="userList"><div v-for="user in userList"><div class="user">{{user.userName}}</div></div></div></div>
</template><script setup>import { ref, onMounted, nextTick } from 'vue'import request from '@/utils/request.js'import { useWebSocketStore } from '@/stores/useWebSocketStore'import imageUrl from '@/assets/默認頭像.jpg'const webSocketStore = useWebSocketStore();const chatBackgroundRef = ref(null)const userList = ref([])const message = ref('')const messageList = ref([])const loading = ref(null)const page = ref(1);const size = 10;const hasMessage = ref(true);const observer = new IntersectionObserver((entries, observer) => {entries.forEach(async entry => {if (entry.isIntersecting) {observer.unobserve(entry.target)await pageQueryMessage();}})})onMounted(() => {observer.observe(loading.value)getOnlineUserList();if (!webSocketStore.wsConnect) {webSocketStore.wsInit();}const ws = webSocketStore.ws;ws.onmessage = async (e) => {// console.log(e);const webSocketMessage = JSON.parse(e.data);const messageObj = {username: webSocketMessage.sender,text: webSocketMessage.text,date: webSocketMessage.sendDate,type: webSocketMessage.type}console.log("###")// console.log(JSON.parse(messageObj.text))if (messageObj.type === "群發") {messageList.value.unshift(messageObj)} else if (messageObj.type === "用戶變更") {userList.value = JSON.parse(messageObj.text)}await nextTick();// 當發送新消息時,自動滾動到頁面最底部,可以替換成消息提示的樣式// chatBackgroundRef.value.scrollTop = chatBackgroundRef.value.scrollHeight;console.log(webSocketMessage)}})const pageQueryMessage = function () {request({url: '/api/message/pageQueryMessage',method: 'post',data: {page: page.value,size: size}}).then((res) => {console.log(res)if (res.data.data.length === 0) {hasMessage.value = false;}else {observer.observe(loading.value)page.value = page.value + 1;messageList.value.push(...res.data.data)}})}function getOnlineUserList() {request({url: '/api/user/getOnlineUser',method: 'get'}).then((res) => {console.log(res)userList.value = res.data.data;})}const sendMessage = function () {if (!webSocketStore.wsConnect) {webSocketStore.wsInit();}const webSocketMessageDto = {type: "群發",text: message.value}webSocketStore.sendMessage(JSON.stringify(webSocketMessageDto));}</script>
這樣就實現了一個簡易的聊天數據持久化,支持在線聊天的界面,總的來說WebSocket用起來還是十分方便的
后面我看看能不能做下上傳圖片,上傳文件之類的功能