前言
曾幾何時,不知道大家有沒有在項目里遇到過需要服務端給客戶端推送消息的需求,是否曾經苦惱過、糾結過,我們知道要想實現這樣的需求肯定離不開websocket長連接方式,那么到底是該選原生的websocket還是更加高級的netty框架呢?
在此我極力推薦netty,因為一款好的框架一般都是在原生的基礎上進行包裝成更好、更方便、更實用的東西,很多我們需要自己考慮的問題都基本可以不用去考慮,不過此文不會去講netty有多么的高深莫測,因為這些概念性的東西隨處可見,而是通過實戰來達到推送消息的目的。
實戰
一、邏輯架構圖
從圖中可以看出本次實戰的基本流程是客戶端A請求服務端核心模塊,核心模塊生產一條消息到消息隊列,然后服務端消息模塊消費消息,消費完之后就將消息推送給客戶端B,流程很簡單,沒有太多技巧,唯一的巧妙之處就在消息模塊這邊的處理上,本文的重點也主要講解消息模塊這一塊,主要包括netty server、netty client、channel的存儲等等。
二、代碼
1、添加依賴
<dependency><groupId>io.nettygroupId><artifactId>netty-allartifactId><version>4.1.6.Finalversion>
dependency>
2、NettyServer類
@Service
public?class?NettyServer?{public?void?run(int?port){new?Thread(){public?void?run(){runServer(port);}}.start();}private?void?runServer(int?port){Print.info("===============Message服務端啟動===============");EventLoopGroup bossGroup =?new?NioEventLoopGroup();EventLoopGroup workerGroup =?new?NioEventLoopGroup();try?{ServerBootstrap b =?new?ServerBootstrap();b.group(bossGroup, workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(new?ChannelInitializer() {protected?void?initChannel(SocketChannel ch)?throws?Exception?{ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("codec-http",?new?HttpServerCodec());pipeline.addLast("aggregator",?new?HttpObjectAggregator(65536));pipeline.addLast("handler",?new?MyWebSocketServerHandler());}});Channel ch = b.bind(port).sync().channel();Print.info("Message服務器啟動成功:"?+ ch.toString());ch.closeFuture().sync();}?catch?(Exception e){Print.error("Message服務運行異常:"?+ e.getMessage());e.printStackTrace();}?finally?{bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();Print.info("Message服務已關閉");}}
}
3、MyWebSocketServerHandler類
public?class?MyWebSocketServerHandler?extends?SimpleChannelInboundHandler<Object>{private?static?final?String WEBSOCKET_PATH =?"";private?WebSocketServerHandshaker handshaker;@Override????protected?void?channelRead0(ChannelHandlerContext ctx, Object msg)?throws?Exception?{if?(msg?instanceof?FullHttpRequest){//以http請求形式接入,但是走的是websockethandleHttpRequest(ctx, (FullHttpRequest) msg);}else?if?(msg?instanceof??WebSocketFrame){//處理websocket客戶端的消息handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Override????public?void?channelReadComplete(ChannelHandlerContext ctx)?throws?Exception?{ctx.flush();}private?void?handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)?throws?Exception?{//要求Upgrade為websocket,過濾掉get/Postif?(!req.decoderResult().isSuccess()|| (!"websocket".equals(req.headers().get("Upgrade")))) {//若不是websocket方式,則創建BAD_REQUEST的req,返回給客戶端sendHttpResponse(ctx, req,?new?DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));return;}WebSocketServerHandshakerFactory wsFactory =?new?WebSocketServerHandshakerFactory("ws://localhost:9502/websocket",?null,?false);handshaker = wsFactory.newHandshaker(req);if?(handshaker ==?null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }?else?{handshaker.handshake(ctx.channel(), req); }}private?void?handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame)?{// Check for closing frame if (frame instanceof CloseWebSocketFrame) {handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());?return; }if?(frame?instanceof?PingWebSocketFrame) {ctx.channel().write(new?PongWebSocketFrame(frame.content().retain()));?return; }if?(!(frame?instanceof?TextWebSocketFrame)) {Print.error("數據幀類型不支持!");?throw?new?UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); }// Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); Print.info("Netty服務器接收到的信息: " + request); if (request.equals(Const.HEARTBEAT)){ctx.channel().write(new?TextWebSocketFrame(request));?return; }JSONObject jsonData = JSONObject.parseObject(request); String eventType = jsonData.getString("event_type"); String apiToken = jsonData.getString("api_token");?if?(Const.FRONT.equals(eventType)){Print.info("front event"); ChannelSupervise.updateChannel(apiToken, ctx.channel()); }else?if?(Const.BEHIND.equals(eventType)){Print.info("behind event"); Channel chan = ChannelSupervise.findChannel(apiToken);?if?(null?== chan){Print.error("目標用戶不存在"); }else?{JSONObject jsonMsg =?new?JSONObject(); jsonMsg.put("type", jsonData.get("type")); jsonMsg.put("child_type", jsonData.get("child_type")); jsonMsg.put("title", jsonData.get("title")); jsonMsg.put("body", jsonData.get("body")); ChannelSupervise.sendToSimple(apiToken,?new?TextWebSocketFrame(jsonMsg.toString())); Print.info("向目標用戶發送成功"); }}else{Print.error("event type error"); }}private?static?void?sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res)?{// 返回應答給客戶端 if (res.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); }ChannelFuture f = ctx.channel().writeAndFlush(res);?// 如果是非Keep-Alive,關閉連接 if (!isKeepAlive(req) || res.status().code() != 200) {f.addListener(ChannelFutureListener.CLOSE); }}@Override????public?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause)?throws?Exception?{cause.printStackTrace(); ctx.close(); }private?static?String?getWebSocketLocation(FullHttpRequest req)?{return?"ws://"?+ req.headers().get(HOST) + WEBSOCKET_PATH; }/** * 接收客戶端連接事件 */????@Override????public?void?channelActive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("客戶端與服務端連接開啟:"?+ ctx.channel()); ChannelSupervise.addChannel(null, ctx.channel()); }/** * 接收客戶端關閉事件 */????@Override????public?void?channelInactive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("客戶端與服務端連接關閉:"?+ ctx.channel()); ChannelSupervise.removeChannel(ctx.channel()); }}
4、ChannelSupervise類
public?class?ChannelSupervise?{private???static?ChannelGroup GlobalGroup =?new?DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private??static?ConcurrentMapChannelMap =?new?ConcurrentHashMap();public??static?void?addChannel(String apiToken, Channel channel){GlobalGroup.add(channel);if?(null?!= apiToken) {ChannelMap.put(apiToken, channel.id());}}public?static?void?updateChannel(String apiToken, Channel channel){Channel chan = GlobalGroup.find(channel.id());if?(null?== chan){addChannel(apiToken, channel);}else?{ChannelMap.put(apiToken, channel.id());}}public?static?void?removeChannel(Channel channel){GlobalGroup.remove(channel);Collectionvalues = ChannelMap.values();values.remove(channel.id());}public?static?Channel?findChannel(String apiToken){ChannelId chanId = ChannelMap.get(apiToken);if?(null?== chanId){return?null;}return?GlobalGroup.find(ChannelMap.get(apiToken));}public?static?void?sendToAll(TextWebSocketFrame tws){GlobalGroup.writeAndFlush(tws);}public?static?void?sendToSimple(String apiToken, TextWebSocketFrame tws){GlobalGroup.find(ChannelMap.get(apiToken)).writeAndFlush(tws);}
}
5、NettyClient類
@Servicepublic?class?NettyClient?{private?Channel channel;public?void?run(String strUri){new?Thread(){public?void?run(){runClient(strUri);}}.start();private?void?runClient(String strUri)?{EventLoopGroup?group?=?new?NioEventLoopGroup();try?{Bootstrap b =?new?Bootstrap();URI uri =?new?URI(strUri);String protocol = uri.getScheme();if?(!"ws".equals(protocol)) {throw?new?IllegalArgumentException("Unsupported protocol: "?+ protocol);}HttpHeaders customHeaders =?new?DefaultHttpHeaders();customHeaders.add("MyHeader",?"MyValue");// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.// If you change it to V00, ping is not supported and remember to change// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.final MyWebSocketClientHandler handler =new?MyWebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13,?null,?false, customHeaders)); b.group(group); b.channel(NioSocketChannel.class); b.handler(new?ChannelInitializer() {@Overpublic?void?initChannel(SocketChannel ch) throws Exception?{ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec",?new?HttpClientCodec()); pipeline.addLast("aggregator",?new?HttpObjectAggregator(8192)); pipeline.addLast("ws-handler", handler); }}); Print.info("===============Message客戶端啟動==============="); channel = b.connect(uri.getHost(), uri.getPort()).sync().channel(); handler.handshakeFuture().sync(); channel.closeFuture().sync(); }?catch?(Exception e){Print.error(e.getMessage()); }?finally?{group.shutdownGracefully(); }}
6、MyWebSocketClientHandler類
public?class?MyWebSocketClientHandler?extends?SimpleChannelInboundHandler<Object>?{private?final?WebSocketClientHandshaker handshaker;private?ChannelPromise handshakeFuture;public?MyWebSocketClientHandler(WebSocketClientHandshaker handshaker)?{this.handshaker = handshaker;}public?ChannelFuture?handshakeFuture()?{return?handshakeFuture;}@Overridepublic?void?handlerAdded(ChannelHandlerContext ctx)?throws?Exception?{handshakeFuture = ctx.newPromise();}@Overridepublic?void?channelActive(ChannelHandlerContext ctx)?throws?Exception?{handshaker.handshake(ctx.channel());}@Overridepublic?void?channelInactive(ChannelHandlerContext ctx)?throws?Exception?{Print.info("webSocket client disconnected!");}@Overridepublic?void?channelRead(ChannelHandlerContext ctx, Object msg)?throws?Exception?{Channel ch = ctx.channel();if?(!handshaker.isHandshakeComplete()) {handshaker.finishHandshake(ch, (FullHttpResponse) msg);Print.info("websocket client connected!");handshakeFuture.setSuccess();return;}if?(msg?instanceof?FullHttpResponse) {FullHttpResponse response = (FullHttpResponse) msg;throw?new?Exception("Unexpected FullHttpResponse (getStatus="?+ response.getStatus() +?", content="?+ response.content().toString(CharsetUtil.UTF_8) +?')');}WebSocketFrame frame = (WebSocketFrame) msg;if?(frame?instanceof?TextWebSocketFrame) {TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;Print.info("客戶端收到消息: "?+ textFrame.text());}?else?if?(frame?instanceof?PongWebSocketFrame) {Print.info("websocket client received pong");}?else?if?(frame?instanceof?CloseWebSocketFrame) {Print.info("websocket client received closing");ch.close();}}@Overrideprotected?void?channelRead0(ChannelHandlerContext channelHandlerContext, Object o)?throws?Exception?{}@Overridepublic?void?exceptionCaught(ChannelHandlerContext ctx, Throwable cause)?throws?Exception?{cause.printStackTrace();if?(!handshakeFuture.isDone()) {handshakeFuture.setFailure(cause);}ctx.close();}}
7、啟動類
@SpringBootApplication
@Servicepublic
class MessageApplication {@Autowiredprivate NettyServer server;@Autowiredprivate NettyClient client;public?static?void?main(String[] args) {SpringApplication.run(MessageApplication.class, args);}@PostConstructpublic?void?initMessage(){server.run(9502);try?{Thread.sleep(1000);}?catch?(InterruptedException e) {e.printStackTrace();}client.run("ws://localhost:"?+?9502);}
8、客戶端B測試頁面
<html><head><meta?charset="UTF-8"><title>WebSocket Chattitle>head><body><script?type="text/javascript">var?socket;if?(!window.WebSocket) {window.WebSocket =?window.MozWebSocket;}if?(window.WebSocket) {socket =?new?WebSocket("ws://localhost:9502");socket.onmessage =?function(event)?{var?ta =?document.getElementById('responseText');ta.value = ta.value +?'\n'?+ event.data};socket.onopen =?function(event)?{var?ta =?document.getElementById('responseText');ta.value =?"連接開啟!";};socket.onclose =?function(event)?{var?ta =?document.getElementById('responseText');ta.value = ta.value +?"連接被關閉";};}?else?{alert("你的瀏覽器不支持 WebSocket!");}function?send(message)?{if?(!window.WebSocket) {return;}if?(socket.readyState == WebSocket.OPEN) {socket.send(message);}?else?{alert("連接沒有開啟.");}}script><form?onsubmit="return false;"><h3>WebSocket:h3><textarea?id="responseText"?style="width: 500px; height: 300px;">textarea><br><input?type="text"?name="message"??style="width: 300px"?value="1"><input?type="button"?value="發送消息"?onclick="send(this.form.message.value)"><input?type="button"?onclick="javascript:document.getElementById('responseText').value=''"?value="清空聊天記錄">form><br>body>
html>
三、測試
1、先運行啟動類,此時會先啟動netty服務器,然后啟動一個netty客戶端,然后過30s模擬客戶端A進行消息發送
2、打開測試頁面,在底下的輸入框輸入:{"event_type":"front", "api_token":"11111"},表示客戶端B連接上netty服務器
測試結果如下:
消息模塊:
客戶端B:
四、結束語
本文只是拋磚引玉,主要啟發有類似需求的朋友知道怎么去存儲channel,進而怎么給指定客戶推送消息,如果想要進行大型項目的高并發、可靠穩定地使用,還需進一步地改進。
作者:都市心聲
來源:toutiao.com/i6794445371457143307