使用go实现一个简单的高性能RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式。RPC调用的原理其实很简单,它类似于三层构架的C/S系统,第三方的客户程序通过接口调用RPC内部的标准或自定义函数,获得函数返回的数据进行处理后显示或打印。

TinyRPC 是基于Go语言标准库 net/rpc 扩展的远程过程调用框架,它具有以下特性:

  • 基于TCP传输层协议
  • 支持多种压缩格式:gzip、snappy、zlib;
  • 基于二进制的 Protocol Buffer 序列化协议:具有协议编码小及高扩展性和跨平台性;
  • 支持生成工具:TinyRPC提供的 protoc-gen-tinyrpc 插件可以帮助开发者快速定义自己的服务;
  • 支持自定义序列化器

TinyRPC 的源代码仅有一千行左右,通过学习 TinyRPC ,开发者可以得到以下收获:

  • 代码简洁规范
  • 涵盖大多数 Go 语言基础用法和高级特性
  • 单元测试编写技巧
  • TCP流中处理数据包的技巧
  • RPC框架的设计理念

基于TCP的TinyRPC协议

在TinyRPC中,请求消息由TinyRPC客户端的应用程序发出,在TCP的字节流中,请求消息分为三部分:

  • 由可变长量编码的 uint 类型用来标识请求头的长度;
  • 基于自定义协议编码的请求头部信息
  • 基于 Protocol Buffer 协议编码的请求体,见图所示:

在TinyRPC中,响应消息由TinyRPC服务端的应用程序响应,在TCP的字节流中,响应消息分为三部分:

  • 由可变长量编码的 uint 类型用来标识响应头的长度;
  • 基于自定义协议编码的响应头部信息
  • 基于 Protocol Buffer 协议编码的响应体,见图所示:

其中ID为RPC调用的序号,以便在并发调用时,客户端根据响应的ID序号来判断RPC的调用结果;

Error message为调用时发生错误的消息,若该内容为空则表示未出现RPC调用错误;

在请求I/O流中,请求体(Request Body)表示RPC的参数内容;而在响应I/O流中,响应体(Response Body)则表示RPC调用的结果,这些Body在TinyRPC中均采用 Protocol Buffer 协议编码。

请求头部消息编码

由于TinyRPC的请求头部是自定义协议编码的,我们可以查看文件header/header.go了解它的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// CompressType type of compressions supported by rpc
type CompressType uint16

// RequestHeader request header structure looks like:
// +--------------+----------------+----------+------------+----------+
// | CompressType | Method | ID | RequestLen | Checksum |
// +--------------+----------------+----------+------------+----------+
// | uint16 | uvarint+string | uvarint | uvarint | uint32 |
// +--------------+----------------+----------+------------+----------+
type RequestHeader struct {
sync.RWMutex
CompressType CompressType // 它表示RPC的协议内容的压缩类型,TinyRPC支持四种压缩类型,Raw、Gzip、Snappy、Zlib
Method string // 方法名
ID uint64 // 请求ID
RequestLen uint32 // 请求体长度
Checksum uint32 // 请求体校验 使用CRC32摘要算法
}

其中 RequestHeader 的编解码过程如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

// Marshal will encode request header into a byte slice
func (r *RequestHeader) Marshal() []byte {
r.RLock()
defer r.RUnlock()
idx := 0
header := make([]byte, MaxHeaderSize+len(r.Method))
// 写入uint16类型的压缩类型
binary.LittleEndian.PutUint16(header[idx:], uint16(r.CompressType))
idx += Uint16Size

idx += writeString(header[idx:], r.Method)
idx += binary.PutUvarint(header[idx:], r.ID) // 写入uvarint类型的请求ID号
idx += binary.PutUvarint(header[idx:], uint64(r.RequestLen)) // 写入uvarint类型的请求体长度

binary.LittleEndian.PutUint32(header[idx:], r.Checksum) // 写入uvarint类型的校验码
idx += Uint32Size
return header[:idx]
}

// Unmarshal will decode request header into a byte slice
func (r *RequestHeader) Unmarshal(data []byte) (err error) {
r.Lock()
defer r.Unlock()
if len(data) == 0 {
return UnmarshalError
}

defer func() {
if r := recover(); r != nil {
err = UnmarshalError
}
}()
idx, size := 0, 0
r.CompressType = CompressType(binary.LittleEndian.Uint16(data[idx:]))
idx += Uint16Size // 读取uint16类型的压缩类型

r.Method, size = readString(data[idx:])
idx += size

r.ID, size = binary.Uvarint(data[idx:]) // 读取uvarint类型的请求ID号
idx += size

length, size := binary.Uvarint(data[idx:]) // 读取uvarint类型的请求体长度
r.RequestLen = uint32(length)
idx += size

r.Checksum = binary.LittleEndian.Uint32(data[idx:]) // 读取uvarint类型的校验码
return
}

