基于Netty的TCP Server端和Client端解決正向隔離網閘數據透傳問題

背景

因為安裝了正向隔離網閘,導致數據傳輸的時候僅支持TCP協議和UDP協議,因此需要開發TCP Client和Server服務來將數據透傳,當前環境是獲取的數據并將數據轉發到kafka

?1.引入依賴

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.84.Final</version>
</dependency>

2.編寫TCP?Server端?

TCP Server代碼

本代碼已經解決TCP的粘包和半包問題,需要通過固定的$符號進行數據分割,使得數據不會錯出現粘包和半包問題,可以根據數據大小制定一個不會超過發送消息長度的值

?

package com.huanyu.forward.tcp.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;@Slf4j
@Service("tcpServer")
@ConditionalOnExpression("#{'${spring.tcp-server.port:}'.length()>0}")
public class TcpNettyServer {@Value("${spring.tcp-server.port:22222}")private Integer port;public static void main(String[] args) throws Exception {new TcpNettyServer().server(22222);}@PostConstruct()public void initTcpServer() {try {log.info("start tcp server......");server(port);} catch (Exception e) {log.error("tcp server start failed");}}public void server(int port) throws Exception {//bossGroup就是parentGroup,是負責處理TCP/IP連接的EventLoopGroup bossGroup = new NioEventLoopGroup();//workerGroup就是childGroup,是負責處理Channel(通道)的I/O事件EventLoopGroup workerGroup = new NioEventLoopGroup();ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1, 1);buffer.writeByte('$');ServerBootstrap sb = new ServerBootstrap();//初始化服務端可連接隊列,指定了隊列的大小500sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)//保持長連接.childOption(ChannelOption.SO_KEEPALIVE, true)// 綁定客戶端連接時候觸發操作.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sh) throws Exception {//handler是按順序執行的ChannelPipeline pipeline = sh.pipeline();//業務編碼 -解決 數據粘包和半包問題-pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024 * 10, buffer));
//                        pipeline.addLast(new LoggingHandler(LogLevel.WARN));pipeline.addLast(new TcpBizFlagHandler());//業務編碼//使用DataHandler類來處理接收到的消息pipeline.addLast(new TcpDataHandler());}});//綁定監聽端口,調用sync同步阻塞方法等待綁定操作完ChannelFuture future = sb.bind(port).sync();if (future.isSuccess()) {log.info("tcp server is listening on  :{}", port);} else {log.error("tcp server is failed ", future.cause());//關閉線程組bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}//成功綁定到端口之后,給channel增加一個 管道關閉的監聽器并同步阻塞,直到channel關閉,線程才會往下執行,結束進程。
//        future.channel().closeFuture().await();}
}

?數據標志位接收代碼

package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;
import java.util.List;@Slf4j
public class TcpBizFlagHandler extends ByteToMessageDecoder {public static final String BIZ_FLAG = "bizFlag";private static final String FLAG_PRE = "@@{";private static final String FLAG_SUF = "}##";private static final byte[] FLAG_PREFIX = FLAG_PRE.getBytes(StandardCharsets.UTF_8);private static final byte[] FLAG_SUFFIX = FLAG_SUF.getBytes(StandardCharsets.UTF_8);@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < FLAG_PREFIX.length + FLAG_SUFFIX.length) {log.warn("數據長度不夠");text(in);return;}int prefixIndex = in.readerIndex();if (!startsWith(in)) {text(in);// 忽略非標志位開頭的數據in.skipBytes(in.readableBytes());log.warn("數據不包含指定的前綴");return;}int suffixIndex = indexOf(in);if (suffixIndex == -1) {log.warn("數據不包含指定的某字符");text(in);return;}int flagLength = suffixIndex - prefixIndex + FLAG_SUFFIX.length;byte[] flagBytes = new byte[flagLength];in.readBytes(flagBytes); // 讀取標志位// 保留標志位的對象結構-以@@{開頭以}##結尾,形如@@{"k":"v"}##{"k":"v"}$,@@和##之間的數據為補充的對象參數JSON,$為換行符號String flag = new String(flagBytes, FLAG_PRE.length() - 1, flagBytes.length - FLAG_PREFIX.length - FLAG_SUFFIX.length + 2, StandardCharsets.UTF_8);// 保存標志位到 Channel 屬性中供后續使用ctx.channel().attr(AttributeKey.valueOf(BIZ_FLAG)).set(flag);// 剩余數據繼續傳遞給下一個 Handler 處理(透傳)out.add(in.readRetainedSlice(in.readableBytes()));}private static void text(ByteBuf in) {byte[] msgByte = new byte[in.readableBytes()];in.readBytes(msgByte);log.warn("數據:{}", new String(msgByte, StandardCharsets.UTF_8));}private boolean startsWith(ByteBuf buf) {for (int i = 0; i < TcpBizFlagHandler.FLAG_PREFIX.length; i++) {if (buf.getByte(buf.readerIndex() + i) != TcpBizFlagHandler.FLAG_PREFIX[i]) {return false;}}return true;}private int indexOf(ByteBuf buf) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();for (int i = 0; i <= readableBytes - TcpBizFlagHandler.FLAG_SUFFIX.length; i++) {boolean match = true;for (int j = 0; j < TcpBizFlagHandler.FLAG_SUFFIX.length; j++) {if (buf.getByte(readerIndex + i + j) != TcpBizFlagHandler.FLAG_SUFFIX[j]) {match = false;break;}}if (match) {return readerIndex + i;}}return -1;}
}

業務轉發/解析代碼?

package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;import static com.aimsphm.forward.tcp.server.TcpBizFlagHandler.BIZ_FLAG;@Slf4j
@Service
public class TcpDataHandler extends ChannelInboundHandlerAdapter {//    @Resourceprivate KafkaTemplate<String, Object> template;//接受client發送的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Channel channel = ctx.channel();// 獲取標志位String flag = (String) channel.attr(AttributeKey.valueOf(BIZ_FLAG)).get();if (ObjectUtils.isEmpty(flag)) {log.warn("沒有業務標識");return;}ByteBuf buf = (ByteBuf) msg;byte[] msgByte = new byte[buf.readableBytes()];buf.readBytes(msgByte);
//        template.send("haha.haha.ha", gbk.getBytes());log.info("bizFag:{},data: {}", flag, new String(msgByte));}//通知處理器最后的channelRead()是當前批處理中的最后一條消息時調用@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}//讀操作時捕獲到異常時調用@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}//客戶端去和服務端連接成功時觸發@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.copiedBuffer("hello client [你好,客戶端]".getBytes()));log.info("client 連接成功: {}", ctx.channel());}
}

3.編寫客戶端代碼

TCP Client?代碼

package com.huanyu.forward.tcp.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import java.util.stream.IntStream;@Getter
@Slf4j
public class TcpNettyClient {public static void main(String[] args) {extracted();}private static void extracted() {try {TcpNettyClient client = new TcpNettyClient("localhost", 4444);Channel channel = client.getChannel();IntStream.range(0, 1000).parallel().forEach(i -> {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();buf.writeBytes(("@@{\"cell-topic" + (i + 1) + "\":true}##{01#.01#\":\"data1\"}").getBytes());buf.writeByte('$');channel.writeAndFlush(buf);});} catch (Exception e) {log.error("出現異常:", e);}}private Channel channel;//連接服務端的端口號地址和端口號public TcpNettyClient(String host, int port) {tcpClient(host, port);}public void tcpClient(String host, int port) {try {final EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class)  // 使用NioSocketChannel來作為連接用的channel類.handler(new ChannelInitializer<SocketChannel>() { // 綁定連接初始化器@Overridepublic void initChannel(SocketChannel ch) throws Exception {System.out.println("正在連接中...");ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new TcpClientHandler()); //客戶端處理類}});//發起異步連接請求,綁定連接端口和host信息final ChannelFuture future = b.connect(host, port).sync();future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture arg0) throws Exception {if (future.isSuccess()) {log.info("連接服務器成功:");} else {log.warn("連接服務器失敗:");System.out.println("連接服務器失敗");group.shutdownGracefully(); //關閉線程組}}});this.channel = future.channel();} catch (InterruptedException e) {log.error("TCP服務端啟動異常:", e);}}}

?客戶端數據解析代碼

package com.huanyu.forward.tcp.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.Map;public class TcpClientHandler extends SimpleChannelInboundHandler<Map<String, ByteBuf>> {//處理服務端返回的數據@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Map<String, ByteBuf> data) throws Exception {ByteBuf msg = data.get("topic");byte[] msgByte = new byte[msg.readableBytes()];msg.readBytes(msgByte);System.out.println("接受到server響應數據: " + new String(msgByte));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.copiedBuffer("hello server 你好".getBytes()));super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

?備注

1.?為了盡可能的降低性能消耗,數據以字節數組的形式發送

2.?業務字段通過@@{"key":"value"}##作為消息的頭部,用數據標志位處理器進行處理

3.?真實要傳送的數據,并不解析出來,并以$結尾,解決粘包和半包問題

記錄備查

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/909370.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/909370.shtml
英文地址,請注明出處:http://en.pswp.cn/news/909370.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Cursor鏈接遠程服務器實現項目部署

想獲取更多高質量的Java技術文章&#xff1f;歡迎訪問Java技術小館官網&#xff0c;持續更新優質內容&#xff0c;助力技術成長 技術小館官網 在軟件開發過程中&#xff0c;遠程服務器開發是一種常見的工作模式。通過遠程連接服務器進行代碼編寫和環境配置&#xff0c;可以充分…

Redis集群模式之Redis Cluster(3)

上篇文章我們講解了Redis Cluster的狀態監測與恢復過程&#xff0c;這篇文章我們來進行Redis Cluster內容的收尾&#xff0c;將其擴容和縮容的過程進行講解&#xff0c;并分析RedisCluster的優缺點。 擴容和縮容 當集群中出現容量限制或者其他一些原因需要擴容時&#xff0c;R…

Cursor ReAct Agent技術架構

一、架構核心思想 “零熵操作交給AI”理念 Cursor通過ReAct模式實現編程中重復性工作的自動化&#xff1a; 零熵操作&#xff1a;機械性任務&#xff08;代碼補全/格式化/重構/語法修復/導入管理&#xff09; Tab-away機制&#xff1a;一鍵接受AI建議&#xff0c;保持思維連續…

國學IP行業實戰洞察:聚焦創客匠人,解鎖創始人IP與知識變現新路徑

國學行業正經歷“文化價值”與“商業變現”的深度融合&#xff0c;2023年市場規模突破千億大關&#xff0c;年增速超 10%。在“IP化數字化”浪潮中&#xff0c;創客匠人作為垂直領域技術服務商&#xff0c;以全鏈路工具矩陣為支點&#xff0c;撬動國學創始人IP從內容生產到商業…

R語言開發入門完整指南

R語言開發入門完整指南 目錄 R語言簡介環境配置包管理基本語法數據類型和結構數據操作統計分析數據可視化編程結構實用技巧學習資源 R語言簡介 R是一種專為統計計算和圖形設計的編程語言&#xff0c;廣泛應用于數據分析、統計建模、機器學習和數據可視化。R語言具有以下特點…

ObservedV2裝飾器和Trace裝飾器

為了對嵌套類對象屬性變化直接觀測&#xff0c;華為提供了ObservedV2和Trace裝飾器。這兩個裝飾器必須搭配使用&#xff0c;單獨使用任何一個都不會起任何作用&#xff1b;在繼承類中也可監測&#xff1b;ObservedV2的類實例目前不支持使用JSON.stringify進行序列化&#xff0c…

6月計算機新書:深度學習、大模型、DeepSeek

六月&#xff0c;這個充滿活力與希望的季節&#xff0c;三本重磅新書《深度學習&#xff1a;基礎與概念》、《MCP極簡開發&#xff1a;輕松打造高效智能體》與《大模型應用開發&#xff1a;RAG實戰課》翩然而至&#xff0c;為我們開啟了一場探索科技前沿的奇妙之旅。一起來看詳…

扁平風格職場商務通用PPT模版分享

扁平風格PPT模版&#xff0c;創意卡通扁平化通用PPT模版&#xff0c;創意扁平化勵志論文答辯PPT模版&#xff0c;卡通職場商務PPT模版&#xff0c;職場培訓&#xff0c;項目策劃&#xff0c;工作總結類PPT模版&#xff0c;互聯網電子商務PPT模版 扁平風格職場商務通用PPT模版分…

jupyter內核崩潰

最近在做用k-mer評估基因組規模的任務&#xff0c;其中一個局部程序&#xff0c;想偷懶&#xff0c;直接在jupyter中跑了下結果&#xff0c;想看看這一小步處理如何&#xff0c;結果沒想到內核崩潰了&#xff01; 這一步我的草稿代碼如下&#xff1a; import pandas as pd imp…

Java企業技術趨勢分析:AI應用的落地實踐與未來展望

Java企業技術趨勢分析&#xff1a;AI應用的落地實踐與未來展望 開篇&#xff1a;技術趨勢與市場需求 在當前快速發展的數字化時代&#xff0c;人工智能&#xff08;AI&#xff09;已經成為推動企業創新和效率提升的關鍵力量。Java作為企業級應用開發的主流語言&#xff0c;正…

每日Prompt:Steve Winter風格插畫

提示詞 世界攝影大師杰作&#xff0c;極簡主義&#xff0c;Steve Winter風格&#xff0c;6只不同顏色的布偶貓圍成一圈&#xff0c;看向鏡頭中心&#xff0c;仰天視角&#xff0c;天空背景&#xff0c;高品質細節&#xff0c;超精細CG&#xff0c;高分辨率&#xff0c;最佳品質…

Vue3 + Element Plus 獲取表格列信息

在 Vue 3 和 Element Plus 中&#xff0c;可以通過以下步驟獲取表格的列信息&#xff1a; 實現步驟&#xff1a; 使用 ref 綁定表格實例 通過表格實例的 store.states.columns 獲取列數據 處理列信息&#xff08;過濾隱藏列、處理嵌套表頭等&#xff09; 示例代碼&#xf…

logger2js - JavaScript日志與調試工具庫

logger2js - JavaScript日志與調試工具庫 logger2js是一個功能強大的前端JavaScript日志與調試工具庫&#xff0c;提供了豐富的日志輸出、性能測試和代碼調試功能。該庫支持配置化引入&#xff0c;包含5種皮膚風格和豐富的API接口&#xff0c;如 a l e r t 增強方法、 alert增…

Stone 3D使用RemoteMesh組件極大的縮小工程文件尺寸

Stone 3D的工程文件tsp默認包含了場景中所有的對象和數據&#xff0c;這樣的好處是tsp可以單獨離線保存&#xff0c;但壞處是tsp文件通常偏大。 解決這個問題的方法是把外部glb模型文件通過RemoteMesh組件來加載。 首先創建一個空實體&#xff0c;然后給該空實體添加RemoteMe…

【深入剖析】攻克 Java 并發的基石:Java 內存模型 (JMM) 原理與實踐指南

0.引言 理解 JMM (Java Memory Model - JMM) 是掌握 Java 并發編程的關鍵&#xff0c;它定義了多線程環境下&#xff0c;線程如何與主內存以及彼此之間交互內存數據。 核心目標&#xff1a; JMM 旨在解決多線程編程中的三個核心問題&#xff1a; 原子性 (Atomicity)&#xf…

【Three.js】初識 Three.js

Threejs介紹 我們開發 webgl 主要是使用 threejs 這個庫&#xff0c;因為 webGL太難用&#xff0c;太復雜&#xff01;但是現代瀏覽器都支持WebGL&#xff0c;這樣我們就不必使用Flash、Java等插件就能在瀏覽器中創建三維圖形。 threejs 它提供-一個很簡單的關于WebGL特性的J…

【經驗總結】ECU休眠后連續發送NM報文3S后ECU網絡才被喚醒問題分析

目錄 前言 正文 1.問題描述 2.問題分析 3.驗證猜想 4.總結 前言 ECU的上下電/休眠喚醒在ECU開發設計過程中最容易出問題且都為嚴重問題,最近在項目開發過程中遇到ECU休眠狀態下連續發送NM報文3S后才能喚醒CAN網絡的問題,解決問題比較順利,但分析過程中涉及到的網絡休…

企業架構框架深入解析:TOGAF、Zachman Framework、FEAF與Gartner EA Framework

執行摘要 企業架構&#xff08;EA&#xff09;是一項至關重要的實踐&#xff0c;它使組織能夠協調其業務戰略、運營流程和技術基礎設施&#xff0c;以實現整體戰略目標。企業架構框架作為結構化的方法論和綜合性工具&#xff0c;旨在管理企業級系統的固有復雜性&#xff0c;提…

數字化動態ID隨機水印和ID跑馬燈實現教育視頻防錄屏

摘要&#xff1a;數字化動態ID隨機水印和ID跑馬燈技術可以有效保護數字教育資源。動態水印將用戶信息隨機顯示在視頻上且不可去除&#xff0c;能追蹤錄屏者并震懾盜版行為。ID跑馬燈則自定義顯示觀看者信息&#xff0c;便于追蹤盜版源頭并提供法律證據。這些技術大幅增加盜版成…

< 自用文兒 騰訊云 VPS > Ubuntu 24 系統,基本設置

前言&#xff1a; 3 月份買的騰訊云的這臺 VPS&#xff0c;剛發現現在退款&#xff0c;只能返回 0 元。測試應用已經遷移到JD&#xff0c;清除內容太麻煩&#xff0c;重裝更簡單。 因為配合政策&#xff0c;國內的云主機都有兩個 IP 地址&#xff0c;一個內網&#xff0c;一個…