日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【基础】利用thrift实现一个非阻塞带有回调机制的客户端

發(fā)布時(shí)間:2025/3/21 编程问答 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【基础】利用thrift实现一个非阻塞带有回调机制的客户端 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

假設(shè)讀者對(duì)thrift有一定了解。

?

  客戶端有時(shí)需要非阻塞的去發(fā)送請(qǐng)求,給定服務(wù)端一個(gè)請(qǐng)求,要求其返回一個(gè)計(jì)算結(jié)果。但是客戶端不想等待服務(wù)端處理完,而是想發(fā)送完這個(gè)指令后自己去做其他事情,當(dāng)結(jié)果返回時(shí)自動(dòng)的去處理。

  比如舉個(gè)形象點(diǎn)的例子:飯店的Boss讓小弟A把本周店里的欠條收集起來放到自己桌子上,然后又告訴自己的小秘書坐在自己辦公室等著小弟A把欠條拿過來,然后統(tǒng)計(jì)一下一共有多少,然后Boss自己出去半點(diǎn)事兒。

  Boss相當(dāng)于client,小弟A相當(dāng)于server,而小秘書相當(dāng)于client端的回調(diào)函數(shù)(callback)。怎么講呢?Boss不想等待小弟處理完,因?yàn)樗先思夜珓?wù)繁忙,還要去干別的呢。于是他把接下來處理欠條的任務(wù)托管給了小秘書,于是自己一個(gè)人出去了。

  OK,那么我們基本了解了整個(gè)工作流程,來看看實(shí)現(xiàn)的方法。thrift去實(shí)現(xiàn)client異步+回調(diào)的方法關(guān)鍵點(diǎn)在于:thrift生成的client中有個(gè)send_XXX()和recv_XXX()方法。send_XXX()相當(dāng)于告知server去處理東西,可以立即返回;而調(diào)用recv_XXX就是個(gè)阻塞的方法了,直到server返回結(jié)果。所以,我們可以在主線程調(diào)用完send_XXX()之后,然后另開一個(gè)線程去調(diào)用send_XXX(),該線程在等到server回復(fù)后自動(dòng)調(diào)用callback方法,對(duì)結(jié)果進(jìn)行一些處理(當(dāng)然callback在修改client狀態(tài)時(shí)需要進(jìn)行同步操作)。這樣的模式下,我們可以做很多事情,比如分布式環(huán)境下的觀察者模式。當(dāng)然了需要注意的一點(diǎn)就是,各個(gè)線程接受到結(jié)果的順序跟請(qǐng)求順序不一定一樣,因?yàn)閟erver處理不通請(qǐng)求時(shí)間不通或者網(wǎng)絡(luò)環(huán)境的影響都可能導(dǎo)致這種情形。所以如果你對(duì)接受這些結(jié)果時(shí)不是冪等操作時(shí)需要注意一下。

thrift腳本:

//只有一個(gè)方法,client發(fā)送一個(gè)消息,server換回一個(gè)消息 service TestServ{string ping(1: string message), }

server端采用TNBlockingServer實(shí)現(xiàn)

1 #include "TestServ.h" 2 3 #include <iostream> 4 5 #include <thrift/protocol/TBinaryProtocol.h> 6 #include <thrift/server/TNonblockingServer.h> 7 #include <thrift/transport/TServerSocket.h> 8 #include <thrift/transport/TBufferTransports.h> 9 #include <thrift/concurrency/PosixThreadFactory.h> 10 11 using namespace std; 12 13 using namespace ::apache::thrift; 14 using namespace ::apache::thrift::protocol; 15 using namespace ::apache::thrift::transport; 16 using namespace ::apache::thrift::server; 17 using namespace ::apache::thrift::concurrency; 18 19 using boost::shared_ptr; 20 21 class TestServHandler : virtual public TestServIf { 22 public: 23 TestServHandler() { 24 // Your initialization goes here 25 } 26 27 void ping(std::string& _return, const std::string& message) { 28 _return = "hello, i am server! "; 29 sleep(3);// do something time-consuming/ 這里我們?cè)趕erver端加一些耗時(shí)的操作 30 cout<<"Request from client: "<<message<<endl; 31 } 32 33 }; 34 35 int main(int argc, char **argv) { 36 int port = 9090; 37 38 shared_ptr<TestServHandler> handler(new TestServHandler()); 39 shared_ptr<TProcessor> processor(new TestServProcessor(handler)); 40 shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); 41 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15); 42 shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory > (new PosixThreadFactory()); 43 threadManager->threadFactory(threadFactory); 44 threadManager->start(); 45 TNonblockingServer server(processor, protocolFactory, port, threadManager); 46 server.serve(); 47 return 0; 48 }

