golang-rpc

概述

RPC(Remote Procedure Call),即远程过程调用。RPC最常见的一种方式是基于http协议的RESTful API,常用于微服务架构中。

RPC 工作原理

RPC的工作原理如图:
RPC调用图
RPC中client中的本地进程可以通过约定的RPC接口调用server的远端进程。其基本步骤为:

  1. 本地进程将请求数据编码为约定的数据类型(json,xml,二进制等等)
  2. 再通过约定的网络协议进行传输(http,http2,tcp,udp等等)
  3. 到了server后,将收到的数据进行解码,然后分发给对应的进程
  4. 对应进程处理完后将响应信息编码
  5. 编码后的响应信息通过约定的网络协议进行传输
  6. client接收到响应后,将响应信息解码
  7. 将解码的响应信息发送给调用的本地进程

RPC 使用场景

RPC的使用可以让不同系统之间进行通信,不同服务间进行通信,横向扩展了计算能力。

golang 的RPC包

RPC包提供对通过网络或其他I / O连接的对象的导出方法的访问。服务器注册一个对象,使其作为服务可见,并具有对象类型的名称。注册后,可以远程访问对象的导出方法。服务器可以注册不同类型的多个对象(服务),但是注册相同类型的多个对象是错误的。
只有满足一定的标准才能够用于远端访问:

  • 导出方法的类型
  • 导出方法
  • 该方法有两个参数
  • 方法的第二个参数是指针
  • 该方法具有返回类型错误

总的来说,方法应该类似func (t *T) MethodName(argType T1, replyType *T2) error,T1和T2可以被encoding/gob编码。即使使用不同的编解码器,这些要求也适用。将来,这些要求可能会因自定义编解码器而变得柔和。
方法的第一个参数表示调用者提供的参数;第二个参数是要返回给调用者的结果参数;方法返回值,如果为空,将作为客户端看到的字符串传回,就好像由errors.New创建的,如果返回错误,则不会将reply参数发送给客户端。
服务器可以通过调用ServeConn来处理单个连接上的请求。更典型的是,它将创建网络侦听器并调用Accept,或者对于HTTP侦听器,调用HandleHTTP和http.Serve。
希望使用该服务的客户端建立连接,然后在连接上调用NewClient。便捷功能Dial(DialHTTP)执行原始网络连接(HTTP连接)的两个步骤。结果Client对象有两个方法,Call和Go,指定要调用的服务和方法,包含参数的指针和接收结果参数的指针。
Call方法等待远程调用完成,而Go方法异步启动调用并使用Call结构的Done通道发出完成信号。除非设置了显式编解码器,否则使用encoding/gob包来传输数据。

Server

Server 结构体

1
2
3
4
5
6
7
8
// Server represents an RPC Server.
type Server struct {
4serviceMap sync.Map // map[string]*service
4reqLock sync.Mutex // protects freeReq
4freeReq *Request
4respLock sync.Mutex // protects freeResp
4freeResp *Response
}

Server 方法

NewServer

1
2
3
4
5
6
7
// NewServer returns a new Server.
func NewServer() *Server {
4return &Server{}
}

// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()

Register

Register 在服务器中发布满足以下条件的接收器值的方法集:

  • 导出类型导出方法
  • 两个参数都是导出类型
  • 第二个参数是指针
  • 返回值为error,如果接受方不是导出类型或没有合适的方法,则返回错误,它还使用包日志记录错误。

客户端使用“Type.Method”形式的字符串访问每个方法,其中Type是接收者的具体类型。

1
2
3
func (server *Server) Register(rcvr interface{}) error {
return server.register(rcvr, "", false)
}

RegisterName

1
2
3
func (server *Server) RegisterName(name string, rcvr interface{}) error {
4return server.register(rcvr, name, true)
}

register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
4s := new(service)
4s.typ = reflect.TypeOf(rcvr)
4s.rcvr = reflect.ValueOf(rcvr)
4sname := reflect.Indirect(s.rcvr).Type().Name()
4if useName {
44sname = name
4}
4if sname == "" {
44s := "rpc.Register: no service name for type " + s.typ.String()
44log.Print(s)
44return errors.New(s)
4}
4if !isExported(sname) && !useName {
44s := "rpc.Register: type " + sname + " is not exported"
44log.Print(s)
44return errors.New(s)
4}
4s.name = sname

