日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

高效的序列化/反序列化数据方式 Protobuf

發布時間:2023/12/19 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 高效的序列化/反序列化数据方式 Protobuf 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

高效的序列化/反序列化數據方式 Protobuf

github地址

目錄

  • protocolBuffers 序列化
    • Int32
    • String
    • Map
    • slice
    • 序列化小結
  • protocolBuffers 反序列化
    • Int32
    • String
    • Map
    • slice
    • 序列化小結
  • 序列化/反序列化性能
  • 最后

protocolBuffers序列化

上篇文章中其實已經講過了 encode 的過程,這篇文章以 golang 為例,從代碼實現的層面講講序列化和反序列化的過程。

舉個 go 使用 protobuf 進行數據序列化和反序列化的例子,本篇文章從這個例子開始。

先新建一個 example 的 message :

syntax = "proto2"; package example;enum FOO { X = 17; };message Test {required string label = 1;optional int32 type = 2 [default=77];repeated int64 reps = 3;optional group OptionalGroup = 4 {required string RequiredField = 5;} }

利用 protoc-gen-go 生成對應的 get/set 方法。代碼中就可以用生成的代碼進行序列化和反序列化了。

package mainimport ("log""github.com/golang/protobuf/proto""path/to/example" )func main() {test := &example.Test {Label: proto.String("hello"),Type: proto.Int32(17),Reps: []int64{1, 2, 3},Optionalgroup: &example.Test_OptionalGroup {RequiredField: proto.String("good bye"),},}data, err := proto.Marshal(test)if err != nil {log.Fatal("marshaling error: ", err)}newTest := &example.Test{}err = proto.Unmarshal(data, newTest)if err != nil {log.Fatal("unmarshaling error: ", err)}// Now test and newTest contain the same data.if test.GetLabel() != newTest.GetLabel() {log.Fatalf("data mismatch %q != %q", test.GetLabel(), newTest.GetLabel())}// etc. }

上面代碼中 proto.Marshal() 是序列化過程。proto.Unmarshal() 是反序列化過程。這一章節先看看序列化過程的實現,下一章節再分析反序列化過程的實現。

// Marshal takes the protocol buffer // and encodes it into the wire format, returning the data. func Marshal(pb Message) ([]byte, error) {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {return m.Marshal()}p := NewBuffer(nil)err := p.Marshal(pb)if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil}return p.buf, err }

序列化函數一進來,會先調用 message 對象自身的實現的序列化方法。

// Marshaler is the interface representing objects that can marshal themselves. type Marshaler interface {Marshal() ([]byte, error) }

Marshaler 是一個 interface ,這個接口是專門留給對象自定義序列化的。如果有實現,就 return 自己實現的方法。如果沒有,接下來就進行默認序列化方式。

p := NewBuffer(nil) err := p.Marshal(pb) if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil }

新建一個 Buffer ,調用 Buffer 的 Marshal() 方法。message 經過序列化以后,數據流會放到 Buffer 的 buf 字節流中。序列化最終返回 buf 字節流即可。