client端實(shí)現(xiàn):

1 #include "TestServ.h" 2 3 #include <iostream> 4 #include <thrift/protocol/TBinaryProtocol.h> 5 #include <thrift/transport/TSocket.h> 6 #include <thrift/transport/TBufferTransports.h> 7 8 #include "test_constants.h" 9 10 using namespace std; 11 using namespace ::apache::thrift; 12 using namespace ::apache::thrift::protocol; 13 using namespace ::apache::thrift::transport; 14 using boost::shared_ptr; 15 16 class AsynTestClient; 17 void * wait_recv(void * parg ); 18 struct PARG { 19 AsynTestClient * pthis; 20 string message; 21 }; 22 23 class AsynTestClient { 24 private: 25 unsigned int d_cnt_recv;//< 客戶端接受到server響應(yīng)次數(shù)的計(jì)數(shù)器. 26 27 pthread_rwlock_t m_cnt_recv;//< 計(jì)數(shù)器的讀寫鎖. 28 vector<pthread_t> m_ids; 29 30 public: 31 TestServClient * d_client; 32 void call_back(string & _return){ 33 //輸出服務(wù)器返回信息并把返回計(jì)數(shù)加1 34 cout<<"server msg: "<<_return<<endl; 35 pthread_rwlock_wrlock( &m_cnt_recv ); 36 d_cnt_recv ++; 37 pthread_rwlock_unlock( &m_cnt_recv ); 38 } 39 explicit AsynTestClient(boost::shared_ptr<TProtocol> & protocol){ 40 pthread_rwlock_init( &m_cnt_recv, NULL ); 41 d_cnt_recv = 0; 42 d_client = new TestServClient( protocol ); 43 } 44 45 ~AsynTestClient(){ 46 delete d_client; 47 pthread_rwlock_destroy( &m_cnt_recv ); 48 } 49 50 void asyn_ping( const string & message) { 51 //發(fā)送請(qǐng)求 52 d_client->send_ping(message); 53 //初始化每個(gè)等待回調(diào)線程的參數(shù) 54 PARG * parg = new PARG; 55 parg->pthis = this; 56 parg->message = message; 57 //把新生成的線程id放入全局?jǐn)?shù)組維護(hù) 58 pthread_t m_id; 59 m_ids.push_back(m_id); 60 //啟動(dòng)線程,從此只要接受到服務(wù)器的返回結(jié)果就調(diào)用回調(diào)函數(shù)。 61 if( 0 != pthread_create( &m_id, NULL, wait_recv, reinterpret_cast< void * > (parg) ) ) { 62 return; 63 } 64 } 65 }; 66 int main(int argc, char **argv) { 67 68 boost::shared_ptr<TSocket> socket(new TSocket("localhost", 9090)); 69 boost::shared_ptr<TTransport> transport(new TFramedTransport(socket)); 70 boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); 71 72 //TestServClient client(protocol); 73 74 transport->open(); 75 AsynTestClient client(protocol); 76 string message = "hello, i am client! "; 77 client.asyn_ping(message); 78 79 while(true){ 80 sleep(1);//這里相當(dāng)于client去做別的事情了 81 } 82 83 transport->close(); 84 return 0; 85 } 86 void * wait_recv(void * parg ) { 87 PARG * t_parg = reinterpret_cast< PARG * >(parg);//強(qiáng)制轉(zhuǎn)化線程參數(shù) 88 string _return; 89 t_parg->pthis->d_client->recv_ping(_return); 90 t_parg->pthis->call_back(_return); 91 }

  其實(shí)大家可以注意到,我并沒有使用asyn_ping(const string & message, void(*)call_back(void));這種方式去定義它,這是因?yàn)閍syn_ping本身可以獲取callback函數(shù)的指針。回調(diào)的本質(zhì)是任務(wù)的托管、時(shí)間的復(fù)用,也就是說等待結(jié)果返回后自動(dòng)去調(diào)用一段代碼而已,所以本質(zhì)上上面就是回調(diào)機(jī)制。如果你想使用傳函數(shù)指針的方式,也可以實(shí)現(xiàn)出來。

  注意:編譯時(shí)需要-L$(LIB_DIR) -lthrift -lthriftnb -levent。

轉(zhuǎn)載于:https://www.cnblogs.com/colorfulkoala/p/3487948.html

總結(jié)

以上是生活随笔為你收集整理的【基础】利用thrift实现一个非阻塞带有回调机制的客户端的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。