高效的序列化/反序列化數據方式 Protobuf

高效的序列化/反序列化數據方式 Protobuf

github地址

目錄

  • protocolBuffers 序列化
    • Int32
    • String
    • Map
    • slice
    • 序列化小結
  • protocolBuffers 反序列化
    • Int32
    • String
    • Map
    • slice
    • 序列化小結
  • 序列化/反序列化性能
  • 最后

protocolBuffers序列化

上篇文章中其實已經講過了 encode 的過程,這篇文章以 golang 為例,從代碼實現的層面講講序列化和反序列化的過程。

舉個 go 使用 protobuf 進行數據序列化和反序列化的例子,本篇文章從這個例子開始。

先新建一個 examplemessage

syntax = "proto2";
package example;enum FOO { X = 17; };message Test {required string label = 1;optional int32 type = 2 [default=77];repeated int64 reps = 3;optional group OptionalGroup = 4 {required string RequiredField = 5;}
}

利用 protoc-gen-go 生成對應的 get/set 方法。代碼中就可以用生成的代碼進行序列化和反序列化了。

package mainimport ("log""github.com/golang/protobuf/proto""path/to/example"
)func main() {test := &example.Test {Label: proto.String("hello"),Type:  proto.Int32(17),Reps:  []int64{1, 2, 3},Optionalgroup: &example.Test_OptionalGroup {RequiredField: proto.String("good bye"),},}data, err := proto.Marshal(test)if err != nil {log.Fatal("marshaling error: ", err)}newTest := &example.Test{}err = proto.Unmarshal(data, newTest)if err != nil {log.Fatal("unmarshaling error: ", err)}// Now test and newTest contain the same data.if test.GetLabel() != newTest.GetLabel() {log.Fatalf("data mismatch %q != %q", test.GetLabel(), newTest.GetLabel())}// etc.
}

上面代碼中 proto.Marshal() 是序列化過程。proto.Unmarshal() 是反序列化過程。這一章節先看看序列化過程的實現,下一章節再分析反序列化過程的實現。

// Marshal takes the protocol buffer
// and encodes it into the wire format, returning the data.
func Marshal(pb Message) ([]byte, error) {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {return m.Marshal()}p := NewBuffer(nil)err := p.Marshal(pb)if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil}return p.buf, err
}

序列化函數一進來,會先調用 message 對象自身的實現的序列化方法。

// Marshaler is the interface representing objects that can marshal themselves.
type Marshaler interface {Marshal() ([]byte, error)
}

Marshaler 是一個 interface ,這個接口是專門留給對象自定義序列化的。如果有實現,就 return 自己實現的方法。如果沒有,接下來就進行默認序列化方式。

p := NewBuffer(nil)
err := p.Marshal(pb)
if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil
}

新建一個 Buffer ,調用 BufferMarshal() 方法。message 經過序列化以后,數據流會放到 Bufferbuf 字節流中。序列化最終返回 buf 字節流即可。

type Buffer struct {buf   []byte // encode/decode byte streamindex int    // read point// pools of basic types to amortize allocation.bools   []booluint32s []uint32uint64s []uint64// extra pools, only used with pointer_reflect.goint32s   []int32int64s   []int64float32s []float32float64s []float64
}

Buffer 的數據結構如上, Buffer 是用于序列化和反序列化 protocol buffers 的緩沖區管理器。它可以在調用的時候重用以減少內存使用量。內部維護了 7 個 pool ,3 個基礎數據類型的 pool ,4 個只能被 pointer_reflect 使用的 pool

func (p *Buffer) Marshal(pb Message) error {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {data, err := m.Marshal()p.buf = append(p.buf, data...)return err}t, base, err := getbase(pb)// 異常處理if structPointer_IsNil(base) {return ErrNil}if err == nil {err = p.enc_struct(GetProperties(t.Elem()), base)}// 用來統計 Encode 次數的if collectStats {(stats).Encode++ // Parens are to work around a goimports bug.}// maxMarshalSize = 1<<31 - 1,這個值是 protobuf 可以 encoded 的最大值。if len(p.buf) > maxMarshalSize {return ErrTooLarge}return err
}

BufferMarshal() 方法依舊先調用一下對象是否實現了 Marshal() 接口,如果實現了,還是讓它自己序列化,序列化之后的二進制數據流加入到 buf 數據流中。

func getbase(pb Message) (t reflect.Type, b structPointer, err error) {if pb == nil {err = ErrNilreturn}// get the reflect type of the pointer to the struct.t = reflect.TypeOf(pb)// get the address of the struct.value := reflect.ValueOf(pb)b = toStructPointer(value)return
}

getbase 方法通過 reflect 方法拿到了 message 的類型和對應 value 的結構體指針。拿到結構體指針先做異常處理。

所以序列化最核心的代碼其實就一句,p.enc_struct(GetProperties(t.Elem()), base)

