背景
因為安裝了正向隔離網閘,導致數據傳輸的時候僅支持TCP協議和UDP協議,因此需要開發TCP Client和Server服務來將數據透傳,當前環境是獲取的數據并將數據轉發到kafka
?1.引入依賴
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.84.Final</version>
</dependency>
2.編寫TCP?Server端?
TCP Server代碼
本代碼已經解決TCP的粘包和半包問題,需要通過固定的$符號進行數據分割,使得數據不會錯出現粘包和半包問題,可以根據數據大小制定一個不會超過發送消息長度的值
?
package com.huanyu.forward.tcp.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;@Slf4j
@Service("tcpServer")
@ConditionalOnExpression("#{'${spring.tcp-server.port:}'.length()>0}")
public class TcpNettyServer {@Value("${spring.tcp-server.port:22222}")private Integer port;public static void main(String[] args) throws Exception {new TcpNettyServer().server(22222);}@PostConstruct()public void initTcpServer() {try {log.info("start tcp server......");server(port);} catch (Exception e) {log.error("tcp server start failed");}}public void server(int port) throws Exception {//bossGroup就是parentGroup,是負責處理TCP/IP連接的EventLoopGroup bossGroup = new NioEventLoopGroup();//workerGroup就是childGroup,是負責處理Channel(通道)的I/O事件EventLoopGroup workerGroup = new NioEventLoopGroup();ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1, 1);buffer.writeByte('$');ServerBootstrap sb = new ServerBootstrap();//初始化服務端可連接隊列,指定了隊列的大小500sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)//保持長連接.childOption(ChannelOption.SO_KEEPALIVE, true)// 綁定客戶端連接時候觸發操作.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sh) throws Exception {//handler是按順序執行的ChannelPipeline pipeline = sh.pipeline();//業務編碼 -解決 數據粘包和半包問題-pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024 * 10, buffer));
// pipeline.addLast(new LoggingHandler(LogLevel.WARN));pipeline.addLast(new TcpBizFlagHandler());//業務編碼//使用DataHandler類來處理接收到的消息pipeline.addLast(new TcpDataHandler());}});//綁定監聽端口,調用sync同步阻塞方法等待綁定操作完ChannelFuture future = sb.bind(port).sync();if (future.isSuccess()) {log.info("tcp server is listening on :{}", port);} else {log.error("tcp server is failed ", future.cause());//關閉線程組bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}//成功綁定到端口之后,給channel增加一個 管道關閉的監聽器并同步阻塞,直到channel關閉,線程才會往下執行,結束進程。
// future.channel().closeFuture().await();}
}
?數據標志位接收代碼
package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;
import java.util.List;@Slf4j
public class TcpBizFlagHandler extends ByteToMessageDecoder {public static final String BIZ_FLAG = "bizFlag";private static final String FLAG_PRE = "@@{";private static final String FLAG_SUF = "}##";private static final byte[] FLAG_PREFIX = FLAG_PRE.getBytes(StandardCharsets.UTF_8);private static final byte[] FLAG_SUFFIX = FLAG_SUF.getBytes(StandardCharsets.UTF_8);@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < FLAG_PREFIX.length + FLAG_SUFFIX.length) {log.warn("數據長度不夠");text(in);return;}int prefixIndex = in.readerIndex();if (!startsWith(in)) {text(in);// 忽略非標志位開頭的數據in.skipBytes(in.readableBytes());log.warn("數據不包含指定的前綴");return;}int suffixIndex = indexOf(in);if (suffixIndex == -1) {log.warn("數據不包含指定的某字符");text(in);return;}int flagLength = suffixIndex - prefixIndex + FLAG_SUFFIX.length;byte[] flagBytes = new byte[flagLength];in.readBytes(flagBytes); // 讀取標志位// 保留標志位的對象結構-以@@{開頭以}##結尾,形如@@{"k":"v"}##{"k":"v"}$,@@和##之間的數據為補充的對象參數JSON,$為換行符號String flag = new String(flagBytes, FLAG_PRE.length() - 1, flagBytes.length - FLAG_PREFIX.length - FLAG_SUFFIX.length + 2, StandardCharsets.UTF_8);// 保存標志位到 Channel 屬性中供后續使用ctx.channel().attr(AttributeKey.valueOf(BIZ_FLAG)).set(flag);// 剩余數據繼續傳遞給下一個 Handler 處理(透傳)out.add(in.readRetainedSlice(in.readableBytes()));}private static void text(ByteBuf in) {byte[] msgByte = new byte[in.readableBytes()];in.readBytes(msgByte);log.warn("數據:{}", new String(msgByte, StandardCharsets.UTF_8));}private boolean startsWith(ByteBuf buf) {for (int i = 0; i < TcpBizFlagHandler.FLAG_PREFIX.length; i++) {if (buf.getByte(buf.readerIndex() + i) != TcpBizFlagHandler.FLAG_PREFIX[i]) {return false;}}return true;}private int indexOf(ByteBuf buf) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();for (int i = 0; i <= readableBytes - TcpBizFlagHandler.FLAG_SUFFIX.length; i++) {boolean match = true;for (int j = 0; j < TcpBizFlagHandler.FLAG_SUFFIX.length; j++) {if (buf.getByte(readerIndex + i + j) != TcpBizFlagHandler.FLAG_SUFFIX[j]) {match = false;break;}}if (match) {return readerIndex + i;}}return -1;}
}
業務轉發/解析代碼?
package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;import static com.aimsphm.forward.tcp.server.TcpBizFlagHandler.BIZ_FLAG;@Slf4j
@Service
public class TcpDataHandler extends ChannelInboundHandlerAdapter {// @Resourceprivate KafkaTemplate<String, Object> template;//接受client發送的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Channel channel = ctx.channel();// 獲取標志位String flag = (String) channel.attr(AttributeKey.valueOf(BIZ_FLAG)).get();if (ObjectUtils.isEmpty(flag)) {log.warn("沒有業務標識");return;}ByteBuf buf = (ByteBuf) msg;byte[] msgByte = new byte[buf.readableBytes()];buf.readBytes(msgByte);
// template.send("haha.haha.ha", gbk.getBytes());log.info("bizFag:{},data: {}", flag, new String(msgByte));}//通知處理器最后的channelRead()是當前批處理中的最后一條消息時調用@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}//讀操作時捕獲到異常時調用@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}//客戶端去和服務端連接成功時觸發@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello client [你好,客戶端]".getBytes()));log.info("client 連接成功: {}", ctx.channel());}
}
3.編寫客戶端代碼
TCP Client?代碼
package com.huanyu.forward.tcp.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import java.util.stream.IntStream;@Getter
@Slf4j
public class TcpNettyClient {public static void main(String[] args) {extracted();}private static void extracted() {try {TcpNettyClient client = new TcpNettyClient("localhost", 4444);Channel channel = client.getChannel();IntStream.range(0, 1000).parallel().forEach(i -> {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();buf.writeBytes(("@@{\"cell-topic" + (i + 1) + "\":true}##{01#.01#\":\"data1\"}").getBytes());buf.writeByte('$');channel.writeAndFlush(buf);});} catch (Exception e) {log.error("出現異常:", e);}}private Channel channel;//連接服務端的端口號地址和端口號public TcpNettyClient(String host, int port) {tcpClient(host, port);}public void tcpClient(String host, int port) {try {final EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class) // 使用NioSocketChannel來作為連接用的channel類.handler(new ChannelInitializer<SocketChannel>() { // 綁定連接初始化器@Overridepublic void initChannel(SocketChannel ch) throws Exception {System.out.println("正在連接中...");ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new TcpClientHandler()); //客戶端處理類}});//發起異步連接請求,綁定連接端口和host信息final ChannelFuture future = b.connect(host, port).sync();future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture arg0) throws Exception {if (future.isSuccess()) {log.info("連接服務器成功:");} else {log.warn("連接服務器失敗:");System.out.println("連接服務器失敗");group.shutdownGracefully(); //關閉線程組}}});this.channel = future.channel();} catch (InterruptedException e) {log.error("TCP服務端啟動異常:", e);}}}
?客戶端數據解析代碼
package com.huanyu.forward.tcp.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.Map;public class TcpClientHandler extends SimpleChannelInboundHandler<Map<String, ByteBuf>> {//處理服務端返回的數據@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Map<String, ByteBuf> data) throws Exception {ByteBuf msg = data.get("topic");byte[] msgByte = new byte[msg.readableBytes()];msg.readBytes(msgByte);System.out.println("接受到server響應數據: " + new String(msgByte));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello server 你好".getBytes()));super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
?備注
1.?為了盡可能的降低性能消耗,數據以字節數組的形式發送
2.?業務字段通過@@{"key":"value"}##作為消息的頭部,用數據標志位處理器進行處理
3.?真實要傳送的數據,并不解析出來,并以$結尾,解決粘包和半包問題
記錄備查