高效的序列化/反序列化数据方式 Protobuf
高效的序列化/反序列化數(shù)據(jù)方式 Protobuf
github地址
目錄
- protocolBuffers 序列化
- Int32
- String
- Map
- slice
- 序列化小結(jié)
- protocolBuffers 反序列化
- Int32
- String
- Map
- slice
- 序列化小結(jié)
- 序列化/反序列化性能
- 最后
protocolBuffers序列化
上篇文章中其實(shí)已經(jīng)講過(guò)了 encode 的過(guò)程,這篇文章以 golang 為例,從代碼實(shí)現(xiàn)的層面講講序列化和反序列化的過(guò)程。
舉個(gè) go 使用 protobuf 進(jìn)行數(shù)據(jù)序列化和反序列化的例子,本篇文章從這個(gè)例子開(kāi)始。
先新建一個(gè) example 的 message :
syntax = "proto2"; package example;enum FOO { X = 17; };message Test {required string label = 1;optional int32 type = 2 [default=77];repeated int64 reps = 3;optional group OptionalGroup = 4 {required string RequiredField = 5;} }利用 protoc-gen-go 生成對(duì)應(yīng)的 get/set 方法。代碼中就可以用生成的代碼進(jìn)行序列化和反序列化了。
package mainimport ("log""github.com/golang/protobuf/proto""path/to/example" )func main() {test := &example.Test {Label: proto.String("hello"),Type: proto.Int32(17),Reps: []int64{1, 2, 3},Optionalgroup: &example.Test_OptionalGroup {RequiredField: proto.String("good bye"),},}data, err := proto.Marshal(test)if err != nil {log.Fatal("marshaling error: ", err)}newTest := &example.Test{}err = proto.Unmarshal(data, newTest)if err != nil {log.Fatal("unmarshaling error: ", err)}// Now test and newTest contain the same data.if test.GetLabel() != newTest.GetLabel() {log.Fatalf("data mismatch %q != %q", test.GetLabel(), newTest.GetLabel())}// etc. }上面代碼中 proto.Marshal() 是序列化過(guò)程。proto.Unmarshal() 是反序列化過(guò)程。這一章節(jié)先看看序列化過(guò)程的實(shí)現(xiàn),下一章節(jié)再分析反序列化過(guò)程的實(shí)現(xiàn)。
// Marshal takes the protocol buffer // and encodes it into the wire format, returning the data. func Marshal(pb Message) ([]byte, error) {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {return m.Marshal()}p := NewBuffer(nil)err := p.Marshal(pb)if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil}return p.buf, err }序列化函數(shù)一進(jìn)來(lái),會(huì)先調(diào)用 message 對(duì)象自身的實(shí)現(xiàn)的序列化方法。
// Marshaler is the interface representing objects that can marshal themselves. type Marshaler interface {Marshal() ([]byte, error) }Marshaler 是一個(gè) interface ,這個(gè)接口是專(zhuān)門(mén)留給對(duì)象自定義序列化的。如果有實(shí)現(xiàn),就 return 自己實(shí)現(xiàn)的方法。如果沒(méi)有,接下來(lái)就進(jìn)行默認(rèn)序列化方式。
p := NewBuffer(nil) err := p.Marshal(pb) if p.buf == nil && err == nil {// Return a non-nil slice on success.return []byte{}, nil }新建一個(gè) Buffer ,調(diào)用 Buffer 的 Marshal() 方法。message 經(jīng)過(guò)序列化以后,數(shù)據(jù)流會(huì)放到 Buffer 的 buf 字節(jié)流中。序列化最終返回 buf 字節(jié)流即可。
type Buffer struct {buf []byte // encode/decode byte streamindex int // read point// pools of basic types to amortize allocation.bools []booluint32s []uint32uint64s []uint64// extra pools, only used with pointer_reflect.goint32s []int32int64s []int64float32s []float32float64s []float64 }Buffer 的數(shù)據(jù)結(jié)構(gòu)如上, Buffer 是用于序列化和反序列化 protocol buffers 的緩沖區(qū)管理器。它可以在調(diào)用的時(shí)候重用以減少內(nèi)存使用量。內(nèi)部維護(hù)了 7 個(gè) pool ,3 個(gè)基礎(chǔ)數(shù)據(jù)類(lèi)型的 pool ,4 個(gè)只能被 pointer_reflect 使用的 pool 。
func (p *Buffer) Marshal(pb Message) error {// Can the object marshal itself?if m, ok := pb.(Marshaler); ok {data, err := m.Marshal()p.buf = append(p.buf, data...)return err}t, base, err := getbase(pb)// 異常處理if structPointer_IsNil(base) {return ErrNil}if err == nil {err = p.enc_struct(GetProperties(t.Elem()), base)}// 用來(lái)統(tǒng)計(jì) Encode 次數(shù)的if collectStats {(stats).Encode++ // Parens are to work around a goimports bug.}// maxMarshalSize = 1<<31 - 1,這個(gè)值是 protobuf 可以 encoded 的最大值。if len(p.buf) > maxMarshalSize {return ErrTooLarge}return err }Buffer 的 Marshal() 方法依舊先調(diào)用一下對(duì)象是否實(shí)現(xiàn)了 Marshal() 接口,如果實(shí)現(xiàn)了,還是讓它自己序列化,序列化之后的二進(jìn)制數(shù)據(jù)流加入到 buf 數(shù)據(jù)流中。
func getbase(pb Message) (t reflect.Type, b structPointer, err error) {if pb == nil {err = ErrNilreturn}// get the reflect type of the pointer to the struct.t = reflect.TypeOf(pb)// get the address of the struct.value := reflect.ValueOf(pb)b = toStructPointer(value)return }getbase 方法通過(guò) reflect 方法拿到了 message 的類(lèi)型和對(duì)應(yīng) value 的結(jié)構(gòu)體指針。拿到結(jié)構(gòu)體指針先做異常處理。
所以序列化最核心的代碼其實(shí)就一句,p.enc_struct(GetProperties(t.Elem()), base)
// Encode a struct. func (o *Buffer) enc_struct(prop *StructProperties, base structPointer) error {var state errorState// Encode fields in tag order so that decoders may use optimizations// that depend on the ordering.// https://developers.google.com/protocol-buffers/docs/encoding#orderfor _, i := range prop.order {p := prop.Prop[i]if p.enc != nil {err := p.enc(o, p, base)if err != nil {if err == ErrNil {if p.Required && state.err == nil {state.err = &RequiredNotSetError{p.Name}}} else if err == errRepeatedHasNil {// Give more context to nil values in repeated fields.return errors.New("repeated field " + p.OrigName + " has nil element")} else if !state.shouldContinue(err, p) {return err}}if len(o.buf) > maxMarshalSize {return ErrTooLarge}}}// Do oneof fields.if prop.oneofMarshaler != nil {m := structPointer_Interface(base, prop.stype).(Message)if err := prop.oneofMarshaler(m, o); err == ErrNil {return errOneofHasNil} else if err != nil {return err}}// Add unrecognized fields at the end.if prop.unrecField.IsValid() {v := *structPointer_Bytes(base, prop.unrecField)if len(o.buf)+len(v) > maxMarshalSize {return ErrTooLarge}if len(v) > 0 {o.buf = append(o.buf, v...)}}return state.err }上面代碼中可以看到,除去 oneof fields 和 unrecognized fields 是單獨(dú)最后處理的,其他類(lèi)型都是調(diào)用的 p.enc(o, p, base) 進(jìn)行序列化的。
Properties 的數(shù)據(jù)結(jié)構(gòu)定義如下:
type Properties struct {Name string // name of the field, for error messagesOrigName string // original name before protocol compiler (always set)JSONName string // name to use for JSON; determined by protocWire stringWireType intTag intRequired boolOptional boolRepeated boolPacked bool // relevant for repeated primitives onlyEnum string // set for enum types onlyproto3 bool // whether this is known to be a proto3 field; set for []byte onlyoneof bool // whether this is a oneof fieldDefault string // default valueHasDefault bool // whether an explicit default was providedCustomType stringStdTime boolStdDuration boolenc encodervalEnc valueEncoder // set for bool and numeric types onlyfield fieldtagcode []byte // encoding of EncodeVarint((Tag<<3)|WireType)tagbuf [8]bytestype reflect.Type // set for struct types onlysstype reflect.Type // set for slices of structs types onlyctype reflect.Type // set for custom types onlysprop * StructProperties // set for struct types onlyisMarshaler boolisUnmarshaler boolmtype reflect.Type // set for map types onlymkeyprop * Properties // set for map types onlymvalprop * Properties // set for map types onlysize sizervalSize valueSizer // set for bool and numeric types onlydec decodervalDec valueDecoder // set for bool and numeric types only// If this is a packable field, this will be the decoder for the packed version of the field.packedDec decoder }在 Properties 這個(gè)結(jié)構(gòu)體中,定義了名為 enc 的 encoder 和名為 dec 的 decoder 。
encoder 和 decoder 函數(shù)定義是完全一樣的。
type encoder func(p *Buffer, prop *Properties, base structPointer) error type decoder func(p *Buffer, prop *Properties, base structPointer) errorencoder 和 decoder 函數(shù)初始化是在 Properties 中:
// Initialize the fields for encoding and decoding. func (p *Properties) setEncAndDec(typ reflect.Type, f *reflect.StructField, lockGetProp bool) {// 下面代碼有刪減,類(lèi)似的部分省略了// proto3 scalar typescase reflect.Int32:if p.proto3 {p.enc = (*Buffer).enc_proto3_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_int32} else {p.enc = (*Buffer).enc_ref_int32p.dec = (*Buffer).dec_proto3_int32p.size = size_ref_int32}case reflect.Uint32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32p.dec = (*Buffer).dec_proto3_int32 // can reusep.size = size_ref_uint32}case reflect.Float32:if p.proto3 {p.enc = (*Buffer).enc_proto3_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_proto3_uint32} else {p.enc = (*Buffer).enc_ref_uint32 // can just treat them as bitsp.dec = (*Buffer).dec_proto3_int32p.size = size_ref_uint32}case reflect.String:if p.proto3 {p.enc = (*Buffer).enc_proto3_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_proto3_string} else {p.enc = (*Buffer).enc_ref_stringp.dec = (*Buffer).dec_proto3_stringp.size = size_ref_string}case reflect.Slice:switch t2 := t1.Elem(); t2.Kind() {default:logNoSliceEnc(t1, t2)breakcase reflect.Int32:if p.Packed {p.enc = (*Buffer).enc_slice_packed_int32p.size = size_slice_packed_int32} else {p.enc = (*Buffer).enc_slice_int32p.size = size_slice_int32}p.dec = (*Buffer).dec_slice_int32p.packedDec = (*Buffer).dec_slice_packed_int32default:logNoSliceEnc(t1, t2)break}}case reflect.Map:p.enc = (*Buffer).enc_new_mapp.dec = (*Buffer).dec_new_mapp.size = size_new_mapp.mtype = t1p.mkeyprop = &Properties{}p.mkeyprop.init(reflect.PtrTo(p.mtype.Key()), "Key", f.Tag.Get("protobuf_key"), nil, lockGetProp)p.mvalprop = &Properties{}vtype := p.mtype.Elem()if vtype.Kind() != reflect.Ptr && vtype.Kind() != reflect.Slice {// The value type is not a message (*T) or bytes ([]byte),// so we need encoders for the pointer to this type.vtype = reflect.PtrTo(vtype)}p.mvalprop.CustomType = p.CustomTypep.mvalprop.StdDuration = p.StdDurationp.mvalprop.StdTime = p.StdTimep.mvalprop.init(vtype, "Value", f.Tag.Get("protobuf_val"), nil, lockGetProp)}p.setTag(lockGetProp) }上面代碼中,分別把各個(gè)類(lèi)型都進(jìn)行 switch - case 枚舉,每種情況都設(shè)置對(duì)應(yīng)的 encode 編碼器,decode 解碼器,size 大小。proto2 和 proto3 有區(qū)別的地方也分成2種不同的情況進(jìn)行處理。
有以下幾種類(lèi)型,reflect.Bool、reflect.Int32、reflect.Uint32、reflect.Int64、reflect.Uint64、reflect.Float32、reflect.Float64、reflect.String、reflect.Struct、reflect.Ptr、reflect.Slice、reflect.Map 共 12 種大的分類(lèi)。
下面主要挑 3 類(lèi),Int32 、String 、Map 代碼實(shí)現(xiàn)進(jìn)行分析。
Int32
func (o *Buffer) enc_proto3_int32(p *Properties, base structPointer) error {v := structPointer_Word32Val(base, p.field)x := int32(word32Val_Get(v)) // permit sign extension to use full 64-bit rangeif x == 0 {return ErrNil}o.buf = append(o.buf, p.tagcode...)p.valEnc(o, uint64(x))return nil }處理 Int32 代碼比較簡(jiǎn)單,先把 tagcode 放進(jìn) buf 二進(jìn)制數(shù)據(jù)流緩沖區(qū),接著序列化 Int32 ,序列化以后緊接著 tagcode 后面放進(jìn)緩沖區(qū)。
// EncodeVarint writes a varint-encoded integer to the Buffer. // This is the format for the // int32, int64, uint32, uint64, bool, and enum // protocol buffer types. func (p *Buffer) EncodeVarint(x uint64) error {for x >= 1<<7 {p.buf = append(p.buf, uint8(x&0x7f|0x80))x >>= 7}p.buf = append(p.buf, uint8(x))return nil }Int32 的編碼處理方法在上篇里面講過(guò),用的 Varint 處理方法。上面這個(gè)函數(shù)同樣適用于處理 int32, int64, uint32, uint64, bool, enum 。
順道也可以看看 sint32、Fixed32 的具體代碼實(shí)現(xiàn)。
// EncodeZigzag32 writes a zigzag-encoded 32-bit integer // to the Buffer. // This is the format used for the sint32 protocol buffer type. func (p *Buffer) EncodeZigzag32(x uint64) error {// use signed number to get arithmetic right shift.return p.EncodeVarint(uint64((uint32(x) << 1) ^ uint32((int32(x) >> 31)))) }針對(duì)有符號(hào)的 sint32 ,采取的是先 Zigzag ,然后在 Varint 的處理方式。
// EncodeFixed32 writes a 32-bit integer to the Buffer. // This is the format for the // fixed32, sfixed32, and float protocol buffer types. func (p *Buffer) EncodeFixed32(x uint64) error {p.buf = append(p.buf,uint8(x),uint8(x>>8),uint8(x>>16),uint8(x>>24))return nil }對(duì)于 Fixed32 的處理,僅僅只是位移操作,并沒(méi)有做什么壓縮操作。
String
func (o *Buffer) enc_proto3_string(p *Properties, base structPointer) error {v := *structPointer_StringVal(base, p.field)if v == "" {return ErrNil}o.buf = append(o.buf, p.tagcode...)o.EncodeStringBytes(v)return nil }序列化字符串也分2步,先把 tagcode 放進(jìn)去,然后再序列化數(shù)據(jù)。
// EncodeStringBytes writes an encoded string to the Buffer. // This is the format used for the proto2 string type. func (p *Buffer) EncodeStringBytes(s string) error {p.EncodeVarint(uint64(len(s)))p.buf = append(p.buf, s...)return nil }序列化字符串的時(shí)候,會(huì)先把字符串的長(zhǎng)度通過(guò)編碼 Varint 的方式,寫(xiě)到 buf 中。長(zhǎng)度后面再緊跟著 string 。這也就是 tag - length - value 的實(shí)現(xiàn)。
Map
// Encode a map field. func (o *Buffer) enc_new_map(p *Properties, base structPointer) error {var state errorState // XXX: or do we need to plumb this through?v := structPointer_NewAt(base, p.field, p.mtype).Elem() // map[K]Vif v.Len() == 0 {return nil}keycopy, valcopy, keybase, valbase := mapEncodeScratch(p.mtype)enc := func() error {if err := p.mkeyprop.enc(o, p.mkeyprop, keybase); err != nil {return err}if err := p.mvalprop.enc(o, p.mvalprop, valbase); err != nil && err != ErrNil {return err}return nil}// Don't sort map keys. It is not required by the spec, and C++ doesn't do it.for _, key := range v.MapKeys() {val := v.MapIndex(key)keycopy.Set(key)valcopy.Set(val)o.buf = append(o.buf, p.tagcode...)if err := o.enc_len_thing(enc, &state); err != nil {return err}}return nil }上述代碼也可以序列化字典數(shù)組,例如:
map<key_type, value_type> map_field = N;轉(zhuǎn)換成對(duì)應(yīng)的 repeated message 形式再進(jìn)行序列化。
message MapFieldEntry {key_type key = 1;value_type value = 2; } repeated MapFieldEntry map_field = N;map 序列化是針對(duì)每個(gè) k-v ,都先放入 tagcode ,然后再序列化 k-v 。這里需要化未知長(zhǎng)度的結(jié)構(gòu)體的時(shí)候需要調(diào)用 enc_len_thing() 方法。
// Encode something, preceded by its encoded length (as a varint). func (o *Buffer) enc_len_thing(enc func() error, state *errorState) error {iLen := len(o.buf)o.buf = append(o.buf, 0, 0, 0, 0) // reserve four bytes for lengthiMsg := len(o.buf)err := enc()if err != nil && !state.shouldContinue(err, nil) {return err}lMsg := len(o.buf) - iMsglLen := sizeVarint(uint64(lMsg))switch x := lLen - (iMsg - iLen); {case x > 0: // actual length is x bytes larger than the space we reserved// Move msg x bytes right.o.buf = append(o.buf, zeroes[:x]...)copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])case x < 0: // actual length is x bytes smaller than the space we reserved// Move msg x bytes left.copy(o.buf[iMsg+x:], o.buf[iMsg:iMsg+lMsg])o.buf = o.buf[:len(o.buf)+x] // x is negative}// Encode the length in the reserved space.o.buf = o.buf[:iLen]o.EncodeVarint(uint64(lMsg))o.buf = o.buf[:len(o.buf)+lMsg]return state.err }enc_len_thing() 方法會(huì)先預(yù)存 4 個(gè)字節(jié)的長(zhǎng)度空位。序列化以后算出長(zhǎng)度。如果長(zhǎng)度比 4 個(gè)字節(jié)還要長(zhǎng),則右移序列化的二進(jìn)制數(shù)據(jù),把長(zhǎng)度填到 tagcode 和數(shù)據(jù)之間。如果長(zhǎng)度小于 4 個(gè)字節(jié),相應(yīng)的要左移。
slice
最后再舉一個(gè)數(shù)組的例子。以 []int32 為例。
// Encode a slice of int32s ([]int32) in packed format. func (o *Buffer) enc_slice_packed_int32(p *Properties, base structPointer) error {s := structPointer_Word32Slice(base, p.field)l := s.Len()if l == 0 {return ErrNil}// TODO: Reuse a Buffer.buf := NewBuffer(nil)for i := 0; i < l; i++ {x := int32(s.Index(i)) // permit sign extension to use full 64-bit rangep.valEnc(buf, uint64(x))}o.buf = append(o.buf, p.tagcode...)o.EncodeVarint(uint64(len(buf.buf)))o.buf = append(o.buf, buf.buf...)return nil }序列化這個(gè)數(shù)組,分3步,先把 tagcode 放進(jìn)去,然后再序列化整個(gè)數(shù)組的長(zhǎng)度,最后把數(shù)組的每個(gè)數(shù)據(jù)都序列化放在后面。最后形成 tag - length - value - value - value 的形式。
上述就是 Protocol Buffer 序列化的過(guò)程。
序列化小結(jié)
Protocol Buffer 序列化采用 Varint、Zigzag 方法,壓縮 int 型整數(shù)和帶符號(hào)的整數(shù)。對(duì)浮點(diǎn)型數(shù)字不做壓縮(這里可以進(jìn)一步的壓縮,Protocol Buffer 還有提升空間)。編碼 .proto 文件,會(huì)對(duì) option 和 repeated 字段進(jìn)行檢查,若 optional 或 repeated 字段沒(méi)有被設(shè)置字段值,那么該字段在序列化時(shí)的數(shù)據(jù)中是完全不存在的,即不進(jìn)行序列化(少編碼一個(gè)字段)。
上面這兩點(diǎn)做到了壓縮數(shù)據(jù),序列化工作量減少。
序列化的過(guò)程都是二進(jìn)制的位移,速度非??臁?shù)據(jù)都以 tag - length - value (或者 tag - value)的形式存在二進(jìn)制數(shù)據(jù)流中。采用了 TLV 結(jié)構(gòu)存儲(chǔ)數(shù)據(jù)以后,也擺脫了 JSON 中的 {、}、; 、這些分隔符,沒(méi)有這些分隔符也算是再一次減少了一部分?jǐn)?shù)據(jù)。
這一點(diǎn)做到了序列化速度非???。
回到頂部
protocolBuffers反序列化
反序列化的實(shí)現(xiàn)完全是序列化實(shí)現(xiàn)的逆過(guò)程。
func Unmarshal(buf []byte, pb Message) error {pb.Reset()return UnmarshalMerge(buf, pb) }在反序列化開(kāi)始之前,先重置一下緩沖區(qū)。
func (p *Buffer) Reset() {p.buf = p.buf[0:0] // for reading/writingp.index = 0 // for reading }清空 buf 中的所有數(shù)據(jù),并且重置 index 。
func UnmarshalMerge(buf []byte, pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {return u.Unmarshal(buf)}return NewBuffer(buf).Unmarshal(pb) }反序列化數(shù)據(jù)的開(kāi)始從上面這個(gè)函數(shù)開(kāi)始,如果傳進(jìn)來(lái)的 message 的結(jié)果和 buf 結(jié)果不匹配,最終得到的結(jié)果是不可預(yù)知的。反序列化之前,同樣會(huì)先調(diào)用一下對(duì)應(yīng)自己身自定義的 Unmarshal() 方法。
type Unmarshaler interface {Unmarshal([]byte) error }Unmarshal() 是一個(gè)可以自己實(shí)現(xiàn)的接口。
UnmarshalMerge 中會(huì)調(diào)用 Unmarshal(pb Message) 方法。
func (p *Buffer) Unmarshal(pb Message) error {// If the object can unmarshal itself, let it.if u, ok := pb.(Unmarshaler); ok {err := u.Unmarshal(p.buf[p.index:])p.index = len(p.buf)return err}typ, base, err := getbase(pb)if err != nil {return err}err = p.unmarshalType(typ.Elem(), GetProperties(typ.Elem()), false, base)if collectStats {stats.Decode++}return err }Unmarshal(pb Message) 這個(gè)函數(shù)只有一個(gè)入?yún)?#xff0c;和 proto.Unmarshal() 方法函數(shù)簽名不同(前面的函數(shù)只有 1 個(gè)入?yún)?#xff0c;后面的有 2 個(gè)入?yún)?。兩者的區(qū)別在于,1 個(gè)入?yún)⒌暮瘮?shù)實(shí)現(xiàn)里面并不會(huì)重置 buf 緩沖區(qū),二個(gè)入?yún)⒌臅?huì)先重置 buf 緩沖區(qū)。
這兩個(gè)函數(shù)最終都會(huì)調(diào)用 unmarshalType() 方法,這個(gè)函數(shù)是最終支持反序列化的函數(shù)。
func (o *Buffer) unmarshalType(st reflect.Type, prop *StructProperties, is_group bool, base structPointer) error {var state errorStaterequired, reqFields := prop.reqCount, uint64(0)var err errorfor err == nil && o.index < len(o.buf) {oi := o.indexvar u uint64u, err = o.DecodeVarint()if err != nil {break}wire := int(u & 0x7)// 下面代碼有省略dec := p.dec// 中間代碼有省略decErr := dec(o, p, base)if decErr != nil && !state.shouldContinue(decErr, p) {err = decErr}if err == nil && p.Required {// Successfully decoded a required field.if tag <= 64 {// use bitmap for fields 1-64 to catch field reuse.var mask uint64 = 1 << uint64(tag-1)if reqFields&mask == 0 {// new required fieldreqFields |= maskrequired--}} else {// This is imprecise. It can be fooled by a required field// with a tag > 64 that is encoded twice; that's very rare.// A fully correct implementation would require allocating// a data structure, which we would like to avoid.required--}}}if err == nil {if is_group {return io.ErrUnexpectedEOF}if state.err != nil {return state.err}if required > 0 {// Not enough information to determine the exact field. If we use extra// CPU, we could determine the field only if the missing required field// has a tag <= 64 and we check reqFields.return &RequiredNotSetError{"{Unknown}"}}}return err }unmarshalType() 函數(shù)比較長(zhǎng),里面處理的情況比較多,有 oneof,WireEndGroup 。真正處理反序列化的函數(shù)在 decErr := dec(o, p, base) 這一行。
dec 函數(shù)在 Properties 的 setEncAndDec() 函數(shù)中進(jìn)行了初始化。上面序列化的時(shí)候談到過(guò)那個(gè)函數(shù)了,這里就不再贅述了。dec() 函數(shù)針對(duì)每個(gè)不同類(lèi)型都有對(duì)應(yīng)的反序列化函數(shù)。
同樣的,接下來(lái)也舉 4 個(gè)例子,看看反序列化的實(shí)際代碼實(shí)現(xiàn)。
Int32
func (o *Buffer) dec_proto3_int32(p *Properties, base structPointer) error {u, err := p.valDec(o)if err != nil {return err}word32Val_Set(structPointer_Word32Val(base, p.field), uint32(u))return nil }反序列化 Int32 代碼比較簡(jiǎn)單,原理是按照 encode 的逆過(guò)程,還原原來(lái)的數(shù)據(jù)。
func (p *Buffer) DecodeVarint() (x uint64, err error) {i := p.indexbuf := p.bufif i >= len(buf) {return 0, io.ErrUnexpectedEOF} else if buf[i] < 0x80 {p.index++return uint64(buf[i]), nil} else if len(buf)-i < 10 {return p.decodeVarintSlow()}var b uint64// we already checked the first bytex = uint64(buf[i]) - 0x80i++b = uint64(buf[i])i++x += b << 7if b&0x80 == 0 {goto done}x -= 0x80 << 7b = uint64(buf[i])i++x += b << 14if b&0x80 == 0 {goto done}x -= 0x80 << 14b = uint64(buf[i])i++x += b << 21if b&0x80 == 0 {goto done}x -= 0x80 << 21b = uint64(buf[i])i++x += b << 28if b&0x80 == 0 {goto done}x -= 0x80 << 28b = uint64(buf[i])i++x += b << 35if b&0x80 == 0 {goto done}x -= 0x80 << 35b = uint64(buf[i])i++x += b << 42if b&0x80 == 0 {goto done}x -= 0x80 << 42b = uint64(buf[i])i++x += b << 49if b&0x80 == 0 {goto done}x -= 0x80 << 49b = uint64(buf[i])i++x += b << 56if b&0x80 == 0 {goto done}x -= 0x80 << 56b = uint64(buf[i])i++x += b << 63if b&0x80 == 0 {goto done}// x -= 0x80 << 63 // Always zero.return 0, errOverflowdone:p.index = ireturn x, nil }Int32 序列化之后,第一個(gè)字節(jié)一定是 0x80 ,那么除去這個(gè)字節(jié)以后,后面的每個(gè)二進(jìn)制字節(jié)都是數(shù)據(jù),剩下的步驟就是通過(guò)位移操作把每個(gè)數(shù)字都加起來(lái)。上面這個(gè)反序列化的函數(shù)同樣適用于 int32 , int64 , uint32 , uint64 , bool , 和 enum。
順道也可以看看 sint32 、Fixed32 的反序列化具體代碼實(shí)現(xiàn)。
func (p *Buffer) DecodeZigzag32() (x uint64, err error) {x, err = p.DecodeVarint()if err != nil {return}x = uint64((uint32(x) >> 1) ^ uint32((int32(x&1)<<31)>>31))return }針對(duì)有符號(hào)的 sint32 ,反序列化的過(guò)程就是先反序列 Varint ,再反序列化 Zigzag 。
func (p *Buffer) DecodeFixed32() (x uint64, err error) {// x, err already 0i := p.index + 4if i < 0 || i > len(p.buf) {err = io.ErrUnexpectedEOFreturn}p.index = ix = uint64(p.buf[i-4])x |= uint64(p.buf[i-3]) << 8x |= uint64(p.buf[i-2]) << 16x |= uint64(p.buf[i-1]) << 24return }Fixed32 反序列化的過(guò)程也是通過(guò)位移,每個(gè)字節(jié)的內(nèi)容都累加,就可以還原出原先的數(shù)據(jù)。注意這里也要先跳過(guò) tag 的位置。
String
func (p *Buffer) DecodeRawBytes(alloc bool) (buf []byte, err error) {n, err := p.DecodeVarint()if err != nil {return nil, err}nb := int(n)if nb < 0 {return nil, fmt.Errorf("proto: bad byte length %d", nb)}end := p.index + nbif end < p.index || end > len(p.buf) {return nil, io.ErrUnexpectedEOF}if !alloc {// todo: check if can get more uses of alloc=falsebuf = p.buf[p.index:end]p.index += nbreturn}buf = make([]byte, nb)copy(buf, p.buf[p.index:])p.index += nbreturn }反序列化 string 先把 length 序列化出來(lái),通過(guò) DecodeVarint 的方式。拿到 length 以后,剩下的就是直接拷貝的過(guò)程。在上篇 encode 中,我們知道字符串是不做處理,直接放到二進(jìn)制流里面的,所以反序列化直接取出即可。
Map
func (o *Buffer) dec_new_map(p *Properties, base structPointer) error {raw, err := o.DecodeRawBytes(false)if err != nil {return err}oi := o.index // index at the end of this map entryo.index -= len(raw) // move buffer back to start of map entrymptr := structPointer_NewAt(base, p.field, p.mtype) // *map[K]Vif mptr.Elem().IsNil() {mptr.Elem().Set(reflect.MakeMap(mptr.Type().Elem()))}v := mptr.Elem() // map[K]V// 這里省略一些代碼,主要是為了 key - value 準(zhǔn)備的一些可以雙重間接尋址的占位符,具體原因可以見(jiàn)序列化代碼里面的 enc_new_map 函數(shù)// Decode.// This parses a restricted wire format, namely the encoding of a message// with two fields. See enc_new_map for the format.for o.index < oi {// tagcode for key and value properties are always a single byte// because they have tags 1 and 2.tagcode := o.buf[o.index]o.index++switch tagcode {case p.mkeyprop.tagcode[0]:if err := p.mkeyprop.dec(o, p.mkeyprop, keybase); err != nil {return err}case p.mvalprop.tagcode[0]:if err := p.mvalprop.dec(o, p.mvalprop, valbase); err != nil {return err}default:// TODO: Should we silently skip this instead?return fmt.Errorf("proto: bad map data tag %d", raw[0])}}keyelem, valelem := keyptr.Elem(), valptr.Elem()if !keyelem.IsValid() {keyelem = reflect.Zero(p.mtype.Key())}if !valelem.IsValid() {valelem = reflect.Zero(p.mtype.Elem())}v.SetMapIndex(keyelem, valelem)return nil }反序列化 map 需要把每個(gè) tag 取出來(lái),然后緊接著反序列化每個(gè) key - value 。最后會(huì)判斷 keyelem 和 valelem 是否為零值,如果是零值要分別調(diào)用 reflect.Zero 處理零值的情況。
slice
最后還是舉一個(gè)數(shù)組的例子。以 []int32 為例。
func (o *Buffer) dec_slice_packed_int32(p *Properties, base structPointer) error {v := structPointer_Word32Slice(base, p.field)nn, err := o.DecodeVarint()if err != nil {return err}nb := int(nn) // number of bytes of encoded int32sfin := o.index + nbif fin < o.index {return errOverflow}for o.index < fin {u, err := p.valDec(o)if err != nil {return err}v.Append(uint32(u))}return nil }反序列化這個(gè)數(shù)組,分2步,跳過(guò) tagcode 拿到 length ,反序列化 length 。在 length 這個(gè)長(zhǎng)度中依次反序列化各個(gè) value 。
上述就是 Protocol Buffer 反序列化的過(guò)程。
序列化小結(jié)
Protocol Buffer 反序列化直接讀取二進(jìn)制字節(jié)數(shù)據(jù)流,反序列化就是 encode 的反過(guò)程,同樣是一些二進(jìn)制操作。反序列化的時(shí)候,通常只需要用到 length 。tag 值只是用來(lái)標(biāo)識(shí)類(lèi)型的,Properties 的 setEncAndDec() 方法里面已經(jīng)把每個(gè)類(lèi)型對(duì)應(yīng)的 decode 解碼器初始化好了,所以反序列化的時(shí)候,tag 值可以直接跳過(guò),從 length 開(kāi)始處理。
XML 的解析過(guò)程就復(fù)雜一些。XML 需要從文件中讀取出字符串,再轉(zhuǎn)換為 XML 文檔對(duì)象結(jié)構(gòu)模型。之后,再?gòu)?XML 文檔對(duì)象結(jié)構(gòu)模型中讀取指定節(jié)點(diǎn)的字符串,最后再將這個(gè)字符串轉(zhuǎn)換成指定類(lèi)型的變量。這個(gè)過(guò)程非常復(fù)雜,其中將 XML 文件轉(zhuǎn)換為文檔對(duì)象結(jié)構(gòu)模型的過(guò)程通常需要完成詞法文法分析等大量消耗 CPU 的復(fù)雜計(jì)算。
回到頂部
序列化/反序列化性能
Protocol Buffer 一直被人們認(rèn)為是高性能的存在。也有很多人做過(guò)實(shí)現(xiàn),驗(yàn)證了這一說(shuō)法。例如這個(gè)鏈接里面的實(shí)驗(yàn) jvm-serializers。
在看數(shù)據(jù)之前,我們可以先理性的分析一下 Protocol Buffer 和 JSON 、XML 這些比有哪些優(yōu)勢(shì):
Protobuf 采用了 Varint 、Zigzag 大幅的壓縮了整數(shù)類(lèi)型,也沒(méi)有 JSON 里面的 {、}、;、這些數(shù)據(jù)分隔符,有 option 字段標(biāo)識(shí)的,沒(méi)有數(shù)據(jù)的時(shí)候不會(huì)進(jìn)行反序列化。這幾個(gè)措施導(dǎo)致 pb 的數(shù)據(jù)量整體的就比 JSON 少很多。
Protobuf 采取的是 TLV 的形式,JSON 這些都是字符串的形式。字符串比對(duì)應(yīng)該比基于數(shù)字的字段 tag 更耗時(shí)。Protobuf 在正文前有一個(gè)大小或者長(zhǎng)度的標(biāo)記,而 JSON 必須全文掃描無(wú)法跳過(guò)不需要的字段。
下面這張圖來(lái)自參考鏈接里面的 《Protobuf有沒(méi)有比JSON快5倍?用代碼來(lái)?yè)羝苝b性能神話》:
從這個(gè)實(shí)驗(yàn)來(lái)看,確實(shí) Protobuf 在序列化數(shù)字這方面性能是非常強(qiáng)悍的。
序列化 / 反序列化數(shù)字確實(shí)是 Protobuf 針對(duì) JSON 和 XML 的優(yōu)勢(shì),但是它也存在一些沒(méi)有優(yōu)勢(shì)的地方。比如字符串。字符串在 Protobuf 中基本沒(méi)有處理,除了前面加了 tag - length 。在序列化 / 反序列化字符串的過(guò)程中,字符串拷貝的速度反而決定的真正的速度。
從上圖可以看到 encode 字符串的時(shí)候,速度基本和 JSON 相差無(wú)幾。
回到頂部
最后
至此,關(guān)于 protocol buffers 的所有,讀者應(yīng)該了然于胸了。
protocol buffers 誕生之初也并不是為了傳輸數(shù)據(jù)存在的,只是為了解決服務(wù)器多版本協(xié)議兼容的問(wèn)題。實(shí)質(zhì)其實(shí)是發(fā)明了一個(gè)新的跨語(yǔ)言無(wú)歧義的 IDL (Interface description language) 。只不過(guò)人們后來(lái)發(fā)現(xiàn)用它來(lái)傳輸數(shù)據(jù)也不錯(cuò),才開(kāi)始用 protocol buffers 。
想用 protocol buffers 替換 JSON ,可能是考慮到:
-
protocol buffers 相同數(shù)據(jù),傳輸?shù)臄?shù)據(jù)量比 JSON 小,gzip 或者 7zip 壓縮以后,網(wǎng)絡(luò)傳輸消耗較少。
-
protocol buffers 不是自我描述的,在缺少 .proto 文件以后,有一定的加密性,數(shù)據(jù)傳輸過(guò)程中都是二進(jìn)制流,并不是明文。
-
protocol buffers 提供了一套工具,自動(dòng)化生成代碼也非常方便。
-
protocol buffers 具有向后兼容性,改變了數(shù)據(jù)結(jié)構(gòu)以后,對(duì)老的版本沒(méi)有影響。
-
protocol buffers 原生完美兼容 RPC 調(diào)用。
如果很少用到整型數(shù)字,浮點(diǎn)型數(shù)字,全部都是字符串?dāng)?shù)據(jù),那么 JSON 和 protocol buffers 性能不會(huì)差太多。純前端之間交互的話,選擇 JSON 或者 protocol buffers 差別不是很大。
與后端交互過(guò)程中,用到 protocol buffers 比較多,筆者認(rèn)為選擇 protocol buffers 除了性能強(qiáng)以外,完美兼容 RPC 調(diào)用也是一個(gè)重要因素。
回到頂部
總結(jié)
以上是生活随笔為你收集整理的高效的序列化/反序列化数据方式 Protobuf的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: sdk没有登录什么意思_检查肝功能没有空
- 下一篇: native react 图片多选_re