// Encode a struct.
func (o *Buffer) enc_struct(prop *StructProperties, base structPointer) error {var state errorState// Encode fields in tag order so that decoders may use optimizations// that depend on the ordering.// https://developers.google.com/protocol-buffers/docs/encoding#orderfor _, i := range prop.order {p := prop.Prop[i]if p.enc != nil {err := p.enc(o, p, base)if err != nil {if err == ErrNil {if p.Required && state.err == nil {state.err = &RequiredNotSetError{p.Name}}} else if err == errRepeatedHasNil {// Give more context to nil values in repeated fields.return errors.New("repeated field " + p.OrigName + " has nil element")} else if !state.shouldContinue(err, p) {return err}}if len(o.buf) > maxMarshalSize {return ErrTooLarge}}}// Do oneof fields.if prop.oneofMarshaler != nil {m := structPointer_Interface(base, prop.stype).(Message)if err := prop.oneofMarshaler(m, o); err == ErrNil {return errOneofHasNil} else if err != nil {return err}}// Add unrecognized fields at the end.if prop.unrecField.IsValid() {v := *structPointer_Bytes(base, prop.unrecField)if len(o.buf)+len(v) > maxMarshalSize {return ErrTooLarge}if len(v) > 0 {o.buf = append(o.buf, v...)}}return state.err
}

上面代碼中可以看到,除去 oneof fieldsunrecognized fields 是單獨最后處理的,其他類型都是調用的 p.enc(o, p, base) 進行序列化的。

Properties 的數據結構定義如下:

type Properties struct {Name     string // name of the field, for error messagesOrigName string // original name before protocol compiler (always set)JSONName string // name to use for JSON; determined by protocWire     stringWireType intTag      intRequired boolOptional boolRepeated boolPacked   bool   // relevant for repeated primitives onlyEnum     string // set for enum types onlyproto3   bool   // whether this is known to be a proto3 field; set for []byte onlyoneof    bool   // whether this is a oneof fieldDefault     string // default valueHasDefault  bool   // whether an explicit default was providedCustomType  stringStdTime     boolStdDuration boolenc           encodervalEnc        valueEncoder // set for bool and numeric types onlyfield         fieldtagcode       []byte // encoding of EncodeVarint((Tag<<3)|WireType)tagbuf        [8]bytestype         reflect.Type      // set for struct types onlysstype        reflect.Type      // set for slices of structs types onlyctype         reflect.Type      // set for custom types onlysprop         * StructProperties // set for struct types onlyisMarshaler   boolisUnmarshaler boolmtype    reflect.Type // set for map types onlymkeyprop * Properties  // set for map types onlymvalprop * Properties  // set for map types onlysize    sizervalSize valueSizer // set for bool and numeric types onlydec    decodervalDec valueDecoder // set for bool and numeric types only// If this is a packable field, this will be the decoder for the packed version of the field.packedDec decoder
}

Properties 這個結構體中,定義了名為 encencoder 和名為 decdecoder

encoderdecoder 函數定義是完全一樣的。

type encoder func(p *Buffer, prop *Properties, base structPointer) error
type decoder func(p *Buffer, prop *Properties, base structPointer) error

encoderdecoder 函數初始化是在 Properties 中:

// Initialize the fields for encoding and decoding.
func (p *Properties) setEncAndDec(typ reflect.Type, f *reflect.StructField, lockGetProp bool) {// 下面代碼有刪減,類似的部分省略了// proto3 scalar typescase reflect.Int32:if p.proto3 {p.enc = (*Buffer).enc_proto3_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_int32} else {p.enc = (*Buffer).enc_ref_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_ref_int32}case reflect.Uint32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_ref_uint32}case reflect.Float32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_ref_uint32}case reflect.String:if p.proto3 {p.enc = (*Buffer).enc_proto3_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_proto3_string} else {p.enc = (*Buffer).enc_ref_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_ref_string}case reflect.Slice:switch t2 := t1.Elem(); t2.Kind() {default:logNoSliceEnc(t1, t2)breakcase reflect.Int32:if p.Packed {p.enc = (*Buffer).enc_slice_packed_int32p.size = size_slice_packed_int32} else {p.enc = (*Buffer).enc_slice_int32p.size = size_slice_int32}p.dec = (*Buffer).dec_slice_int32p.packedDec = (*Buffer).dec_slice_packed_int32default:logNoSliceEnc(t1, t2)break}}case reflect.Map:p.enc = (*Buffer).enc_new_mapp.dec = (*Buffer).dec_new_mapp.size = size_new_mapp.mtype = t1p.mkeyprop = &Properties{}p.mkeyprop.init(reflect.PtrTo(p.mtype.Key()), "Key", f.Tag.Get("protobuf_key"), nil, lockGetProp)p.mvalprop = &Properties{}vtype := p.mtype.Elem()if vtype.Kind() != reflect.Ptr && vtype.Kind() != reflect.Slice {// The value type is not a message (*T) or bytes ([]byte),// so we need encoders for the pointer to this type.vtype = reflect.PtrTo(vtype)}p.mvalprop.CustomType = p.CustomTypep.mvalprop.StdDuration = p.StdDurationp.mvalprop.StdTime = p.StdTimep.mvalprop.init(vtype, "Value", f.Tag.Get("protobuf_val"), nil, lockGetProp)}p.setTag(lockGetProp)
}