func readString(data []byte) (string, int) {
idx := 0
length, size := binary.Uvarint(data) // 读取一个uvarint类型表示字符的长度
idx += size
str := string(data[idx : idx+int(length)])
idx += len(str)
return str, idx
}

func writeString(data []byte, str string) int {
idx := 0
idx += binary.PutUvarint(data, uint64(len(str))) // 写入一个uvarint类型表示字符长度
copy(data[idx:], str)
idx += len(str)
return idx
}

响应头部消息编码

由于TinyRPC的响应头部是自定义协议编码的,我们可以查看文件header/header.go了解它的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ResponseHeader request header structure looks like:
// +--------------+---------+----------------+-------------+----------+
// | CompressType | ID | Error | ResponseLen | Checksum |
// +--------------+---------+----------------+-------------+----------+
// | uint16 | uvarint | uvarint+string | uvarint | uint32 |
// +--------------+---------+----------------+-------------+----------+
type ResponseHeader struct {
sync.RWMutex
CompressType CompressType // 压缩类型
ID uint64 // 响应ID号
Error string // 错误信息
ResponseLen uint32 // 响应体长度
Checksum uint32 // 响应体校验码
}

其中 ResponseHeader 的编解码过程如下所示,与RequestHeader 的编解码过程类似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

// Marshal will encode response header into a byte slice
func (r *ResponseHeader) Marshal() []byte {
r.RLock()
defer r.RUnlock()
idx := 0
header := make([]byte, MaxHeaderSize+len(r.Error))

binary.LittleEndian.PutUint16(header[idx:], uint16(r.CompressType))
idx += Uint16Size

idx += binary.PutUvarint(header[idx:], r.ID)
idx += writeString(header[idx:], r.Error)
idx += binary.PutUvarint(header[idx:], uint64(r.ResponseLen))

binary.LittleEndian.PutUint32(header[idx:], r.Checksum)
idx += Uint32Size
return header[:idx]
}

// Unmarshal will decode response header into a byte slice
func (r *ResponseHeader) Unmarshal(data []byte) (err error) {
r.Lock()
defer r.Unlock()
if len(data) == 0 {
return UnmarshalError
}

defer func() {
if r := recover(); r != nil {
err = UnmarshalError
}
}()
idx, size := 0, 0
r.CompressType = CompressType(binary.LittleEndian.Uint16(data[idx:]))
idx += Uint16Size

r.ID, size = binary.Uvarint(data[idx:])
idx += size

r.Error, size = readString(data[idx:])
idx += size

length, size := binary.Uvarint(data[idx:])
r.ResponseLen = uint32(length)
idx += size

r.Checksum = binary.LittleEndian.Uint32(data[idx:])
return
}

头部消息对象池

为了减少创建请求头部对象 RequestHeader 和响应头部对象 ResponseHeader 的次数我们通过为这两个结构体建立对象池,以便可以进行复用。

同时我们为 RequestHeaderResponseHeader 都实现了ResetHeader方法,当每次使用完这些对象时,我们调用ResetHeader让结构体内容初始化,随后再把它们丢回对象池里。

代码 header/pool.go 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

package header

import "sync"

var (
RequestPool sync.Pool
ResponsePool sync.Pool
)

func init() {
RequestPool = sync.Pool{New: func() any {
return &RequestHeader{}
}}
ResponsePool = sync.Pool{New: func() any {
return &ResponseHeader{}
}}
}

// ResetHeader reset request header
func (h *RequestHeader) ResetHeader() {
h.Id = 0
h.Checksum = 0
h.Method = ""
h.CompressType = 0
h.RequestLen = 0
}

// ResetHeader reset response header
func (h *ResponseHeader) ResetHeader() {
h.Error = ""
h.Id = 0
h.CompressType = 0
h.Checksum = 0
h.ResponseLen = 0
}

IO操作

TinyRPC的IO操作函数在codec/io.go中,其中 sendFrame 函数会向IO中写入uvarint类型的 size ,表示要发送数据的长度,随后将该字节slice类型的数据 data 写入IO流中。

  • 若写入数据的长度为 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 0 值;
  • 若写入数据的长度大于 0 ,此时sendFrame 函数会向IO流写入uvarint类型的 len(data) 值,随后将该字节串的数据 data 写入IO流中。

