2025/4/15
向?
一、什么是Netty
Netty 是 Java 中一個非常高性能的網絡通信框架,用來開發服務器和客戶端程序,主要用于處理 TCP/UDP 的網絡連接,比如:
-
聊天服務
-
實時推送
-
高并發網絡通信(比如游戲、IoT、金融系統)
你可以把 Netty 理解為一種比 Java 原生 Socket 更方便、性能更強的“網絡搭建工具”。再詳細了解Netty的工作原理之前,我們先來看一下Java中最簡單的客戶端和服務器之間的連接。
二、最簡單的 Java 網絡通信
2.1什么是“客戶端”和“服務端”?
我們先理解一個現實生活的比喻:奶茶店點單系統
-
服務端(Netty 服務):奶茶店(固定位置,等待別人來點單)
-
客戶端(瀏覽器、手機 App、Netty 客戶端):顧客(誰想喝奶茶誰來)
-
通信方式(TCP):電話(通過電話點單)
還可以更加省略一點來說就是?💬 一個人發送消息(客戶端) ? 另一個人接收并回復(服務端)
2.2服務端
import java.io.*;
import java.net.*;public class Server {public static void main(String[] args) throws Exception {ServerSocket serverSocket = new ServerSocket(8080); // 在8080端口等別人來找System.out.println("服務端啟動,等待客戶端連接...");Socket socket = serverSocket.accept(); // 有人來連接,就接收它System.out.println("客戶端連接進來了");// 輸入輸出流:用來讀寫數據BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 讀PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 寫String line;while ((line = in.readLine()) != null) {System.out.println("收到客戶端消息:" + line);out.println("我收到了:" + line); // 回給客戶端}socket.close(); // 關閉連接serverSocket.close();}
}
2.3客戶端
import java.io.*;
import java.net.*;public class Client {public static void main(String[] args) throws Exception {Socket socket = new Socket("127.0.0.1", 8080); // 連接本機服務端System.out.println("連接服務端成功!");// 輸入輸出BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in)); // 你鍵盤輸入PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 發消息BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息String msg;while ((msg = userInput.readLine()) != null) {out.println(msg); // 發給服務端String reply = in.readLine(); // 讀取服務端返回System.out.println("服務端說:" + reply);}socket.close();}
}
2.4 服務端和客戶端之間的通信
首先是服務端先啟動,會有如下顯示,同時告訴顧客我家的店的端口號是8080。
服務端啟動,等待客戶端連接...
然后有顧客想買東西,通過 new Socket("127.0.0.1", 8080); // 連接本機服務端,即走進服務器店的大門8080。而在服務器這端,通過serverSocket.accept(); 看見有人來連接,就接收它,服務它。這時候客戶端會輸出如下
連接服務端成功!
服務端會輸出如下:
客戶端連接進來了
在客戶端通過控制臺輸入:hello后,即通過如下代碼接收到了你的輸入,并存放在userInput變量中。
BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in));
客戶端通過out對象發消息?
PrintWriter out = new PrintWriter(socket.getOutputStream(), true); // 發消息
?客戶端通過in對象接受消息?
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 收消息
當 msg = userInput.readLine()) != null ,即當檢測到客戶端要發送消息就執行如下代碼:
out.println(msg); // 發給服務端
String reply = in.readLine(); // 讀取服務端返回
System.out.println("服務端說:" + reply);
out.println(msg)后,就將信息發送到了服務器端,服務器端就會輸出如下
收到客戶端消息:hello
同時在服務器端通過 out.println("我收到了:" + line); 回給客戶端,客戶端通過reply接收到消息,客戶端就會輸出
服務端說:我收到了:hello
2.5 客戶端和服務器端的關系如下:?
角色 | 作用 |
---|
Server | 永遠在等別人來(監聽端口) |
Client | 主動發起連接 |
Input/Output | 收發消息用的“通道” |
?二、為什么需要線程模型?(Thread Model)
理解了基礎的服務端和客戶端通信,我們可以繼續深入,了解一些稍微復雜一點的概念,即線程。
在前面那個簡單的服務端/客戶端例子中,服務端是“串行”的,意思是:
-
它在等待一個客戶端連接。
-
收到消息后再回復,接著等待下一個連接。
但是如果你有很多客戶端同時發消息,服務端就會變得很慢,因為它只能一個一個地處理請求。
所以,我們需要更高效的處理方式:并發編程并發編程意味著能夠同時處理多個任務,不等一個任務完成再開始下一個。而且每個任務都不會相互阻塞。這就是 線程池 和 事件循環模型 的價值所在。在 Netty 中:
-
線程池:多個線程可以同時處理多個連接。
-
事件循環模型:每個線程(事件循環)只負責自己的任務,它會不停地輪詢事件,比如客戶端連接、數據讀取等。
三、什么是“阻塞”和“非阻塞”?
? 阻塞:你去餐廳吃飯,服務員給你一個菜單,但你必須等著他們準備好菜才能吃,期間你不能干別的事。
? 非阻塞:你點菜后,服務員會告訴你“稍等一會兒”,然后你可以做其他事。只要菜做好了,服務員會告訴你,打斷你做其他事,給你菜。
TCP 通信中的阻塞和非阻塞:
-
阻塞:當你發起連接或請求時,程序會一直等待,直到連接建立或數據返回。
-
非阻塞:發起請求后,程序不再等待,會繼續執行其他任務。如果有返回結果,程序會處理返回。
Netty 默認就是 非阻塞 的,這樣它能同時處理很多連接,不會被一個請求堵住。
四、Netty 是如何處理高并發的?
Netty通過使用一個線程模型 EventLoop(事件循環)來處理高并發。EventLoopGroup:管理多個線程(可以理解為多個服務員),負責處理網絡事件。EventLoop:每個線程負責自己的一部分任務,比如處理某一個客戶端的請求。
舉例來看就是:
- 一個服務端線程,負責監聽連接(等待“顧客”進店)。
- 多個工作線程,負責實際的通信(幫“顧客”點單、做菜)。
4.1?EventLoop 和 NIO 的關系
Netty 使用了 NIO(非阻塞 IO) 模型。NIO 讓一個線程能處理多個連接。具體來說:
-
使用 Selector 輪詢(檢查)每個連接的狀態,看是否有數據到達。
-
使用 Channel 來表示網絡連接。
-
使用 Buffer 來讀取和寫入數據。
這個模型讓 Netty 在面對數千個并發連接時,也能保持高效。
總結來看,Netty的EventLoopGroup管理多個線程,每個線程只干特定的事情,假設某個線程只干連接客戶端這個事情,又由于Netty引入了NIO模型,所以又讓這個負責處理連接的線程具備了同時處理多個連接請求的能力。
五、實際的 Netty 服務端示例
public class EchoServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 負責接收連接EventLoopGroup workerGroup = new NioEventLoopGroup(); // 負責處理請求try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new EchoServerHandler());ChannelFuture f = b.bind(8080).sync(); // 綁定端口,開始監聽f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}// 處理客戶端發來的消息public static class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 接收到數據后直接寫回給客戶端System.out.println("收到消息:" + msg);ctx.writeAndFlush(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close(); // 出現異常關閉連接}}
}
- ServerBootstrap:是 Netty 中用于啟動服務端的核心類,啟動 Netty 服務端。
- bossGroup 和 workerGroup:管理事件循環,分別處理接收連接和處理數據的任務。
- EchoServerHandler:是我們自定義的業務處理邏輯,收到客戶端的消息就原封不動地回傳。
六、實際使用的Netty
6.1 NettyServer類
ServerBootstrap:Netty服務器啟動的核心類。
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ServerChannelInitializer(delimiter, maxFrameLength)).localAddress(socketAddress).option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.SO_KEEPALIVE, true);
- .group(bossGroup, workGroup) 配置監聽線程和工作線程。
.channel(NioServerSocketChannel.class)
: 這里指定了服務端的 Channel 類型。NioServerSocketChannel
適用于 NIO(非阻塞 IO),這是一種處理高并發的方式。.childHandler(new ServerChannelInitializer(delimiter, maxFrameLength))
: 為每個連接配置一個ChannelInitializer
,在每個連接初始化時(每個客戶端連接時)會被調用。ServerChannelInitializer
是自定義的初始化類,配置如何處理數據的編解碼、業務邏輯等。.localAddress(socketAddress)
: 配置綁定的本地地址和端口.option(ChannelOption.SO_BACKLOG, 1024)
: 配置服務器端的連接隊列大小。隊列最大長度設置為 1024。.childOption(ChannelOption.SO_KEEPALIVE, true)
: 設置 TCP KeepAlive,確保連接在空閑時依然存活。
6.1.1啟動并綁定端口
ChannelFuture channelFuture = serverBootstrap.bind(socketAddress).sync();
-
.bind(socketAddress)
: 綁定到指定的socketAddress
,開始監聽客戶端的連接。 -
.sync()
: 阻塞方法,直到端口綁定成功并啟動后,才會繼續執行。ChannelFuture
用于獲取當前操作的結果(是否成功綁定)
6.2 SeverChannelInitializer類
在NettyServer類中,我們是調用了SeverChannelInitializer類的,我們使用SeverChannelInitializer類來配置如何處理數據的編解碼、業務邏輯等。當每個客戶端連接進來時,配置它的 Channel 的“流水線”——也就是這個連接收到/發送數據時,按什么順序怎么處理。可以把它理解為工廠生產線的“組裝說明書”。
package com....nettyService;import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LoggingHandler;public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {private String DELIMITER;private int MAXFRAMELENGTH;public ServerChannelInitializer(String delimiter, int maxFrameLength) {DELIMITER = delimiter;MAXFRAMELENGTH = maxFrameLength;}@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast("logging", new LoggingHandler("DEBUG"));socketChannel.pipeline().addLast("decoder", new HL7Decoder());socketChannel.pipeline().addLast("encoder", new HL7Encoder());socketChannel.pipeline().addLast(new NettyServerHandler());}}
SeverChannelInitializer首先繼承了ChannelInitializer<SocketChannel>,這樣沒有一個新的連接的時候Netty 就會調用 initChannel()
方法,給這個連接安裝一套“處理器組合”(pipeline)。
而這一套“處理器組合”當接收到客戶端發送的消息執行順序如下:
【客戶端】==> socketChannel
? ? ? ? ?↓
[LoggingHandler](打印日志)
? ? ? ? ?↓
[HL7Decoder](解碼消息)
? ? ? ? ?↓
[NettyServerHandler](業務處理)
當服務端要回復消息,其執行順序如下:
? ?NettyServerHandler.write()
? ? ? ? ?↓
? ? ? ?[HL7Encoder](編碼為字節)
? ? ? ? ?↓
? ? ? ? ?[LoggingHandler](打印)
? ? ? ? ?↓
? ? ? ? ? 【客戶端】
6.3 NettySeverHandler類
在SeverChannelInitializer類中,其寫好了業務處理順序,在處理業務時,其處理業務的核心是NettySeverHandler類來實現的
package com.....nettyService;import com...component.commons.utils.BeanUtils;
import com...emergency.service.impl.BS2800MPacketParse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);private BS2800MPacketParse bs2800MPacketParse = BeanUtils.getBean(BS2800MPacketParse.class);/*** 裝載所有客戶端channel的組*/private static final Map<String, Channel> ipChannelMap = new HashMap<>();/*** 客戶端連接過來會觸發*/@Overridepublic void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {Channel channel = channelHandlerContext.channel();ipChannelMap.put(channel.remoteAddress().toString(), channel);logger.info("客戶端連接:" + channelHandlerContext);}/*** 客戶端發消息過來會觸發*/@Overridepublic void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {Channel channel = channelHandlerContext.channel();logger.info("服務端接收到客戶端消息");
// logger.info("發送消息的客戶端地址:" + channel.remoteAddress());logger.info("發送消息的客戶端所發消息:" + msg);String result = msg;String msa = handleParams(channelHandlerContext, result);if (ObjectUtils.isNotEmpty(msa)) {channel.writeAndFlush(msa);}}@Overridepublic void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {super.channelReadComplete(channelHandlerContext);}@Overridepublic void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {Channel channel = channelHandlerContext.channel();// 當通道變為非活動狀態(斷開連接)時,將其從 ChannelGroup 中移除String ip = channel.remoteAddress().toString();if (ipChannelMap.containsKey(ip)) {ipChannelMap.remove(ip);if (!channel.isActive() || channel == null) {channelHandlerContext.close();}}logger.info("客戶端地址為:" + ip + "的連接已斷開");}/*** 發生異常觸發*/@Overridepublic void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) throws Exception {logger.warn(cause.toString());}/*** 處理接收報文消息*/public String handleParams(ChannelHandlerContext channelHandlerContext, String msg) {String msa = null;SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");Channel channel = channelHandlerContext.channel();if (channel.remoteAddress().toString().contains("10.10.51.213")) {if (ObjectUtils.isNotEmpty(msg)) {String result[] = msg.split("\r");if (ObjectUtils.isNotEmpty(result) && result.length > 0) {String msh = null;for (String string : result) {if (string.contains("MSH")) {msh = string;}}if (msh.contains("ORU^R01")) {Date date = new Date();String temp[] = msh.split("\\|", -1);if (ObjectUtils.isNotEmpty(temp) && temp.length > 9) {msa = "MSH|^~\\&|||||" + dateFormat.format(date) + "||ACK^R01|" + temp[9] + "|P|2.3.1||||0||ASCII|||";String str = "MSA|AA|" + temp[9] + "|Message accepted|||0|";msa = msa + "\r" + str;Map<String, String> paramMap = new HashMap<>();paramMap.put(temp[9], msg);bs2800MPacketParse.parse(msg);return msa;}}}}}return msa;}}
6.3.1繼承
NettyServerHandler繼承SimpleChannelInboundHandler<String> 每次接收到客戶端消息(已經是 String
類型,說明解碼器已完成解碼),就會觸發 channelRead0()
方法。我們可以在這里處理邏輯、保存數據、做回復等
6.3.2channelActive
有客戶端連接進來時,Netty 會自動調用這個方法。將客戶端的 Channel 保存到 ipChannelMap
中,方便后面用 IP 找到連接。同時打印客戶端連接信息。
6.3.3channelRead0
每當客戶端發一條消息過來,就會自動執行這里!先獲取當前的 Channel(對應客戶端)
Channel channel = channelHandlerContext.channel();
打印日志,方便調試看到收到的數據