上面代碼中,分別把各個類型都進行 switch - case 枚舉,每種情況都設置對應的 encode 編碼器,decode 解碼器,size 大小。proto2proto3 有區別的地方也分成2種不同的情況進行處理。

有以下幾種類型,reflect.Boolreflect.Int32reflect.Uint32reflect.Int64reflect.Uint64reflect.Float32reflect.Float64reflect.Stringreflect.Structreflect.Ptr、reflect.Slicereflect.Map 共 12 種大的分類。

下面主要挑 3 類,Int32StringMap 代碼實現進行分析。

Int32

func (o *Buffer) enc_proto3_int32(p *Properties, base structPointer) error {v := structPointer_Word32Val(base, p.field)x := int32(word32Val_Get(v)) // permit sign extension to use full 64-bit rangeif x == 0 {return ErrNil}o.buf = append(o.buf, p.tagcode...)p.valEnc(o, uint64(x))return nil
}

處理 Int32 代碼比較簡單,先把 tagcode 放進 buf 二進制數據流緩沖區,接著序列化 Int32 ,序列化以后緊接著 tagcode 后面放進緩沖區。

// EncodeVarint writes a varint-encoded integer to the Buffer.
// This is the format for the
// int32, int64, uint32, uint64, bool, and enum
// protocol buffer types.
func (p *Buffer) EncodeVarint(x uint64) error {for x >= 1<<7 {p.buf = append(p.buf, uint8(x&0x7f|0x80))x >>= 7}p.buf = append(p.buf, uint8(x))return nil
}

Int32 的編碼處理方法在上篇里面講過,用的 Varint 處理方法。上面這個函數同樣適用于處理 int32, int64, uint32, uint64, bool, enum

順道也可以看看 sint32Fixed32 的具體代碼實現。

// EncodeZigzag32 writes a zigzag-encoded 32-bit integer
// to the Buffer.
// This is the format used for the sint32 protocol buffer type.
func (p *Buffer) EncodeZigzag32(x uint64) error {// use signed number to get arithmetic right shift.return p.EncodeVarint(uint64((uint32(x) << 1) ^ uint32((int32(x) >> 31))))
}

針對有符號的 sint32 ,采取的是先 Zigzag ,然后在 Varint 的處理方式。

// EncodeFixed32 writes a 32-bit integer to the Buffer.
// This is the format for the
// fixed32, sfixed32, and float protocol buffer types.
func (p *Buffer) EncodeFixed32(x uint64) error {p.buf = append(p.buf,uint8(x),uint8(x>>8),uint8(x>>16),uint8(x>>24))return nil
}

對于 Fixed32 的處理,僅僅只是位移操作,并沒有做什么壓縮操作。

String

func (o *Buffer) enc_proto3_string(p *Properties, base structPointer) error {v := *structPointer_StringVal(base, p.field)if v == "" {return ErrNil}o.buf = append(o.buf, p.tagcode...)o.EncodeStringBytes(v)return nil
}

序列化字符串也分2步,先把 tagcode 放進去,然后再序列化數據。

// EncodeStringBytes writes an encoded string to the Buffer.
// This is the format used for the proto2 string type.
func (p *Buffer) EncodeStringBytes(s string) error {p.EncodeVarint(uint64(len(s)))p.buf = append(p.buf, s...)return nil
}

序列化字符串的時候,會先把字符串的長度通過編碼 Varint 的方式,寫到 buf 中。長度后面再緊跟著 string 。這也就是 tag - length - value 的實現。

Map

// Encode a map field.
func (o *Buffer) enc_new_map(p *Properties, base structPointer) error {var state errorState // XXX: or do we need to plumb this through?v := structPointer_NewAt(base, p.field, p.mtype).Elem() // map[K]Vif v.Len() == 0 {return nil}keycopy, valcopy, keybase, valbase := mapEncodeScratch(p.mtype)enc := func() error {if err := p.mkeyprop.enc(o, p.mkeyprop, keybase); err != nil {return err}if err := p.mvalprop.enc(o, p.mvalprop, valbase); err != nil && err != ErrNil {return err}return nil}// Don't sort map keys. It is not required by the spec, and C++ doesn't do it.for _, key := range v.MapKeys() {val := v.MapIndex(key)keycopy.Set(key)valcopy.Set(val)o.buf = append(o.buf, p.tagcode...)if err := o.enc_len_thing(enc, &state); err != nil {return err}}return nil
}

上述代碼也可以序列化字典數組,例如:

map<key_type, value_type> map_field = N;

轉換成對應的 repeated message 形式再進行序列化。