代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func sendFrame(w io.Writer, data []byte) (err error) {
var size [binary.MaxVarintLen64]byte

if data == nil || len(data) == 0 {
n := binary.PutUvarint(size[:], uint64(0))
if err = write(w, size[:n]); err != nil {
return
}
return
}

n := binary.PutUvarint(size[:], uint64(len(data)))
if err = write(w, size[:n]); err != nil {
return
}
if err = write(w, data); err != nil {
return
}
return
}

func write(w io.Writer, data []byte) error {
for index := 0; index < len(data); {
n, err := w.Write(data[index:])
if _, ok := err.(net.Error); !ok {
return err
}
index += n
}
return nil
}

recvFrame 函数与sendFrame 函数类似,首先会向IO中读入uvarint类型的 size ,表示要接收数据的长度,随后将该从IO流中读取该 size 长度字节串。

注意,由于 codec 层会传入一个bufio类型的结构体,bufio类型实现了有缓冲的IO操作,以便减少IO在用户态与内核态拷贝的次数。

  • recvFrame 函数从IO流读取uvarint类型的 size 值大于0,随后 recvFrame 将该从IO流中读取该 size 长度字节串。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func recvFrame(r io.Reader) (data []byte, err error) {
size, err := binary.ReadUvarint(r.(io.ByteReader))
if err != nil {
return nil, err
}
if size != 0 {
data = make([]byte, size)
if err = read(r, data); err != nil {
return nil, err
}
}
return data, nil
}

func read(r io.Reader, data []byte) error {
for index := 0; index < len(data); {
n, err := r.Read(data[index:])
if err != nil {
if _, ok := err.(net.Error); !ok {
return err
}
}
index += n
}
return nil
}

TinyRPC的压缩器

TinyRPC的压缩器代码部分很短,RawCompressor、GzipCompressor、SnappyCompressor、ZlibCompressor压缩器均实现了Compressor 接口,代码compressor/compressor.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type CompressType int32

const (
Raw CompressType = iota
Gzip
Snappy
Zlib
)
// Compressors 四种压缩器的实现
var Compressors = map[CompressType]Compressor{
Raw: RawCompressor{},
Gzip: GzipCompressor{},
Snappy: SnappyCompressor{},
Zlib: ZlibCompressor{},
}
// Compressor 压缩器接口
type Compressor interface {
Zip([]byte) ([]byte, error)
Unzip([]byte) ([]byte, error)
}

TinyRPC的序列化器

TinyRPC的序列化器的代码部分也很短,ProtoSerializer实现了Serializer接口,它是基于Protocol Buffer的序列化协议,代码serializer/serializer.go

1
2
3
4
5
6
type SerializeType int32

type Serializer interface {
Marshal(message any) ([]byte, error)
Unmarshal(data []byte, message any) error
}

实现ClientCodec接口

由于TinyRPC是基于标准库net/rpc扩展的,所以TinyRPC在codec层需要实现net/rpcClientCodec接口,我们先看看ClientCodec的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 文件 src/net/rpc/server.go

type ClientCodec interface {
WriteRequest(*Request, any) error
ReadResponseHeader(*Response) error
ReadResponseBody(any) error

Close() error
}
// Request 标准库里的请求体结构
type Request struct {
ServiceMethod string
Seq uint64
next *Request
}
// Response 标准库里的响应结构
type Response struct {
ServiceMethod string
Seq uint64
Error string
next *Response
}

其中ClientCodec接口包括写请求读响应头部读响应体,我们建立一个clientCode的结构体用来实现ClientCodec接口:

代码 codec/client.go 如下:

1
2
3
4
5
6
7
8
9
10
11
type clientCodec struct {
r io.Reader
w io.Writer
c io.Closer

compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
serializer serializer.Serializer
response header.ResponseHeader // rpc response header
mutex sync.Mutex // protect pending map
pending map[uint64]string
}

其中 compressor 表示压缩类型,serializer 表示使用的序列化器,response 是响应的头部,mutex 是用于保护 pending 的互斥锁;

1
2
3
4
5
6
7
8
9
10
11
12
13
// NewClientCodec Create a new client codec
func NewClientCodec(conn io.ReadWriteCloser,
compressType compressor.CompressType, serializer serializer.Serializer) rpc.ClientCodec {

return &clientCodec{
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),
c: conn,
compressor: compressType,
serializer: serializer,
pending: make(map[uint64]string),
}
}

