<AcWing>-thrift
catalog
- Basic
- 匹配-服務1
- main.cpp
- 游戲-服務1
- client.py
- 匹配-服務2
- C++線程
- 內存池
- Version1 代碼
- 匹配結果 存入DB
- 多線程
Basic
一個應用, 包含有 多個 服務, 不同的服務 在不同的服務器上 (也可以在同個服務器)
比如說一個游戲, 一些玩家可以匹配到一起, 然后開始游戲.
(游戲 主服務) (匹配系統 服務)
主服務里, 一個玩家 點擊開始匹配, 那么此時: 主服務 會調用 匹配服務的 一個函數add_user函數, 將這個玩家 加入到 匹配系統里
然后比如, 這個玩家 又點擊了取消匹配, 就對應為: 主服務 會調用 匹配服務的 remove_user函數, 將這個玩家 從 匹配系統中, 移除掉
比如, 匹配系統, 成功的匹配了一些人, 然后調用 數據庫系統 服務 的 save_match函數, 將匹配的結果 存到數據庫里, 做一些log記錄
thrift, 就是上面的這些函數接口 add_user, remove_user, save_match, 即 服務間的 通信
也就是: A服務器的x進程, 想要調用, B服務器的y進程 AB可以是同個服務器, 這就是thrift
而且, 不同的服務, 使用的語言 可以是不同的! 比如, 游戲服務是 python語言, 匹配系統是 c++語言
thrift是: rpc框架 remote procedure call 遠程函數調用
在這里, 比如A 調用了 B的一個函數add_user, 那么: A稱為client端, B稱為:server端
匹配-服務1
創建 ~/thrift_lession文件夾, 表示, 我們整個項目的工作區, 進入該工作區
vim readme.md mkdir game ' 游戲 服務 'mkdir match ' 匹配 服務 'mkdir thrift ' 放一些 thrift的函數接口 'vim match.thrift ' 文件名是match, 說明這是 (針對 match匹配服務的 接口) '
內容如下 這是thrift語法:
namespace cpp match_service ' 表示, 如果這個thrift去生成一個cpp文件, 則你這里定義的(結構體/函數), 都是在(match_service)的命名空間下的 'struct User{ ' 一個結構體 '1: i32 id,2: string name,3: i32 score }service Match{ ' 函數; match服務, 向外提供的接口 'i32 add_user(1: User user, 2: string info),i32 remove_user(1: User user, 2: string info), }' 進入 ~/thrift_lession/match 文件夾, 即當前是在 匹配服務(也就是一個項目)'mkdir src ' 任何一個項目, 最好都要有一個src 存源文件 'cd src/ thrift -r --gen cpp ~/thrift_lesson/thrift/match.thrift ' 根據之前寫的thrift接口, 生成對應的cpp代碼 '此時, 在match/src/下, 有一個gen-cpp/的文件夾 ' 這就是上面生成的cpp代碼 ' 里面有: 'Match.cpp Match.h Match_server.skeleton.cpp match_types.cpp match_types.h'mv gen-cpp/ match_server ' 改個名字. match_server表示: 這個是服務端的接口. 即是當前match 提供給外界來使用的接口 'mv match_server/Match_server.skeleton.cpp main.cpp 此時src/里是: |-- main.cpp |-- match_server |-- Match.cpp |-- Match.h |-- match_types.cpp |-- match_types.h
main.cpp
#include "Match.h"int32_t add_user(const User& user, const std::string& info) { // Your implementation goes here printf("add_user\n"); } int32_t remove_user(const User& user, const std::string& info) { // Your implementation goes here printf("remove_user\n"); }int main(){serve();return 0; }可以發現, 我們的這些接口的實現, 并沒有返回值; 手動把函數加上返回值.
由于, 我們將main.cpp, 放到了src下, 自然#include "Match.h"也是錯的, 需要加一個match_server/目錄
在main函數里, 寫一個: ::std::cout<< "test" << ::std::endl;, 注意, 要寫上endl
因為, main里有個serve函數, 程序是一直運行的. cout的緩沖區, 刷新到屏幕, 要么是程序結束, 要么你手動的調用endl. 否則, 你看不到cout的輸出
先編譯下, 讓項目正常運行
g++ -c main.cpp ./match_server/*.cpp: 編譯, 生成.o
此時, src/下, 有很多的.o
g++ *.o -o main -lthrift (鏈接dll)
./main 執行
假如你修改了某個cpp, 比如修改了main.cpp. 無需g++ *.cpp -o main
因為, 其他的.o文件 已經有了.
最好是: g++ main.cpp -c, 單獨去生成main.o. 然后再: g++ *.o -o main -lthrift
在/src目錄下:
git add .
git restore --staged *.o
git restore --staged main
此時, 所有非.o文件 非執行文件, 就都在 暫存區里了 最好只是把一些源文件 放到git里, 把編譯結果文件放進去 不太好
游戲-服務1
' 進入 ~/thrift_lession/game 文件夾 'mkdir src ' 同樣生成src, 存儲源文件 'thrift -r --gen py ../../thrift/match.thrift ' 根據接口, 生成python代碼; 因為, game也需要調用這些接口 ' ' 此時, 當前目錄下多了: gen-py文件夾 'mv gen-py/ match_client ' 改個名字. 表示, 這是客戶端要使用的接口 'tree ' 這是: src/match_client/ 的目錄 ' |-- __init__.py |-- match|-- Match-remote ' 刪除這個文件; 他是提供遠程服務的, 而此時是客戶端, 不是服務端; 刪不刪都可以 ' |-- Match.py|-- __init__.py|-- constants.py|-- ttypes.pyclient.py
' 在game/src目錄下 (此時, 這個目錄下 有一個match_client): 'vim client.py' client.py 如下: ' def main():# Make sockettransport = TSocket.TSocket('127.0.0.1', 9090)# Buffering is critical. Raw sockets are very slowtransport = TTransport.TBufferedTransport(transport)# Wrap in a protocolprotocol = TBinaryProtocol.TBinaryProtocol(transport)# Create a client to use the protocol encoderclient = Match.Client(protocol)# Connect!transport.open()user = User(1, 'wchang', 15000)client.add_user( user, "me") if __name__ == "__main__":main()此時, 先將我們的match服務里的 之前生成的./main 可執行文件 給運行起來. 他會一直在運行, 在監聽
然后, 在這里game 服務里, 執行這個client.py文件: python3 client.py
對應的, 在match服務里, 這個正在運行的./main程序, 會輸出執行了add_user函數
即, game 服務 調用了 match 服務里的add_user函數
匹配-服務2
對于匹配系統:
- 他一邊要: 接收新的玩家進來 即一個死循環
- 同時還要: 進行匹配算法 只要有玩家, 就進行算法; 其實也是個死循環
兩個工作, 需要并行的進行.
兩個死循環 都需要同步的進行, 即涉及到線程的概念;
即一個線程 不斷的接收進來的玩家, 一個線程 不停的進行匹配算法
C++線程
一個程序, 里面可能會有很多的線程; 上百個都有可能
這么多個線程, 分為2大類: 生產者: 產生任務Task 消費者: 處理任務Task
以我們這個match匹配系統為例, add_user函數, 會產生一個: 添加玩家的Task; remove_user函數, 會產生一個: 刪除玩家的Task;
即, 這兩個函數, 都會產生任務 比如任務1是: 刪除某個玩家, 任務2是: 添加某個玩家
然后需要一個內存池, 存所有的任務Task
生產者 和 消費者, 這2者之間 需要進行通信, 實現方式有: 消息隊列
實現消息隊列時, 需要用到: 鎖mutex #include <mutex>
比如, 生產者 和 消費者 多個線程, 同時去 操作一個內存, 如果無法保證原子性 就會發生沖突;
比如, 一個線程 在執行消息隊列的push操作: message_queue.push(..);,而一個push函數, 他是對應有多個機器指令: 1 2 3 4 5 6 7,
一旦發生: a線程執行了push, 當機器指令執行到4時, 執行權突然跳到了b線程, b線程也執行了push操作, 這是非常危險的.
因為此時b線程, 面對的message_queue 是未知的. 因為push操作沒有完成
所以, 必須保證: push()函數 (其對應的機器執行), 是完整執行結束的; 在執行push函數的過程中, 執行權 不能被剝奪
所以, 有些操作 不能同時去執行; 這就是鎖mutex的用處, 必須保證, 對內存的操作, 同一時間 只能有1個線程在操作他
鎖有2個操作:
- p操作: 去爭取這個鎖; 一旦爭取到了這個鎖, 就可以去進行操作了; 而在進行操作時, 會保證 其他線程不會并行的執行這一段代碼
即, 我拿到這個鎖, 其他鎖就阻塞掉了; 比如同時有100個線程在爭取這個鎖, 一旦一個線程拿到了這個鎖, 那么99個線程 就會被阻塞 - v操作: 等拿到鎖的人, 執行完他的操作后, 執行v操作 釋放鎖
#include <condition_variable> c++里有個 條件變量, 他是對mutex鎖 進行了一個封裝
struct Message_Queue{queue< Task> task_queue;mutex mutex_;condition_varible condition_var; }message_queue;void func1(){std::unique_lock< mutex> _lock( message_queue.mutex_);message_queue.task_queue.push(...); } void func2(){std::unique_lock< mutex> _lock( message_queue.mutex_);message_queue.task_queue.push(...); }
這個_lock, 就是對消息隊列的 互斥量, 進行上鎖; 等到這個_lock 摧毀時, 就會自動的 解鎖
即使func1 和 func2同時執行, 比如func1先 獲取到了鎖, 那么func2的lock 就會被阻塞, 獲取不到鎖 , 即, 保證了: func1的 push操作, 是原子性的
void consume(){ ' 消費者, 監測 消息隊列 'while( true){unique_lock< mutex> lock( message_queue.mutex); // 注意位置!! 是在while里面if( message_queue.empty()){}else{auto task = message_queue.task_queue.top();message_queue.task_queue.pop();lock.unlock();' 處理這個task '}} }
操作完message_queue后, 必須立刻unlock; 否則會導致: 在處理task時 這是個耗時的過程, 其他的線程 無法操作message_queue
這是不應該的, 因為在處理task時, consume線程 不會用到message_queue, 為什么不讓別人使用呢?
一旦使用完共享變量后, 一定要及時解鎖
這里有一個問題: 如果消息隊列不是空的, 由于會訪問消息隊列里的元素, 此時上鎖是應該的.
但是, 如果當消息隊列是空的, 從上面的代碼, 他還是會上鎖 解鎖, 每次的while死循環, 都是在重復(上鎖 解鎖), 就會陷入死循環 cpu占用達到100%
即此時這個consume線程, 他一直在運行態: 不斷的上鎖解鎖, 導致其他線程無法使用消息隊列.
這是不應該的. 因為你在執行NULL語句, 沒有使用消息隊列 反而還不讓其他線程使用隊列!!
正確的做法是: 當消息隊列是空時, 這個線程應該被阻塞住 不能讓他一直占用cpu 反而一直執行NULL語句, 讓線程卡住
讓consume線程堵塞住, 讓其他線程去執行; 知道消息隊列不是空時, consume線程才能恢復運行 -> 這就需要用到: 條件變量 condition_varible
要寫成:
unique_lock< mutex> lock( message_queue.mutex); if( message_queue.empty()){message_queue.condition_var.wait( lock); }wait的作用是: 將鎖lock 給釋放掉, 然后當前consume線程 就被卡住了 (即堵塞);
當外界將這個條件變量condition_var給喚醒后, 當前被卡住的consume線程, 才解除卡住
即, 一旦消息隊列是空, 該consume線程 就會被message_queue里的 condition_var 這個條件變量 所卡住 (即當前線程 被堵塞)
喚醒條件變量, 喚醒了message_queue里的 condition_var 這個條件變量, 也就是: 喚醒了 `正在被堵塞了的 consume線程
' 其他線程 執行的函數func: 'void func(){message_queue.push( ...);' 執行完push后, 隊列已經不空了! 也就是, consume線程, 應該被喚醒; 'message_queue.contion_var.notify_all();' notify_all(): 通知所有被(message_queue.contion_var)卡住的線程, 喚醒他 ' }內存池
把所有的Task, 存到一個pool里, 然后當pool滿足一定容量后 進行匹配.
Version1 代碼
struct Task{ 26 public:27 User user;28 ::std::string type; // 任務的類型: add_user 還是 remove_user?29 };30 31 struct Message_Queue{32 public:33 ::std::queue< Task> task_queue;34 ::std::mutex mutex_;35 ::std::condition_variable condition_var;36 }Message_queue;37 38 class Pool{39 public:40 void add( const User & _u){41 ::std::cout << "pool - add: (id: )" << _u.id << ::std::endl;42 users.push_back( _u);43 }44 void remove( const User & _u){45 ::std::cout << "pool - remove: (id: )" << _u.id << ::std::endl;46 for( uint32_t i = 0; i < users.size(); ++i){47 if( users.at(i).id == _u.id){48 users.erase( users.begin() + i);49 break;50 }51 }52 }void save_matched_result( const User & _a, const User _b){54 ::std::cout << "matched: id(" << _a.id << ", " << _b.id << ")" << ::std::endl;55 56 }57 58 void match(){59 while( users.size() >= 2){60 ::std::cout << "matched succ" << ::std::endl;61 User a = users.at( 0);62 User b = users.at( 1);63 users.erase( users.begin(), users.begin() + 2);64 save_matched_result( a, b);65 } 66 } 67 68 private:69 ::std::vector< User> users;70 }pool; 71 72 void Consume_task(){73 74 while( true){75 ::std::unique_lock< ::std::mutex> _lock( Message_queue.mutex_);' 注意這里, lock 是在while里面的!!! 76 if( Message_queue.task_queue.empty()){77 Message_queue.condition_var.wait( _lock);78 // continue;79 }80 else{81 auto task = Message_queue.task_queue.front(); 82 Message_queue.task_queue.pop();83 _lock.unlock();84 85 // todo task86 if( task.type == "add"){87 pool.add( task.user);88 }89 else if( task.type == "remove"){90 pool.remove( task.user);91 }92 93 pool.match();94 } 95 } 96 } 97 98 class MatchHandler : virtual public MatchIf {99 public: 100 MatchHandler() { 101 // Your initialization goes here 102 } 103 104 int32_t add_user(const User& user, const std::string& info) { 105 // Your implementation goes here 106 printf("add_user (id: %d)\n", user.id); 107 108 // thread :: begin 109 ::std::unique_lock< ::std::mutex> _lock( Message_queue.mutex_); 111 Message_queue.task_queue.push( {user, "add"}); 112 113 Message_queue.condition_var.notify_all(); 114 // thread :: end 115 116 117 return 0; 118 } 119 120 int32_t remove_user(const User& user, const std::string& info) { 121 // Your implementation goes here 122 printf("remove_user\n"); 123 124 // thread :: begin 125 ::std::unique_lock< ::std::mutex> _lock( Message_queue.mutex_); 126 127 Message_queue.task_queue.push( {user, "remove"}); 128 129 Message_queue.condition_var.notify_all(); 130 // thread :: end 131 132 return 0; 133 } 134 135 }; 137 int main(int argc, char **argv) { 138 139 ::std::cout << "(match_system): enter main function" << ::std::endl; 140 141 int port = 9090; 142 ::std::shared_ptr<MatchHandler> handler(new MatchHandler()); 143 ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler)); 144 ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); 145 ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); 146 ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); 147 148 ::std::thread match_thread( Consume_task); 151 152 TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory); 153 server.serve(); 154 return 0; 155 }匹配結果 存入DB
此時我們要新加一個功能: 即當兩名玩家成功匹配后, 我們將這個匹配結果 存到db里
[match服務] <-> [db服務]
' 在thrift/下, 添加: save.thrift '1 namespace cpp save_service2 3 service Save{4 5 # username: myserver服務器的名稱 acs_23576 # passwork: myserver服務器密碼的MD5sum7 # 密碼驗證失敗 返回1, 成功會返回0 且匹配信息結果在存在myserver:homework/lesson_6/result.txt里8 i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id) 9 }
這是我們的接口 (服務端接口的實現即db服務的代碼 不用我們實現, 已經實現好了 )
我們只需要, 實現: 客戶端接口的實現 即match服務里, 實現去連接 [db服務]
到match_system/src/下, 執行thrift -r --gen cpp ../../thrift/save.thrift, 此時在當前文件夾下, 就生成了: gen-cpp, 將他改名為: save_client
進入save_client/, 他里面有: Save.cpp Save.h Save_server.skeleton.cpp save_types.h
其中, Save_server.xx是 服務端的代碼, 我們這里是客戶端, 所以要把他給delete掉
回到match_sysytem/src/main.cpp里,
#include "./save_client/Save.h" // 剛生成的 [與db服務]通信的接口 的cpp實現' 之前這個函數是空的, 此時在這里, 成功匹配后, 將他的結果 存到[db服務]里 ' void save_matched_result( const User & _a, const User _b){' [db服務]的 服務器的 ip號'::std::shared_ptr< TTransport> socket( new TSocket("123.57.47.211", 9090)); ::std::shared_ptr< TTransport> transport( new TBufferedTransport( socket)); ::std::shared_ptr< TProtocol> protocol( new TBinaryProtocol(transport)); ' 這個, 就是./save_client/Save.h里, 生成的一個類 ' SaveClient client( protocol);try{ transport->open(); ' 登錄密碼, 是md5sum后的 'auto ret = client.save_data( "acs_2357", "580fafec", _a.id, _b.id); ::std::cout << ret << ::std::endl; ' 0為成功 'transport->close(); }catch( TException & _e){ ::std::cout << "(ERROR): " << _e.what() << ::std::endl; } }g++ ./save_client/Save.cpp -c ' 這里很重要!!! 與[db服務]的接口實現是在: save_client文件夾下 ' ' 這個文件夾里有: Save.cpp Save.h save_types.h; 一定要記得編譯這個cpp, 否則一會main.cpp 會就找不到實現了'g++ ./main.cpp -c ' [匹配服務] 的項目 'g++ *.o -o main -lthrift -pthread
然后就可以: ./main 啟動匹配服務, ipython3 ./client.py 啟動游戲服務
匹配成功后: ssh myServer 進入[db服務器]: cat ~/homework/lesson_6/resulte.txt
md5sum
根據 MD5值, 反過來去求 原串, 幾乎不可能
' 輸入md5sum命令: ' 902976bc ' 輸入你的文本, 然后: 回車, ctrl+d ' 580fafec24c5922541e06f2539e830cb ' md5值 '多線程
match_system里, 此時的服務端 (即當外界調用add_user/remove_user時, 此時的match_system就是[服務端]) 是單線程的
即, 生產者, 是單線程的
如果我們想要提高并發量的話, 可以使用 多線程服務器; 即外部每調用一個函數, 就開一個線程
160 class MatchCloneFactory : virtual public MatchIfFactory{ 161 public: 162 ~MatchCloneFactory() override = default; 163 MatchIf * getHandler( const ::apache::thrift::TConnectionInfo & connInfo) override{' 外部每調用一個: add_user/remove_user, 就會進入這里, 開一個線程 ' 164 ::std::shared_ptr< TSocket> sock = ::std::dynamic_pointer_cast< TSocket>( connInfo.transport); 165 ::std::cout << " geted connection \n"; 166 ::std::cout << "Socket Info: "<< sock->getSocketInfo() << "\n"; 167 ::std::cout << "PeerHost: " << sock->getPeerHost() << "\n"; 168 ------------ 169 ::std::cout << "Peer Addr: " << sock->getPeerAddress() << "\n"; 170 ::std::cout << "Peer Port: " << sock->getPeerPort() << "\n"; 171 172 return new MatchHandler; 173 } 174 void releaseHandler( MatchIf * handler) override{ 175 delete handler; 176 } 177 };void main(){ 183 TThreadedServer server( 184 ::std::make_shared< MatchProcessorFactory>( ::std::make_shared< MatchCloneFactory>()), 185 ::std::make_shared<TServerSocket>( 9090), 186 ::std::make_shared<TBufferedTransportFactory>(), 187 ::std::make_shared<TBinaryProtocolFactory>()); ' 把之前 (單線程的 TSimpleServer), 改為, (多線程的 TThreadedServer) ' 188 189 // 開一個線程, 處理(所有的任務: add_user 和 remove_user) 190 ::std::thread match_thread( Consume_task); ' 這是一個線程 ' 191 192 server.serve(); ' 這也是個線程 '
總結
以上是生活随笔為你收集整理的<AcWing>-thrift的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: andrioid项目开发之教程 桌面
- 下一篇: Birdwatching