protobuf是谷歌的Protocol Buffers的簡稱,用于結構化數據和字節碼之間互相轉換(序列化、反序列化),一般應用于網絡傳輸,可支持多種編程語言。
protobuf怎樣使用這里不再介紹,本文主要介紹在MINA、Netty、Twisted中怎樣使用protobuf,不了解protobuf的同學能夠去參考我的還有一篇博文。
在前面的一篇博文中。有介紹到一種用一個固定為4字節的前綴Header來指定Body的字節數的一種消息切割方式。在這里相同要使用到。
僅僅是當中Body的內容不再是字符串,而是protobuf字節碼。
在處理業務邏輯時,肯定不希望還要對數據進行序列化和反序列化。而是希望直接操作一個對象,那么就須要有對應的編碼器和解碼器。將序列化和反序列化的邏輯寫在編碼器和解碼器中。有關編碼器和解碼器的實現,上一篇博文中有介紹。
Netty包中已經自帶針對protobuf的編碼器和解碼器。那么就不用再自己去實現了。而MINA、Twisted還須要自己去實現protobuf的編碼器和解碼器。
這里定義一個protobuf數據結構,用于描寫敘述一個學生的信息。保存為StudentMsg.proto文件:
message Student {// IDrequired int32 id = 1; // 姓名required string name = 2;// emailoptional string email = 3;// 朋友repeated string friends = 4;
}
用StudentMsg.proto分別生成Java和Python代碼。將代碼加入到對應的項目中。
生成的代碼就不再貼上來了。
以下分別介紹在Netty、MINA、Twisted怎樣使用protobuf來傳輸Student信息。
Netty:
Netty自帶protobuf的編碼器和解碼器,各自是ProtobufEncoder和ProtobufDecoder。須要注意的是,ProtobufEncoder和ProtobufDecoder僅僅負責protobuf的序列化和反序列化,而處理消息Header前綴和消息切割的還須要LengthFieldBasedFrameDecoder和LengthFieldPrepender。LengthFieldBasedFrameDecoder即用于解析消息Header前綴。依據Header中指定的Body字節數截取Body,LengthFieldPrepender用于在wirte消息時在消息前面加入一個Header前綴來指定Body字節數。
public class TcpServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch)throws Exception {ChannelPipeline pipeline = ch.pipeline();// 負責通過4字節Header指定的Body長度將消息切割pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));// 負責將frameDecoder處理后的完整的一條消息的protobuf字節碼轉成Student對象pipeline.addLast("protobufDecoder",new ProtobufDecoder(StudentMsg.Student.getDefaultInstance()));// 負責將寫入的字節碼加上4字節Header前綴來指定Body長度pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));// 負責將Student對象轉成protobuf字節碼pipeline.addLast("protobufEncoder", new ProtobufEncoder());pipeline.addLast(new TcpServerHandler());}});ChannelFuture f = b.bind(8080).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
處理事件時,接收和發送的參數直接就是Student對象:
public class TcpServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 讀取client傳過來的Student對象StudentMsg.Student student = (StudentMsg.Student) msg;System.out.println("ID:" + student.getId());System.out.println("Name:" + student.getName());System.out.println("Email:" + student.getEmail());System.out.println("Friends:");List<String> friends = student.getFriendsList();for(String friend : friends) {System.out.println(friend);}// 新建一個Student對象傳到clientStudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();builder.setId(9);builder.setName("server");builder.setEmail("123@abc.com");builder.addFriends("X");builder.addFriends("Y");StudentMsg.Student student2 = builder.build();ctx.writeAndFlush(student2);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}
}
MINA:
在MINA中沒有針對protobuf的編碼器和解碼器。可是能夠自己實現一個功能和Netty一樣的編碼器和解碼器。
編碼器:
public class MinaProtobufEncoder extends ProtocolEncoderAdapter {@Overridepublic void encode(IoSession session, Object message,ProtocolEncoderOutput out) throws Exception {StudentMsg.Student student = (StudentMsg.Student) message;byte[] bytes = student.toByteArray(); // Student對象轉為protobuf字節碼int length = bytes.length;IoBuffer buffer = IoBuffer.allocate(length + 4);buffer.putInt(length); // write headerbuffer.put(bytes); // write bodybuffer.flip();out.write(buffer);}
}
解碼器:
public class MinaProtobufDecoder extends CumulativeProtocolDecoder {@Overrideprotected boolean doDecode(IoSession session, IoBuffer in,ProtocolDecoderOutput out) throws Exception {// 假設沒有接收完Header部分(4字節)。直接返回falseif (in.remaining() < 4) {return false;} else {// 標記開始位置,假設一條消息沒傳輸完畢則返回到這個位置in.mark();// 讀取header部分,獲取body長度int bodyLength = in.getInt();// 假設body沒有接收完整,直接返回falseif (in.remaining() < bodyLength) {in.reset(); // IoBuffer position回到原來標記的地方return false;} else {byte[] bodyBytes = new byte[bodyLength];in.get(bodyBytes); // 讀取body部分StudentMsg.Student student = StudentMsg.Student.parseFrom(bodyBytes); // 將body中protobuf字節碼轉成Student對象out.write(student); // 解析出一條消息return true;}}}
}
MINAserver加入protobuf的編碼器和解碼器:
public class TcpServer {public static void main(String[] args) throws IOException {IoAcceptor acceptor = new NioSocketAcceptor();// 指定protobuf的編碼器和解碼器acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new MinaProtobufEncoder(), new MinaProtobufDecoder()));acceptor.setHandler(new TcpServerHandle());acceptor.bind(new InetSocketAddress(8080));}
}
這樣。在處理業務邏輯時,就和Netty一樣了:
public class TcpServerHandle extends IoHandlerAdapter {@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception {cause.printStackTrace();}@Overridepublic void messageReceived(IoSession session, Object message)throws Exception {// 讀取client傳過來的Student對象StudentMsg.Student student = (StudentMsg.Student) message;System.out.println("ID:" + student.getId());System.out.println("Name:" + student.getName());System.out.println("Email:" + student.getEmail());System.out.println("Friends:");List<String> friends = student.getFriendsList();for(String friend : friends) {System.out.println(friend);}// 新建一個Student對象傳到clientStudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();builder.setId(9);builder.setName("server");builder.setEmail("123@abc.com");builder.addFriends("X");builder.addFriends("Y");StudentMsg.Student student2 = builder.build();session.write(student2);}
}
Twisted:
在Twisted中。首先定義一個ProtobufProtocol類,繼承Protocol類,充當編碼器和解碼器。處理業務邏輯的TcpServerHandle類再繼承ProtobufProtocol類。調用或重寫ProtobufProtocol提供的方法。
# -*- coding:utf-8 –*-from struct import pack, unpack
from twisted.internet.protocol import Factory
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
import StudentMsg_pb2# protobuf編碼、解碼器
class ProtobufProtocol(Protocol):# 用于臨時存放接收到的數據_buffer = b""def dataReceived(self, data):# 上次未處理的數據加上本次接收到的數據self._buffer = self._buffer + data# 一直循環直到新的消息沒有接收完整while True:# 假設header接收完整if len(self._buffer) >= 4:# header部分,按大字節序轉int,獲取body長度length, = unpack(">I", self._buffer[0:4])# 假設body接收完整if len(self._buffer) >= 4 + length:# body部分,protobuf字節碼packet = self._buffer[4:4 + length]# protobuf字節碼轉成Student對象student = StudentMsg_pb2.Student()student.ParseFromString(packet)# 調用protobufReceived傳入Student對象self.protobufReceived(student)# 去掉_buffer中已經處理的消息部分self._buffer = self._buffer[4 + length:]else:break;else:break;def protobufReceived(self, student):raise NotImplementedErrordef sendProtobuf(self, student):# Student對象轉為protobuf字節碼data = student.SerializeToString()# 加入Header前綴指定protobuf字節碼長度self.transport.write(pack(">I", len(data)) + data)# 邏輯代碼
class TcpServerHandle(ProtobufProtocol):# 實現ProtobufProtocol提供的protobufReceiveddef protobufReceived(self, student):# 將接收到的Student輸出print 'ID:' + str(student.id)print 'Name:' + student.nameprint 'Email:' + student.emailprint 'Friends:'for friend in student.friends:print friend# 創建一個Student并發送給clientstudent2 = StudentMsg_pb2.Student()student2.id = 9student2.name = 'server'.decode('UTF-8') # 中文須要轉成UTF-8字符串student2.email = '123@abc.com'student2.friends.append('X')student2.friends.append('Y')self.sendProtobuf(student2)factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()
以下是Java編寫的一個client測試程序:
public class TcpClient {public static void main(String[] args) throws IOException {Socket socket = null;DataOutputStream out = null;DataInputStream in = null;try {socket = new Socket("localhost", 8080);out = new DataOutputStream(socket.getOutputStream());in = new DataInputStream(socket.getInputStream());// 創建一個Student傳給serverStudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();builder.setId(1);builder.setName("client");builder.setEmail("xxg@163.com");builder.addFriends("A");builder.addFriends("B");StudentMsg.Student student = builder.build();byte[] outputBytes = student.toByteArray(); // Student轉成字節碼out.writeInt(outputBytes.length); // write headerout.write(outputBytes); // write bodyout.flush();// 獲取server傳過來的Studentint bodyLength = in.readInt(); // read headerbyte[] bodyBytes = new byte[bodyLength];in.readFully(bodyBytes); // read bodyStudentMsg.Student student2 = StudentMsg.Student.parseFrom(bodyBytes); // body字節碼解析成StudentSystem.out.println("Header:" + bodyLength);System.out.println("Body:");System.out.println("ID:" + student2.getId());System.out.println("Name:" + student2.getName());System.out.println("Email:" + student2.getEmail());System.out.println("Friends:");List<String> friends = student2.getFriendsList();for(String friend : friends) {System.out.println(friend);}} finally {// 關閉連接in.close();out.close();socket.close();}}
}
用client分別測試上面三個TCPserver:
server輸出:
ID:1
Name:client
Email:xxg@163.com
Friends:
A
B
client輸出:
Header:32
Body:
ID:9
Name:server
Email:123@abc.com
Friends:
X
Y
作者:叉叉哥 ? 轉載請注明出處:http://blog.csdn.net/xiao__gui/article/details/38864961
MINA、Netty、Twisted一起學系列
MINA、Netty、Twisted一起學(一):實現簡單的TCPserver
MINA、Netty、Twisted一起學(二):TCP消息邊界問題及按行切割消息
MINA、Netty、Twisted一起學(三):TCP消息固定大小的前綴(Header)
MINA、Netty、Twisted一起學(四):定制自己的協議
MINA、Netty、Twisted一起學(五):整合protobuf
MINA、Netty、Twisted一起學(六):session
MINA、Netty、Twisted一起學(七):公布/訂閱(Publish/Subscribe)
MINA、Netty、Twisted一起學(八):HTTPserver
MINA、Netty、Twisted一起學(九):異步IO和回調函數
MINA、Netty、Twisted一起學(十):線程模型
MINA、Netty、Twisted一起學(十一):SSL/TLS
MINA、Netty、Twisted一起學(十二):HTTPS
源代碼
https://github.com/wucao/mina-netty-twisted