go使用grpc实现异步_(python、go)基于ETCD的gRPC分布式服务器实现详解
1-概述
gRPC框架是一個(gè)性能很好的rpc框架,但框架中沒(méi)有實(shí)現(xiàn)分布式服務(wù)器負(fù)載均衡的方法,只是給出實(shí)現(xiàn)方案,需要我們自己實(shí)現(xiàn)。官方推薦是客戶端負(fù)載均衡的方案,也就是由客戶端主動(dòng)選擇路由,這樣的好處是不用給代理服務(wù)器造成壓力。
分布式場(chǎng)景下一般采用etcd、consul、zookeeper等分布式系統(tǒng),這里采用etcd,etcd是go語(yǔ)言實(shí)現(xiàn),在github可看到開(kāi)源的全部實(shí)現(xiàn)代碼。
目前網(wǎng)上都是go語(yǔ)言寫的關(guān)于基于etcd的grpc分布式服務(wù)器的實(shí)現(xiàn),幾乎沒(méi)有python版的grpc服務(wù)器實(shí)現(xiàn),于是本人就寫了一個(gè)python版的服務(wù)器和不同客戶端(python、go)版的全套實(shí)現(xiàn)方案。
注:全部完整代碼已上傳至github:Zartenc/grpc_etcd_ms_py
2-實(shí)現(xiàn)思想
2-1-服務(wù)器端實(shí)現(xiàn)思想
服務(wù)器端思想主旨是:每個(gè)gRPC服務(wù)器啟動(dòng)(上線)都會(huì)在etcd的key中注冊(cè)自己在本機(jī)中對(duì)外暴露的ip-port,一旦gRPC服務(wù)停止(下線、意外掛掉、租約到期等)會(huì)在etcd中注銷掉自己的信息。
2-2-客戶端實(shí)現(xiàn)思想
客戶端思想主旨是:只需連接到etcd服務(wù)器產(chǎn)生一個(gè)etcdClient對(duì)象,長(zhǎng)期維護(hù)這個(gè)對(duì)象即可,通過(guò)這個(gè)對(duì)象的負(fù)載均衡策略可獲取眾多gRPC服務(wù)器中的其中一個(gè)進(jìn)行連接。在這個(gè)etcdClient對(duì)象內(nèi)部會(huì)對(duì)etcd的key添加一個(gè)監(jiān)視(watch)并維護(hù)一個(gè)可用gRPC服務(wù)器信息的集合。
3-具體實(shí)現(xiàn)核心代碼
前提已部署并啟動(dòng)etcd服務(wù)器,若沒(méi)部署請(qǐng)參考這里。
3-1-服務(wù)器端實(shí)現(xiàn)核心代碼
服務(wù)器端采用python編寫。
由于采用etcd分布式框架,這里首先實(shí)現(xiàn)一個(gè)etcd客戶端的類EtcdClient,類方法有:
- get_values_by_key():通過(guò)etcd的key獲取服務(wù)器信息。
- put_values_by_key():添加服務(wù)器信息到etcd的key中。
再實(shí)現(xiàn)一個(gè)此服務(wù)器對(duì)etcd操作的類EtcdHandleServ,類方法有:
register_service():注冊(cè)本機(jī)信息到etcd的key中。
logout_service:從etcd的key中注銷本機(jī)信息。
class EtcdHandleServ():def __init__(self, service_port, etcd_ip, etcd_port, etcd_prefix):self.etcd_ip = etcd_ipself.etcd_port = etcd_portself.etcd_prefix = etcd_prefix# service_ip = get_outside_ip()service_ip = '127.0.0.1' #在本機(jī)機(jī)器作實(shí)驗(yàn)使用self.endpoint = f'{service_ip}:{service_port}'def register_service(self):etcd_client = EtcdClient(host=self.etcd_ip, port=self.etcd_port)key_name = f'{self.etcd_prefix}/grpc'with etcd_client.lock(key_name):value_list = etcd_client.get_values_by_key(key_name)if self.endpoint not in value_list:value_list.append(self.endpoint)etcd_client.put_values_by_key(key_name, value_list)def logout_service(self):etcd_client = EtcdClient(host=self.etcd_ip, port=self.etcd_port)key_name = f'{self.etcd_prefix}/grpc'with etcd_client.lock(key_name):value_list = etcd_client.get_values_by_key(key_name)if self.endpoint in value_list:value_list.remove(self.endpoint)etcd_client.put_values_by_key(key_name, value_list)最后在主函數(shù)中進(jìn)行相關(guān)的注冊(cè)和注銷操作并監(jiān)控程序停止信號(hào)。
*注意:在docker啟動(dòng)的操作事項(xiàng)
在下面代碼中,若服務(wù)器要在docker中啟動(dòng)需要考慮2個(gè)問(wèn)題:
- 1.在docker run命令啟動(dòng)時(shí)如何將參數(shù)從外部傳到容器內(nèi)?
- 2.docker stop命令停止時(shí)如何程序內(nèi)部如何接收停止信號(hào)?
上面2個(gè)問(wèn)題共同之處是在Dockerfile文件中的ENTRYPOINT命令:
ENTRYPOINT ["python3", "-u", "main.py"]在Dockerfile文件中啟動(dòng)命名使用ENTRYPOINT的exec模式,這樣程序在容器內(nèi)為1號(hào)進(jìn)程,可接收停止信號(hào)(若為shell模式,也可處理,但麻煩一些,后面給出解決方案)。
docker啟動(dòng)時(shí)從外部傳參,只需在后面跟上需要傳入的參數(shù)即可:
docker run -d -p 65510:65510 zartenImage:v ----service_port 65510 --etcd_ip xxxx --etcd_prefix /zarten上面給出了2個(gè)問(wèn)題的解決方案,具體更全面的方案,請(qǐng)參考本人之前寫的一篇docker使用詳解的文章,并在文章最后的常見(jiàn)問(wèn)題中有提到解決方案。
def main(service_ip, service_port, etcd_ip, etcd_port, etcd_prefix):print('***service is starting...')grpc_server = grpc.server(futures.ThreadPoolExecutor(max_workers=500))zarten_pb2_grpc.add_ZartenServicer_to_server(ZartenServ(), grpc_server)grpc_server.add_insecure_port(f'{service_ip}:{service_port}')grpc_server.start()etcd_handle_serv = EtcdHandleServ(service_port=service_port, etcd_ip=etcd_ip, etcd_port=etcd_port, etcd_prefix=etcd_prefix)etcd_handle_serv.register_service()event = threading.Event()def signal_handler(*args):etcd_handle_serv.logout_service()event.set()signal.signal(signal.SIGINT, signal_handler)signal.signal(signal.SIGTERM, signal_handler)print("***serveice started")try:while True:time.sleep(60 * 60 * 24)except KeyboardInterrupt:etcd_handle_serv.logout_service()grpc_server.stop(0)最后采用命令方式啟動(dòng)。
# python main.py --service_port 65510 --etcd_ip xxxx --etcd_prefix /zarten if __name__ == '__main__':parser = get_arguments_parser()args = parser.parse_args()main(args.service_ip, args.service_port, args.etcd_ip, args.etcd_port, args.etcd_prefix)3-2-客戶端實(shí)現(xiàn)核心代碼
客戶端采用python和go編寫。也可采用其他語(yǔ)言實(shí)現(xiàn),這里就不作展示了,知道實(shí)現(xiàn)思想即可自己實(shí)現(xiàn)。
gRPC客戶端中的負(fù)載均衡是以每次調(diào)用為基礎(chǔ)而不是以每個(gè)連接為基礎(chǔ),即只需維護(hù)一個(gè)連接對(duì)象,每次調(diào)用都是連接不同gRPC的服務(wù)器。官方文檔是這樣描述的:
“It is worth noting that load-balancing within gRPC happens on a per-call basis, not a per-connection basis. In other words, even if all requests come from a single client, we still want them to be load-balanced across all servers.”
3-2-1-python客戶端
首先同樣是一個(gè)連接etcd的客戶端類,跟服務(wù)器端代碼差不多。
class EtcdClient(etcd3.Etcd3Client):def get_values_by_key(self, key, **kwargs):values, _ = self.get(key, **kwargs)values_list = []if values is not None:try:values_list = json.loads(values.decode('utf-8'))if not isinstance(values_list, list):raise TypeError()except:raise Exception()return values_listdef put_values_by_key(self, key, values):if not isinstance(values, list):raise Exception()self.put(key, json.dumps(values))其次是此客戶端對(duì)etcd的操作類,采用單例模式,主要方法就是監(jiān)視etcd并進(jìn)行回調(diào)處理。
class EtcdHandleClient(EtcdClient):_singleton = Nonedef __new__(cls, *args, **kwargs):if not cls._singleton:cls._singleton = super().__new__(cls)return cls._singletondef __init__(self, etcd_ip, etcd_port, etcd_prefix):self.etcd_ip = etcd_ipself.etcd_port = etcd_portself.etcd_prefix = etcd_prefixsuper().__init__(host=etcd_ip, port=etcd_port)self.endpoints_list = self.get_values_by_key(f'{self.etcd_prefix}/grpc')self.watched_id = self.add_watch_callback(key=f'{self.etcd_prefix}/grpc', callback=self._update_endpoints)def __del__(self):self.cancel_watch(self.watched_id)def get_grpc_serv_ip(self):endpoints_nums = len(self.endpoints_list)if endpoints_nums <= 0:raise RuntimeError('No grpc services are available.Please notify the administrator to start the grpc service')select_id = random.randint(0, len(self.endpoints_list)-1)return self.endpoints_list[select_id]def _update_endpoints(self, watched_response):watched_event = watched_response.events[0]try:update_endpoint_list = json.loads(watched_event.value)if not isinstance(update_endpoint_list, list):raise TypeErrorexcept Exception as e:print(e)returnself.endpoints_list = update_endpoint_list最后main函數(shù)中只需長(zhǎng)期維護(hù)一個(gè)EtcdHandleClient對(duì)象即可。
def main():etcd_client = EtcdHandleClient(etcd_ip='xxxx', etcd_port=2379, etcd_prefix='/zarten')endpoint = etcd_client.get_grpc_serv_ip()print('endpoint:', endpoint)with grpc.insecure_channel(endpoint) as channel:stub = zarten_pb2_grpc.ZartenStub(channel)response = stub.GetInfo(zarten_pb2.ZartenRequest(zhihu_name='Zarten123'))print(f'receive response: {response}')3-2-2-go客戶端
go語(yǔ)言客戶思想跟python一樣,只是代碼不同而已。
首先定義一個(gè)GrpcClient結(jié)構(gòu)體,包括一個(gè)etcd連接對(duì)象和一個(gè)可用gRPC服務(wù)器信息數(shù)組。
type GrpcClient struct {Etcd3Client *clientv3.ClientGrpcEndpoints []string }其次是初始化GrpcClient結(jié)構(gòu)體的函數(shù)NewGrpcClient(),此函數(shù)中會(huì)調(diào)用一個(gè)協(xié)程來(lái)監(jiān)視etcd的變動(dòng)。
GrpcClient結(jié)構(gòu)體的方法只有一個(gè)GetRrpcServIp()對(duì)外開(kāi)放函數(shù)來(lái)獲取某個(gè)gRPC服務(wù)器的信息。
func NewGrpcClient(EtcdIp string, EtcdPort int, EtcdPrefix string) *GrpcClient{keyName := EtcdPrefix+"/grpc"grpcClient := new(GrpcClient)cli, err := clientv3.New(clientv3.Config{Endpoints: []string{EtcdIp + ":" + strconv.Itoa(EtcdPort)},DialTimeout: 10 * time.Second,})if err != nil {log.Fatal(err)}res, err := cli.Get(context.Background(), keyName)if err != nil{log.Fatal(err)}for _, ev := range res.Kvs {endPoints := ev.Valueerr := json.Unmarshal(endPoints, &grpcClient.GrpcEndpoints)if err != nil{log.Fatal(err)}break}if len(grpcClient.GrpcEndpoints) <= 0{log.Fatal("No grpc services are available.Please notify the administrator to start the grpc service")}grpcClient.Etcd3Client = clirch := cli.Watch(context.Background(), keyName)go func() {for wresp := range rch {for _, ev := range wresp.Events {fmt.Printf("%s %q : %qn", ev.Type, ev.Kv.Key, ev.Kv.Value)mu.Lock()err := json.Unmarshal(ev.Kv.Value, &grpcClient.GrpcEndpoints)mu.Unlock()if err != nil{log.Fatal(err)}fmt.Println(grpcClient.GrpcEndpoints)}}}()return grpcClient }func (g *GrpcClient) GetRrpcServIp() string{rand.Seed(time.Now().Unix())n := len(g.GrpcEndpoints)return g.GrpcEndpoints[rand.Intn(n)] }main函數(shù)中只需維護(hù)一個(gè)NewGrpcClient對(duì)象即可。
func main() {grpcClient := client_center.NewGrpcClient("xxxx", 2379, "/zarten")ip := grpcClient.GetRrpcServIp()fmt.Println(ip)conn, err := grpc.Dial(grpcClient.GetRrpcServIp(), grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := go_protoc.NewZartenClient(conn)res, _ := client.GetInfo(context.Background(), &go_protoc.ZartenRequest{ZhihuName:"zarten456"})fmt.Println(res.Name)fmt.Println(res.Homepage)}4-演示結(jié)果
在本地機(jī)器啟動(dòng)3個(gè)服務(wù)器端,端口分別為65510、65511、65512.
多次調(diào)用python和go版的客戶端,都是使用不同gRPC服務(wù)器并成功返回信息。部分截圖如下所示:
5-總結(jié)
主旨思想是通過(guò)etcd來(lái)進(jìn)行交互來(lái)共享所有的gRPC服務(wù)器信息。
在服務(wù)器端的實(shí)現(xiàn)為了能更加健全,還需考慮加入租約功能,此功能主要用于服務(wù)器端自身掛掉后無(wú)法及時(shí)通知etcd來(lái)注銷掉自己信息。此功能以后有時(shí)間會(huì)在github中更新,敬請(qǐng)期待!
總結(jié)
以上是生活随笔為你收集整理的go使用grpc实现异步_(python、go)基于ETCD的gRPC分布式服务器实现详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: copyof java_死磕 java集
- 下一篇: pdf转word python_Pyt