Java通過Netty,實現Websocket消息推送只需要簡單幾步

前言

曾幾何時,不知道大家有沒有在項目里遇到過需要服務端給客戶端推送消息的需求,是否曾經苦惱過、糾結過,我們知道要想實現這樣的需求肯定離不開websocket長連接方式,那么到底是該選原生的websocket還是更加高級的netty框架呢?

在此我極力推薦netty,因為一款好的框架一般都是在原生的基礎上進行包裝成更好、更方便、更實用的東西,很多我們需要自己考慮的問題都基本可以不用去考慮,不過此文不會去講netty有多么的高深莫測,因為這些概念性的東西隨處可見,而是通過實戰來達到推送消息的目的。

實戰

一、邏輯架構圖

從圖中可以看出本次實戰的基本流程是客戶端A請求服務端核心模塊,核心模塊生產一條消息到消息隊列,然后服務端消息模塊消費消息,消費完之后就將消息推送給客戶端B,流程很簡單,沒有太多技巧,唯一的巧妙之處就在消息模塊這邊的處理上,本文的重點也主要講解消息模塊這一塊,主要包括netty server、netty client、channel的存儲等等。

二、代碼

1、添加依賴

<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.6.Finalversion>
dependency>

2、NettyServer類

@Service
public?class?NettyServer?{public?void?run(int?port){new?Thread(){public?void?run(){runServer(port);}}.start();}private?void?runServer(int?port){Print.info("===============Message服務端啟動===============");EventLoopGroup bossGroup =?new?NioEventLoopGroup();EventLoopGroup workerGroup =?new?NioEventLoopGroup();try?{ServerBootstrap b =?new?ServerBootstrap();b.group(bossGroup, workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(new?ChannelInitializer() {protected?void?initChannel(SocketChannel ch)?throws?Exception?{ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("codec-http",?new?HttpServerCodec());pipeline.addLast("aggregator",?new?HttpObjectAggregator(65536));pipeline.addLast("handler",?new?MyWebSocketServerHandler());}});Channel ch = b.bind(port).sync().channel();Print.info("Message服務器啟動成功:"?+ ch.toString());ch.closeFuture().sync();}?catch?(Exception e){Print.error("Message服務運行異常:"?+ e.getMessage());e.printStackTrace();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();Print.info("Message服務已關閉");}}
}

3、MyWebSocketServerHandler類

public?class?MyWebSocketServerHandler?extends?SimpleChannelInboundHandler<Object>{private?static?final?String WEBSOCKET_PATH =?"";private?WebSocketServerHandshaker handshaker;@Override????protected?void?channelRead0(ChannelHandlerContext ctx, Object msg)?throws?Exception?{if?(msg?instanceof?FullHttpRequest){//以http請求形式接入,但是走的是websockethandleHttpRequest(ctx, (FullHttpRequest) msg);}else?if?(msg?instanceof??WebSocketFrame){//處理websocket客戶端的消息handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Override????public?void?channelReadComplete(ChannelHandlerContext ctx)?throws?Exception?{ctx.flush();}private?void?handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)?throws?Exception?{//要求Upgrade為websocket,過濾掉get/Postif?(!req.decoderResult().isSuccess()|| (!"websocket".equals(req.headers().get("Upgrade")))) {//若不是websocket方式,則創建BAD_REQUEST的req,返回給客戶端sendHttpResponse(ctx, req,?new?DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory =?new?WebSocketServerHandshakerFactory("ws://localhost:9502/websocket",?null,?false);handshaker = wsFactory.newHandshaker(req);if?(handshaker ==?null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }?else?{handshaker.handshake(ctx.channel(), req); }}private?void?handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame)?{// Check for closing frame if (frame instanceof CloseWebSocketFrame) {handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());?return; }if?(frame?instanceof?PingWebSocketFrame) {ctx.channel().write(new?PongWebSocketFrame(frame.content().retain()));?return; }if?(!(frame?instanceof?TextWebSocketFrame)) {Print.error("數據幀類型不支持!");?throw?new?UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); }// Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); Print.info("Netty服務器接收到的信息: " + request); if (request.equals(Const.HEARTBEAT)){ctx.channel().write(new?TextWebSocketFrame(request));?return; }JSONObject jsonData = JSONObject.parseObject(request); String eventType = jsonData.getString("event_type"); String apiToken = jsonData.getString("api_token");?if?(Const.FRONT.equals(eventType)){Print.info("front event"); ChannelSupervise.updateChannel(apiToken, ctx.channel()); }else?if?(Const.BEHIND.equals(eventType)){Print.info("behind event"); Channel chan = ChannelSupervise.findChannel(apiToken);?if?(null?== chan){Print.error("目標用戶不存在"); }else?{JSONObject jsonMsg =?new?JSONObject(); jsonMsg.put("type", jsonData.get("type")); jsonMsg.put("child_type", jsonData.get("child_type")); jsonMsg.put("title", jsonData.get("title")); jsonMsg.put("body", jsonData.get("body")); ChannelSupervise.sendToSimple(apiToken,?new?TextWebSocketFrame(jsonMsg.toString())); Print.info("向目標用戶發送成功"); }}else{Print.error("event type error"); }}private?static?void?sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res)?{// 返回應答給客戶端 if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); }ChannelFuture f = ctx.channel().writeAndFlush(res);?// 如果是非Keep-Alive,關閉連接 if (!isKeepAlive(req) || res.status().code() != 200) {f.addListener(ChannelFutureListener.CLOSE); }}@Override????public?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause)?throws?Exception?{cause.printStackTrace(); ctx.close(); }private?static?String?getWebSocketLocation(FullHttpRequest req)?{return?"ws://"?+ req.headers().get(HOST) + WEBSOCKET_PATH; }/** * 接收客戶端連接事件 */????@Override????public?void?channelActive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("客戶端與服務端連接開啟:"?+ ctx.channel()); ChannelSupervise.addChannel(null, ctx.channel()); }/** * 接收客戶端關閉事件 */????@Override????public?void?channelInactive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("客戶端與服務端連接關閉:"?+ ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); }}

4、ChannelSupervise類

public?class?ChannelSupervise?{private???static?ChannelGroup GlobalGroup =?new?DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private??static?ConcurrentMapChannelMap =?new?ConcurrentHashMap();public??static?void?addChannel(String apiToken, Channel channel){GlobalGroup.add(channel);if?(null?!= apiToken) {ChannelMap.put(apiToken, channel.id());}}public?static?void?updateChannel(String apiToken, Channel channel){Channel chan = GlobalGroup.find(channel.id());if?(null?== chan){addChannel(apiToken, channel);}else?{ChannelMap.put(apiToken, channel.id());}}public?static?void?removeChannel(Channel channel){GlobalGroup.remove(channel);Collectionvalues = ChannelMap.values();values.remove(channel.id());}public?static?Channel?findChannel(String apiToken){ChannelId chanId = ChannelMap.get(apiToken);if?(null?== chanId){return?null;}return?GlobalGroup.find(ChannelMap.get(apiToken));}public?static?void?sendToAll(TextWebSocketFrame tws){GlobalGroup.writeAndFlush(tws);}public?static?void?sendToSimple(String apiToken, TextWebSocketFrame tws){GlobalGroup.find(ChannelMap.get(apiToken)).writeAndFlush(tws);}
}

5、NettyClient類

@Servicepublic?class?NettyClient?{private?Channel channel;public?void?run(String strUri){new?Thread(){public?void?run(){runClient(strUri);}}.start();private?void?runClient(String strUri)?{EventLoopGroup?group?=?new?NioEventLoopGroup();try?{Bootstrap b =?new?Bootstrap();URI uri =?new?URI(strUri);String protocol = uri.getScheme();if?(!"ws".equals(protocol)) {throw?new?IllegalArgumentException("Unsupported protocol: "?+ protocol);}HttpHeaders customHeaders =?new?DefaultHttpHeaders();customHeaders.add("MyHeader",?"MyValue");// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.// If you change it to V00, ping is not supported and remember to change// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.final MyWebSocketClientHandler handler =new?MyWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13,?null,?false, customHeaders)); b.group(group); b.channel(NioSocketChannel.class); b.handler(new?ChannelInitializer() {@Overpublic?void?initChannel(SocketChannel ch) throws Exception?{ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec",?new?HttpClientCodec()); pipeline.addLast("aggregator",?new?HttpObjectAggregator(8192)); pipeline.addLast("ws-handler", handler); }}); Print.info("===============Message客戶端啟動==============="); channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); channel.closeFuture().sync(); }?catch?(Exception e){Print.error(e.getMessage()); }?finally?{group.shutdownGracefully(); }}

6、MyWebSocketClientHandler類

public?class?MyWebSocketClientHandler?extends?SimpleChannelInboundHandler<Object>?{private?final?WebSocketClientHandshaker handshaker;private?ChannelPromise handshakeFuture;public?MyWebSocketClientHandler(WebSocketClientHandshaker handshaker)?{this.handshaker = handshaker;}public?ChannelFuture?handshakeFuture()?{return?handshakeFuture;}@Overridepublic?void?handlerAdded(ChannelHandlerContext ctx)?throws?Exception?{handshakeFuture = ctx.newPromise();}@Overridepublic?void?channelActive(ChannelHandlerContext ctx)?throws?Exception?{handshaker.handshake(ctx.channel());}@Overridepublic?void?channelInactive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("webSocket client disconnected!");}@Overridepublic?void?channelRead(ChannelHandlerContext ctx, Object msg)?throws?Exception?{Channel ch = ctx.channel();if?(!handshaker.isHandshakeComplete()) {handshaker.finishHandshake(ch, (FullHttpResponse) msg);Print.info("websocket client connected!");handshakeFuture.setSuccess();return;}if?(msg?instanceof?FullHttpResponse) {FullHttpResponse response = (FullHttpResponse) msg;throw?new?Exception("Unexpected FullHttpResponse (getStatus="?+ response.getStatus() +?", content="?+ response.content().toString(CharsetUtil.UTF_8) +?')');}WebSocketFrame frame = (WebSocketFrame) msg;if?(frame?instanceof?TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;Print.info("客戶端收到消息: "?+ textFrame.text());}?else?if?(frame?instanceof?PongWebSocketFrame) {Print.info("websocket client received pong");}?else?if?(frame?instanceof?CloseWebSocketFrame) {Print.info("websocket client received closing");ch.close();}}@Overrideprotected?void?channelRead0(ChannelHandlerContext channelHandlerContext, Object o)?throws?Exception?{}@Overridepublic?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause)?throws?Exception?{cause.printStackTrace();if?(!handshakeFuture.isDone()) {handshakeFuture.setFailure(cause);}ctx.close();}}

7、啟動類

@SpringBootApplication
@Servicepublic
class MessageApplication {@Autowiredprivate NettyServer server;@Autowiredprivate NettyClient client;public?static?void?main(String[] args) {SpringApplication.run(MessageApplication.class, args);}@PostConstructpublic?void?initMessage(){server.run(9502);try?{Thread.sleep(1000);}?catch?(InterruptedException e) {e.printStackTrace();}client.run("ws://localhost:"?+?9502);}

8、客戶端B測試頁面

<html><head><meta?charset="UTF-8"><title>WebSocket Chattitle>head><body><script?type="text/javascript">var?socket;if?(!window.WebSocket) {window.WebSocket =?window.MozWebSocket;}if?(window.WebSocket) {socket =?new?WebSocket("ws://localhost:9502");socket.onmessage =?function(event)?{var?ta =?document.getElementById('responseText');ta.value = ta.value +?'\n'?+ event.data};socket.onopen =?function(event)?{var?ta =?document.getElementById('responseText');ta.value =?"連接開啟!";};socket.onclose =?function(event)?{var?ta =?document.getElementById('responseText');ta.value = ta.value +?"連接被關閉";};}?else?{alert("你的瀏覽器不支持 WebSocket!");}function?send(message)?{if?(!window.WebSocket) {return;}if?(socket.readyState == WebSocket.OPEN) {socket.send(message);}?else?{alert("連接沒有開啟.");}}script><form?onsubmit="return false;"><h3>WebSocket:h3><textarea?id="responseText"?style="width: 500px; height: 300px;">textarea><br><input?type="text"?name="message"??style="width: 300px"?value="1"><input?type="button"?value="發送消息"?onclick="send(this.form.message.value)"><input?type="button"?onclick="javascript:document.getElementById('responseText').value=''"?value="清空聊天記錄">form><br>body>
html>

三、測試

1、先運行啟動類,此時會先啟動netty服務器,然后啟動一個netty客戶端,然后過30s模擬客戶端A進行消息發送

2、打開測試頁面,在底下的輸入框輸入:{"event_type":"front", "api_token":"11111"},表示客戶端B連接上netty服務器

測試結果如下:

消息模塊:

客戶端B:

四、結束語

本文只是拋磚引玉,主要啟發有類似需求的朋友知道怎么去存儲channel,進而怎么給指定客戶推送消息,如果想要進行大型項目的高并發、可靠穩定地使用,還需進一步地改進。

作者:都市心聲

來源:toutiao.com/i6794445371457143307

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/271822.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/271822.shtml
英文地址,請注明出處:http://en.pswp.cn/news/271822.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

53.Maximum Subarray

/** 53.Maximum Subarray * 2016-5-7 by Mingyang * 如果我們從頭遍歷這個數組。對于數組中的其中一個元素&#xff0c;它只有兩個選擇&#xff1a; 1.* 要么加入之前的數組加和之中&#xff08;跟別人一組&#xff09; * 2. 要么自己單立一個數組&#xff08;自己單開一組&…

java 創建者設計模式_Java設計模式之創建者模式分享熱愛編程,程序人生

PS:今天的23中設計模式中的創建者方式&#xff0c;至此告一段落。我今天帶來的技術分享為創建者模式以及原型模式。當然在Java中這兩種方式很常見&#xff0c;只不過我們寫的次數確實有點低而已&#xff0c;但是這不是我不學它的借口&#xff01;&#xff01;&#xff01;創建者…

一文讀懂電感器的原理、結構、作用及分類

電感器是能夠把電能轉化為磁能而存儲起來的元件。電感器的結構類似于變壓器&#xff0c;但只有一個繞組。電感器具有一定的電感&#xff0c;它只阻礙電流的變化。 如果電感器在沒有電流通過的狀態下&#xff0c;電路接通時它將試圖阻礙電流流過它&#xff1b;如果電感器在有電流…

final關鍵字與static對比

final關鍵字與static對比 static關鍵字修飾變量時&#xff0c;會使該變量在類加載時就會被初始化&#xff0c;不會因為對象的創建再次被加載&#xff0c;當變量被static 修飾時就代表該變量只會被初始化一次 例如圖中所示&#xff0c;被static修飾的變量j&#xff0c;雖然創建…

juce中的BailOutChecker

界面庫中值得注意的一點就是對象響應事件的時候自身被刪除了&#xff0c;那么后續的訪問自然就會出問題&#xff0c;所以需要在響應事件之后先添加引用&#xff0c;相關處理之后再查看自身是否已經被刪除&#xff0c;如果已經被刪除那么就直接退出。juce中通過BailOutChecker來…

java quartz 跳過_Java Quartz計劃作業-禁止同時執行作業

我正在使用Quartz Job執行特定任務。我也在我的Main應用程序類中安排它的執行&#xff0c;而我試圖完成的工作是不允許同時執行此作業的實例。因此&#xff0c;調度程序僅應在其先前實例完成后才執行作業。這是我的工作班級&#xff1a;public class MainJob implements Job {s…

mac USB串口工具配置

安裝USB serial 驅動 我的usb serial芯片是 pl2303, 先到官網上下載對應驅動&#xff0c;并安裝。安裝完成之后會要求重啟。 http://www.prolific.com.tw/admin/Technology/GetFile.ashx?fileID238 安裝 minicom https://alioth.debian.org/projects/minicom/ 下載源碼&…

macpro生成公鑰并查看公鑰

打開macpro的終端輸入以下命令&#xff1a; $ cd ~/.ssh $ ls 此時發現沒有那個id_rsa.pub文件&#xff0c;沒有&#xff0c;就需要創建公鑰 用ssh-keygen創建公鑰 此時已經有了

java join 源碼_join on 和where 一起使用的細節

left join :左連接&#xff0c;返回左表中所有的記錄以及右表中連接字段相等的記錄。right join :右連接&#xff0c;返回右表中所有的記錄以及左表中連接字段相等的記錄。inner join: 內連接&#xff0c;又叫等值連接&#xff0c;只返回兩個表中連接字段相等的行。full join:外…

SSIS 學習之旅 FTP訪問類

這章把腳本任務訪問FTP的方法 全部給大家。 控件的使用大家如果有不懂得可以看下我之前的文章。第一章&#xff1a;SSIS 學習之旅 第一個SSIS 示例&#xff08;一&#xff09;&#xff08;上&#xff09; 第二章&#xff1a;SSIS 學習之旅 第一個SSIS 示例&#xff08;二&#…

Spring Cloud Feign 使用Apache的HTTP Client替換Feign原生httpclient

http 連接池能提升性能 http 的背景原理 a. 兩臺服務器建立 http 連接的過程是很復雜的一個過程&#xff0c;涉及到多個數據包的交換&#xff0c;并且也很耗時間。 b. Http 連接需要的 3 次握手 4 次分手開銷很大&#xff0c;這一開銷對于大量的比較小的 http 消息來說更大。…

Java容器坐標起點_Java的屏幕坐標是以像素為單位,容器的左下角被確定為坐標的起點...

【單選題】【單選題】【單選題】class A{ int x1; void func1(int x1){ this.x1 x1; } } 關于上述程序,說法錯誤的是( )【單選題】瀏覽器的作用是( )。【判斷題】構建大學生心理危機預警及干預工作機制,更好地幫助有嚴重心理問題的學生度過心理難關,及早預防、及時疏導、有效干…

自媒體工具:文本內容轉音頻文件實用小工具

目錄 ?編輯 1、軟件介紹 2、軟件技術框架 3、使用說明 4、核心代碼文件 5、注意事項 1、軟件介紹 文本內容轉轉音頻文件小工具&#xff0c;采用C#編程語言&#xff0c;基于Framework4.5開發&#xff0c;主要采用百度語音識別SDK&#xff0c;實現了在線文本內容轉音頻文件的功能…

IDEA 創建 SpringCloud項目-多項目方式

SpringCloud 雖然可以用多模塊化的方式來創建&#xff0c;但是&#xff0c;SpirngCloud本身就是為分布式而準備的&#xff0c;如果使用多模塊的話&#xff0c;那就是一個項目&#xff0c;偏離了分布式的概念。所以工程上還是常用多項目的方式&#xff0c;這樣才可以分開布署各個…

php位運算重要嗎,PHP位運算的用途

下面為大家帶來一篇PHP位運算的用途。現在就分享給大家&#xff0c;也給大家做個參考。一起過來看看吧在實際應用中可以做用戶權限的應用我這里說到的權限管理辦法是一個普遍采用的方法&#xff0c;主要是使用到”位運行符”操作&#xff0c;& 位與運算符、| 位或運行符。參…

盤點6款實用的文件對比工具,你都用過嗎?

??作者主頁&#xff1a;IT技術分享社區 ??作者簡介&#xff1a;大家好,我是IT技術分享社區的博主&#xff0c;從事C#、Java開發九年&#xff0c;對數據庫、C#、Java、前端、運維、電腦技巧等經驗豐富。 ??個人榮譽&#xff1a; 數據庫領域優質創作者&#x1f3c6;&#x…

aggregations 詳解1(概述)

aggregation分類 aggregations —— 聚合&#xff0c;提供了一種基于查詢條件來對數據進行分桶、計算的方法。有點類似于 SQL 中的 group by 再加一些函數方法的操作。 聚合可以嵌套&#xff0c;由此可以組成復雜的操作&#xff08;Bucketing聚合可以包含sub-aggregation&#…

IDEA開發中,類的頭位置生成作者時間信息

點擊 File > Settings > File and Code Templates > Class按照圖中步驟添加如下信息 #if (${PACKAGE_NAME} && ${PACKAGE_NAME} ! "")package ${PACKAGE_NAME};#end #parse("File Header.java") /** * Author WangZeyu * Date ${…

提現接口網站 php,API提現接口

>獲取提現積分的類型&#xff0c;在后臺可以設置某種積分可被提現&#xff0c;此處獲取的數據為可提現積分的類型~~~[api]get:/index.php/accounts/Apipoint/member_withdrawal_listint:type 0#是否智能限制提現積分類型&#xff0c;0&#xff1a;不智能&#xff0c;1&#…

數據庫:PostgreSQL 和 MySQL對比

比較版本&#xff1a;PostgreSQL 11 VS MySQL5.7&#xff08;innodb引擎&#xff09; Oracle官方社區版版權情況&#xff1a;PostgreSQL 11&#xff08;免費開源&#xff09;、MySQL5.7 Oracle官方社區版&#xff08;免費開源&#xff09; 1. CPU限制 PGSQL沒有CPU核心數限制&a…