高效的序列化/反序列化數據方式 Protobuf
github地址
目錄
- protocolBuffers 序列化
- Int32
- String
- Map
- slice
- 序列化小結
- protocolBuffers 反序列化
- Int32
- String
- Map
- slice
- 序列化小結
- 序列化/反序列化性能
- 最后

protocolBuffers序列化
上篇文章中其實已經講過了 encode
的過程,這篇文章以 golang
為例,從代碼實現的層面講講序列化和反序列化的過程。
舉個 go
使用 protobuf
進行數據序列化和反序列化的例子,本篇文章從這個例子開始。
先新建一個 example
的 message
:
syntax = "proto2";
package example;enum FOO { X = 17; };message Test {required string label = 1;optional int32 type = 2 [default=77];repeated int64 reps = 3;optional group OptionalGroup = 4 {required string RequiredField = 5;}
}
利用 protoc-gen-go
生成對應的 get/set
方法。代碼中就可以用生成的代碼進行序列化和反序列化了。
package mainimport ("log""github.com/golang/protobuf/proto""path/to/example"
)func main() {test := &example.Test {Label: proto.String("hello"),Type: proto.Int32(17),Reps: []int64{1, 2, 3},Optionalgroup: &example.Test_OptionalGroup {RequiredField: proto.String("good bye"),},}data, err := proto.Marshal(test)if err != nil {log.Fatal("marshaling error: ", err)}newTest := &example.Test{}err = proto.Unmarshal(data, newTest)if err != nil {log.Fatal("unmarshaling error: ", err)}// Now test and newTest contain the same data.if test.GetLabel() != newTest.GetLabel() {log.Fatalf("data mismatch %q != %q", test.GetLabel(), newTest.GetLabel())}// etc.
}
上面代碼中 proto.Marshal()
是序列化過程。proto.Unmarshal()
是反序列化過程。這一章節先看看序列化過程的實現,下一章節再分析反序列化過程的實現。
// Marshal takes the protocol buffer
// and encodes it into the wire format, returning the data.
func Marshal(pb Message) ([]byte, error) {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {return m.Marshal()}p := NewBuffer(nil)err := p.Marshal(pb)if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil}return p.buf, err
}
序列化函數一進來,會先調用 message
對象自身的實現的序列化方法。
// Marshaler is the interface representing objects that can marshal themselves.
type Marshaler interface {Marshal() ([]byte, error)
}
Marshaler
是一個 interface
,這個接口是專門留給對象自定義序列化的。如果有實現,就 return
自己實現的方法。如果沒有,接下來就進行默認序列化方式。
p := NewBuffer(nil)
err := p.Marshal(pb)
if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil
}
新建一個 Buffer
,調用 Buffer
的 Marshal()
方法。message
經過序列化以后,數據流會放到 Buffer
的 buf
字節流中。序列化最終返回 buf
字節流即可。
type Buffer struct {buf []byte // encode/decode byte streamindex int // read point// pools of basic types to amortize allocation.bools []booluint32s []uint32uint64s []uint64// extra pools, only used with pointer_reflect.goint32s []int32int64s []int64float32s []float32float64s []float64
}
Buffer
的數據結構如上, Buffer
是用于序列化和反序列化 protocol buffers
的緩沖區管理器。它可以在調用的時候重用以減少內存使用量。內部維護了 7 個 pool
,3 個基礎數據類型的 pool
,4 個只能被 pointer_reflect
使用的 pool
。
func (p *Buffer) Marshal(pb Message) error {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {data, err := m.Marshal()p.buf = append(p.buf, data...)return err}t, base, err := getbase(pb)// 異常處理if structPointer_IsNil(base) {return ErrNil}if err == nil {err = p.enc_struct(GetProperties(t.Elem()), base)}// 用來統計 Encode 次數的if collectStats {(stats).Encode++ // Parens are to work around a goimports bug.}// maxMarshalSize = 1<<31 - 1,這個值是 protobuf 可以 encoded 的最大值。if len(p.buf) > maxMarshalSize {return ErrTooLarge}return err
}
Buffer
的 Marshal()
方法依舊先調用一下對象是否實現了 Marshal()
接口,如果實現了,還是讓它自己序列化,序列化之后的二進制數據流加入到 buf
數據流中。
func getbase(pb Message) (t reflect.Type, b structPointer, err error) {if pb == nil {err = ErrNilreturn}// get the reflect type of the pointer to the struct.t = reflect.TypeOf(pb)// get the address of the struct.value := reflect.ValueOf(pb)b = toStructPointer(value)return
}
getbase
方法通過 reflect
方法拿到了 message
的類型和對應 value
的結構體指針。拿到結構體指針先做異常處理。
所以序列化最核心的代碼其實就一句,p.enc_struct(GetProperties(t.Elem()), base)
// Encode a struct.
func (o *Buffer) enc_struct(prop *StructProperties, base structPointer) error {var state errorState// Encode fields in tag order so that decoders may use optimizations// that depend on the ordering.// https://developers.google.com/protocol-buffers/docs/encoding#orderfor _, i := range prop.order {p := prop.Prop[i]if p.enc != nil {err := p.enc(o, p, base)if err != nil {if err == ErrNil {if p.Required && state.err == nil {state.err = &RequiredNotSetError{p.Name}}} else if err == errRepeatedHasNil {// Give more context to nil values in repeated fields.return errors.New("repeated field " + p.OrigName + " has nil element")} else if !state.shouldContinue(err, p) {return err}}if len(o.buf) > maxMarshalSize {return ErrTooLarge}}}// Do oneof fields.if prop.oneofMarshaler != nil {m := structPointer_Interface(base, prop.stype).(Message)if err := prop.oneofMarshaler(m, o); err == ErrNil {return errOneofHasNil} else if err != nil {return err}}// Add unrecognized fields at the end.if prop.unrecField.IsValid() {v := *structPointer_Bytes(base, prop.unrecField)if len(o.buf)+len(v) > maxMarshalSize {return ErrTooLarge}if len(v) > 0 {o.buf = append(o.buf, v...)}}return state.err
}
上面代碼中可以看到,除去 oneof fields
和 unrecognized fields
是單獨最后處理的,其他類型都是調用的 p.enc(o, p, base)
進行序列化的。
Properties
的數據結構定義如下:
type Properties struct {Name string // name of the field, for error messagesOrigName string // original name before protocol compiler (always set)JSONName string // name to use for JSON; determined by protocWire stringWireType intTag intRequired boolOptional boolRepeated boolPacked bool // relevant for repeated primitives onlyEnum string // set for enum types onlyproto3 bool // whether this is known to be a proto3 field; set for []byte onlyoneof bool // whether this is a oneof fieldDefault string // default valueHasDefault bool // whether an explicit default was providedCustomType stringStdTime boolStdDuration boolenc encodervalEnc valueEncoder // set for bool and numeric types onlyfield fieldtagcode []byte // encoding of EncodeVarint((Tag<<3)|WireType)tagbuf [8]bytestype reflect.Type // set for struct types onlysstype reflect.Type // set for slices of structs types onlyctype reflect.Type // set for custom types onlysprop * StructProperties // set for struct types onlyisMarshaler boolisUnmarshaler boolmtype reflect.Type // set for map types onlymkeyprop * Properties // set for map types onlymvalprop * Properties // set for map types onlysize sizervalSize valueSizer // set for bool and numeric types onlydec decodervalDec valueDecoder // set for bool and numeric types only// If this is a packable field, this will be the decoder for the packed version of the field.packedDec decoder
}
在 Properties
這個結構體中,定義了名為 enc
的 encoder
和名為 dec
的 decoder
。
encoder
和 decoder
函數定義是完全一樣的。
type encoder func(p *Buffer, prop *Properties, base structPointer) error
type decoder func(p *Buffer, prop *Properties, base structPointer) error
encoder
和 decoder
函數初始化是在 Properties
中:
// Initialize the fields for encoding and decoding.
func (p *Properties) setEncAndDec(typ reflect.Type, f *reflect.StructField, lockGetProp bool) {// 下面代碼有刪減,類似的部分省略了// proto3 scalar typescase reflect.Int32:if p.proto3 {p.enc = (*Buffer).enc_proto3_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_int32} else {p.enc = (*Buffer).enc_ref_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_ref_int32}case reflect.Uint32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_ref_uint32}case reflect.Float32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_ref_uint32}case reflect.String:if p.proto3 {p.enc = (*Buffer).enc_proto3_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_proto3_string} else {p.enc = (*Buffer).enc_ref_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_ref_string}case reflect.Slice:switch t2 := t1.Elem(); t2.Kind() {default:logNoSliceEnc(t1, t2)breakcase reflect.Int32:if p.Packed {p.enc = (*Buffer).enc_slice_packed_int32p.size = size_slice_packed_int32} else {p.enc = (*Buffer).enc_slice_int32p.size = size_slice_int32}p.dec = (*Buffer).dec_slice_int32p.packedDec = (*Buffer).dec_slice_packed_int32default:logNoSliceEnc(t1, t2)break}}case reflect.Map:p.enc = (*Buffer).enc_new_mapp.dec = (*Buffer).dec_new_mapp.size = size_new_mapp.mtype = t1p.mkeyprop = &Properties{}p.mkeyprop.init(reflect.PtrTo(p.mtype.Key()), "Key", f.Tag.Get("protobuf_key"), nil, lockGetProp)p.mvalprop = &Properties{}vtype := p.mtype.Elem()if vtype.Kind() != reflect.Ptr && vtype.Kind() != reflect.Slice {// The value type is not a message (*T) or bytes ([]byte),// so we need encoders for the pointer to this type.vtype = reflect.PtrTo(vtype)}p.mvalprop.CustomType = p.CustomTypep.mvalprop.StdDuration = p.StdDurationp.mvalprop.StdTime = p.StdTimep.mvalprop.init(vtype, "Value", f.Tag.Get("protobuf_val"), nil, lockGetProp)}p.setTag(lockGetProp)
}
上面代碼中,分別把各個類型都進行 switch - case
枚舉,每種情況都設置對應的 encode
編碼器,decode
解碼器,size
大小。proto2
和 proto3
有區別的地方也分成2種不同的情況進行處理。
有以下幾種類型,reflect.Bool
、reflect.Int32
、reflect.Uint32
、reflect.Int64
、reflect.Uint64
、reflect.Float32
、reflect.Float64
、reflect.String
、reflect.Struct
、reflect.Ptr、reflect.Slice
、reflect.Map
共 12 種大的分類。
下面主要挑 3 類,Int32
、String
、Map
代碼實現進行分析。
Int32
func (o *Buffer) enc_proto3_int32(p *Properties, base structPointer) error {v := structPointer_Word32Val(base, p.field)x := int32(word32Val_Get(v)) // permit sign extension to use full 64-bit rangeif x == 0 {return ErrNil}o.buf = append(o.buf, p.tagcode...)p.valEnc(o, uint64(x))return nil
}
處理 Int32
代碼比較簡單,先把 tagcode
放進 buf
二進制數據流緩沖區,接著序列化 Int32
,序列化以后緊接著 tagcode
后面放進緩沖區。
// EncodeVarint writes a varint-encoded integer to the Buffer.
// This is the format for the
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
func (p *Buffer) EncodeVarint(x uint64) error {for x >= 1<<7 {p.buf = append(p.buf, uint8(x&0x7f|0x80))x >>= 7}p.buf = append(p.buf, uint8(x))return nil
}
Int32
的編碼處理方法在上篇里面講過,用的 Varint
處理方法。上面這個函數同樣適用于處理 int32
, int64
, uint32
, uint64
, bool
, enum
。
順道也可以看看 sint32
、Fixed32
的具體代碼實現。
// EncodeZigzag32 writes a zigzag-encoded 32-bit integer
// to the Buffer.
// This is the format used for the sint32 protocol buffer type.
func (p *Buffer) EncodeZigzag32(x uint64) error {// use signed number to get arithmetic right shift.return p.EncodeVarint(uint64((uint32(x) << 1) ^ uint32((int32(x) >> 31))))
}
針對有符號的 sint32
,采取的是先 Zigzag
,然后在 Varint
的處理方式。
// EncodeFixed32 writes a 32-bit integer to the Buffer.
// This is the format for the
// fixed32, sfixed32, and float protocol buffer types.
func (p *Buffer) EncodeFixed32(x uint64) error {p.buf = append(p.buf,uint8(x),uint8(x>>8),uint8(x>>16),uint8(x>>24))return nil
}
對于 Fixed32
的處理,僅僅只是位移操作,并沒有做什么壓縮操作。
String
func (o *Buffer) enc_proto3_string(p *Properties, base structPointer) error {v := *structPointer_StringVal(base, p.field)if v == "" {return ErrNil}o.buf = append(o.buf, p.tagcode...)o.EncodeStringBytes(v)return nil
}
序列化字符串也分2步,先把 tagcode
放進去,然后再序列化數據。
// EncodeStringBytes writes an encoded string to the Buffer.
// This is the format used for the proto2 string type.
func (p *Buffer) EncodeStringBytes(s string) error {p.EncodeVarint(uint64(len(s)))p.buf = append(p.buf, s...)return nil
}
序列化字符串的時候,會先把字符串的長度通過編碼 Varint
的方式,寫到 buf
中。長度后面再緊跟著 string
。這也就是 tag - length - value
的實現。
Map
// Encode a map field.
func (o *Buffer) enc_new_map(p *Properties, base structPointer) error {var state errorState // XXX: or do we need to plumb this through?v := structPointer_NewAt(base, p.field, p.mtype).Elem() // map[K]Vif v.Len() == 0 {return nil}keycopy, valcopy, keybase, valbase := mapEncodeScratch(p.mtype)enc := func() error {if err := p.mkeyprop.enc(o, p.mkeyprop, keybase); err != nil {return err}if err := p.mvalprop.enc(o, p.mvalprop, valbase); err != nil && err != ErrNil {return err}return nil}// Don't sort map keys. It is not required by the spec, and C++ doesn't do it.for _, key := range v.MapKeys() {val := v.MapIndex(key)keycopy.Set(key)valcopy.Set(val)o.buf = append(o.buf, p.tagcode...)if err := o.enc_len_thing(enc, &state); err != nil {return err}}return nil
}
上述代碼也可以序列化字典數組,例如:
map<key_type, value_type> map_field = N;
轉換成對應的 repeated message
形式再進行序列化。
message MapFieldEntry {key_type key = 1;value_type value = 2;
}
repeated MapFieldEntry map_field = N;
map
序列化是針對每個 k-v
,都先放入 tagcode
,然后再序列化 k-v
。這里需要化未知長度的結構體的時候需要調用 enc_len_thing()
方法。
// Encode something, preceded by its encoded length (as a varint).
func (o *Buffer) enc_len_thing(enc func() error, state *errorState) error {iLen := len(o.buf)o.buf = append(o.buf, 0, 0, 0, 0) // reserve four bytes for lengthiMsg := len(o.buf)err := enc()if err != nil && !state.shouldContinue(err, nil) {return err}lMsg := len(o.buf) - iMsglLen := sizeVarint(uint64(lMsg))switch x := lLen - (iMsg - iLen); {case x > 0: // actual length is x bytes larger than the space we reserved// Move msg x bytes right.o.buf = append(o.buf, zeroes[:x]...)copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])case x < 0: // actual length is x bytes smaller than the space we reserved// Move msg x bytes left.copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])o.buf = o.buf[:len(o.buf)+x] // x is negative}// Encode the length in the reserved space.o.buf = o.buf[:iLen]o.EncodeVarint(uint64(lMsg))o.buf = o.buf[:len(o.buf)+lMsg]return state.err
}
enc_len_thing()
方法會先預存 4 個字節的長度空位。序列化以后算出長度。如果長度比 4 個字節還要長,則右移序列化的二進制數據,把長度填到 tagcode
和數據之間。如果長度小于 4 個字節,相應的要左移。
slice
最后再舉一個數組的例子。以 []int32
為例。
// Encode a slice of int32s ([]int32) in packed format.
func (o *Buffer) enc_slice_packed_int32(p *Properties, base structPointer) error {s := structPointer_Word32Slice(base, p.field)l := s.Len()if l == 0 {return ErrNil}// TODO: Reuse a Buffer.buf := NewBuffer(nil)for i := 0; i < l; i++ {x := int32(s.Index(i)) // permit sign extension to use full 64-bit rangep.valEnc(buf, uint64(x))}o.buf = append(o.buf, p.tagcode...)o.EncodeVarint(uint64(len(buf.buf)))o.buf = append(o.buf, buf.buf...)return nil
}
序列化這個數組,分3步,先把 tagcode
放進去,然后再序列化整個數組的長度,最后把數組的每個數據都序列化放在后面。最后形成 tag - length - value - value - value
的形式。
上述就是 Protocol Buffer
序列化的過程。
序列化小結
Protocol Buffer
序列化采用 Varint、Zigzag
方法,壓縮 int
型整數和帶符號的整數。對浮點型數字不做壓縮(這里可以進一步的壓縮,Protocol Buffer
還有提升空間)。編碼 .proto
文件,會對 option
和 repeated
字段進行檢查,若 optional
或 repeated
字段沒有被設置字段值,那么該字段在序列化時的數據中是完全不存在的,即不進行序列化(少編碼一個字段)。
上面這兩點做到了壓縮數據,序列化工作量減少。
序列化的過程都是二進制的位移,速度非常快。數據都以 tag - length - value
(或者 tag - value
)的形式存在二進制數據流中。采用了 TLV
結構存儲數據以后,也擺脫了 JSON
中的 {、}、; 、
這些分隔符,沒有這些分隔符也算是再一次減少了一部分數據。
這一點做到了序列化速度非常快。
回到頂部
protocolBuffers反序列化
反序列化的實現完全是序列化實現的逆過程。
func Unmarshal(buf []byte, pb Message) error {pb.Reset()return UnmarshalMerge(buf, pb)
}
在反序列化開始之前,先重置一下緩沖區。
func (p *Buffer) Reset() {p.buf = p.buf[0:0] // for reading/writingp.index = 0 // for reading
}
清空 buf
中的所有數據,并且重置 index
。
func UnmarshalMerge(buf []byte, pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {return u.Unmarshal(buf)}return NewBuffer(buf).Unmarshal(pb)
}
反序列化數據的開始從上面這個函數開始,如果傳進來的 message
的結果和 buf
結果不匹配,最終得到的結果是不可預知的。反序列化之前,同樣會先調用一下對應自己身自定義的 Unmarshal()
方法。
type Unmarshaler interface {Unmarshal([]byte) error
}
Unmarshal()
是一個可以自己實現的接口。
UnmarshalMerge
中會調用 Unmarshal(pb Message)
方法。
func (p *Buffer) Unmarshal(pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {err := u.Unmarshal(p.buf[p.index:])p.index = len(p.buf)return err}typ, base, err := getbase(pb)if err != nil {return err}err = p.unmarshalType(typ.Elem(), GetProperties(typ.Elem()), false, base)if collectStats {stats.Decode++}return err
}
Unmarshal(pb Message)
這個函數只有一個入參,和 proto.Unmarshal()
方法函數簽名不同(前面的函數只有 1 個入參,后面的有 2 個入參)。兩者的區別在于,1 個入參的函數實現里面并不會重置 buf
緩沖區,二個入參的會先重置 buf
緩沖區。
這兩個函數最終都會調用 unmarshalType()
方法,這個函數是最終支持反序列化的函數。
func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error {var state errorStaterequired, reqFields := prop.reqCount, uint64(0)var err errorfor err == nil && o.index < len(o.buf) {oi := o.indexvar u uint64u, err = o.DecodeVarint()if err != nil {break}wire := int(u & 0x7)// 下面代碼有省略dec := p.dec// 中間代碼有省略decErr := dec(o, p, base)if decErr != nil && !state.shouldContinue(decErr, p) {err = decErr}if err == nil && p.Required {// Successfully decoded a required field.if tag <= 64 {// use bitmap for fields 1-64 to catch field reuse.var mask uint64 = 1 << uint64(tag-1)if reqFields&mask == 0 {// new required fieldreqFields |= maskrequired--}} else {// This is imprecise. It can be fooled by a required field// with a tag > 64 that is encoded twice; that's very rare.// A fully correct implementation would require allocating// a data structure, which we would like to avoid.required--}}}if err == nil {if is_group {return io.ErrUnexpectedEOF}if state.err != nil {return state.err}if required > 0 {// Not enough information to determine the exact field. If we use extra// CPU, we could determine the field only if the missing required field// has a tag <= 64 and we check reqFields.return &RequiredNotSetError{"{Unknown}"}}}return err
}
unmarshalType()
函數比較長,里面處理的情況比較多,有 oneof,WireEndGroup
。真正處理反序列化的函數在 decErr := dec(o, p, base)
這一行。
dec
函數在 Properties
的 setEncAndDec()
函數中進行了初始化。上面序列化的時候談到過那個函數了,這里就不再贅述了。dec()
函數針對每個不同類型都有對應的反序列化函數。
同樣的,接下來也舉 4 個例子,看看反序列化的實際代碼實現。
Int32
func (o *Buffer) dec_proto3_int32(p *Properties, base structPointer) error {u, err := p.valDec(o)if err != nil {return err}word32Val_Set(structPointer_Word32Val(base, p.field), uint32(u))return nil
}
反序列化 Int32
代碼比較簡單,原理是按照 encode
的逆過程,還原原來的數據。
func (p *Buffer) DecodeVarint() (x uint64, err error) {i := p.indexbuf := p.bufif i >= len(buf) {return 0, io.ErrUnexpectedEOF} else if buf[i] < 0x80 {p.index++return uint64(buf[i]), nil} else if len(buf)-i < 10 {return p.decodeVarintSlow()}var b uint64// we already checked the first bytex = uint64(buf[i]) - 0x80i++b = uint64(buf[i])i++x += b << 7if b&0x80 == 0 {goto done}x -= 0x80 << 7b = uint64(buf[i])i++x += b << 14if b&0x80 == 0 {goto done}x -= 0x80 << 14b = uint64(buf[i])i++x += b << 21if b&0x80 == 0 {goto done}x -= 0x80 << 21b = uint64(buf[i])i++x += b << 28if b&0x80 == 0 {goto done}x -= 0x80 << 28b = uint64(buf[i])i++x += b << 35if b&0x80 == 0 {goto done}x -= 0x80 << 35b = uint64(buf[i])i++x += b << 42if b&0x80 == 0 {goto done}x -= 0x80 << 42b = uint64(buf[i])i++x += b << 49if b&0x80 == 0 {goto done}x -= 0x80 << 49b = uint64(buf[i])i++x += b << 56if b&0x80 == 0 {goto done}x -= 0x80 << 56b = uint64(buf[i])i++x += b << 63if b&0x80 == 0 {goto done}// x -= 0x80 << 63 // Always zero.return 0, errOverflowdone:p.index = ireturn x, nil
}
Int32
序列化之后,第一個字節一定是 0x80
,那么除去這個字節以后,后面的每個二進制字節都是數據,剩下的步驟就是通過位移操作把每個數字都加起來。上面這個反序列化的函數同樣適用于 int32
, int64
, uint32
, uint64
, bool
, 和 enum
。
順道也可以看看 sint32
、Fixed32
的反序列化具體代碼實現。
func (p *Buffer) DecodeZigzag32() (x uint64, err error) {x, err = p.DecodeVarint()if err != nil {return}x = uint64((uint32(x) >> 1) ^ uint32((int32(x&1)<<31)>>31))return
}
針對有符號的 sint32
,反序列化的過程就是先反序列 Varint
,再反序列化 Zigzag
。
func (p *Buffer) DecodeFixed32() (x uint64, err error) {// x, err already 0i := p.index + 4if i < 0 || i > len(p.buf) {err = io.ErrUnexpectedEOFreturn}p.index = ix = uint64(p.buf[i-4])x |= uint64(p.buf[i-3]) << 8x |= uint64(p.buf[i-2]) << 16x |= uint64(p.buf[i-1]) << 24return
}
Fixed32
反序列化的過程也是通過位移,每個字節的內容都累加,就可以還原出原先的數據。注意這里也要先跳過 tag
的位置。
String
func (p *Buffer) DecodeRawBytes(alloc bool) (buf []byte, err error) {n, err := p.DecodeVarint()if err != nil {return nil, err}nb := int(n)if nb < 0 {return nil, fmt.Errorf("proto: bad byte length %d", nb)}end := p.index + nbif end < p.index || end > len(p.buf) {return nil, io.ErrUnexpectedEOF}if !alloc {// todo: check if can get more uses of alloc=falsebuf = p.buf[p.index:end]p.index += nbreturn}buf = make([]byte, nb)copy(buf, p.buf[p.index:])p.index += nbreturn
}
反序列化 string
先把 length
序列化出來,通過 DecodeVarint
的方式。拿到 length
以后,剩下的就是直接拷貝的過程。在上篇 encode
中,我們知道字符串是不做處理,直接放到二進制流里面的,所以反序列化直接取出即可。
Map
func (o *Buffer) dec_new_map(p *Properties, base structPointer) error {raw, err := o.DecodeRawBytes(false)if err != nil {return err}oi := o.index // index at the end of this map entryo.index -= len(raw) // move buffer back to start of map entrymptr := structPointer_NewAt(base, p.field, p.mtype) // *map[K]Vif mptr.Elem().IsNil() {mptr.Elem().Set(reflect.MakeMap(mptr.Type().Elem()))}v := mptr.Elem() // map[K]V// 這里省略一些代碼,主要是為了 key - value 準備的一些可以雙重間接尋址的占位符,具體原因可以見序列化代碼里面的 enc_new_map 函數// Decode.// This parses a restricted wire format, namely the encoding of a message// with two fields. See enc_new_map for the format.for o.index < oi {// tagcode for key and value properties are always a single byte// because they have tags 1 and 2.tagcode := o.buf[o.index]o.index++switch tagcode {case p.mkeyprop.tagcode[0]:if err := p.mkeyprop.dec(o, p.mkeyprop, keybase); err != nil {return err}case p.mvalprop.tagcode[0]:if err := p.mvalprop.dec(o, p.mvalprop, valbase); err != nil {return err}default:// TODO: Should we silently skip this instead?return fmt.Errorf("proto: bad map data tag %d", raw[0])}}keyelem, valelem := keyptr.Elem(), valptr.Elem()if !keyelem.IsValid() {keyelem = reflect.Zero(p.mtype.Key())}if !valelem.IsValid() {valelem = reflect.Zero(p.mtype.Elem())}v.SetMapIndex(keyelem, valelem)return nil
}
反序列化 map
需要把每個 tag
取出來,然后緊接著反序列化每個 key - value
。最后會判斷 keyelem
和 valelem
是否為零值,如果是零值要分別調用 reflect.Zero
處理零值的情況。
slice
最后還是舉一個數組的例子。以 []int32
為例。
func (o *Buffer) dec_slice_packed_int32(p *Properties, base structPointer) error {v := structPointer_Word32Slice(base, p.field)nn, err := o.DecodeVarint()if err != nil {return err}nb := int(nn) // number of bytes of encoded int32sfin := o.index + nbif fin < o.index {return errOverflow}for o.index < fin {u, err := p.valDec(o)if err != nil {return err}v.Append(uint32(u))}return nil
}
反序列化這個數組,分2步,跳過 tagcode
拿到 length
,反序列化 length
。在 length
這個長度中依次反序列化各個 value
。
上述就是 Protocol Buffer
反序列化的過程。
序列化小結
Protocol Buffer
反序列化直接讀取二進制字節數據流,反序列化就是 encode
的反過程,同樣是一些二進制操作。反序列化的時候,通常只需要用到 length
。tag
值只是用來標識類型的,Properties
的 setEncAndDec()
方法里面已經把每個類型對應的 decode
解碼器初始化好了,所以反序列化的時候,tag
值可以直接跳過,從 length
開始處理。
XML
的解析過程就復雜一些。XML
需要從文件中讀取出字符串,再轉換為 XML
文檔對象結構模型。之后,再從 XML
文檔對象結構模型中讀取指定節點的字符串,最后再將這個字符串轉換成指定類型的變量。這個過程非常復雜,其中將 XML
文件轉換為文檔對象結構模型的過程通常需要完成詞法文法分析等大量消耗 CPU
的復雜計算。
回到頂部
序列化/反序列化性能
Protocol Buffer
一直被人們認為是高性能的存在。也有很多人做過實現,驗證了這一說法。例如這個鏈接里面的實驗 jvm-serializers。
在看數據之前,我們可以先理性的分析一下 Protocol Buffer
和 JSON
、XML
這些比有哪些優勢:
Protobuf
采用了 Varint
、Zigzag
大幅的壓縮了整數類型,也沒有 JSON
里面的 {、}、;、
這些數據分隔符,有 option
字段標識的,沒有數據的時候不會進行反序列化。這幾個措施導致 pb
的數據量整體的就比 JSON
少很多。
Protobuf
采取的是 TLV
的形式,JSON
這些都是字符串的形式。字符串比對應該比基于數字的字段 tag
更耗時。Protobuf
在正文前有一個大小或者長度的標記,而 JSON
必須全文掃描無法跳過不需要的字段。
下面這張圖來自參考鏈接里面的 《Protobuf有沒有比JSON快5倍?用代碼來擊破pb性能神話》:

從這個實驗來看,確實 Protobuf
在序列化數字這方面性能是非常強悍的。
序列化 / 反序列化數字確實是 Protobuf
針對 JSON
和 XML
的優勢,但是它也存在一些沒有優勢的地方。比如字符串。字符串在 Protobuf
中基本沒有處理,除了前面加了 tag - length
。在序列化 / 反序列化字符串的過程中,字符串拷貝的速度反而決定的真正的速度。

從上圖可以看到 encode
字符串的時候,速度基本和 JSON
相差無幾。
回到頂部
最后
至此,關于 protocol buffers
的所有,讀者應該了然于胸了。
protocol buffers
誕生之初也并不是為了傳輸數據存在的,只是為了解決服務器多版本協議兼容的問題。實質其實是發明了一個新的跨語言無歧義的 IDL (Interface description language)
。只不過人們后來發現用它來傳輸數據也不錯,才開始用 protocol buffers
。
想用 protocol buffers
替換 JSON
,可能是考慮到:
-
protocol buffers
相同數據,傳輸的數據量比JSON
小,gzip
或者7zip
壓縮以后,網絡傳輸消耗較少。 -
protocol buffers
不是自我描述的,在缺少.proto
文件以后,有一定的加密性,數據傳輸過程中都是二進制流,并不是明文。 -
protocol buffers
提供了一套工具,自動化生成代碼也非常方便。 -
protocol buffers
具有向后兼容性,改變了數據結構以后,對老的版本沒有影響。 -
protocol buffers
原生完美兼容RPC
調用。
如果很少用到整型數字,浮點型數字,全部都是字符串數據,那么 JSON 和 protocol buffers
性能不會差太多。純前端之間交互的話,選擇 JSON
或者 protocol buffers
差別不是很大。
與后端交互過程中,用到 protocol buffers
比較多,筆者認為選擇 protocol buffers
除了性能強以外,完美兼容 RPC
調用也是一個重要因素。
回到頂部