
最近,小编一直在研究 RPC 的原理及实现方式。在本篇文章中将通过用 300 行纯 Golang 编写简单的 RPC 框架来解释 RPC。希望能帮助大家梳理 RPC 相关知识点。
我们通过从头开始在 Golang 中构建一个简单的 RPC 框架来学习 RPC 基础构成。
1 什么是 RPC
简单地说,服务 A 想调用服务 B 的函数。但是这两个服务不在同一个内存空间中。所以不能直接调用它。
因此,为了实现这个调用,我们需要表达如何调用以及如何通过网络传递通信的语义。
让我们考虑一下,当我们可以在相同的内存空间(本地调用)中运行时,我们要怎么做。
type User struct { Name string Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}
func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}
func main() { u , err := QueryUser(8) if err != nil { fmt.Println(err) return }
fmt.Printf("name: %s, age: %d \n", u.Name, u.Age)}
现在我们如何在网络上进行相同的函数调用
客户端将通过网络调用 QueryUser(id int) 函数,并且将有一个服务端提供对该函数的调用,并返回响应 User{“Name”, id}, nil。
2 网络传输数据格式
我们将采用 TLV(定长报头+变长消息体)编码方案来规范 tcp 上的数据传输。稍后会详细介绍
在通过网络发送数据之前,我们需要定义如何通过网络发送数据的结构。
这有助于我们定义一个通用协议,客户端和服务端都可以理解这个协议。(protobuf IDL 定义了服务端和客户端都能理解的内容)。
因此,服务端接收到的数据、要调用的函数名和参数列表,或者来自客户端的数据都需要传递这些参数。
另外,让我们约定第二个返回值的类型为 error,表示 RPC 调用结果。
// RPC数据传输格式type RPCdata struct { Name string // name of the function Args []interface{} // request's or response's body expect error. Err string // Error any executing remote server}
现在我们有了一个格式,我们需要序列化它以便我们可以通过网络发送它。在本例中,我们将使用 go 默认的二进制序列化协议进行编码和解码。
// be sent over the network.func Encode(data RPCdata) ([]byte, error) { var buf bytes.Buffer encoder := gob.NewEncoder(&buf) if err := encoder.Encode(data); err != nil { return nil, err } return buf.Bytes(), nil}
// Decode the binary data into the Go structfunc Decode(b []byte) (RPCdata, error) { buf := bytes.NewBuffer(b) decoder := gob.NewDecoder(buf) var data RPCdata if err := decoder.Decode(&data); err != nil { return Data{}, err } return data, nil}
3 网络传输
选择 TLV 协议的原因是由于其非常容易实现,同时也完成了我们需要识别的数据读取的长度,因为我们需要确定这个请求读取的字节数的传入请求流。发送和接收都执行相同的操作。
// Transport will use TLV protocoltype Transport struct { conn net.Conn // Conn is a generic stream-oriented network connection.}
// NewTransport creates a Transportfunc NewTransport(conn net.Conn) *Transport { return &Transport{conn}}
// Send TLV data over the networkfunc (t *Transport) Send(data []byte) error { // we will need 4 more byte then the len of data // as TLV header is 4bytes and in this header // we will encode how much byte of data // we are sending for this request. buf := make([]byte, 4+len(data)) binary.BigEndian.PutUint32(buf[:4], uint32(len(data))) copy(buf[4:], data) _, err := t.conn.Write(buf) if err != nil { return err } return nil}
// Read TLV sent over the wirefunc (t *Transport) Read() ([]byte, error) { header := make([]byte, 4) _, err := io.ReadFull(t.conn, header) if err != nil { return nil, err } dataLen := binary.BigEndian.Uint32(header) data := make([]byte, dataLen) _, err = io.ReadFull(t.conn, data) if err != nil { return nil, err } return data, nil}
现在我们已经定义了数据格式和传输协议。下面我们还需要 RPC 服务器和 RPC 客户端的实现。
4 RPC 服务器
RPC 服务器将接收具有函数名的 RPCData。因此,我们需要维护和映射包含函数名到实际函数映射的函数
// RPCServer ...type RPCServer struct { addr string funcs map[string] reflect.Value}
// Register the name of the function and its entriesfunc (s *RPCServer) Register(fnName string, fFunc interface{}) { if _,ok := s.funcs[fnName]; ok { return }
s.funcs[fnName] = reflect.ValueOf(fFunc)}
现在我们已经注册了 func,当我们收到请求时,我们将检查函数执行期间传递的 func 的名称是否存在。然后执行相应的操作
// Execute the given function if presentfunc (s *RPCServer) Execute(req RPCdata) RPCdata { // get method by name f, ok := s.funcs[req.Name] if !ok { // since method is not present e := fmt.Sprintf("func %s not Registered", req.Name) log.Println(e) return RPCdata{Name: req.Name, Args: nil, Err: e} }
log.Printf("func %s is called\n", req.Name) // unpackage request arguments inArgs := make([]reflect.Value, len(req.Args)) for i := range req.Args { inArgs[i] = reflect.ValueOf(req.Args[i]) }
// invoke requested method out := f.Call(inArgs) // now since we have followed the function signature style where last argument will be an error // so we will pack the response arguments expect error. resArgs := make([]interface{}, len(out) - 1) for i := 0; i < len(out) - 1; i ++ { // Interface returns the constant value stored in v as an interface{}. resArgs[i] = out[i].Interface() }
// pack error argument var er string if e, ok := out[len(out) - 1].Interface().(error); ok { // convert the error into error string value er = e.Error() } return RPCdata{Name: req.Name, Args: resArgs, Err: er}}
5 RPC 客户端
由于函数的具体实现在服务器端,客户端只有函数的原型,所以我们需要调用函数的完整原型,这样我们才能调用它。
func (c *Client) callRPC(rpcName string, fPtr interface{}) { container := reflect.ValueOf(fPtr).Elem() f := func(req []reflect.Value) []reflect.Value { cReqTransport := NewTransport(c.conn) errorHandler := func(err error) []reflect.Value { outArgs := make([]reflect.Value, container.Type().NumOut()) for i := 0; i < len(outArgs)-1; i++ { outArgs[i] = reflect.Zero(container.Type().Out(i)) } outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem() return outArgs }
// Process input parameters inArgs := make([]interface{}, 0, len(req)) for _, arg := range req { inArgs = append(inArgs, arg.Interface()) }
// ReqRPC reqRPC := RPCdata{Name: rpcName, Args: inArgs} b, err := Encode(reqRPC) if err != nil { panic(err) } err = cReqTransport.Send(b) if err != nil { return errorHandler(err) } // receive response from server rsp, err := cReqTransport.Read() if err != nil { // local network error or decode error return errorHandler(err) } rspDecode, _ := Decode(rsp) if rspDecode.Err != "" { // remote server error return errorHandler(errors.New(rspDecode.Err)) }
if len(rspDecode.Args) == 0 { rspDecode.Args = make([]interface{}, container.Type().NumOut()) } // unpackage response arguments numOut := container.Type().NumOut() outArgs := make([]reflect.Value, numOut) for i := 0; i < numOut; i++ { if i != numOut-1 { // unpackage arguments (except error) if rspDecode.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value outArgs[i] = reflect.Zero(container.Type().Out(i)) } else { outArgs[i] = reflect.ValueOf(rspDecode.Args[i]) } } else { // unpackage error argument outArgs[i] = reflect.Zero(container.Type().Out(i)) } }
return outArgs } container.Set(reflect.MakeFunc(container.Type(), f))}
6 测试一下我们的框架
package main
import ( "encoding/gob" "fmt" "net")
type User struct { Name string Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}
func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}
func main() { // new Type needs to be registered gob.Register(User{}) addr := "localhost:3212" srv := NewServer(addr)
// start server srv.Register("QueryUser", QueryUser) go srv.Run()
// wait for server to start. time.Sleep(1 * time.Second)
// start client conn, err := net.Dial("tcp", addr) if err != nil { panic(err) } cli := NewClient(conn)
var Query func(int) (User, error) cli.callRPC("QueryUser", &Query)
u, err := Query(1) if err != nil { panic(err) } fmt.Println(u)
u2, err := Query(8) if err != nil { panic(err) } fmt.Println(u2)}
执行:go run main.go
输出内容
2019/07/23 20:26:18 func QueryUser is called{Ankur 85}2019/07/23 20:26:18 func QueryUser is called{Ankur Anand 27}
总结
致此我们简单的 RPC 框架就实现完成了,旨在帮大家理解 RPC 的原理及上手简单实践。如果大家对这篇文章中所讲内容有异议,或者想进一步讨论,请留言回复。
本文转载自公众号 360 云计算(ID:hulktalk)。
原文链接:
https://mp.weixin.qq.com/s/fxrocOMLX7kqUH9lP94JpA
更多内容推荐
Echo Server 实战
2022-09-08
2022 第十四届南京国际智慧工地展览会|智慧工地展
2022第十四届南京国际智慧工地展览会|智慧工地展
2022-07-20
42|RESTful Web Services(6):如何处理 JAX-RS 定义的异常类?
欢迎把你的项目代码分享出来。相信经过你的思考与实操,学习效果会更好!
2022-06-14
其它流程控制语句
2022-12-29
兆骑科创创新创业大赛,双创活动,赛事承办,三招三引
兆骑科创创新创业大赛,双创活动,赛事承办,三招三引
2022-08-17
45|RESTful Web Services(9):通过伦敦学派得到的测试风格是什么样的?
伦敦学派与经典学派的差异点在哪里?
2022-06-21
14|基于 Flask 的推荐服务:搭建基础的 Flask 服务
这节课我们先来用Flask搭建一个简单的推荐服务。我们会深入地认识Flask,学习如何使用Flask框架来搭建一个简单的Web服务。
2023-05-17
【云计算】企业上云后需要避免的几个错误
随着云计算的快速发展,越来越多的企业选择了上云。但企业上云后,存在一定的风险,很多刚上云的企业往往是不知的。这里我们结合网络上相关信息,给大家汇总了几个企业上云后需要避免的错误,希望帮助更多企业降低企业上云风险。
2022-07-13
2023 深圳高交会|第二十五届中国国际高新技术展览会
2023深圳高交会|第二十五届中国国际高新技术展览会
2023-07-25
Wallys/3×3/2×2 MIMO/ 802.11ac/ Mini PCIe /2,4GHz / 5GHz QCA 9880
QCA9880 802.11ac Dual bandQCA9880 2x2 2.4G/5G FCC/CE /3×3
2022-07-20
数据通信网络之 OSPFv3 基础
数据通信网络之OSPFv3基础
2023-09-11
golang 中的字符串
在go中rune是一个unicode编码点。 我们都知道UTF-8将字符编码为1-4个字节,比如我们常用的汉字,UTF-8编码为3个字节。所以rune也是int32的别名。
2022-10-22
什么是光网络,几张图就可以很好的解释!
定义:光网络是一种通信网络,用于通过光纤电缆在一端到另一端之间交换信息,它是用于数据通信的最快网络之一。
2022-10-05
云生态大会,随“峰”而来!
7月11日消息,“第五届数字中国建设峰会•云生态大会”将于7月24日在福建福州海峡国际会展中心举办。作为峰会重要组成部分,云生态大会以“共铸国云 智领未来”为主题,旨在助力建设网络强国、数字中国和数字社会,夯实数字技术新基础,激发数字经济新活力,
2022-07-20
低代码开发:助力企业高效实现数字转型的一大利器
随着互联网、移动互联网、物联网等技术的迅速普及和应用,数字经济时代的到来,人们的生产、消费和生活方式都发生了巨大的变化,而传统企业也面临着巨大的挑战和机遇。
2023-03-27
抢先预约 | 阿里云无影云应用线上发布会预约开启
无影云应用即将重磅发布,精彩不容错过!
2022-05-16
34|服务注册与监听:Worker 节点与 etcd 交互
这节课,让我们将Worker节点变为一个支持GRPC与HTTP协议访问的服务,让它最终可以被Master服务和外部服务直接访问。
2022-12-27
如何赋能企业数字化转型?华为云有妙招
近年来,全球疫情的蔓延加速了企业数字化转型的步伐,数字化的浪潮席卷了各行各业。作为一种新型的生产组织方式,“云时代”正在改变人们对传统行业的认知。许多企业在逆流中寻求新的办公和管理模式,混合办公和泛办公模式越来越普遍,每个人的工作、学习和生
2023-09-12
推荐阅读
5.1 快速制作图表方法与图表创意设计(一)
2023-10-17
ETL+BI 结合的数据集成工具
2023-12-08
现身说法:2023 中级程序员进阶之路
2023-11-20
21. 巨大的下沉市场红利 - 以拼多多数字化战略为例
2023-10-17
10. FlinkSQL 的自定义 UDF、UDTF 函数
2023-09-08
降本增效背景下,金融业如何重新审视投入与产出 |InfoQ《超级连麦》
C++ 使用 getline() 从文件中读取一行字符串
后端
电子书

大厂实战PPT下载
换一换 
张雁飞 | Datafuse Labs 联合创始人
王院生 | Apache APISIX PMC 成员
杨军 | 腾讯 IEG 技术运营部 SRE 总监






评论 1 条评论