这里的读写IO分别使用 bufio.NewReaderbufio.NewWriter 构造,通过缓冲IO来提高RPC的读写性能;

首先 clientCode 结构体实现了 ClientCodec 接口的WriteRequest 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (c *clientCodec) WriteRequest(r *rpc.Request, param any) error {
c.mutex.Lock()
c.pending[r.Seq] = r.ServiceMethod
c.mutex.Unlock()
// 判断压缩器是否存在
if _, ok := compressor.Compressors[c.compressor]; !ok {
return NotFoundCompressorError
}
reqBody, err := c.serializer.Marshal(param) // 用序列化器进行编码
if err != nil {
return err
}
// 压缩
compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
if err != nil {
return err
}
// 从请求头部对象池取出请求头
h := header.RequestPool.Get().(*header.RequestHeader)
defer func() {
h.ResetHeader()
header.RequestPool.Put(h)
}()
h.ID = r.Seq
h.Method = r.ServiceMethod
h.RequestLen = uint32(len(compressedReqBody))
h.CompressType = header.CompressType(c.compressor)
h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
// 发送请求头
if err := sendFrame(c.w, h.Marshal()); err != nil {
return err
}
// 发送请求体
if err := write(c.w, compressedReqBody); err != nil {
return err
}

c.w.(*bufio.Writer).Flush()
return nil
}

实现 *ClientCodec* 接口的 *ReadResponseHeader* 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
c.response.ResetHeader() // 重置clientCodec的响应头部
data, err := recvFrame(c.r) // 读取请求头字节串
if err != nil {
return err
}
err = c.response.Unmarshal(data) // 用序列化器继续解码
if err != nil {
return err
}
c.mutex.Lock()
r.Seq = c.response.ID // 填充 r.Seq
r.Error = c.response.Error // 填充 r.Error
r.ServiceMethod = c.pending[r.Seq] // 根据序号填充 r.ServiceMethod
delete(c.pending, r.Seq) // 删除pending里的序号
c.mutex.Unlock()
return nil
}

实现 *ClientCodec* 接口的 *ReadResponseBody* 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

func (c *clientCodec) ReadResponseBody(param any) error {
if param == nil {
if c.response.ResponseLen != 0 { // 废弃多余部分
if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
return err
}
}
return nil
}
// 根据响应体长度,读取该长度的字节串
respBody := make([]byte, c.response.ResponseLen)
err := read(c.r, respBody)
if err != nil {
return err
}
// 校验
if c.response.Checksum != 0 {
if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
return UnexpectedChecksumError
}
}
// 判断压缩器是否存在
if _, ok := compressor.Compressors[c.response.GetCompressType()]; !ok {
return NotFoundCompressorError
}
// 解压
resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
if err != nil {
return err
}
// 反序列化
return c.serializer.Unmarshal(resp, param)
}

实现ServerCodec接口

TinyRPC在codec层还需要实现net/rpcServerCodec接口

ServerCodec 的接口和 ClientCodec 接口十分类似:

1
2
3
4
5
6
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(any) error
WriteResponse(*Response, any) error
Close() error
}

其中 ServerCodec 接口包括写响应读请求头部读请求体,我们建立一个 serverCodec 的结构体用来实现 ServerCodec 接口,代码codec/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type serverCodec struct {
r io.Reader
w io.Writer
c io.Closer

request header.RequestHeader
serializer serializer.Serializer
mutex sync.Mutex // protects seq, pending
seq uint64
pending map[uint64]uint64
}

// NewServerCodec Create a new server codec
func NewServerCodec(conn io.ReadWriteCloser, serializer serializer.Serializer) rpc.ServerCodec {
return &serverCodec{
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),
c: conn,
serializer: serializer,
pending: make(map[uint64]uint64),
}
}

是不是和刚才的 clientCode 结构体神似

首先, serverCodec 结构体实现了 ServerCodec 接口的 ReadRequestHeader方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
s.request.ResetHeader() // 重置serverCodec结构体的请求头部
data, err := recvFrame(s.r) // 读取请求头部字节串
if err != nil {
return err
}
err = s.request.Unmarshal(data) //将字节串反序列化
if err != nil {
return err
}
s.mutex.Lock()
s.seq++ // 序号自增
s.pending[s.seq] = s.request.ID // 自增序号与请求头部的ID进行绑定
r.ServiceMethod = s.request.Method // 填充 r.ServiceMethod
r.Seq = s.seq// 填充 r.Seq
s.mutex.Unlock()
return nil
}

