日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Thrift 异步模式

發布時間:2025/3/15 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Thrift 异步模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我們廣泛使用thrift作為我們內部接口調用的RPC框架,而且基本上都是使用多線程請求等待應答的同步模式 。但是在一些情況下(例如大數據量同步),如果可以使用異步模式,可以優化程序結構和提高模塊性能。

thrift 有提供一套異步模式模式供我們使用,我們跟往常一樣來編寫一個thrift 協議文件。

namespace cpp example service Twitter {string sendString(1:string data); }不同的是,我們需要加入cpp:cob_type 來生成代碼。

thrift -r -strict –gen cpp:cob_style -o ./ test.thrift

生成的代碼文件表和之前的基本相同,但在Twitter.cpp 和Twitter.h 文件中增加了異步客戶端和異步服務器使用的類。

$ tree gen-cpp?
|– Test_constants.cpp?
|– Test_constants.h?
|– Test_types.cpp?
|– Test_types.h?
|– Twitter.cpp?
|– Twitter.h?
|–Twitter_server.skeleton.cpp?
|-Twitter_async_server.skeleton.cpp

用戶只要關心在Twitter.h 中的TwitterCobClient、TwitterCobSvIf和TwitterAsyncProcessor這三個類。

Thrift 異步Client
異步客戶端代碼有TwitterCobClient 以及它繼承的類。

class TwitterCobClient : virtual public TwitterCobClIf {public:TwitterCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :channel_(channel),itrans_(new ::apache::thrift::transport::TMemoryBuffer()),otrans_(new ::apache::thrift::transport::TMemoryBuffer()),piprot_(protocolFactory->getProtocol(itrans_)),poprot_(protocolFactory->getProtocol(otrans_)) {iprot_ = piprot_.get();oprot_ = poprot_.get();}boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {return channel_;}virtual void completed__(bool /* success */) {}void sendString(tcxx::function<void(TwitterCobClient* client)> cob, const std::string& data);void send_sendString(const std::string& data);void recv_sendString(std::string& _return);protected:boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;::apache::thrift::protocol::TProtocol* iprot_;::apache::thrift::protocol::TProtocol* oprot_; };從源文件上看,通過類實現發現:

completed__(bool /* success */)是虛函數,用于通知用戶數據接收完成;
sendString函數帶有回調參數 function <void(TwitterCobClient* client)> cob,用于數據接收時回調,這是異步的特點;
send_sendString和recv_sendString分別用于寫數據到輸出緩存和從輸入緩存讀數據
列表內容擁有TAsyncChannel,異步功能的核心在于TAsyncChannel,它是用于回調函數注冊和異步收發數據;
Transport采用TMemoryBuffer,TMemoryBuffer是用于程序內部之間通信用的,在這里起到讀寫緩存作用

下面看看關鍵函數 sendString的實現
void TwitterCobClient::sendString(t cxx::function<v oid(TwitterCobClient* client)> cob, const std::string& data) {send_sendString(data);channel_->sendAndRecvMessage(tcxx::bind(cob, this), otrans_.get(), itrans_.get()); }send_sendString函數是想緩沖區(TMemoryBuffer)寫入數據, 而sendString 則通過調用TAsyncChannel的sendAndRecvMessage接口注冊回調函數。