message MapFieldEntry {key_type key = 1;value_type value = 2;
}
repeated MapFieldEntry map_field = N;

map 序列化是針對每個 k-v ,都先放入 tagcode ,然后再序列化 k-v 。這里需要化未知長度的結構體的時候需要調用 enc_len_thing() 方法。

// Encode something, preceded by its encoded length (as a varint).
func (o *Buffer) enc_len_thing(enc func() error, state *errorState) error {iLen := len(o.buf)o.buf = append(o.buf, 0, 0, 0, 0) // reserve four bytes for lengthiMsg := len(o.buf)err := enc()if err != nil && !state.shouldContinue(err, nil) {return err}lMsg := len(o.buf) - iMsglLen := sizeVarint(uint64(lMsg))switch x := lLen - (iMsg - iLen); {case x > 0: // actual length is x bytes larger than the space we reserved// Move msg x bytes right.o.buf = append(o.buf, zeroes[:x]...)copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])case x < 0: // actual length is x bytes smaller than the space we reserved// Move msg x bytes left.copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])o.buf = o.buf[:len(o.buf)+x] // x is negative}// Encode the length in the reserved space.o.buf = o.buf[:iLen]o.EncodeVarint(uint64(lMsg))o.buf = o.buf[:len(o.buf)+lMsg]return state.err
}

enc_len_thing() 方法會先預存 4 個字節的長度空位。序列化以后算出長度。如果長度比 4 個字節還要長,則右移序列化的二進制數據,把長度填到 tagcode 和數據之間。如果長度小于 4 個字節,相應的要左移。

slice

最后再舉一個數組的例子。以 []int32 為例。

// Encode a slice of int32s ([]int32) in packed format.
func (o *Buffer) enc_slice_packed_int32(p *Properties, base structPointer) error {s := structPointer_Word32Slice(base, p.field)l := s.Len()if l == 0 {return ErrNil}// TODO: Reuse a Buffer.buf := NewBuffer(nil)for i := 0; i < l; i++ {x := int32(s.Index(i)) // permit sign extension to use full 64-bit rangep.valEnc(buf, uint64(x))}o.buf = append(o.buf, p.tagcode...)o.EncodeVarint(uint64(len(buf.buf)))o.buf = append(o.buf, buf.buf...)return nil
}

序列化這個數組,分3步,先把 tagcode 放進去,然后再序列化整個數組的長度,最后把數組的每個數據都序列化放在后面。最后形成 tag - length - value - value - value 的形式。

上述就是 Protocol Buffer 序列化的過程。

序列化小結

Protocol Buffer 序列化采用 Varint、Zigzag 方法,壓縮 int 型整數和帶符號的整數。對浮點型數字不做壓縮(這里可以進一步的壓縮,Protocol Buffer 還有提升空間)。編碼 .proto 文件,會對 optionrepeated 字段進行檢查,若 optionalrepeated 字段沒有被設置字段值,那么該字段在序列化時的數據中是完全不存在的,即不進行序列化(少編碼一個字段)。

上面這兩點做到了壓縮數據,序列化工作量減少。

序列化的過程都是二進制的位移,速度非常快。數據都以 tag - length - value (或者 tag - value)的形式存在二進制數據流中。采用了 TLV 結構存儲數據以后,也擺脫了 JSON 中的 {、}、; 、這些分隔符,沒有這些分隔符也算是再一次減少了一部分數據。

這一點做到了序列化速度非常快。

回到頂部

protocolBuffers反序列化

反序列化的實現完全是序列化實現的逆過程。

func Unmarshal(buf []byte, pb Message) error {pb.Reset()return UnmarshalMerge(buf, pb)
}

在反序列化開始之前,先重置一下緩沖區。

func (p *Buffer) Reset() {p.buf = p.buf[0:0] // for reading/writingp.index = 0        // for reading
}

清空 buf 中的所有數據,并且重置 index

func UnmarshalMerge(buf []byte, pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {return u.Unmarshal(buf)}return NewBuffer(buf).Unmarshal(pb)
}

反序列化數據的開始從上面這個函數開始,如果傳進來的 message 的結果和 buf 結果不匹配,最終得到的結果是不可預知的。反序列化之前,同樣會先調用一下對應自己身自定義的 Unmarshal() 方法。

type Unmarshaler interface {Unmarshal([]byte) error
}

Unmarshal() 是一個可以自己實現的接口。

UnmarshalMerge 中會調用 Unmarshal(pb Message) 方法。

func (p *Buffer) Unmarshal(pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {err := u.Unmarshal(p.buf[p.index:])p.index = len(p.buf)return err}typ, base, err := getbase(pb)if err != nil {return err}err = p.unmarshalType(typ.Elem(), GetProperties(typ.Elem()), false, base)if collectStats {stats.Decode++}return err
}