4// Install the methods
4s.method = suitableMethods(s.typ, true)

4if len(s.method) == 0 {
44str := ""

44// To help the user, see if a pointer receiver would work.
44method := suitableMethods(reflect.PtrTo(s.typ), false)
44if len(method) != 0 {
444str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
44} else {
444str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
44}
44log.Print(str)
44return errors.New(str)
4}

4if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
44return errors.New("rpc: service already defined: " + sname)
4}
4return nil
}

suitableMethods

suitableMethods 返回适当的典型RPC方法,如果reportErr为true,他将使用log报告错误。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
4methods := make(map[string]*methodType)
4for m := 0; m < typ.NumMethod(); m++ {
44method := typ.Method(m)
44mtype := method.Type
44mname := method.Name
44// Method must be exported.
44if method.PkgPath != "" {
444continue
44}
44// Method needs three ins: receiver, *args, *reply.
44if mtype.NumIn() != 3 {
444if reportErr {
4444log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
444}
444continue
44}
44// First arg need not be a pointer.
44argType := mtype.In(1)
44if !isExportedOrBuiltinType(argType) {
444if reportErr {
4444log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
444}
444continue
44}
44// Second arg must be a pointer.
44replyType := mtype.In(2)
44if replyType.Kind() != reflect.Ptr {
444if reportErr {
4444log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
444}
444continue
44}
44// Reply type must be exported.
44if !isExportedOrBuiltinType(replyType) {
444if reportErr {
4444log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
444}
444continue
44}
44// Method needs one out.
44if mtype.NumOut() != 1 {
444if reportErr {
4444log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
444}
444continue
44}
44// The return type of the method must be error.
44if returnType := mtype.Out(0); returnType != typeOfError {
444if reportErr {
4444log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
444}
444continue
44}
44methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
4}
4return methods
}

sendResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
4resp := server.getResponse()
4// Encode the response header
4resp.ServiceMethod = req.ServiceMethod
4if errmsg != "" {
44resp.Error = errmsg
44reply = invalidRequest
4}
4resp.Seq = req.Seq
4sending.Lock()
4err := codec.WriteResponse(resp, reply)
4if debugLog && err != nil {
44log.Println("rpc: writing response:", err)
4}
4sending.Unlock()
4server.freeResponse(resp)
}

ServeConn

1
2
3
4
5
6
7
8
9
10
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
4buf := bufio.NewWriter(conn)
4srv := &gobServerCodec{
44rwc: conn,
44dec: gob.NewDecoder(conn),
44enc: gob.NewEncoder(buf),
44encBuf: buf,
4}
4server.ServeCodec(srv)
}

ServeCodec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (server *Server) ServeCodec(codec ServerCodec) {
4sending := new(sync.Mutex)
4wg := new(sync.WaitGroup)
4for {
44service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
44if err != nil {
444if debugLog && err != io.EOF {
4444log.Println("rpc:", err)
444}
444if !keepReading {
4444break
444}
444// send a response if we actually managed to read a header.
444if req != nil {
4444server.sendResponse(sending, req, invalidRequest, codec, err.Error())
4444server.freeRequest(req)
444}
444continue
44}
44wg.Add(1)
44go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
4}
4// We've seen that there are no more requests.
4// Wait for responses to be sent before closing codec.
4wg.Wait()
4codec.Close()
}

ServeRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (server *Server) ServeRequest(codec ServerCodec) error {
4sending := new(sync.Mutex)
4service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
4if err != nil {
44if !keepReading {
444return err
44}
44// send a response if we actually managed to read a header.
44if req != nil {
444server.sendResponse(sending, req, invalidRequest, codec, err.Error())
444server.freeRequest(req)
44}
44return err
4}
4service.call(server, sending, nil, mtype, req, argv, replyv, codec)
4return nil
}

Client

jsonrpc