RPC是远程过程调用(Remote Procedure Call )的缩写形式。RPC调用的原理其实很简单,它类似于三层构架的C/S系统,第三方的客户程序通过接口调用RPC内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。
TinyRPC 是基于Go语言标准库 net/rpc 扩展的远程过程调用框架,它具有以下特性:
基于TCP传输层协议
支持多种压缩格式 :gzip、snappy、zlib;
基于二进制的 Protocol Buffer 序列化协议:具有协议编码小及高扩展性和跨平台性;
支持生成工具:TinyRPC提供的 protoc-gen-tinyrpc 插件可以帮助开发者快速定义自己的服务;
支持自定义序列化器
TinyRPC 的源代码仅有一千行左右,通过学习 TinyRPC ,开发者可以得到以下收获:
代码简洁规范
涵盖大多数 Go 语言基础用法和高级特性
单元测试编写技巧
TCP流中处理数据包的技巧
RPC框架的设计理念
基于TCP的TinyRPC协议 在TinyRPC中,请求消息由TinyRPC客户端的应用程序发出,在TCP的字节流中,请求消息分为三部分:
由可变长量编码的 uint 类型 用来标识请求头的长度;
基于自定义协议编码的请求头部信息
基于 Protocol Buffer 协议编码的请求体,见图所示:
在TinyRPC中,响应消息由TinyRPC服务端的应用程序响应,在TCP的字节流中,响应消息分为三部分:
由可变长量编码的 uint 类型 用来标识响应头的长度;
基于自定义协议编码的响应头部信息
基于 Protocol Buffer 协议编码的响应体,见图所示:
其中ID为RPC调用的序号,以便在并发调用时,客户端根据响应的ID序号来判断RPC的调用结果;
Error message为调用时发生错误的消息,若该内容为空则表示未出现RPC调用错误;
在请求I/O流中,请求体(Request Body)表示RPC的参数内容;而在响应I/O流中,响应体(Response Body)则表示RPC调用的结果,这些Body在TinyRPC中均采用 Protocol Buffer 协议编码。
请求头部消息编码 由于TinyRPC 的请求头部是自定义协议编码的,我们可以查看文件header/header.go 了解它的细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type CompressType uint16 type RequestHeader struct { sync.RWMutex CompressType CompressType Method string ID uint64 RequestLen uint32 Checksum uint32 }
其中 RequestHeader 的编解码过程如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 func (r *RequestHeader) Marshal() []byte { r.RLock() defer r.RUnlock() idx := 0 header := make ([]byte , MaxHeaderSize+len (r.Method)) binary.LittleEndian.PutUint16(header[idx:], uint16 (r.CompressType)) idx += Uint16Size idx += writeString(header[idx:], r.Method) idx += binary.PutUvarint(header[idx:], r.ID) idx += binary.PutUvarint(header[idx:], uint64 (r.RequestLen)) binary.LittleEndian.PutUint32(header[idx:], r.Checksum) idx += Uint32Size return header[:idx] } func (r *RequestHeader) Unmarshal(data []byte ) (err error ) { r.Lock() defer r.Unlock() if len (data) == 0 { return UnmarshalError } defer func () { if r := recover (); r != nil { err = UnmarshalError } }() idx, size := 0 , 0 r.CompressType = CompressType(binary.LittleEndian.Uint16(data[idx:])) idx += Uint16Size r.Method, size = readString(data[idx:]) idx += size r.ID, size = binary.Uvarint(data[idx:]) idx += size length, size := binary.Uvarint(data[idx:]) r.RequestLen = uint32 (length) idx += size r.Checksum = binary.LittleEndian.Uint32(data[idx:]) return } func readString (data []byte ) (string , int ) { idx := 0 length, size := binary.Uvarint(data) idx += size str := string (data[idx : idx+int (length)]) idx += len (str) return str, idx } func writeString (data []byte , str string ) int { idx := 0 idx += binary.PutUvarint(data, uint64 (len (str))) copy (data[idx:], str) idx += len (str) return idx }
响应头部消息编码 由于TinyRPC 的响应头部是自定义协议编码的,我们可以查看文件header/header.go 了解它的细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type ResponseHeader struct { sync.RWMutex CompressType CompressType ID uint64 Error string ResponseLen uint32 Checksum uint32 }
其中 ResponseHeader 的编解码过程如下所示,与RequestHeader 的编解码过程类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 func (r *ResponseHeader) Marshal() []byte { r.RLock() defer r.RUnlock() idx := 0 header := make ([]byte , MaxHeaderSize+len (r.Error)) binary.LittleEndian.PutUint16(header[idx:], uint16 (r.CompressType)) idx += Uint16Size idx += binary.PutUvarint(header[idx:], r.ID) idx += writeString(header[idx:], r.Error) idx += binary.PutUvarint(header[idx:], uint64 (r.ResponseLen)) binary.LittleEndian.PutUint32(header[idx:], r.Checksum) idx += Uint32Size return header[:idx] } func (r *ResponseHeader) Unmarshal(data []byte ) (err error ) { r.Lock() defer r.Unlock() if len (data) == 0 { return UnmarshalError } defer func () { if r := recover (); r != nil { err = UnmarshalError } }() idx, size := 0 , 0 r.CompressType = CompressType(binary.LittleEndian.Uint16(data[idx:])) idx += Uint16Size r.ID, size = binary.Uvarint(data[idx:]) idx += size r.Error, size = readString(data[idx:]) idx += size length, size := binary.Uvarint(data[idx:]) r.ResponseLen = uint32 (length) idx += size r.Checksum = binary.LittleEndian.Uint32(data[idx:]) return }
头部消息对象池 为了减少创建请求头部对象 RequestHeader 和响应头部对象 ResponseHeader 的次数, 我们通过为这两个结构体建立对象池,以便可以进行复用。
同时我们为 RequestHeader 和 ResponseHeader 都实现了ResetHeader方法,当每次使用完这些对象时,我们调用ResetHeader让结构体内容初始化,随后再把它们丢回对象池里。
代码 header/pool.go 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package headerimport "sync" var ( RequestPool sync.Pool ResponsePool sync.Pool ) func init () { RequestPool = sync.Pool{New: func () any { return &RequestHeader{} }} ResponsePool = sync.Pool{New: func () any { return &ResponseHeader{} }} } func (h *RequestHeader) ResetHeader() { h.Id = 0 h.Checksum = 0 h.Method = "" h.CompressType = 0 h.RequestLen = 0 } func (h *ResponseHeader) ResetHeader() { h.Error = "" h.Id = 0 h.CompressType = 0 h.Checksum = 0 h.ResponseLen = 0 }
IO操作 TinyRPC的IO操作函数在codec/io.go 中,其中 sendFrame 函数会向IO中写入uvarint 类型的 size ,表示要发送数据的长度,随后将该字节slice类型的数据 data 写入IO流中。
若写入数据的长度为 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 0 值;
若写入数据的长度大于 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 len(data) 值,随后将该字节串的数据 data 写入IO流中。
代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func sendFrame (w io.Writer, data []byte ) (err error ) { var size [binary.MaxVarintLen64]byte if data == nil || len (data) == 0 { n := binary.PutUvarint(size[:], uint64 (0 )) if err = write(w, size[:n]); err != nil { return } return } n := binary.PutUvarint(size[:], uint64 (len (data))) if err = write(w, size[:n]); err != nil { return } if err = write(w, data); err != nil { return } return } func write (w io.Writer, data []byte ) error { for index := 0 ; index < len (data); { n, err := w.Write(data[index:]) if _, ok := err.(net.Error); !ok { return err } index += n } return nil }
recvFrame 函数与sendFrame 函数类似,首先会向IO中读入uvarint 类型的 size ,表示要接收数据的长度,随后将该从IO流中读取该 size 长度字节串。
注意,由于 codec 层会传入一个bufio 类型的结构体,bufio 类型实现了有缓冲的IO操作,以便减少IO在用户态与内核态拷贝的次数。
若 recvFrame 函数从IO流读取uvarint 类型的 size 值大于0,随后 recvFrame 将该从IO流中读取该 size 长度字节串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func recvFrame (r io.Reader) (data []byte , err error ) { size, err := binary.ReadUvarint(r.(io.ByteReader)) if err != nil { return nil , err } if size != 0 { data = make ([]byte , size) if err = read(r, data); err != nil { return nil , err } } return data, nil } func read (r io.Reader, data []byte ) error { for index := 0 ; index < len (data); { n, err := r.Read(data[index:]) if err != nil { if _, ok := err.(net.Error); !ok { return err } } index += n } return nil }
TinyRPC的压缩器 TinyRPC的压缩器代码部分很短,RawCompressor、GzipCompressor、SnappyCompressor、ZlibCompressor压缩器均实现了Compressor 接口,代码compressor/compressor.go :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type CompressType int32 const ( Raw CompressType = iota Gzip Snappy Zlib ) var Compressors = map [CompressType]Compressor{ Raw: RawCompressor{}, Gzip: GzipCompressor{}, Snappy: SnappyCompressor{}, Zlib: ZlibCompressor{}, } type Compressor interface { Zip([]byte ) ([]byte , error ) Unzip([]byte ) ([]byte , error ) }
TinyRPC的序列化器 TinyRPC的序列化器的代码部分也很短,ProtoSerializer实现了Serializer接口,它是基于Protocol Buffer的序列化协议,代码serializer/serializer.go :
1 2 3 4 5 6 type SerializeType int32 type Serializer interface { Marshal(message any) ([]byte , error ) Unmarshal(data []byte , message any) error }
实现ClientCodec接口 由于TinyRPC是基于标准库net/rpc 扩展的,所以TinyRPC在codec层需要实现net/rpc 的ClientCodec 接口,我们先看看ClientCodec 的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type ClientCodec interface { WriteRequest(*Request, any) error ReadResponseHeader(*Response) error ReadResponseBody(any) error Close() error } type Request struct { ServiceMethod string Seq uint64 next *Request } type Response struct { ServiceMethod string Seq uint64 Error string next *Response }
其中ClientCodec接口包括写请求 、读响应头部 和读响应体 ,我们建立一个clientCode的结构体用来实现ClientCodec接口:
代码 codec/client.go 如下:
1 2 3 4 5 6 7 8 9 10 11 type clientCodec struct { r io.Reader w io.Writer c io.Closer compressor compressor.CompressType serializer serializer.Serializer response header.ResponseHeader mutex sync.Mutex pending map [uint64 ]string }
其中 compressor 表示压缩类型,serializer 表示使用的序列化器,response 是响应的头部,mutex 是用于保护 pending 的互斥锁;
1 2 3 4 5 6 7 8 9 10 11 12 13 func NewClientCodec (conn io.ReadWriteCloser, compressType compressor.CompressType, serializer serializer.Serializer) rpc.ClientCodec { return &clientCodec{ r: bufio.NewReader(conn), w: bufio.NewWriter(conn), c: conn, compressor: compressType, serializer: serializer, pending: make (map [uint64 ]string ), } }
这里的读写IO分别使用 bufio.NewReader 和 bufio.NewWriter 构造,通过缓冲IO来提高RPC的读写性能;
首先 clientCode 结构体实现了 ClientCodec 接口的WriteRequest 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (c *clientCodec) WriteRequest(r *rpc.Request, param any) error {c.mutex.Lock() c.pending[r.Seq] = r.ServiceMethod c.mutex.Unlock() if _, ok := compressor.Compressors[c.compressor]; !ok { return NotFoundCompressorError } reqBody, err := c.serializer.Marshal(param) if err != nil { return err } compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody) if err != nil { return err } h := header.RequestPool.Get().(*header.RequestHeader) defer func () { h.ResetHeader() header.RequestPool.Put(h) }() h.ID = r.Seq h.Method = r.ServiceMethod h.RequestLen = uint32 (len (compressedReqBody)) h.CompressType = header.CompressType(c.compressor) h.Checksum = crc32.ChecksumIEEE(compressedReqBody) if err := sendFrame(c.w, h.Marshal()); err != nil { return err } if err := write(c.w, compressedReqBody); err != nil { return err } c.w.(*bufio.Writer).Flush() return nil }
实现 *ClientCodec* 接口的 *ReadResponseHeader* 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error { c.response.ResetHeader() data, err := recvFrame(c.r) if err != nil { return err } err = c.response.Unmarshal(data) if err != nil { return err } c.mutex.Lock() r.Seq = c.response.ID r.Error = c.response.Error r.ServiceMethod = c.pending[r.Seq] delete (c.pending, r.Seq) c.mutex.Unlock() return nil }
实现 *ClientCodec* 接口的 *ReadResponseBody* 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (c *clientCodec) ReadResponseBody(param any) error { if param == nil { if c.response.ResponseLen != 0 { if err := read(c.r, make ([]byte , c.response.ResponseLen)); err != nil { return err } } return nil } respBody := make ([]byte , c.response.ResponseLen) err := read(c.r, respBody) if err != nil { return err } if c.response.Checksum != 0 { if crc32.ChecksumIEEE(respBody) != c.response.Checksum { return UnexpectedChecksumError } } if _, ok := compressor.Compressors[c.response.GetCompressType()]; !ok { return NotFoundCompressorError } resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody) if err != nil { return err } return c.serializer.Unmarshal(resp, param) }
实现ServerCodec接口 TinyRPC在codec层还需要实现net/rpc 的ServerCodec 接口
ServerCodec 的接口和 ClientCodec 接口十分类似:
1 2 3 4 5 6 type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(any) error WriteResponse(*Response, any) error Close() error }
其中 ServerCodec 接口包括写响应 、读请求头部 和读请求体 ,我们建立一个 serverCodec 的结构体用来实现 ServerCodec 接口,代码codec/server.go :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type serverCodec struct { r io.Reader w io.Writer c io.Closer request header.RequestHeader serializer serializer.Serializer mutex sync.Mutex seq uint64 pending map [uint64 ]uint64 } func NewServerCodec (conn io.ReadWriteCloser, serializer serializer.Serializer) rpc.ServerCodec { return &serverCodec{ r: bufio.NewReader(conn), w: bufio.NewWriter(conn), c: conn, serializer: serializer, pending: make (map [uint64 ]uint64 ), } }
是不是和刚才的 clientCode 结构体神似 ?
首先, serverCodec 结构体实现了 ServerCodec 接口的 ReadRequestHeader 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error { s.request.ResetHeader() data, err := recvFrame(s.r) if err != nil { return err } err = s.request.Unmarshal(data) if err != nil { return err } s.mutex.Lock() s.seq++ s.pending[s.seq] = s.request.ID r.ServiceMethod = s.request.Method r.Seq = s.seq s.mutex.Unlock() return nil }
实现 ServerCodec 接口的 ReadRequestBody 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (s *serverCodec) ReadRequestBody(x any) error { if param == nil { if s.request.RequestLen != 0 { if err := read(s.r, make ([]byte , s.request.RequestLen)); err != nil { return err } } return nil } reqBody := make ([]byte , s.request.RequestLen) err := read(s.r, reqBody) if err != nil { return err } if s.request.Checksum != 0 { if crc32.ChecksumIEEE(reqBody) != s.request.Checksum { return UnexpectedChecksumError } } if _, ok := compressor. Compressors[s.request.GetCompressType()]; !ok { return NotFoundCompressorError } req, err := compressor. Compressors[s.request.GetCompressType()].Unzip(reqBody) if err != nil { return err } return s.serializer.Unmarshal(req, param) }
实现 ServerCodec 接口的 WriteResponse 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 func (s *serverCodec) WriteResponse(r *rpc.Response, param any) error { s.mutex.Lock() id, ok := s.pending[r.Seq] if !ok { s.mutex.Unlock() return InvalidSequenceError } delete (s.pending, r.Seq) s.mutex.Unlock() if r.Error != "" { param = nil } if _, ok := compressor. Compressors[s.request.GetCompressType()]; !ok { return NotFoundCompressorError } var respBody []byte var err error if param != nil { respBody, err = s.serializer.Marshal(param) if err != nil { return err } } compressedRespBody, err := compressor. Compressors[s.request.GetCompressType()].Zip(respBody) if err != nil { return err } h := header.ResponsePool.Get().(*header.ResponseHeader) defer func () { h.ResetHeader() header.ResponsePool.Put(h) }() h.ID = id h.Error = r.Error h.ResponseLen = uint32 (len (compressedRespBody)) h.Checksum = crc32.ChecksumIEEE(compressedRespBody) h.CompressType = s.request.CompressType if err = sendFrame(s.w, h.Marshal()); err != nil { return err } if err = write(s.w, compressedRespBody); err != nil { return err } s.w.(*bufio.Writer).Flush() return nil }
TinyRPC的Server TinyRPC的服务端非常简单,把标准库 net/rpc 的 Server 结构包装了一层,其中 ServeCodec 使用的是TinyRPC的编解码器,代码server.go :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type Server struct { *rpc.Server serializer.Serializer } ... func (s *Server) Serve(lis net.Listener) { for { conn, err := lis.Accept() if err != nil { log.Print("tinyrpc.Serve: accept:" , err.Error()) return } go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))) } }
TinyRPC的Client TinyRPC的客户端也很简单,把标准库 net/rpc 的 Client 结构包装了一层,其中 ClientCodec 使用的是TinyRPC的编解码器,代码client.go :
注意:TinyRPC Client使用一种Go语言常用的设计模式:功能选项模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 type Client struct { *rpc.Client } type Option func (o *options) type options struct { compressType compressor.CompressType serializer serializer.Serializer } func WithCompress (c compressor.CompressType) Option { return func (o *options) { o.compressType = c } } func WithSerializer (serializer serializer.Serializer) Option { return func (o *options) { o.serializer = serializer } } func NewClient (conn io.ReadWriteCloser, opts ...Option) *Client { options := options{ compressType: compressor.Raw, serializer: serializer.Proto, } for _, option := range opts { option(&options) } return &Client{rpc.NewClientWithCodec( codec.NewClientCodec(conn, options.compressType, options.serializer))} } func (c *Client) Call(serviceMethod string , args interface {}, reply interface {}) error { return c.Client.Call(serviceMethod, args, reply) } func (c *Client) AsyncCall(serviceMethod string , args interface {}, reply interface {}) chan *rpc.Call { return c.Go(serviceMethod, args, reply, nil ).Done }
作者:马丸子 链接:https://zhuanlan.zhihu.com/p/499098284 来源:知乎