Unmarshal(pb Message) 這個函數只有一個入參,和 proto.Unmarshal() 方法函數簽名不同(前面的函數只有 1 個入參,后面的有 2 個入參)。兩者的區別在于,1 個入參的函數實現里面并不會重置 buf 緩沖區,二個入參的會先重置 buf 緩沖區。

這兩個函數最終都會調用 unmarshalType() 方法,這個函數是最終支持反序列化的函數。

func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error {var state errorStaterequired, reqFields := prop.reqCount, uint64(0)var err errorfor err == nil && o.index < len(o.buf) {oi := o.indexvar u uint64u, err = o.DecodeVarint()if err != nil {break}wire := int(u & 0x7)// 下面代碼有省略dec := p.dec// 中間代碼有省略decErr := dec(o, p, base)if decErr != nil && !state.shouldContinue(decErr, p) {err = decErr}if err == nil && p.Required {// Successfully decoded a required field.if tag <= 64 {// use bitmap for fields 1-64 to catch field reuse.var mask uint64 = 1 << uint64(tag-1)if reqFields&mask == 0 {// new required fieldreqFields |= maskrequired--}} else {// This is imprecise. It can be fooled by a required field// with a tag > 64 that is encoded twice; that's very rare.// A fully correct implementation would require allocating// a data structure, which we would like to avoid.required--}}}if err == nil {if is_group {return io.ErrUnexpectedEOF}if state.err != nil {return state.err}if required > 0 {// Not enough information to determine the exact field. If we use extra// CPU, we could determine the field only if the missing required field// has a tag <= 64 and we check reqFields.return &RequiredNotSetError{"{Unknown}"}}}return err
}

unmarshalType() 函數比較長,里面處理的情況比較多,有 oneof,WireEndGroup 。真正處理反序列化的函數在 decErr := dec(o, p, base) 這一行。

dec 函數在 PropertiessetEncAndDec() 函數中進行了初始化。上面序列化的時候談到過那個函數了,這里就不再贅述了。dec() 函數針對每個不同類型都有對應的反序列化函數。

同樣的,接下來也舉 4 個例子,看看反序列化的實際代碼實現。

Int32

func (o *Buffer) dec_proto3_int32(p *Properties, base structPointer) error {u, err := p.valDec(o)if err != nil {return err}word32Val_Set(structPointer_Word32Val(base, p.field), uint32(u))return nil
}

反序列化 Int32 代碼比較簡單,原理是按照 encode 的逆過程,還原原來的數據。

func (p *Buffer) DecodeVarint() (x uint64, err error) {i := p.indexbuf := p.bufif i >= len(buf) {return 0, io.ErrUnexpectedEOF} else if buf[i] < 0x80 {p.index++return uint64(buf[i]), nil} else if len(buf)-i < 10 {return p.decodeVarintSlow()}var b uint64// we already checked the first bytex = uint64(buf[i]) - 0x80i++b = uint64(buf[i])i++x += b << 7if b&0x80 == 0 {goto done}x -= 0x80 << 7b = uint64(buf[i])i++x += b << 14if b&0x80 == 0 {goto done}x -= 0x80 << 14b = uint64(buf[i])i++x += b << 21if b&0x80 == 0 {goto done}x -= 0x80 << 21b = uint64(buf[i])i++x += b << 28if b&0x80 == 0 {goto done}x -= 0x80 << 28b = uint64(buf[i])i++x += b << 35if b&0x80 == 0 {goto done}x -= 0x80 << 35b = uint64(buf[i])i++x += b << 42if b&0x80 == 0 {goto done}x -= 0x80 << 42b = uint64(buf[i])i++x += b << 49if b&0x80 == 0 {goto done}x -= 0x80 << 49b = uint64(buf[i])i++x += b << 56if b&0x80 == 0 {goto done}x -= 0x80 << 56b = uint64(buf[i])i++x += b << 63if b&0x80 == 0 {goto done}// x -= 0x80 << 63 // Always zero.return 0, errOverflowdone:p.index = ireturn x, nil
}

Int32 序列化之后,第一個字節一定是 0x80 ,那么除去這個字節以后,后面的每個二進制字節都是數據,剩下的步驟就是通過位移操作把每個數字都加起來。上面這個反序列化的函數同樣適用于 int32 , int64 , uint32 , uint64 , bool , 和 enum

順道也可以看看 sint32Fixed32 的反序列化具體代碼實現。

func (p *Buffer) DecodeZigzag32() (x uint64, err error) {x, err = p.DecodeVarint()if err != nil {return}x = uint64((uint32(x) >> 1) ^ uint32((int32(x&1)<<31)>>31))return
}

針對有符號的 sint32 ,反序列化的過程就是先反序列 Varint ,再反序列化 Zigzag

func (p *Buffer) DecodeFixed32() (x uint64, err error) {// x, err already 0i := p.index + 4if i < 0 || i > len(p.buf) {err = io.ErrUnexpectedEOFreturn}p.index = ix = uint64(p.buf[i-4])x |= uint64(p.buf[i-3]) << 8x |= uint64(p.buf[i-2]) << 16x |= uint64(p.buf[i-1]) << 24return
}

Fixed32 反序列化的過程也是通過位移,每個字節的內容都累加,就可以還原出原先的數據。注意這里也要先跳過 tag 的位置。

String

func (p *Buffer) DecodeRawBytes(alloc bool) (buf []byte, err error) {n, err := p.DecodeVarint()if err != nil {return nil, err}nb := int(n)if nb < 0 {return nil, fmt.Errorf("proto: bad byte length %d", nb)}end := p.index + nbif end < p.index || end > len(p.buf) {return nil, io.ErrUnexpectedEOF}if !alloc {// todo: check if can get more uses of alloc=falsebuf = p.buf[p.index:end]p.index += nbreturn}buf = make([]byte, nb)copy(buf, p.buf[p.index:])p.index += nbreturn
}

反序列化 string 先把 length 序列化出來,通過 DecodeVarint 的方式。拿到 length 以后,剩下的就是直接拷貝的過程。在上篇 encode 中,我們知道字符串是不做處理,直接放到二進制流里面的,所以反序列化直接取出即可。

Map

func (o *Buffer) dec_new_map(p *Properties, base structPointer) error {raw, err := o.DecodeRawBytes(false)if err != nil {return err}oi := o.index       // index at the end of this map entryo.index -= len(raw) // move buffer back to start of map entrymptr := structPointer_NewAt(base, p.field, p.mtype) // *map[K]Vif mptr.Elem().IsNil() {mptr.Elem().Set(reflect.MakeMap(mptr.Type().Elem()))}v := mptr.Elem() // map[K]V// 這里省略一些代碼,主要是為了 key - value 準備的一些可以雙重間接尋址的占位符,具體原因可以見序列化代碼里面的 enc_new_map 函數// Decode.// This parses a restricted wire format, namely the encoding of a message// with two fields. See enc_new_map for the format.for o.index < oi {// tagcode for key and value properties are always a single byte// because they have tags 1 and 2.tagcode := o.buf[o.index]o.index++switch tagcode {case p.mkeyprop.tagcode[0]:if err := p.mkeyprop.dec(o, p.mkeyprop, keybase); err != nil {return err}case p.mvalprop.tagcode[0]:if err := p.mvalprop.dec(o, p.mvalprop, valbase); err != nil {return err}default:// TODO: Should we silently skip this instead?return fmt.Errorf("proto: bad map data tag %d", raw[0])}}keyelem, valelem := keyptr.Elem(), valptr.Elem()if !keyelem.IsValid() {keyelem = reflect.Zero(p.mtype.Key())}if !valelem.IsValid() {valelem = reflect.Zero(p.mtype.Elem())}v.SetMapIndex(keyelem, valelem)return nil
}

反序列化 map 需要把每個 tag 取出來,然后緊接著反序列化每個 key - value 。最后會判斷 keyelemvalelem 是否為零值,如果是零值要分別調用 reflect.Zero 處理零值的情況。

slice

最后還是舉一個數組的例子。以 []int32 為例。

func (o *Buffer) dec_slice_packed_int32(p *Properties, base structPointer) error {v := structPointer_Word32Slice(base, p.field)nn, err := o.DecodeVarint()if err != nil {return err}nb := int(nn) // number of bytes of encoded int32sfin := o.index + nbif fin < o.index {return errOverflow}for o.index < fin {u, err := p.valDec(o)if err != nil {return err}v.Append(uint32(u))}return nil
}

反序列化這個數組,分2步,跳過 tagcode 拿到 length ,反序列化 length 。在 length 這個長度中依次反序列化各個 value

上述就是 Protocol Buffer 反序列化的過程。

序列化小結

Protocol Buffer 反序列化直接讀取二進制字節數據流,反序列化就是 encode 的反過程,同樣是一些二進制操作。反序列化的時候,通常只需要用到 lengthtag 值只是用來標識類型的,PropertiessetEncAndDec() 方法里面已經把每個類型對應的 decode 解碼器初始化好了,所以反序列化的時候,tag 值可以直接跳過,從 length 開始處理。

XML 的解析過程就復雜一些。XML 需要從文件中讀取出字符串,再轉換為 XML 文檔對象結構模型。之后,再從 XML 文檔對象結構模型中讀取指定節點的字符串,最后再將這個字符串轉換成指定類型的變量。這個過程非常復雜,其中將 XML 文件轉換為文檔對象結構模型的過程通常需要完成詞法文法分析等大量消耗 CPU 的復雜計算。

回到頂部

序列化/反序列化性能

Protocol Buffer 一直被人們認為是高性能的存在。也有很多人做過實現,驗證了這一說法。例如這個鏈接里面的實驗 jvm-serializers。

在看數據之前,我們可以先理性的分析一下 Protocol BufferJSONXML 這些比有哪些優勢:

Protobuf 采用了 VarintZigzag 大幅的壓縮了整數類型,也沒有 JSON 里面的 {、}、;、這些數據分隔符,有 option 字段標識的,沒有數據的時候不會進行反序列化。這幾個措施導致 pb 的數據量整體的就比 JSON 少很多。

Protobuf 采取的是 TLV 的形式,JSON 這些都是字符串的形式。字符串比對應該比基于數字的字段 tag 更耗時。Protobuf 在正文前有一個大小或者長度的標記,而 JSON 必須全文掃描無法跳過不需要的字段。
下面這張圖來自參考鏈接里面的 《Protobuf有沒有比JSON快5倍?用代碼來擊破pb性能神話》:


從這個實驗來看,確實 Protobuf 在序列化數字這方面性能是非常強悍的。

序列化 / 反序列化數字確實是 Protobuf 針對 JSONXML 的優勢,但是它也存在一些沒有優勢的地方。比如字符串。字符串在 Protobuf 中基本沒有處理,除了前面加了 tag - length 。在序列化 / 反序列化字符串的過程中,字符串拷貝的速度反而決定的真正的速度。


從上圖可以看到 encode 字符串的時候,速度基本和 JSON 相差無幾。

回到頂部

最后

至此,關于 protocol buffers 的所有,讀者應該了然于胸了。

protocol buffers 誕生之初也并不是為了傳輸數據存在的,只是為了解決服務器多版本協議兼容的問題。實質其實是發明了一個新的跨語言無歧義的 IDL (Interface description language) 。只不過人們后來發現用它來傳輸數據也不錯,才開始用 protocol buffers

想用 protocol buffers 替換 JSON ,可能是考慮到:

  • protocol buffers 相同數據,傳輸的數據量比 JSON 小,gzip 或者 7zip 壓縮以后,網絡傳輸消耗較少。

  • protocol buffers 不是自我描述的,在缺少 .proto 文件以后,有一定的加密性,數據傳輸過程中都是二進制流,并不是明文。

  • protocol buffers 提供了一套工具,自動化生成代碼也非常方便。

  • protocol buffers 具有向后兼容性,改變了數據結構以后,對老的版本沒有影響。

  • protocol buffers 原生完美兼容 RPC 調用。

如果很少用到整型數字,浮點型數字,全部都是字符串數據,那么 JSON 和 protocol buffers 性能不會差太多。純前端之間交互的話,選擇 JSON 或者 protocol buffers 差別不是很大。

與后端交互過程中,用到 protocol buffers 比較多,筆者認為選擇 protocol buffers 除了性能強以外,完美兼容 RPC 調用也是一個重要因素。

回到頂部

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/448086.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/448086.shtml
英文地址,請注明出處:http://en.pswp.cn/news/448086.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何配置一個Oracle服務

1、網絡服務名&#xff1a;即填寫OracleTNS的值&#xff0c;如OracleTNSorcl_192.168.1.125&#xff0c;填寫orcl_192.168.1.1252、主機名&#xff1a;192.168.1.1253、服務名&#xff1a;orcl4、測試成功即可。 轉載于:https://www.cnblogs.com/dengshiwei/p/4258719.html

解決 VUE前端項目報錯: Uncaught ReferenceError : initPage is not defined (initPage 方法是有的,依舊報錯找不到)

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 明明代碼中定義了 initPage 這個方法&#xff0c;&#xff0c;卻一直報找不到這個方法&#xff1a; Uncaught ReferenceError: init…

掌握新手學車技巧對于新手來說是非常重要的

剛開始學車的時候對于新手來說很多操作不知道從哪里下手&#xff0c;這個時候&#xff0c;如果按照相關的學車技巧來學習的話&#xff0c;對于新手來說是非常有好處的。下面我們就來學習一下讓新手們可以快速進入開車狀態的學車技巧吧&#xff01;基本上駕校的教練都會教學員把…

iView學習筆記(三):表格搜索,過濾及隱藏列操作

iView學習筆記(三)&#xff1a;表格搜索&#xff0c;過濾及隱藏某列操作 1.后端準備工作 環境說明 python版本&#xff1a;3.6.6 Django版本&#xff1a;1.11.8 數據庫&#xff1a;MariaDB 5.5.60 新建Django項目&#xff0c;在項目中新建app&#xff0c;配置好數據庫 api_test…

Jenkins自動編譯庫并上傳服務器

Jenkins自動編譯庫并上傳服務器 github地址 首先添加 git 地址&#xff1a; 再添加定時構建&#xff0c;每天夜里構建一次&#xff1a; 執行 shell 腳本進行構建 cd networklayerecho "build json x86" cmake -S . -B cmake-build-release -DCMAKE_BUILD_TYPERele…

解決:The “data“ option should be a function that returns a per-instance value in component definitions

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 只是想定義一個變量&#xff0c;方便頁面上調用 。 報錯&#xff1a; The "data" option should be a function that re…

科目三考試里面的會車,調頭,靠邊停車通過標準

科目三會車&#xff1a;減速靠道路的右側邊緣線行駛&#xff0c;速度要減到20km/h以下&#xff0c;靠右以不壓右側邊緣線為基準盡量靠右。會車結束指令發出后向左打方向回到道路中央。考點&#xff1a;1.速度要降到20km/h&#xff0c;有時考官故意刁難&#xff0c;會在直線行駛…

Esxi直通板載Sata

Esxi安裝好后&#xff0c;打開SSH。 解決方法如下&#xff1a; shell下執行&#xff1a; lspci -v | grep "Class 0106"-B 1&#xff0c;查看是否有如下顯示&#xff1a;0000:00:1f.2 SATAcontroller Mass storage controller: Intel Corporation Lynx Point AHCICon…

gdb 調試 TuMediaService

gdb 調試 TuMediaService github地址 起因 首先需要有 armgdb 環境運行 ./armgdb ./TuMediaService 進入 gdb 模式b hi3531_trcod_interface.cc:98 打斷點r 運行程序print this->vdec_config_path_ 打印關鍵值 在這里我們關注的值已經被修改&#xff0c;由于程序中沒有刻…

jackson 的注解:@JsonProperty、@JsonIgnore、@JsonFormat 用法說明

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 導包&#xff1a; <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-data…

科目三-變更車道,直線行駛和超車的考試標準

直線行駛&#xff1a;這是唯一一個可以提前操作的項目&#xff0c;當聽到“下一項考試為直線行駛......”的指令時&#xff0c;可以立即把車身擺正。放在道路的正中間&#xff0c;并踩油門&#xff0c;把速度提至30----50km/h&#xff0c;最好保持在35---40km/h&#xff0c;因為…

PyQt安裝和環境配置

PyQt安裝和環境配置 github地址 首先安裝Pycharm 新建一個空的 python 工程&#xff0c;找到 setting 安裝第三方模塊 PyQT5 , 點加號&#xff0c;先安 PyQT5 , 再安裝 pyqt5-tools &#xff0c;后面包含 qtdesinger 以上模塊都安完&#xff0c;設置擴展工具的參數找到 sett…

HZOJ 大佬(kat)

及其水水水的假期望&#xff08;然而我已經被期望嚇怕了……&#xff09;。 數據范圍及其沙雕導致丟掉5分…… 因為其實每天的期望是一樣的&#xff0c;考慮分開。 f[i][j]表示做k道題&#xff0c;難度最大為j的概率。 則f[i][j](f[i-1][j])*(j-1)*temq[j]*tem;q為前綴和&#…

F12 界面:請求響應內容 Preview 和 Response 不一致、接口返回數據和 jsp 解析到的內容不一致

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1. 情況描述&#xff1a; 我有一個接口只是簡單的查詢列表數據并返回給前端作一個表格展示。 接口返回的 userId 數據為&#xff1a;…

為什么新手開車起步總是熄火

最近&#xff0c;深圳市民陳小姐年前考完駕照就買了一輛新車&#xff0c;在過完年后上班的第一天&#xff0c;幾乎每次等紅綠燈的路口起步時汽車都會熄火&#xff0c;導致身后的司機非常不滿狂按車喇叭催她“別擋路”&#xff0c;陳小姐自己也急得冒汗。就像陳小姐這樣的新手很…

TDD實例

TDD實例 github地址 項目中對于 TDD 的實戰&#xff0c;依賴的是 GoogleTest 框架 我負責編碼單元對中控提供 設置編碼單元設置視頻源設置視頻輸出狀態檢測開啟通道關閉通道 這 6 個接口&#xff0c;中控通過 http 調用編碼單元接口&#xff0c;為了解耦和方便進行 TDD 測…

修改Sql server中列的屬性腳本

alter tablename alter column columnname varchar(100) not null 轉載于:https://www.cnblogs.com/pw/archive/2007/01/08/615062.html

推薦 21 個頂級的 Vue UI 庫

前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到教程。 1、Vuetify Star 數為 11K&#xff0c;提供了 80 多個 Vue.js 組件&#xff0c;這些組件是根據谷歌 Material Design 指南實現的。Vuet…

MSCRM日志配置

之前有很多人問我在MSCRM上日志怎么做&#xff0c;具體的如&#xff08;登錄日志&#xff0c;操作日志&#xff09;。個人認為操作日志確實比較難做&#xff08;不過我可以給一個思路可以用觸發器或者plugin來實現&#xff0c;不過比較麻煩&#xff0c;對系統壓力也比較大&…

機動車駕駛人科目三考試項目及合格標準

機動車駕駛人科目三考試項目及合格標準 &#xff08;2013年道路考試智能評判&#xff09; 科目三考試綜合評判標準 一般規定&#xff1a;道路駕駛技能滿分為100分&#xff0c;成績達到90分為合格。 道路駕駛技能通用評判 不合格情形&#xff1a;考試時出現下列情形之一的&#…