type Buffer struct {buf []byte // encode/decode byte streamindex int // read point// pools of basic types to amortize allocation.bools []booluint32s []uint32uint64s []uint64// extra pools, only used with pointer_reflect.goint32s []int32int64s []int64float32s []float32float64s []float64 }

Buffer 的數據結構如上, Buffer 是用于序列化和反序列化 protocol buffers 的緩沖區管理器。它可以在調用的時候重用以減少內存使用量。內部維護了 7 個 pool ,3 個基礎數據類型的 pool ,4 個只能被 pointer_reflect 使用的 pool 。

func (p *Buffer) Marshal(pb Message) error {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {data, err := m.Marshal()p.buf = append(p.buf, data...)return err}t, base, err := getbase(pb)// 異常處理if structPointer_IsNil(base) {return ErrNil}if err == nil {err = p.enc_struct(GetProperties(t.Elem()), base)}// 用來統計 Encode 次數的if collectStats {(stats).Encode++ // Parens are to work around a goimports bug.}// maxMarshalSize = 1<<31 - 1,這個值是 protobuf 可以 encoded 的最大值。if len(p.buf) > maxMarshalSize {return ErrTooLarge}return err }

Buffer 的 Marshal() 方法依舊先調用一下對象是否實現了 Marshal() 接口,如果實現了,還是讓它自己序列化,序列化之后的二進制數據流加入到 buf 數據流中。

func getbase(pb Message) (t reflect.Type, b structPointer, err error) {if pb == nil {err = ErrNilreturn}// get the reflect type of the pointer to the struct.t = reflect.TypeOf(pb)// get the address of the struct.value := reflect.ValueOf(pb)b = toStructPointer(value)return }

getbase 方法通過 reflect 方法拿到了 message 的類型和對應 value 的結構體指針。拿到結構體指針先做異常處理。

所以序列化最核心的代碼其實就一句,p.enc_struct(GetProperties(t.Elem()), base)

// Encode a struct. func (o *Buffer) enc_struct(prop *StructProperties, base structPointer) error {var state errorState// Encode fields in tag order so that decoders may use optimizations// that depend on the ordering.// https://developers.google.com/protocol-buffers/docs/encoding#orderfor _, i := range prop.order {p := prop.Prop[i]if p.enc != nil {err := p.enc(o, p, base)if err != nil {if err == ErrNil {if p.Required && state.err == nil {state.err = &RequiredNotSetError{p.Name}}} else if err == errRepeatedHasNil {// Give more context to nil values in repeated fields.return errors.New("repeated field " + p.OrigName + " has nil element")} else if !state.shouldContinue(err, p) {return err}}if len(o.buf) > maxMarshalSize {return ErrTooLarge}}}// Do oneof fields.if prop.oneofMarshaler != nil {m := structPointer_Interface(base, prop.stype).(Message)if err := prop.oneofMarshaler(m, o); err == ErrNil {return errOneofHasNil} else if err != nil {return err}}// Add unrecognized fields at the end.if prop.unrecField.IsValid() {v := *structPointer_Bytes(base, prop.unrecField)if len(o.buf)+len(v) > maxMarshalSize {return ErrTooLarge}if len(v) > 0 {o.buf = append(o.buf, v...)}}return state.err }

上面代碼中可以看到,除去 oneof fields 和 unrecognized fields 是單獨最后處理的,其他類型都是調用的 p.enc(o, p, base) 進行序列化的。

Properties 的數據結構定義如下:

type Properties struct {Name string // name of the field, for error messagesOrigName string // original name before protocol compiler (always set)JSONName string // name to use for JSON; determined by protocWire stringWireType intTag intRequired boolOptional boolRepeated boolPacked bool // relevant for repeated primitives onlyEnum string // set for enum types onlyproto3 bool // whether this is known to be a proto3 field; set for []byte onlyoneof bool // whether this is a oneof fieldDefault string // default valueHasDefault bool // whether an explicit default was providedCustomType stringStdTime boolStdDuration boolenc encodervalEnc valueEncoder // set for bool and numeric types onlyfield fieldtagcode []byte // encoding of EncodeVarint((Tag<<3)|WireType)tagbuf [8]bytestype reflect.Type // set for struct types onlysstype reflect.Type // set for slices of structs types onlyctype reflect.Type // set for custom types onlysprop * StructProperties // set for struct types onlyisMarshaler boolisUnmarshaler boolmtype reflect.Type // set for map types onlymkeyprop * Properties // set for map types onlymvalprop * Properties // set for map types onlysize sizervalSize valueSizer // set for bool and numeric types onlydec decodervalDec valueDecoder // set for bool and numeric types only// If this is a packable field, this will be the decoder for the packed version of the field.packedDec decoder }

在 Properties 這個結構體中,定義了名為 enc 的 encoder 和名為 dec 的 decoder 。

encoder 和 decoder 函數定義是完全一樣的。

type encoder func(p *Buffer, prop *Properties, base structPointer) error type decoder func(p *Buffer, prop *Properties, base structPointer) error

encoder 和 decoder 函數初始化是在 Properties 中:

// Initialize the fields for encoding and decoding. func (p *Properties) setEncAndDec(typ reflect.Type, f *reflect.StructField, lockGetProp bool) {// 下面代碼有刪減,類似的部分省略了// proto3 scalar typescase reflect.Int32:if p.proto3 {p.enc = (*Buffer).enc_proto3_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_int32} else {p.enc = (*Buffer).enc_ref_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_ref_int32}case reflect.Uint32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_ref_uint32}case reflect.Float32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_ref_uint32}case reflect.String:if p.proto3 {p.enc = (*Buffer).enc_proto3_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_proto3_string} else {p.enc = (*Buffer).enc_ref_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_ref_string}case reflect.Slice:switch t2 := t1.Elem(); t2.Kind() {default:logNoSliceEnc(t1, t2)breakcase reflect.Int32:if p.Packed {p.enc = (*Buffer).enc_slice_packed_int32p.size = size_slice_packed_int32} else {p.enc = (*Buffer).enc_slice_int32p.size = size_slice_int32}p.dec = (*Buffer).dec_slice_int32p.packedDec = (*Buffer).dec_slice_packed_int32default:logNoSliceEnc(t1, t2)break}}case reflect.Map:p.enc = (*Buffer).enc_new_mapp.dec = (*Buffer).dec_new_mapp.size = size_new_mapp.mtype = t1p.mkeyprop = &Properties{}p.mkeyprop.init(reflect.PtrTo(p.mtype.Key()), "Key", f.Tag.Get("protobuf_key"), nil, lockGetProp)p.mvalprop = &Properties{}vtype := p.mtype.Elem()if vtype.Kind() != reflect.Ptr && vtype.Kind() != reflect.Slice {// The value type is not a message (*T) or bytes ([]byte),// so we need encoders for the pointer to this type.vtype = reflect.PtrTo(vtype)}p.mvalprop.CustomType = p.CustomTypep.mvalprop.StdDuration = p.StdDurationp.mvalprop.StdTime = p.StdTimep.mvalprop.init(vtype, "Value", f.Tag.Get("protobuf_val"), nil, lockGetProp)}p.setTag(lockGetProp) }

上面代碼中,分別把各個類型都進行 switch - case 枚舉,每種情況都設置對應的 encode 編碼器,decode 解碼器,size 大小。proto2 和 proto3 有區別的地方也分成2種不同的情況進行處理。

有以下幾種類型,reflect.Bool、reflect.Int32、reflect.Uint32、reflect.Int64、reflect.Uint64、reflect.Float32、reflect.Float64、reflect.String、reflect.Struct、reflect.Ptr、reflect.Slice、reflect.Map 共 12 種大的分類。

下面主要挑 3 類,Int32 、String 、Map 代碼實現進行分析。

Int32

func (o *Buffer) enc_proto3_int32(p *Properties, base structPointer) error {v := structPointer_Word32Val(base, p.field)x := int32(word32Val_Get(v)) // permit sign extension to use full 64-bit rangeif x == 0 {return ErrNil}o.buf = append(o.buf, p.tagcode...)p.valEnc(o, uint64(x))return nil }

處理 Int32 代碼比較簡單,先把 tagcode 放進 buf 二進制數據流緩沖區,接著序列化 Int32 ,序列化以后緊接著 tagcode 后面放進緩沖區。

// EncodeVarint writes a varint-encoded integer to the Buffer. // This is the format for the // int32, int64, uint32, uint64, bool, and enum // protocol buffer types. func (p *Buffer) EncodeVarint(x uint64) error {for x >= 1<<7 {p.buf = append(p.buf, uint8(x&0x7f|0x80))x >>= 7}p.buf = append(p.buf, uint8(x))return nil }

Int32 的編碼處理方法在上篇里面講過,用的 Varint 處理方法。上面這個函數同樣適用于處理 int32, int64, uint32, uint64, bool, enum 。

順道也可以看看 sint32、Fixed32 的具體代碼實現。

// EncodeZigzag32 writes a zigzag-encoded 32-bit integer // to the Buffer. // This is the format used for the sint32 protocol buffer type. func (p *Buffer) EncodeZigzag32(x uint64) error {// use signed number to get arithmetic right shift.return p.EncodeVarint(uint64((uint32(x) << 1) ^ uint32((int32(x) >> 31)))) }

針對有符號的 sint32 ,采取的是先 Zigzag ,然后在 Varint 的處理方式。

// EncodeFixed32 writes a 32-bit integer to the Buffer. // This is the format for the // fixed32, sfixed32, and float protocol buffer types. func (p *Buffer) EncodeFixed32(x uint64) error {p.buf = append(p.buf,uint8(x),uint8(x>>8),uint8(x>>16),uint8(x>>24))return nil }

對于 Fixed32 的處理,僅僅只是位移操作,并沒有做什么壓縮操作。

String

func (o *Buffer) enc_proto3_string(p *Properties, base structPointer) error {v := *structPointer_StringVal(base, p.field)if v == "" {return ErrNil}o.buf = append(o.buf, p.tagcode...)o.EncodeStringBytes(v)return nil }

序列化字符串也分2步,先把 tagcode 放進去,然后再序列化數據。

// EncodeStringBytes writes an encoded string to the Buffer. // This is the format used for the proto2 string type. func (p *Buffer) EncodeStringBytes(s string) error {p.EncodeVarint(uint64(len(s)))p.buf = append(p.buf, s...)return nil }

序列化字符串的時候,會先把字符串的長度通過編碼 Varint 的方式,寫到 buf 中。長度后面再緊跟著 string 。這也就是 tag - length - value 的實現。

Map

// Encode a map field. func (o *Buffer) enc_new_map(p *Properties, base structPointer) error {var state errorState // XXX: or do we need to plumb this through?v := structPointer_NewAt(base, p.field, p.mtype).Elem() // map[K]Vif v.Len() == 0 {return nil}keycopy, valcopy, keybase, valbase := mapEncodeScratch(p.mtype)enc := func() error {if err := p.mkeyprop.enc(o, p.mkeyprop, keybase); err != nil {return err}if err := p.mvalprop.enc(o, p.mvalprop, valbase); err != nil && err != ErrNil {return err}return nil}// Don't sort map keys. It is not required by the spec, and C++ doesn't do it.for _, key := range v.MapKeys() {val := v.MapIndex(key)keycopy.Set(key)valcopy.Set(val)o.buf = append(o.buf, p.tagcode...)if err := o.enc_len_thing(enc, &state); err != nil {return err}}return nil }

上述代碼也可以序列化字典數組,例如:

map<key_type, value_type> map_field = N;

轉換成對應的 repeated message 形式再進行序列化。

message MapFieldEntry {key_type key = 1;value_type value = 2; } repeated MapFieldEntry map_field = N;

map 序列化是針對每個 k-v ,都先放入 tagcode ,然后再序列化 k-v 。這里需要化未知長度的結構體的時候需要調用 enc_len_thing() 方法。

// Encode something, preceded by its encoded length (as a varint). func (o *Buffer) enc_len_thing(enc func() error, state *errorState) error {iLen := len(o.buf)o.buf = append(o.buf, 0, 0, 0, 0) // reserve four bytes for lengthiMsg := len(o.buf)err := enc()if err != nil && !state.shouldContinue(err, nil) {return err}lMsg := len(o.buf) - iMsglLen := sizeVarint(uint64(lMsg))switch x := lLen - (iMsg - iLen); {case x > 0: // actual length is x bytes larger than the space we reserved// Move msg x bytes right.o.buf = append(o.buf, zeroes[:x]...)copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])case x < 0: // actual length is x bytes smaller than the space we reserved// Move msg x bytes left.copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])o.buf = o.buf[:len(o.buf)+x] // x is negative}// Encode the length in the reserved space.o.buf = o.buf[:iLen]o.EncodeVarint(uint64(lMsg))o.buf = o.buf[:len(o.buf)+lMsg]return state.err }

enc_len_thing() 方法會先預存 4 個字節的長度空位。序列化以后算出長度。如果長度比 4 個字節還要長,則右移序列化的二進制數據,把長度填到 tagcode 和數據之間。如果長度小于 4 個字節,相應的要左移。

slice

最后再舉一個數組的例子。以 []int32 為例。

// Encode a slice of int32s ([]int32) in packed format. func (o *Buffer) enc_slice_packed_int32(p *Properties, base structPointer) error {s := structPointer_Word32Slice(base, p.field)l := s.Len()if l == 0 {return ErrNil}// TODO: Reuse a Buffer.buf := NewBuffer(nil)for i := 0; i < l; i++ {x := int32(s.Index(i)) // permit sign extension to use full 64-bit rangep.valEnc(buf, uint64(x))}o.buf = append(o.buf, p.tagcode...)o.EncodeVarint(uint64(len(buf.buf)))o.buf = append(o.buf, buf.buf...)return nil }

序列化這個數組,分3步,先把 tagcode 放進去,然后再序列化整個數組的長度,最后把數組的每個數據都序列化放在后面。最后形成 tag - length - value - value - value 的形式。

上述就是 Protocol Buffer 序列化的過程。

序列化小結

Protocol Buffer 序列化采用 Varint、Zigzag 方法,壓縮 int 型整數和帶符號的整數。對浮點型數字不做壓縮(這里可以進一步的壓縮,Protocol Buffer 還有提升空間)。編碼 .proto 文件,會對 option 和 repeated 字段進行檢查,若 optional 或 repeated 字段沒有被設置字段值,那么該字段在序列化時的數據中是完全不存在的,即不進行序列化(少編碼一個字段)。

上面這兩點做到了壓縮數據,序列化工作量減少。

序列化的過程都是二進制的位移,速度非常快。數據都以 tag - length - value (或者 tag - value)的形式存在二進制數據流中。采用了 TLV 結構存儲數據以后,也擺脫了 JSON 中的 {、}、; 、這些分隔符,沒有這些分隔符也算是再一次減少了一部分數據。

這一點做到了序列化速度非常快。

回到頂部

protocolBuffers反序列化

反序列化的實現完全是序列化實現的逆過程。

func Unmarshal(buf []byte, pb Message) error {pb.Reset()return UnmarshalMerge(buf, pb) }

在反序列化開始之前,先重置一下緩沖區。

func (p *Buffer) Reset() {p.buf = p.buf[0:0] // for reading/writingp.index = 0 // for reading }

清空 buf 中的所有數據,并且重置 index 。

func UnmarshalMerge(buf []byte, pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {return u.Unmarshal(buf)}return NewBuffer(buf).Unmarshal(pb) }

反序列化數據的開始從上面這個函數開始,如果傳進來的 message 的結果和 buf 結果不匹配,最終得到的結果是不可預知的。反序列化之前,同樣會先調用一下對應自己身自定義的 Unmarshal() 方法。

type Unmarshaler interface {Unmarshal([]byte) error }

Unmarshal() 是一個可以自己實現的接口。

UnmarshalMerge 中會調用 Unmarshal(pb Message) 方法。

func (p *Buffer) Unmarshal(pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {err := u.Unmarshal(p.buf[p.index:])p.index = len(p.buf)return err}typ, base, err := getbase(pb)if err != nil {return err}err = p.unmarshalType(typ.Elem(), GetProperties(typ.Elem()), false, base)if collectStats {stats.Decode++}return err }

Unmarshal(pb Message) 這個函數只有一個入參,和 proto.Unmarshal() 方法函數簽名不同(前面的函數只有 1 個入參,后面的有 2 個入參)。兩者的區別在于,1 個入參的函數實現里面并不會重置 buf 緩沖區,二個入參的會先重置 buf 緩沖區。

這兩個函數最終都會調用 unmarshalType() 方法,這個函數是最終支持反序列化的函數。

func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error {var state errorStaterequired, reqFields := prop.reqCount, uint64(0)var err errorfor err == nil && o.index < len(o.buf) {oi := o.indexvar u uint64u, err = o.DecodeVarint()if err != nil {break}wire := int(u & 0x7)// 下面代碼有省略dec := p.dec// 中間代碼有省略decErr := dec(o, p, base)if decErr != nil && !state.shouldContinue(decErr, p) {err = decErr}if err == nil && p.Required {// Successfully decoded a required field.if tag <= 64 {// use bitmap for fields 1-64 to catch field reuse.var mask uint64 = 1 << uint64(tag-1)if reqFields&mask == 0 {// new required fieldreqFields |= maskrequired--}} else {// This is imprecise. It can be fooled by a required field// with a tag > 64 that is encoded twice; that's very rare.// A fully correct implementation would require allocating// a data structure, which we would like to avoid.required--}}}if err == nil {if is_group {return io.ErrUnexpectedEOF}if state.err != nil {return state.err}if required > 0 {// Not enough information to determine the exact field. If we use extra// CPU, we could determine the field only if the missing required field// has a tag <= 64 and we check reqFields.return &RequiredNotSetError{"{Unknown}"}}}return err }

unmarshalType() 函數比較長,里面處理的情況比較多,有 oneof,WireEndGroup 。真正處理反序列化的函數在 decErr := dec(o, p, base) 這一行。

dec 函數在 Properties 的 setEncAndDec() 函數中進行了初始化。上面序列化的時候談到過那個函數了,這里就不再贅述了。dec() 函數針對每個不同類型都有對應的反序列化函數。

同樣的,接下來也舉 4 個例子,看看反序列化的實際代碼實現。

Int32

func (o *Buffer) dec_proto3_int32(p *Properties, base structPointer) error {u, err := p.valDec(o)if err != nil {return err}word32Val_Set(structPointer_Word32Val(base, p.field), uint32(u))return nil }

反序列化 Int32 代碼比較簡單,原理是按照 encode 的逆過程,還原原來的數據。

func (p *Buffer) DecodeVarint() (x uint64, err error) {i := p.indexbuf := p.bufif i >= len(buf) {return 0, io.ErrUnexpectedEOF} else if buf[i] < 0x80 {p.index++return uint64(buf[i]), nil} else if len(buf)-i < 10 {return p.decodeVarintSlow()}var b uint64// we already checked the first bytex = uint64(buf[i]) - 0x80i++b = uint64(buf[i])i++x += b << 7if b&0x80 == 0 {goto done}x -= 0x80 << 7b = uint64(buf[i])i++x += b << 14if b&0x80 == 0 {goto done}x -= 0x80 << 14b = uint64(buf[i])i++x += b << 21if b&0x80 == 0 {goto done}x -= 0x80 << 21b = uint64(buf[i])i++x += b << 28if b&0x80 == 0 {goto done}x -= 0x80 << 28b = uint64(buf[i])i++x += b << 35if b&0x80 == 0 {goto done}x -= 0x80 << 35b = uint64(buf[i])i++x += b << 42if b&0x80 == 0 {goto done}x -= 0x80 << 42b = uint64(buf[i])i++x += b << 49if b&0x80 == 0 {goto done}x -= 0x80 << 49b = uint64(buf[i])i++x += b << 56if b&0x80 == 0 {goto done}x -= 0x80 << 56b = uint64(buf[i])i++x += b << 63if b&0x80 == 0 {goto done}// x -= 0x80 << 63 // Always zero.return 0, errOverflowdone:p.index = ireturn x, nil }

Int32 序列化之后,第一個字節一定是 0x80 ,那么除去這個字節以后,后面的每個二進制字節都是數據,剩下的步驟就是通過位移操作把每個數字都加起來。上面這個反序列化的函數同樣適用于 int32 , int64 , uint32 , uint64 , bool , 和 enum。

順道也可以看看 sint32 、Fixed32 的反序列化具體代碼實現。

func (p *Buffer) DecodeZigzag32() (x uint64, err error) {x, err = p.DecodeVarint()if err != nil {return}x = uint64((uint32(x) >> 1) ^ uint32((int32(x&1)<<31)>>31))return }

針對有符號的 sint32 ,反序列化的過程就是先反序列 Varint ,再反序列化 Zigzag 。

func (p *Buffer) DecodeFixed32() (x uint64, err error) {// x, err already 0i := p.index + 4if i < 0 || i > len(p.buf) {err = io.ErrUnexpectedEOFreturn}p.index = ix = uint64(p.buf[i-4])x |= uint64(p.buf[i-3]) << 8x |= uint64(p.buf[i-2]) << 16x |= uint64(p.buf[i-1]) << 24return }

Fixed32 反序列化的過程也是通過位移,每個字節的內容都累加,就可以還原出原先的數據。注意這里也要先跳過 tag 的位置。

String

func (p *Buffer) DecodeRawBytes(alloc bool) (buf []byte, err error) {n, err := p.DecodeVarint()if err != nil {return nil, err}nb := int(n)if nb < 0 {return nil, fmt.Errorf("proto: bad byte length %d", nb)}end := p.index + nbif end < p.index || end > len(p.buf) {return nil, io.ErrUnexpectedEOF}if !alloc {// todo: check if can get more uses of alloc=falsebuf = p.buf[p.index:end]p.index += nbreturn}buf = make([]byte, nb)copy(buf, p.buf[p.index:])p.index += nbreturn }

反序列化 string 先把 length 序列化出來,通過 DecodeVarint 的方式。拿到 length 以后,剩下的就是直接拷貝的過程。在上篇 encode 中,我們知道字符串是不做處理,直接放到二進制流里面的,所以反序列化直接取出即可。

Map

func (o *Buffer) dec_new_map(p *Properties, base structPointer) error {raw, err := o.DecodeRawBytes(false)if err != nil {return err}oi := o.index // index at the end of this map entryo.index -= len(raw) // move buffer back to start of map entrymptr := structPointer_NewAt(base, p.field, p.mtype) // *map[K]Vif mptr.Elem().IsNil() {mptr.Elem().Set(reflect.MakeMap(mptr.Type().Elem()))}v := mptr.Elem() // map[K]V// 這里省略一些代碼,主要是為了 key - value 準備的一些可以雙重間接尋址的占位符,具體原因可以見序列化代碼里面的 enc_new_map 函數// Decode.// This parses a restricted wire format, namely the encoding of a message// with two fields. See enc_new_map for the format.for o.index < oi {// tagcode for key and value properties are always a single byte// because they have tags 1 and 2.tagcode := o.buf[o.index]o.index++switch tagcode {case p.mkeyprop.tagcode[0]:if err := p.mkeyprop.dec(o, p.mkeyprop, keybase); err != nil {return err}case p.mvalprop.tagcode[0]:if err := p.mvalprop.dec(o, p.mvalprop, valbase); err != nil {return err}default:// TODO: Should we silently skip this instead?return fmt.Errorf("proto: bad map data tag %d", raw[0])}}keyelem, valelem := keyptr.Elem(), valptr.Elem()if !keyelem.IsValid() {keyelem = reflect.Zero(p.mtype.Key())}if !valelem.IsValid() {valelem = reflect.Zero(p.mtype.Elem())}v.SetMapIndex(keyelem, valelem)return nil }

反序列化 map 需要把每個 tag 取出來,然后緊接著反序列化每個 key - value 。最后會判斷 keyelem 和 valelem 是否為零值,如果是零值要分別調用 reflect.Zero 處理零值的情況。

slice

最后還是舉一個數組的例子。以 []int32 為例。

func (o *Buffer) dec_slice_packed_int32(p *Properties, base structPointer) error {v := structPointer_Word32Slice(base, p.field)nn, err := o.DecodeVarint()if err != nil {return err}nb := int(nn) // number of bytes of encoded int32sfin := o.index + nbif fin < o.index {return errOverflow}for o.index < fin {u, err := p.valDec(o)if err != nil {return err}v.Append(uint32(u))}return nil }

反序列化這個數組,分2步,跳過 tagcode 拿到 length ,反序列化 length 。在 length 這個長度中依次反序列化各個 value 。

上述就是 Protocol Buffer 反序列化的過程。

序列化小結

Protocol Buffer 反序列化直接讀取二進制字節數據流,反序列化就是 encode 的反過程,同樣是一些二進制操作。反序列化的時候,通常只需要用到 length 。tag 值只是用來標識類型的,Properties 的 setEncAndDec() 方法里面已經把每個類型對應的 decode 解碼器初始化好了,所以反序列化的時候,tag 值可以直接跳過,從 length 開始處理。

XML 的解析過程就復雜一些。XML 需要從文件中讀取出字符串,再轉換為 XML 文檔對象結構模型。之后,再從 XML 文檔對象結構模型中讀取指定節點的字符串,最后再將這個字符串轉換成指定類型的變量。這個過程非常復雜,其中將 XML 文件轉換為文檔對象結構模型的過程通常需要完成詞法文法分析等大量消耗 CPU 的復雜計算。

回到頂部

序列化/反序列化性能

Protocol Buffer 一直被人們認為是高性能的存在。也有很多人做過實現,驗證了這一說法。例如這個鏈接里面的實驗 jvm-serializers。

在看數據之前,我們可以先理性的分析一下 Protocol Buffer 和 JSON 、XML 這些比有哪些優勢:

Protobuf 采用了 Varint 、Zigzag 大幅的壓縮了整數類型,也沒有 JSON 里面的 {、}、;、這些數據分隔符,有 option 字段標識的,沒有數據的時候不會進行反序列化。這幾個措施導致 pb 的數據量整體的就比 JSON 少很多。

Protobuf 采取的是 TLV 的形式,JSON 這些都是字符串的形式。字符串比對應該比基于數字的字段 tag 更耗時。Protobuf 在正文前有一個大小或者長度的標記,而 JSON 必須全文掃描無法跳過不需要的字段。
下面這張圖來自參考鏈接里面的 《Protobuf有沒有比JSON快5倍?用代碼來擊破pb性能神話》:


從這個實驗來看,確實 Protobuf 在序列化數字這方面性能是非常強悍的。

序列化 / 反序列化數字確實是 Protobuf 針對 JSON 和 XML 的優勢,但是它也存在一些沒有優勢的地方。比如字符串。字符串在 Protobuf 中基本沒有處理,除了前面加了 tag - length 。在序列化 / 反序列化字符串的過程中,字符串拷貝的速度反而決定的真正的速度。


從上圖可以看到 encode 字符串的時候,速度基本和 JSON 相差無幾。

回到頂部

最后

至此,關于 protocol buffers 的所有,讀者應該了然于胸了。

protocol buffers 誕生之初也并不是為了傳輸數據存在的,只是為了解決服務器多版本協議兼容的問題。實質其實是發明了一個新的跨語言無歧義的 IDL (Interface description language) 。只不過人們后來發現用它來傳輸數據也不錯,才開始用 protocol buffers 。

想用 protocol buffers 替換 JSON ,可能是考慮到:

  • protocol buffers 相同數據,傳輸的數據量比 JSON 小,gzip 或者 7zip 壓縮以后,網絡傳輸消耗較少。

  • protocol buffers 不是自我描述的,在缺少 .proto 文件以后,有一定的加密性,數據傳輸過程中都是二進制流,并不是明文。

  • protocol buffers 提供了一套工具,自動化生成代碼也非常方便。

  • protocol buffers 具有向后兼容性,改變了數據結構以后,對老的版本沒有影響。

  • protocol buffers 原生完美兼容 RPC 調用。

如果很少用到整型數字,浮點型數字,全部都是字符串數據,那么 JSON 和 protocol buffers 性能不會差太多。純前端之間交互的話,選擇 JSON 或者 protocol buffers 差別不是很大。

與后端交互過程中,用到 protocol buffers 比較多,筆者認為選擇 protocol buffers 除了性能強以外,完美兼容 RPC 調用也是一個重要因素。

回到頂部

總結

以上是生活随笔為你收集整理的高效的序列化/反序列化数据方式 Protobuf的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。