实现 ServerCodec 接口的 ReadRequestBody 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (s *serverCodec) ReadRequestBody(x any) error {
if param == nil {
if s.request.RequestLen != 0 { // 废弃多余部分
if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
return err
}
}
return nil
}

reqBody := make([]byte, s.request.RequestLen)
// 根据请求体的大小,读取该大小的字节串
err := read(s.r, reqBody)
if err != nil {
return err
}
// 校验
if s.request.Checksum != 0 {
if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
return UnexpectedChecksumError
}
}
// 判断压缩器是否存在
if _, ok := compressor.
Compressors[s.request.GetCompressType()]; !ok {
return NotFoundCompressorError
}
// 解压
req, err := compressor.
Compressors[s.request.GetCompressType()].Unzip(reqBody)
if err != nil {
return err
}
// 把字节串反序列化
return s.serializer.Unmarshal(req, param)
}

实现 ServerCodec 接口的 WriteResponse 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (s *serverCodec) WriteResponse(r *rpc.Response, param any) error {
s.mutex.Lock()
id, ok := s.pending[r.Seq]
if !ok {
s.mutex.Unlock()
return InvalidSequenceError
}
delete(s.pending, r.Seq)
s.mutex.Unlock()

if r.Error != "" { // 如果RPC调用结果有误,把param置为nil
param = nil
}
// 判断压缩器是否存在
if _, ok := compressor.
Compressors[s.request.GetCompressType()]; !ok {
return NotFoundCompressorError
}

var respBody []byte
var err error
if param != nil {
respBody, err = s.serializer.Marshal(param) // 反序列化
if err != nil {
return err
}
}
// 压缩
compressedRespBody, err := compressor.
Compressors[s.request.GetCompressType()].Zip(respBody)
if err != nil {
return err
}
h := header.ResponsePool.Get().(*header.ResponseHeader)
defer func() {
h.ResetHeader()
header.ResponsePool.Put(h)
}()
h.ID = id
h.Error = r.Error
h.ResponseLen = uint32(len(compressedRespBody))
h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
h.CompressType = s.request.CompressType
// 发送响应头
if err = sendFrame(s.w, h.Marshal()); err != nil {
return err
}
// 发送响应体
if err = write(s.w, compressedRespBody); err != nil {
return err
}
s.w.(*bufio.Writer).Flush()
return nil
}

TinyRPC的Server

TinyRPC的服务端非常简单,把标准库 net/rpcServer 结构包装了一层,其中 ServeCodec 使用的是TinyRPC的编解码器,代码server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Server struct {
*rpc.Server
serializer.Serializer
}

...

func (s *Server) Serve(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("tinyrpc.Serve: accept:", err.Error())
return
}
go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))) // 使用TinyRPC的解码器
}
}

TinyRPC的Client

TinyRPC的客户端也很简单,把标准库 net/rpcClient 结构包装了一层,其中 ClientCodec 使用的是TinyRPC的编解码器,代码client.go

注意:TinyRPC Client使用一种Go语言常用的设计模式:功能选项模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Client rpc client based on net/rpc implementation
type Client struct {
*rpc.Client
}

//Option provides options for rpc
type Option func(o *options)

type options struct {
compressType compressor.CompressType
serializer serializer.Serializer
}

// WithCompress set client compression format
func WithCompress(c compressor.CompressType) Option {
return func(o *options) {
o.compressType = c
}
}

// WithSerializer set client serializer
func WithSerializer(serializer serializer.Serializer) Option {
return func(o *options) {
o.serializer = serializer
}
}

// NewClient Create a new rpc client
func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
options := options{
compressType: compressor.Raw,
serializer: serializer.Proto,
}
for _, option := range opts {
option(&options)
}
return &Client{rpc.NewClientWithCodec(
codec.NewClientCodec(conn, options.compressType, options.serializer))}
}

// Call synchronously calls the rpc function
func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
return c.Client.Call(serviceMethod, args, reply)
}

// AsyncCall asynchronously calls the rpc function and returns a channel of *rpc.Call
func (c *Client) AsyncCall(serviceMethod string, args interface{}, reply interface{}) chan *rpc.Call {
return c.Go(serviceMethod, args, reply, nil).Done
}

作者:马丸子
链接:https://zhuanlan.zhihu.com/p/499098284
来源:知乎