TAsyncChannel作為接口類定義了三個接口函數。
/*** Send a message over the channel.*/virtual void sendMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* message) = 0;/*** Receive a message from the channel.*/virtual void recvMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* message) = 0;/*** Send a message over the channel and receive a response.*/virtual void sendAndRecvMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* sendBuf,apache::thrift::transport::TMemoryBuffer* recvBuf);TAsyncChannel目前為止(0.9.1版本)只有一種客戶端實現類TEvhttpClientChannel,顧名思義它是基于libevent和http協議實現的。 使用libevent的方法就不在這里累贅了,主要看下sendAndRecvMessage的實現。
void TEvhttpClientChannel::sendAndRecvMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* sendBuf,apache::thrift::transport::TMemoryBuffer* recvBuf) {cob_ = cob;// 綁定回調函數recvBuf_ = recvBuf;struct evhttp_request* req = evhttp_request_new(response, this);uint8_t* obuf;uint32_t sz;sendBuf->getBuffer(&obuf, &sz);rv = evbuffer_add(req->output_buffer, obuf, sz);rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());// 發送http 請求 }
從sendAndRecvMessage實現可看出,TEvhttpClientChannel是用采用http協議來與服務器通信,后面介紹異步server時會發現,同樣采用是http協議,它們使用的http庫是libevent庫的evhttp。


通過向evhttp_request中注冊相應回調函數respones和傳入回調實例本身的指針,在相應時候回調函數中調用TEvhttpClientChannel實例的finish接口完成數據接收,并寫入緩存中,供應用層獲取使用。?
看下回調函數response 的實現:
/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;try {self->finish(req);} catch (std::exception& e) {// don't propagate a C++ exception in C code (e.g. libevent)std::cerr << "TEvhttpClientChannel::response exception thrown (ignored): " << e.what()<< std::endl;} }Thrift 異步server
異步server關心另外兩個類:TwitterCobSvIf和TwitterAsyncProcessor。很明顯TwitterCobSvIf是要用戶繼承實現的,它與同步TwitterSvIf不同的地方是成員函數多一個cob回調函數,在實現TwitterSvIf時,需要調用cob。示例如下:
class TwitterCobSvNull : virtual public TwitterCobSvIf {public:virtual ~TwitterCobSvNull() {}void sendString(tcxx::function<void(std::string const& _return)> cob, const std::string& /* data */) {std::string _return;return cob(_return);} };


那么這個cob是什么函數,哪里注冊的?這在thrift lib庫里的TEvhttpServer和TAsyncProtocolProcessor類里可找到答案,其中TEvhttpServer是異步server,傳輸是采用http協議,與異步client對上。


先看看TEvhttpServer實現,同樣采用event_base來異步收發數據,收到數據時,回調request函數。

void TEvhttpServer::request(struct evhttp_request* req, void* self) {try {static_cast<TEvhttpServer*>(self)->process(req);} catch(std::exception& e) {evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0);} } void TEvhttpServer::process(struct evhttp_request* req) {RequestContext* ctx = new RequestContext(req);return processor_->process( // 這里的processor_正是TAsyncProtocolProcessorstd::tr1::bind(&TEvhttpServer::complete, // 注冊completethis,ctx,std::tr1::placeholders::_1),ctx->ibuf,ctx->obuf); }void TEvhttpServer::complete(RequestContext* ctx, bool success) {std::auto_ptr<RequestContext> ptr(ctx);int code = success ? 200 : 400;const char* reason = success ? "OK" : "Bad Request";int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");struct evbuffer* buf = evbuffer_new();uint8_t* obuf;uint32_t sz;ctx->obuf->getBuffer(&obuf, &sz); // 從輸出緩沖讀數據int ret = evbuffer_add(buf, obuf, sz);evhttp_send_reply(ctx->req, code, reason, buf); // 發送數據}接著看TAsyncProtocolProcessor的process實現
void TAsyncProtocolProcessor::process(std::tr1::function<void(bool healthy)> _return,boost::shared_ptr<TBufferBase> ibuf,boost::shared_ptr<TBufferBase> obuf) {boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));return underlying_->process( // underlying_是生成代碼里的TwitterAsyncProcessorstd::tr1::bind(&TAsyncProtocolProcessor::finish, _return, // compere函數oprot,std::tr1::placeholders::_1),iprot, oprot); }/* static */ void TAsyncProtocolProcessor::finish(std::tr1::function<void(bool healthy)> _return,boost::shared_ptr<TProtocol> oprot,bool healthy) {(void) oprot;// This is a stub function to hold a reference to oprot.return _return(healthy); // 回調compere函數 }最后看TwitterAsyncProcessor::process,它先寫fname,mtype, seqid然后調用process_fn,process_fn選擇調用合理的處理函數(如process_sendString),看process_sendString實現:

void TwitterAsyncProcessor::process_sendString(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)void (TwitterAsyncProcessor::*return_fn)(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, const std::string& _return) =&TwitterAsyncProcessor::return_sendString; // return_sendString正是我們要找的cob函數iface_->sendString( // iface_是TwitterCobSvIf的具體類,用戶實現的std::tr1::bind(return_fn, this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1), // cob 是 finish函數args.data);}上面return_sendString是我們要找的cob函數,該函數將用戶處理的結果寫入輸出沖緩,并發送給client。

下面實現了一個異步客戶端和異步服務端?
采用異步時,必須采用http 傳輸層。


異步客戶端的實現
demo_async_client.cc
#include <string> #include "boost/shared_ptr.hpp" #include <thrift/Thrift.h> #include <thrift/protocol/TProtocol.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/concurrency/ThreadManager.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/server/TServer.h> #include <thrift/async/TAsyncChannel.h> #include <thrift/async/TEvhttpClientChannel.h> #include "common/thrift/Twitter.h" #include "boost/function.hpp" #include "boost/bind.hpp" #include <event.h> #include <stdio.h> using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using std::string; using boost::shared_ptr; using namespace example; using namespace apache::thrift::async;class testClient : public TwitterCobClient { public:testClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, TProtocolFactory* protocolFactory): TwitterCobClient(channel, protocolFactory){ };virtual void completed__(bool success){if (success){printf("respone : %s \n", res.c_str()); // 輸出返回結果}else{printf("failed to respone\n");}fflush(0);};string res; };//callback function static void my_recv_sendString(TwitterCobClient *client){client->recv_sendString(dynamic_cast<testClient*>(client)->res); }static void sendString(testClient & client){ printf("snedstring start\n"); std::function<void(TwitterCobClient*client)>cob = bind(&my_recv_sendString,_1); client.sendString(cob,"Hello"); printf("sendstring end\n"); }static void DoSimpleTest(const std::string & host, int port){printf("running SimpleTset(%s, %d)..\n", host.c_str(),port);event_base* base = event_base_new();boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel1( new TEvhttpClientChannel( host, "/", host.c_str(), port, base ) );testClient client1( channel1, new TBinaryProtocolFactory() );sendString(client1); // 發送第一個請求boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel2( new TEvhttpClientChannel( host, "/", host.c_str(), port, base ) );testClient client2( channel2, new TBinaryProtocolFactory() );sendString(client2); // 發送第二個請求event_base_dispatch(base);event_base_free(base);printf( "done DoSimpleTest().\n" ); }int main( int argc, char* argv[] ) {DoSimpleTest( "localhost", 14488 );return 0;}
異步服務端的實現
demo_async_serv.cc
#include <string> #include "boost/shared_ptr.hpp" #include <thrift/Thrift.h> #include <thrift/protocol/TProtocol.h> #include <thrift/transport/TSocket.h> #include <thrift/transport/TTransportUtils.h> #include <thrift/concurrency/ThreadManager.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/server/TServer.h> #include <thrift/async/TAsyncChannel.h> #include <thrift/async/TEvhttpClientChannel.h> #include "common/thrift/Twitter.h" #include <thrift/async/TAsyncProtocolProcessor.h> #include <thrift/async/TEvhttpServer.h>using namespace apache::thrift; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using std::string; using namespace boost; using namespace example; using namespace apache::thrift::async;class TwitterAsyncHandler : public TwitterCobSvIf {public:TwitterAsyncHandler() {// Your initialization goes here}void sendString(std::function<void(std::string const& _return)> cob, const std::string& data) {printf("sendString rec:%s\n", data.c_str()); // 輸出收到的數據std::string _return = "world"; // 返回world字符串給客戶端return cob(_return);}};int main(int argc, char **argv) {shared_ptr<TAsyncProcessor> underlying_pro(new TwitterAsyncProcessor( shared_ptr<TwitterCobSvIf>(new TwitterAsyncHandler()) ) );shared_ptr<TAsyncBufferProcessor> processor( new TAsyncProtocolProcessor( underlying_pro, shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()) ) );TEvhttpServer server(processor, 14488);server.serve();return 0; }
參考
http://blog.csdn.net/whycold/article/details/10973?
http://tech.uc.cn/?p=2668553

總結

以上是生活随笔為你收集整理的Thrift 异步模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。