日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

万物互联之~RPC专栏

發布時間:2023/12/9 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 万物互联之~RPC专栏 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

3.RPC引入

上篇回顧:萬物互聯之~深入篇

Code:https://github.com/lotapp/BaseCode/tree/master/python/6.net/6.rpc/

其他專欄最新篇:協程加強之~兼容答疑篇 | 聊聊數據庫~SQL環境篇

3.1.概念

RPC(Remote Procedure Call):分布式系統常見的一種通信方法(遠程過程調用),通俗講:可以一臺計算機的程序調用另一臺計算機的子程序(可以把它看成之前我們說的進程間通信,只不過這一次的進程不在同一臺PC上了)

PS:RPC的設計思想是力圖使遠程調用中的通訊細節對于使用者透明,調用雙方無需關心網絡通訊的具體實現

引用一張網上的圖:

和HTTP有點相似,你可以這樣理解:

  • 老版本的HTTP/1.0是短鏈接,而RPC是長連接進行通信
    • HTTP協議(header、body),RPC可以采取HTTP協議,也可以自定義二進制格式
  • 后來HTTP/1.1支持了長連接(Connection:keep-alive),基本上和RPC差不多了
    • keep-alive一般都限制有最長時間,或者最多處理的請求數,而RPC是基于長連接的,基本上沒有這個限制
  • 后來谷歌直接基于HTTP/2.0建立了gRPC,它們之間的基本上也就差不多了
    • 如果硬是要區分就是:HTTP-普通話RPC-方言的區別了
    • RPC高效而小眾,HTTP效率沒RPC高,但更通用
  • PS:RPC和HTTP調用不用經過中間件,而是端到端的直接數據交互
    • 網絡交互可以理解為基于Socket實現的(RPC、HTTP都是Socket的讀寫操作)
  • 簡單概括一下RPC的優缺點就是:

  • 優點:
  • 效率更高(可以自定義二進制格式)
  • 發起RPC調用的一方,在編寫代碼時可忽略RPC的具體實現(跟編寫本地函數調用一般
  • 缺點:
    • 通用性不如HTTP(方言普及程度肯定不如普通話),如果傳輸協議不是HTTP協議格式,調用雙方就需要專門實現通信庫
  • PS:HTTP更多是Client與Server的通訊;RPC更多是內部服務器間的通訊

    3.2.引入

    上面說這么多,可能還沒有來個案例實在,我們看個案例:

    本地調用sum()

    def sum(a, b):"""return a+b"""return a + bdef main():result = sum(1, 2)print(f"1+2={result}")if __name__ == "__main__":main()

    輸出:(這個大家都知道)

    1+2=3

    1.xmlrpc案例

    官方文檔:

    https://docs.python.org/3/library/xmlrpc.client.html https://docs.python.org/3/library/xmlrpc.server.html

    都說RPC用起來就像本地調用一樣,那么用起來啥樣呢?看個案例:

    服務端:(CentOS7:192.168.36.123:50051)

    from xmlrpc.server import SimpleXMLRPCServerdef sum(a, b):"""return a+b"""return a + b# PS:50051是gRPC默認端口 server = SimpleXMLRPCServer(('', 50051)) # 把函數注冊到RPC服務器中 server.register_function(sum) print("Server啟動ing,Port:50051") server.serve_forever()

    客戶端:(Win10:192.168.36.144

    from xmlrpc.client import ServerProxystub = ServerProxy("http://192.168.36.123:50051") result = stub.sum(1, 2) print(f"1+2={result}")

    輸出:(Client用起來是不是和本地差不多?就是通過代理訪問了下RPCServer而已)

    1+2=3

    PS:CentOS服務器不是你綁定個端口就一定能訪問的,如果不能記讓防火墻開放對應的端口

    這個之前在說MariaDB環境的時候有詳細說:https://www.cnblogs.com/dotnetcrazy/p/9887708.html#_map4

    # 添加 --permanent永久生效(沒有此參數重啟后失效) firewall-cmd --zone=public --add-port=80/tcp --permanent

    2.ZeroRPC案例:

    zeroRPC用起來和這個差不多,也簡單舉個例子吧:

    把服務的某個方法注冊到RPCServer中,供外部服務調用

    import zerorpcclass Test(object):def say_hi(self, name):return f"Hi,My Name is{name}"# 注冊一個Test的實例 server = zerorpc.Server(Test()) server.bind("tcp://0.0.0.0:50051") server.run()

    調用服務端代碼

    import zerorpcclient = zerorpc.Client("tcp://192.168.36.123:50051") result = client.say_hi("RPC") print(result)

    3.3.簡單版自定義RPC

    看了上面的引入案例,是不是感覺RPC不過如此?NoNoNo,要是真這么簡單也就談不上RPC架構了,上面兩個是最簡單的RPC服務了,可以這么說:生產環境基本上用不到,只能當案例練習罷了,對Python來說,最常用的RPC就兩個gRPC and Thrift

    PS:國產最出名的是Dubbo and Tars,Net最常用的是gRPC、Thrift、Surging

    1.RPC服務的流程

    要自己實現一個RPC Server那么就得了解整個流程了:

  • Client(調用者)以本地調用的方式發起調用
  • 通過RPC服務進行遠程過程調用(RPC的目標就是要把這些步驟都封裝起來,讓使用者感覺不到這個過程)
  • 客戶端的RPC Proxy組件收到調用后,負責將被調用的方法名、參數等打包編碼成自定義的協議
  • 客戶端的RPC Proxy組件在打包完成后通過網絡把數據包發送給RPC Server
  • 服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數
  • 服務端的RPC Proxy組件根據方法名和參數進行本地調用
  • RPC Server(被調用者)本地執行后將結果返回給服務端的RPC Proxy
  • 服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,并通過網絡發送給客戶端的RPC Proxy組件
  • 客戶端的RPC Proxy組件收到數據包后,進行拆包解碼,把數據返回給Client
  • Client(調用者)得到本次RPC調用的返回結果
  • 用一張時序圖來描述下整個過程:

    PS:RPC Proxy有時候也叫Stub(存根):(Client Stub,Server Stub)

    為屏蔽客戶調用遠程主機上的對象,必須提供某種方式來模擬本地對象,這種本地對象稱為存根(stub),存根負責接收本地方法調用,并將它們委派給各自的具體實現對象

    PRC服務實現的過程中其實就兩核心點:

  • 消息協議:客戶端調用的參數和服務端的返回值這些在網絡上傳輸的數據以何種方式打包編碼和拆包解碼
    • 經典代表:Protocol Buffers
  • 傳輸控制:在網絡中數據的收發傳輸控制具體如何實現(TCP/UDP/HTTP)
  • 2.手寫RPC

    下面我們就根據上面的流程來手寫一個簡單的RPC:

    1.Client調用:

    # client.py from client_stub import ClientStubdef main():stub = ClientStub(("192.168.36.144", 50051))result = stub.get("sum", (1, 2))print(f"1+2={result}")result = stub.get("sum", (1.1, 2))print(f"1.1+2={result}")time_str = stub.get("get_time")print(time_str)if __name__ == "__main__":main()

    輸出:

    1+2=3 1.1+2=3.1 Wed Jan 16 22

    2.Client Stub,客戶端存根:(主要有打包、解包、和RPC服務器通信的方法)

    # client_stub.py import socketclass ClientStub(object):def __init__(self, address):"""address ==> (ip,port)"""self.socket = socket.socket()self.socket.connect(address)def convert(self, obj):"""根據類型轉換成對應的類型編號"""if isinstance(obj, int):return 1if isinstance(obj, float):return 2if isinstance(obj, str):return 3def pack(self, func, args):"""打包:把方法和參數拼接成自定義的協議格式:func:函數名@params:類型-參數,類型2-參數2..."""result = f"func:{func}"if args:params = ""# params:類型-參數,類型2-參數2...for item in args:params += f"{self.convert(item)}-{item},"# 去除最后一個,result += f"@params:{params[:-1]}"# print(result) # log 輸出return result.encode("utf-8")def unpack(self, data):"""解包:獲取返回結果"""msg = data.decode("utf-8")# 格式應該是"data:xxxx"params = msg.split(":")if len(params) > 1:return params[1]return Nonedef get(self, func, args=None):"""1.客戶端的RPC Proxy組件收到調用后,負責將被調用的方法名、參數等打包編碼成自定義的協議"""data = self.pack(func, args)# 2.客戶端的RPC Proxy組件在打包完成后通過網絡把數據包發送給RPC Serverself.socket.send(data)# 等待服務端返回結果data = self.socket.recv(2048)if data:return self.unpack(data)return None

    簡要說明下:(我根據流程在Code里面標注了,看起來應該很輕松)

    之前有說到核心其實就是消息協議and傳輸控制,我客戶端存根的消息協議是自定義的格式(后面會說簡化方案):func:函數名@params:類型-參數,類型2-參數2...,傳輸我是基于TCP進行了簡單的封裝


    3.Server端:(實現很簡單)

    # server.py import socket from server_stub import ServerStubclass RPCServer(object):def __init__(self, address, mycode):self.mycode = mycode# 服務端存根(RPC Proxy)self.server_stub = ServerStub(mycode)# TCP Socketself.socket = socket.socket()# 端口復用self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 綁定端口self.socket.bind(address)def run(self):self.socket.listen()while True:# 等待客戶端連接client_socket, client_addr = self.socket.accept()print(f"來自{client_addr}的請求:\n")# 交給服務端存根(Server Proxy)處理self.server_stub.handle(client_socket, client_addr)if __name__ == "__main__":from server_code import MyCodeserver = RPCServer(('', 50051), MyCode())print("Server啟動ing,Port:50051")server.run()

    為了簡潔,服務端代碼我單獨放在了server_code.py中:

    # 5.RPC Server(被調用者)本地執行后將結果返回給服務端的RPC Proxy class MyCode(object):def sum(self, a, b):return a + bdef get_time(self):import timereturn time.ctime()

    4.然后再看看重頭戲Server Stub:

    # server_stub.py import socketclass ServerStub(object):def __init__(self, mycode):self.mycode = mycodedef convert(self, num, obj):"""根據類型編號轉換類型"""if num == "1":obj = int(obj)if num == "2":obj = float(obj)if num == "3":obj = str(obj)return objdef unpack(self, data):"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""msg = data.decode("utf-8")# 格式應該是"格式:func:函數名@params:類型編號-參數,類型編號2-參數2..."array = msg.split("@")func = array[0].split(":")[1]if len(array) > 1:args = list()for item in array[1].split(":")[1].split(","):temps = item.split("-")# 類型轉換args.append(self.convert(temps[0], temps[1]))return (func, tuple(args)) # (func,args)return (func, )def pack(self, result):"""打包:把方法和參數拼接成自定義的協議"""# 格式:"data:返回值"return f"data:{result}".encode("utf-8")def exec(self, func, args=None):"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""# 如果沒有這個方法則返回Nonefunc = getattr(self.mycode, func, None)if args:return func(*args) # 解包else:return func() # 無參函數def handle(self, client_socket, client_addr):while True:# 獲取客戶端發送的數據包data = client_socket.recv(2048)if data:try:data = self.unpack(data) # 解包if len(data) == 1:data = self.exec(data[0]) # 執行無參函數elif len(data) > 1:data = self.exec(data[0], data[1]) # 執行帶參函數else:data = "RPC Server Error Code:500"except Exception as ex:data = "RPC Server Function Error"print(ex)# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,并通過網絡發送給客戶端的RPC Proxy組件data = self.pack(data) # 把函數執行結果按指定協議打包# 把處理過的數據發送給客戶端client_socket.send(data)else:print(f"客戶端:{client_addr}已斷開\n")break

    再簡要說明一下:里面方法其實主要就是解包、執行函數、返回值打包

    輸出圖示:

    再貼一下上面的時序圖:

    課外拓展:

    HTTP1.0、HTTP1.1 和 HTTP2.0 的區別 https://www.cnblogs.com/heluan/p/8620312.html簡述分布式RPC框架 https://blog.csdn.net/jamebing/article/details/79610994分布式基礎—RPC http://www.dataguru.cn/article-14244-1.html

    4.RPC簡化與提煉

    上篇回顧:萬物互聯之~RPC專欄 https://www.cnblogs.com/dunitian/p/10279946.html

    上節課解答

    之前有網友問,很多開源的RPC中都是使用路由表,這個怎么實現?

    其實路由表實現起來也簡單,代碼基本上不變化,就修改一下server_stub.py的__init__和exe兩個方法就可以了:

    class ServerStub(object):def __init__(self, mycode):self.func_dict = dict()# 初始化一個方法名和方法的字典({func_name:func})for item in mycode.__dir__():if not item.startswith("_"):self.func_dict[item] = getattr(mycode, item)def exec(self, func, args=None):"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""# 如果沒有這個方法則返回None# func = getattr(self.mycode, func, None)func = self.func_dict[func]if args:return func(*args) # 解包else:return func() # 無參函數

    4.1.Json序列化

    Python比較6的同志對上節課的Code肯定嗤之以鼻,上次自定義協議是同的通用方法,這節課我們先來簡化下代碼:

    再貼一下上節課的時序圖:

    1.Json知識點

    官方文檔:https://docs.python.org/3/library/json.html

    # 把字典對象轉換為Json字符串 json_str = json.dumps({"func": func, "args": args})# 把Json字符串重新變成字典對象 data = json.loads(data) func, args = data["func"], data["args"]

    需要注意的就是類型轉換了(eg:python tuple ==> json array)

    PythonJSON
    dictobject
    list, tuplearray
    strstring
    int, floatnumber
    Truetrue
    Falsefalse
    Nonenull

    PS:序列化:json.dumps(obj),反序列化:json.loads(json_str)

    2.消息協議采用Json格式

    在原有基礎上只需要修改下Stub的pack和unpack方法即可

    Client_Stub(類型轉換都省掉了)

    import json import socketclass ClientStub(object):def pack(self, func, args):"""打包:把方法和參數拼接成自定義的協議格式:{"func": "sum", "args": [1, 2]}"""json_str = json.dumps({"func": func, "args": args})# print(json_str) # log 輸出return json_str.encode("utf-8")def unpack(self, data):"""解包:獲取返回結果"""data = data.decode("utf-8")# 格式應該是"{data:xxxx}"data = json.loads(data)# 獲取不到就返回Nonereturn data.get("data", None)# 其他Code我沒有改變

    Server Stub()

    import json import socketclass ServerStub(object):def unpack(self, data):"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""data = data.decode("utf-8")# 格式應該是"格式:{"func": "sum", "args": [1, 2]}"data = json.loads(data)func, args = data["func"], data["args"]if args:return (func, tuple(args)) # (func,args)return (func, )def pack(self, result):"""打包:把方法和參數拼接成自定義的協議"""# 格式:"data:返回值"json_str = json.dumps({"data": result})return json_str.encode("utf-8")# 其他Code我沒有改變

    輸出圖示:

    4.2.Buffer序列化

    RPC其實更多的是二進制的序列化方式,這邊簡單介紹下

    1.pickle知識點

    官方文檔:https://docs.python.org/3/library/pickle.html

    用法和Json類似,PS:序列化:pickle.dumps(obj),反序列化:pickle.loads(buffer)

    2.簡單案例

    和Json案例類似,也只是改了pack和unpack,我這邊就貼一下完整代碼(防止被吐槽)

    1.Client

    # 和上一節一樣 from client_stub import ClientStubdef main():stub = ClientStub(("192.168.36.144", 50051))result = stub.get("sum", (1, 2))print(f"1+2={result}")result = stub.get("sum", (1.1, 2))print(f"1.1+2={result}")time_str = stub.get("get_time")print(time_str)if __name__ == "__main__":main()

    2.ClientStub

    import socket import pickleclass ClientStub(object):def __init__(self, address):"""address ==> (ip,port)"""self.socket = socket.socket()self.socket.connect(address)def pack(self, func, args):"""打包:把方法和參數拼接成自定義的協議"""return pickle.dumps((func, args))def unpack(self, data):"""解包:獲取返回結果"""return pickle.loads(data)def get(self, func, args=None):"""1.客戶端的RPC Proxy組件收到調用后,負責將被調用的方法名、參數等打包編碼成自定義的協議"""data = self.pack(func, args)# 2.客戶端的RPC Proxy組件在打包完成后通過網絡把數據包發送給RPC Serverself.socket.send(data)# 等待服務端返回結果data = self.socket.recv(2048)if data:return self.unpack(data)return None

    3.Server

    # 和上一節一樣 import socket from server_stub import ServerStubclass RPCServer(object):def __init__(self, address, mycode):self.mycode = mycode# 服務端存根(RPC Proxy)self.server_stub = ServerStub(mycode)# TCP Socketself.socket = socket.socket()# 端口復用self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 綁定端口self.socket.bind(address)def run(self):self.socket.listen()while True:# 等待客戶端連接client_socket, client_addr = self.socket.accept()print(f"來自{client_addr}的請求:\n")try:# 交給服務端存根(Server Proxy)處理self.server_stub.handle(client_socket, client_addr)except Exception as ex:print(ex)if __name__ == "__main__":from server_code import MyCodeserver = RPCServer(('', 50051), MyCode())print("Server啟動ing,Port:50051")server.run()

    4.ServerCode

    # 和上一節一樣 # 5.RPC Server(被調用者)本地執行后將結果返回給服務端的RPC Proxy class MyCode(object):def sum(self, a, b):return a + bdef get_time(self):import timereturn time.ctime()

    5.ServerStub

    import socket import pickleclass ServerStub(object):def __init__(self, mycode):self.mycode = mycodedef unpack(self, data):"""3.服務端的RPC Proxy組件把通過網絡接收到的數據包按照相應格式進行拆包解碼,獲取方法名和參數"""func, args = pickle.loads(data)if args:return (func, args) # (func,args)return (func, )def pack(self, result):"""打包:把方法和參數拼接成自定義的協議"""return pickle.dumps(result)def exec(self, func, args=None):"""4.服務端的RPC Proxy組件根據方法名和參數進行本地調用"""# 如果沒有這個方法則返回Nonefunc = getattr(self.mycode, func)if args:return func(*args) # 解包else:return func() # 無參函數def handle(self, client_socket, client_addr):while True:# 獲取客戶端發送的數據包data = client_socket.recv(2048)if data:try:data = self.unpack(data) # 解包if len(data) == 1:data = self.exec(data[0]) # 執行無參函數elif len(data) > 1:data = self.exec(data[0], data[1]) # 執行帶參函數else:data = "RPC Server Error Code:500"except Exception as ex:data = "RPC Server Function Error"print(ex)# 6.服務端的RPC Proxy組件將返回值打包編碼成自定義的協議數據包,并通過網絡發送給客戶端的RPC Proxy組件data = self.pack(data) # 把函數執行結果按指定協議打包# 把處理過的數據發送給客戶端client_socket.send(data)else:print(f"客戶端:{client_addr}已斷開\n")break

    輸出圖示:

    然后關于RPC高級的內容(會涉及到注冊中心),咱們后面說架構的時候繼續,網絡這邊就說到這

    轉載于:https://www.cnblogs.com/dunitian/p/10279946.html

    總結

    以上是生活随笔為你收集整理的万物互联之~RPC专栏的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。