最近,小编一直在研究 RPC 的原理及实现方式。在本篇文章中将通过用 300 行纯 Golang 编写简单的 RPC 框架来解释 RPC。希望能帮助大家梳理 RPC 相关知识点。
我们通过从头开始在 Golang 中构建一个简单的 RPC 框架来学习 RPC 基础构成。
1 什么是 RPC
简单地说,服务 A 想调用服务 B 的函数。但是这两个服务不在同一个内存空间中。所以不能直接调用它。
因此,为了实现这个调用,我们需要表达如何调用以及如何通过网络传递通信的语义。
让我们考虑一下,当我们可以在相同的内存空间(本地调用)中运行时,我们要怎么做。
type User struct {
Name string
Age int
}
var userDB = map[int]User{
1: User{"Ankur", 85},
9: User{"Anand", 25},
8: User{"Ankur Anand", 27},
}
func QueryUser(id int) (User, error) {
if u, ok := userDB[id]; ok {
return u, nil
}
return User{}, fmt.Errorf("id %d not in user db", id)
}
func main() {
u , err := QueryUser(8)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("name: %s, age: %d \n", u.Name, u.Age)
}
现在我们如何在网络上进行相同的函数调用
客户端将通过网络调用 QueryUser(id int) 函数,并且将有一个服务端提供对该函数的调用,并返回响应 User{“Name”, id}, nil。
2 网络传输数据格式
我们将采用 TLV(定长报头+变长消息体)编码方案来规范 tcp 上的数据传输。稍后会详细介绍
在通过网络发送数据之前,我们需要定义如何通过网络发送数据的结构。
这有助于我们定义一个通用协议,客户端和服务端都可以理解这个协议。(protobuf IDL 定义了服务端和客户端都能理解的内容)。
因此,服务端接收到的数据、要调用的函数名和参数列表,或者来自客户端的数据都需要传递这些参数。
另外,让我们约定第二个返回值的类型为 error,表示 RPC 调用结果。
// RPC数据传输格式
type RPCdata struct {
Name string // name of the function
Args []interface{} // request's or response's body expect error.
Err string // Error any executing remote server
}
现在我们有了一个格式,我们需要序列化它以便我们可以通过网络发送它。在本例中,我们将使用 go 默认的二进制序列化协议进行编码和解码。
// be sent over the network.
func Encode(data RPCdata) ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(data); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Decode the binary data into the Go struct
func Decode(b []byte) (RPCdata, error) {
buf := bytes.NewBuffer(b)
decoder := gob.NewDecoder(buf)
var data RPCdata
if err := decoder.Decode(&data); err != nil {
return Data{}, err
}
return data, nil
}
3 网络传输
选择 TLV 协议的原因是由于其非常容易实现,同时也完成了我们需要识别的数据读取的长度,因为我们需要确定这个请求读取的字节数的传入请求流。发送和接收都执行相同的操作。
// Transport will use TLV protocol
type Transport struct {
conn net.Conn // Conn is a generic stream-oriented network connection.
}
// NewTransport creates a Transport
func NewTransport(conn net.Conn) *Transport {
return &Transport{conn}
}
// Send TLV data over the network
func (t *Transport) Send(data []byte) error {
// we will need 4 more byte then the len of data
// as TLV header is 4bytes and in this header
// we will encode how much byte of data
// we are sending for this request.
buf := make([]byte, 4+len(data))
binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
copy(buf[4:], data)
_, err := t.conn.Write(buf)
if err != nil {
return err
}
return nil
}
// Read TLV sent over the wire
func (t *Transport) Read() ([]byte, error) {
header := make([]byte, 4)
_, err := io.ReadFull(t.conn, header)
if err != nil {
return nil, err
}
dataLen := binary.BigEndian.Uint32(header)
data := make([]byte, dataLen)
_, err = io.ReadFull(t.conn, data)
if err != nil {
return nil, err
}
return data, nil
}
现在我们已经定义了数据格式和传输协议。下面我们还需要 RPC 服务器和 RPC 客户端的实现。
4 RPC 服务器
RPC 服务器将接收具有函数名的 RPCData。因此,我们需要维护和映射包含函数名到实际函数映射的函数
// RPCServer ...
type RPCServer struct {
addr string
funcs map[string] reflect.Value
}
// Register the name of the function and its entries
func (s *RPCServer) Register(fnName string, fFunc interface{}) {
if _,ok := s.funcs[fnName]; ok {
return
}
s.funcs[fnName] = reflect.ValueOf(fFunc)
}
现在我们已经注册了 func,当我们收到请求时,我们将检查函数执行期间传递的 func 的名称是否存在。然后执行相应的操作
// Execute the given function if present
func (s *RPCServer) Execute(req RPCdata) RPCdata {
// get method by name
f, ok := s.funcs[req.Name]
if !ok {
// since method is not present
e := fmt.Sprintf("func %s not Registered", req.Name)
log.Println(e)
return RPCdata{Name: req.Name, Args: nil, Err: e}
}
log.Printf("func %s is called\n", req.Name)
// unpackage request arguments
inArgs := make([]reflect.Value, len(req.Args))
for i := range req.Args {
inArgs[i] = reflect.ValueOf(req.Args[i])
}
// invoke requested method
out := f.Call(inArgs)
// now since we have followed the function signature style where last argument will be an error
// so we will pack the response arguments expect error.
resArgs := make([]interface{}, len(out) - 1)
for i := 0; i < len(out) - 1; i ++ {
// Interface returns the constant value stored in v as an interface{}.
resArgs[i] = out[i].Interface()
}
// pack error argument
var er string
if e, ok := out[len(out) - 1].Interface().(error); ok {
// convert the error into error string value
er = e.Error()
}
return RPCdata{Name: req.Name, Args: resArgs, Err: er}
}
5 RPC 客户端
由于函数的具体实现在服务器端,客户端只有函数的原型,所以我们需要调用函数的完整原型,这样我们才能调用它。
func (c *Client) callRPC(rpcName string, fPtr interface{}) {
container := reflect.ValueOf(fPtr).Elem()
f := func(req []reflect.Value) []reflect.Value {
cReqTransport := NewTransport(c.conn)
errorHandler := func(err error) []reflect.Value {
outArgs := make([]reflect.Value, container.Type().NumOut())
for i := 0; i < len(outArgs)-1; i++ {
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()
return outArgs
}
// Process input parameters
inArgs := make([]interface{}, 0, len(req))
for _, arg := range req {
inArgs = append(inArgs, arg.Interface())
}
// ReqRPC
reqRPC := RPCdata{Name: rpcName, Args: inArgs}
b, err := Encode(reqRPC)
if err != nil {
panic(err)
}
err = cReqTransport.Send(b)
if err != nil {
return errorHandler(err)
}
// receive response from server
rsp, err := cReqTransport.Read()
if err != nil { // local network error or decode error
return errorHandler(err)
}
rspDecode, _ := Decode(rsp)
if rspDecode.Err != "" { // remote server error
return errorHandler(errors.New(rspDecode.Err))
}
if len(rspDecode.Args) == 0 {
rspDecode.Args = make([]interface{}, container.Type().NumOut())
}
// unpackage response arguments
numOut := container.Type().NumOut()
outArgs := make([]reflect.Value, numOut)
for i := 0; i < numOut; i++ {
if i != numOut-1 { // unpackage arguments (except error)
if rspDecode.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value
outArgs[i] = reflect.Zero(container.Type().Out(i))
} else {
outArgs[i] = reflect.ValueOf(rspDecode.Args[i])
}
} else { // unpackage error argument
outArgs[i] = reflect.Zero(container.Type().Out(i))
}
}
return outArgs
}
container.Set(reflect.MakeFunc(container.Type(), f))
}
6 测试一下我们的框架
package main
import (
"encoding/gob"
"fmt"
"net"
)
type User struct {
Name string
Age int
}
var userDB = map[int]User{
1: User{"Ankur", 85},
9: User{"Anand", 25},
8: User{"Ankur Anand", 27},
}
func QueryUser(id int) (User, error) {
if u, ok := userDB[id]; ok {
return u, nil
}
return User{}, fmt.Errorf("id %d not in user db", id)
}
func main() {
// new Type needs to be registered
gob.Register(User{})
addr := "localhost:3212"
srv := NewServer(addr)
// start server
srv.Register("QueryUser", QueryUser)
go srv.Run()
// wait for server to start.
time.Sleep(1 * time.Second)
// start client
conn, err := net.Dial("tcp", addr)
if err != nil {
panic(err)
}
cli := NewClient(conn)
var Query func(int) (User, error)
cli.callRPC("QueryUser", &Query)
u, err := Query(1)
if err != nil {
panic(err)
}
fmt.Println(u)
u2, err := Query(8)
if err != nil {
panic(err)
}
fmt.Println(u2)
}
执行:go run main.go
输出内容
2019/07/23 20:26:18 func QueryUser is called
{Ankur 85}
2019/07/23 20:26:18 func QueryUser is called
{Ankur Anand 27}
总结
致此我们简单的 RPC 框架就实现完成了,旨在帮大家理解 RPC 的原理及上手简单实践。如果大家对这篇文章中所讲内容有异议,或者想进一步讨论,请留言回复。
本文转载自公众号 360 云计算(ID:hulktalk)。
原文链接:
https://mp.weixin.qq.com/s/fxrocOMLX7kqUH9lP94JpA
更多内容推荐
Andriod 网络框架 OkHttp 源码解析,总结一下
除了使用上面的直接实例化一个 OkHttp 客户端的方式,我们也可以使用 OkHttpClient 的构建者 OkHttpClient.Builder 来创建 OkHttp 客户端。
2021-11-08
【云计算】企业上云后需要避免的几个错误
随着云计算的快速发展,越来越多的企业选择了上云。但企业上云后,存在一定的风险,很多刚上云的企业往往是不知的。这里我们结合网络上相关信息,给大家汇总了几个企业上云后需要避免的错误,希望帮助更多企业降低企业上云风险。
2022-07-13
云生态大会,随“峰”而来!
7月11日消息,“第五届数字中国建设峰会•云生态大会”将于7月24日在福建福州海峡国际会展中心举办。作为峰会重要组成部分,云生态大会以“共铸国云 智领未来”为主题,旨在助力建设网络强国、数字中国和数字社会,夯实数字技术新基础,激发数字经济新活力,
2022-07-20
14|基于 Flask 的推荐服务:搭建基础的 Flask 服务
这节课我们先来用Flask搭建一个简单的推荐服务。我们会深入地认识Flask,学习如何使用Flask框架来搭建一个简单的Web服务。
2023-05-17
UART
通用异步收发传输器(Universal Asynchronous Receiver/Transmitter,通常称为UART)是一种异步收发传输器,是电脑硬件的一部分,将数据透过串列通信进行传输。UART通常用在与其他通信接口(如EIA RS-232)的连接上。
2022-07-24
Modbus 协议通信异常
@[toc]
2022-07-02
API 请求失败后发生了什么?
当一个API请求失败时,到底发生了什么。
34|服务注册与监听:Worker 节点与 etcd 交互
这节课,让我们将Worker节点变为一个支持GRPC与HTTP协议访问的服务,让它最终可以被Master服务和外部服务直接访问。
2022-12-27
一文理解分布式开发中的服务治理
我们在分布式开发中经常听到的一个词就是“服务治理”。在理解“服务治理”的概念之前让我们先理解什么是分布式系统,分布式系统之间如何通过RPC(Remote Procedure Call,远程过程调用)方式通信,以及如何解决RPC框架存在的问题,这样才能真正地理解服务治
2022-07-22
22|协议编解码:接口调用的数据是如何发到网络中的?
梳理帧格式的概念,借助 client 向 server 发送数据的简单模型案例,引出了 client 与 server 约定了三种类型的协议格式,分别是固定长度、分隔符、定长+变长。
2023-02-06
10 分钟带你彻底搞懂 Dubbo 远程调用本地化的实现原理
对于 RPC 而言,它最基本的功能就是实现远程调用。Dubbo 作为一款非常主流的 RPC 框架,在实现远程调用过程中采用了巧妙的设计方法,确保开发人员能够像使用本地方法一样,来对远程服务发起请求并获取响应结果。那么,Dubbo 是如何做到这一点的呢?这一节课,我将和你来详细分析一下 Dubbo 远程调用的执行流程和实现原理。 讲师介绍 萧亦然,资深技术专家、TGO 鲲鹏会会员、阿里云 MVP、腾讯云 TVP。 内容看点 远程调用与代理机制之间的交互关系 Dubbo 中,远程调用的执行过程
2022-08-19
网络入侵检测系统之 Suricata(十)--ICMP 实现详解
本文介绍了suricata终于icmp协议的解析,icmp攻击防护的实现
2022-09-17
什么是光网络,几张图就可以很好的解释!
定义:光网络是一种通信网络,用于通过光纤电缆在一端到另一端之间交换信息,它是用于数据通信的最快网络之一。
2022-10-05
Java 基础(五)| 方法的定义、调用及重载
⭐本专栏旨在对JAVA的基础语法及知识点进行全面且详细的讲解,完成从0到1的java学习,面向零基础及入门的学习者,通过专栏的学习可以熟练掌握JAVA编程,同时为后续的框架学习,进阶开发的代码能力打下坚实的基础。
2022-10-14
【6.10-6.16】写作社区精彩技术博文回顾
Hello 大家好呀,为了让更多的优质的内容和创作者被看见,我们决定不定时地向大家推荐近期优质的社区文章和新入驻的创作者们。
2022-06-17
鉴释加入龙蜥社区,助力开源生态建设
近日,鉴释科技(深圳)有限公司(以下简称:“鉴释”)签署了 CLA(Contribution License Agreement,贡献者许可协议),正式加入龙蜥社区(OpenAnolis)。
2022-01-26
34|WebSocket:如何在消息队列内核中支持 WebSocket?
WebSocket 是一种实时协议,它在单个 TCP 连接上提供持久的全双工通信。
2023-09-06
4. 并发编程、文件操作与泛型
2023-09-25
推荐阅读
09. 高效沟通的第三只拦路虎——表达不清
2023-10-17
通过 TCP/IP 每分钟发送数十亿条消息
「Go 框架」bind 函数:gin 框架中是如何绑定请求数据的?
2023-03-22
3、RestTemplate 实现原理剖析
2023-09-26
sip 中继的内容介绍
2023-10-30
02|初出茅庐:构造一个极简的 HttpServer
2023-12-11
为什么你的下一个 API 应该是 GraphQL 而不是 REST
电子书
大厂实战PPT下载
换一换 娄一翎 博士 | 复旦大学 计算机科学技术学院 青年副研究员
吴丹武(魁武) | 阿里巴巴 大淘宝泛端架构部 前端开发技术专家
李刚 | 阿里巴巴 高级技术专家
评论 1 条评论