ASIO协程彻底转变你的思维
生活随笔
收集整理的這篇文章主要介紹了
ASIO协程彻底转变你的思维
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
avbot 發布了許久了, 最近突然有個用戶跑來說,希望能增加個調用 “外部腳本” 的功能,方便擴展。
我一向對設計一個 plugin 機制極力的避免,不喜歡動態載入的模塊擴展程序本身的功能。何況 avbot 是 c++開發的,調用腳本并不是容易的事情。(好吧,真實的原因是我被 mingw (VC 不支持 utf8源碼,我已經拋棄了) 折騰怕了,不想再搞個 python 。windows實在是恐怖的平臺,寫點程序麻煩的要死,編譯麻煩的要死。可是 avbot 又必須跨平臺,結果是我一天寫好的東西要在 windows (虛擬機) 里折騰好幾天,累死人 )
于是我決定提供一個??JSON 接口,內置一個簡單的 HTTP Server, 用腳本(python應該 HTTP JSON 模塊有的是,對吧)連接到 avbot ,然后 avbot 將發生的每條消息以 json 的形式返回給 外部腳本。
另外,默認使用 HTTP 的connection: keep-alive 模式,所以保持一個長連接即可。
那么,avbot 需要支持 不確定數目的消息接收方了。
對于鏈接到 avbot 的客戶端而言, avbot 并不保留之前的所有消息,而是從連接上的那一刻開始,后續的消息才能通知到。
一個很明顯的思路就是,將鏈接上的客戶端做成一個鏈表/列隊, avbot 收到消息后,遍歷這個列隊執行消息發送。
這個思路很簡單,可是如果要求 : 必須單線程異步呢?
avbot 是一個純粹的單線程程序,絕對不允許多線程化。所有的邏輯必須使用異步處理。
那么,這個問題就復雜化了, “avbot 收到消息后,遍歷這個列隊執行消息發送” 這個做法,不可避免的帶來了阻塞。好吧,異步遍歷吧。
要是異步遍歷還沒遍歷完,又來一個消息呢? 考慮這個問題,你會發瘋的。因為異步,太多的細節需要考慮了。真的。
好吧,又有個好主意了,為每個客戶端建立一個列隊,每次遍歷就是把要發送的消息掛入列隊即可。這樣也不需要異步遍歷了,同步就可以。解決了異步遍歷的時候又來一個消息導致的痛苦的調度。
然后細分,考慮每個客戶端,就是等待 “發送列隊” 不為空!等等,一直這么等待也不行,如果客戶斷開了鏈接呢? 所以要 “同時等待發送列隊不為空&&客戶正常在線,并且已經發送了 HTTP 請求頭部”
好繞口,不過也只能如此了。
avbot 因為默認使用了 keep-alive , 所以發送是一個死循環,知道客戶端主動斷開鏈接或者網絡發生錯誤。如果 客戶端死了,那么,發送列隊興許會出現 爆隊 的情況。所以要限制發送列隊的大小。不是滿了就不發送,而是滿了后就把早的消息踢掉,也就是讓 客戶端發生“暫時性卡死”后,還能繼續處理最后的幾條信息。
誒,復雜的邏輯終于理清了,代碼呢?!
啊累?
靠,這么復雜的 邏輯,得寫一長段代碼,調試幾百年了吧?
錯,我只花了幾個小時, 不到 100 行的代碼就輕松實現了全部要求。
!!!!!!!!!!!!!!!!!!! WHAT !!!!!!!!!!!!!!!!!!!
這種功能不可能不用個千把行代碼的吧?!
如果使用以前的老辦法,確實如此。
可是,自從發現了 ASIO 后,我被 ASIO 爸爸發明的協程深深的 震驚了!
利用 ASIO 爸爸提出的協程思想,我只用了不到 100行代碼就全部完成了以上復雜的邏輯,而且,全部都是異步的哦~ 。
好,廢話不多,先貼代碼。然后解釋。// avbot_rpc_server 由 acceptor_server 這個輔助類調用
// 為其構造函數傳入一個 m_socket, 是 shared_ptr 的.
class avbot_rpc_server
{
public:
? ? ? ? typedef boost::signals2::signal<
? ? ? ? ? ? ? ? void( std::string protocol, std::string room, std::string who,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? std::string message, sender_flags )
? ? ? ? > on_message_signal_type;
? ? ? ? static on_message_signal_type on_message;
? ? ? ? typedef boost::asio::ip::tcp Protocol;
? ? ? ? typedef boost::asio::basic_stream_socket<Protocol> socket_type;
? ? ? ? typedef void result_type;
? ? ? ? avbot_rpc_server(boost::shared_ptr<socket_type> _socket)
? ? ? ?? ?: m_socket(_socket)
? ? ? ?? ?, m_request(new boost::asio::streambuf)
? ? ? ?? ?, m_responses(new boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> >(20) )
? ? ? ? {
? ? ? ? ? ? ? ? m_socket->get_io_service().post(
? ? ? ? ? ? ? ? ? ? ? ? boost::asio::detail::bind_handler(*this, boost::coro::coroutine(), boost::system::error_code(), 0)
? ? ? ? ? ? ? ? );
? ? ? ? }
? ? ? ? // 數據操作跑這里,嘻嘻.
? ? ? ? void operator()(boost::coro::coroutine coro, boost::system::error_code ec, std::size_t bytestransfered)
? ? ? ? {
? ? ? ? ? ? ? ? boost::shared_ptr<boost::asio::streambuf>? ? ? ? sendbuf;
? ? ? ? ? ? ? ? if (ec){
? ? ? ? ? ? ? ? ? ? ? ? m_socket->close(ec);
? ? ? ? ? ? ? ? ? ? ? ? // 看來不是 HTTP 客戶端,誒,滾蛋啊!
? ? ? ? ? ? ? ? ? ? ? ? // 沉默,直接關閉鏈接. 取消信號注冊.
? ? ? ? ? ? ? ???? ? ? ? if (m_connect && m_connect->connected())
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? m_connect->disconnect();
? ? ? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? CORO_REENTER(&coro)
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? do{
? ? ? ? ? ? ? ? ? ? ? ? // 發起 HTTP 處理操作.
? ? ? ? ? ? ? ? ? ? ? ? _yield boost::asio::async_read_until(*m_socket, *m_request, "\r\n\r\n", boost::bind(*this, coro, _1, _2));
? ? ? ? ? ? ? ? ? ? ? ? m_request->consume(bytestransfered);
? ? ? ? ? ? ? ? ? ? ? ? // 解析 HTTP
? ? ? ? ? ? ? ? ? ? ? ? // 等待消息.
? ? ? ? ? ? ? ? ? ? ? ? if (m_responses->empty())
? ? ? ? ? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!m_connect){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 將自己注冊到 avbot 的 signal 去
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 等 有消息的時候,on_message 被調用,也就是下面的 operator() 被調用.
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? _yield m_connect = boost::make_shared<boost::signals2::connection>
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (on_message.connect(boost::bind(*this, coro, _1, _2, _3, _4, _5)));
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 就這么退出了,但是消息來的時候,om_message 被調用,然后下面的那個
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // operator() 就被調用了,那個 operator() 接著就會重新回調本 operator()
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 結果就是隨著 coroutine 的作用,代碼進入這一行,然后退出??if 判定
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 然后進入發送過程.
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }else{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 如果已經注冊,直接返回。時候如果消息來了,on_message 被調用,也就
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 是下面的 operator() 被調用. 結果就是隨著 coroutine 的作用,代碼
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // 進入上面那行,然后退出??if 判定。然后進入發送過程.
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? // signals2 回調的時候會進入到這一行.
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? // 進入發送過程
? ? ? ? ? ? ? ? ? ? ? ? sendbuf = m_responses->front();
? ? ? ? ? ? ? ? ? ? ? ? _yield boost::asio::async_write(*m_socket, *sendbuf, boost::bind(*this, coro, _1, _2) );
? ? ? ? ? ? ? ? ? ? ? ? m_responses->pop_front();
? ? ? ? ? ? ? ? ? ? ? ? // 寫好了,重新開始我們的處理吧!
? ? ? ? ? ? ? ? }while(1);
? ? ? ? ? ? ? ? }
? ? ? ? }
? ? ? ? // signal 的回調到了這里, 這里我們要區分對方是不是用了 keep-alive 呢.
? ? ? ? void operator()(boost::coro::coroutine coro, std::string protocol, std::string room, std::string who, std::string message, sender_flags)
? ? ? ? {
? ? ? ? ? ? ? ? pt::ptree jsonmessage;
? ? ? ? ? ? ? ? boost::shared_ptr<boost::asio::streambuf> buf(new boost::asio::streambuf);
? ? ? ? ? ? ? ? std::ostream? ? ? ? stream(buf.get());
? ? ? ? ? ? ? ? std::stringstream? ? ? ? teststream;
? ? ? ? ? ? ? ? jsonmessage.put("protocol", protocol);
? ? ? ? ? ? ? ? jsonmessage.put("root", room);
? ? ? ? ? ? ? ? jsonmessage.put("who", who);
? ? ? ? ? ? ? ? jsonmessage.put("msg", message);
? ? ? ? ? ? ? ? js::write_json(teststream,??jsonmessage);
? ? ? ? ? ? ? ? // 直接寫入 json 格式的消息吧!
? ? ? ? ? ? ? ? stream <<??"HTTP/1.1 200 OK\r\n" <<??"Content-type: application/json\r\n";
? ? ? ? ? ? ? ? stream <<??"connection: keep-alive\r\n" <<??"Content-length: ";
? ? ? ? ? ? ? ? stream << teststream.str().length() <<??"\r\n\r\n";
? ? ? ? ? ? ? ? js::write_json(stream, jsonmessage);
? ? ? ? ? ? ? ? // 檢查 發送緩沖區.
? ? ? ? ? ? ? ? if (m_responses->empty()){
? ? ? ? ? ? ? ? ? ? ? ? // 打通仁督脈.
? ? ? ? ? ? ? ? ? ? ? ? m_socket->get_io_service().post(boost::asio::detail::bind_handler(*this, coro, boost::system::error_code(), 0));
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // 寫入 m_responses
? ? ? ? ? ? ? ? m_responses->push_back(buf);
? ? ? ? }
private:
? ? ? ? boost::shared_ptr<socket_type> m_socket;
? ? ? ? boost::shared_ptr<boost::signals2::connection> m_connect;
? ? ? ? boost::shared_ptr<boost::asio::streambuf>? ? ? ? m_request;
? ? ? ? boost::shared_ptr<boost::circular_buffer_space_optimized<boost::shared_ptr<boost::asio::streambuf> > >? ? ? ? m_responses;
};
} 復制代碼
首先這個 avbot_rpc_server 由一個 acceptor_service 輔助類調用。 acceptor_service 是一個模板,大家可以去 acceptor_server.hpp 膜拜。
acceptor_service 以 Protocol 和一個 處理類 為模板。在 main.cpp里,我以 asio::ip::tcp 作為 Protocl 的參數 avbot_rpc_server為 ProtocolProcesser的參數 調用acceptor_service。acceptor_service 進入一個死循環(協程的)不停的 accept , 然后將 accept 到的 socket 交給 ProtocolProcesser,也就是 avbot_rpc_server 。
avbot_rpc_server 處理一下客戶的請求頭,然后把自己注冊到 on_message 信號處理。
然后,然后就沒然后了。
on_message 在 avbot 接收到消息的時候發出。結果就是 avbot_rpc_server 的 第二個 operator() 被調用。然后就繼續發送了。
當然,并不是每一個 on_message 都會導致 avbot_rpc_server 的 第二個 operator() 被調用的,必須是列隊為空的時候。不為空的時候就不需要調用。發送循環會繼續循環的,避免競爭出現
我一向對設計一個 plugin 機制極力的避免,不喜歡動態載入的模塊擴展程序本身的功能。何況 avbot 是 c++開發的,調用腳本并不是容易的事情。(好吧,真實的原因是我被 mingw (VC 不支持 utf8源碼,我已經拋棄了) 折騰怕了,不想再搞個 python 。windows實在是恐怖的平臺,寫點程序麻煩的要死,編譯麻煩的要死。可是 avbot 又必須跨平臺,結果是我一天寫好的東西要在 windows (虛擬機) 里折騰好幾天,累死人 )
于是我決定提供一個??JSON 接口,內置一個簡單的 HTTP Server, 用腳本(python應該 HTTP JSON 模塊有的是,對吧)連接到 avbot ,然后 avbot 將發生的每條消息以 json 的形式返回給 外部腳本。
另外,默認使用 HTTP 的connection: keep-alive 模式,所以保持一個長連接即可。
那么,avbot 需要支持 不確定數目的消息接收方了。
對于鏈接到 avbot 的客戶端而言, avbot 并不保留之前的所有消息,而是從連接上的那一刻開始,后續的消息才能通知到。
一個很明顯的思路就是,將鏈接上的客戶端做成一個鏈表/列隊, avbot 收到消息后,遍歷這個列隊執行消息發送。
這個思路很簡單,可是如果要求 : 必須單線程異步呢?
avbot 是一個純粹的單線程程序,絕對不允許多線程化。所有的邏輯必須使用異步處理。
那么,這個問題就復雜化了, “avbot 收到消息后,遍歷這個列隊執行消息發送” 這個做法,不可避免的帶來了阻塞。好吧,異步遍歷吧。
要是異步遍歷還沒遍歷完,又來一個消息呢? 考慮這個問題,你會發瘋的。因為異步,太多的細節需要考慮了。真的。
好吧,又有個好主意了,為每個客戶端建立一個列隊,每次遍歷就是把要發送的消息掛入列隊即可。這樣也不需要異步遍歷了,同步就可以。解決了異步遍歷的時候又來一個消息導致的痛苦的調度。
然后細分,考慮每個客戶端,就是等待 “發送列隊” 不為空!等等,一直這么等待也不行,如果客戶斷開了鏈接呢? 所以要 “同時等待發送列隊不為空&&客戶正常在線,并且已經發送了 HTTP 請求頭部”
好繞口,不過也只能如此了。
avbot 因為默認使用了 keep-alive , 所以發送是一個死循環,知道客戶端主動斷開鏈接或者網絡發生錯誤。如果 客戶端死了,那么,發送列隊興許會出現 爆隊 的情況。所以要限制發送列隊的大小。不是滿了就不發送,而是滿了后就把早的消息踢掉,也就是讓 客戶端發生“暫時性卡死”后,還能繼續處理最后的幾條信息。
誒,復雜的邏輯終于理清了,代碼呢?!
啊累?
靠,這么復雜的 邏輯,得寫一長段代碼,調試幾百年了吧?
錯,我只花了幾個小時, 不到 100 行的代碼就輕松實現了全部要求。
!!!!!!!!!!!!!!!!!!! WHAT !!!!!!!!!!!!!!!!!!!
這種功能不可能不用個千把行代碼的吧?!
如果使用以前的老辦法,確實如此。
可是,自從發現了 ASIO 后,我被 ASIO 爸爸發明的協程深深的 震驚了!
利用 ASIO 爸爸提出的協程思想,我只用了不到 100行代碼就全部完成了以上復雜的邏輯,而且,全部都是異步的哦~ 。
好,廢話不多,先貼代碼。然后解釋。
acceptor_service 以 Protocol 和一個 處理類 為模板。在 main.cpp里,我以 asio::ip::tcp 作為 Protocl 的參數 avbot_rpc_server為 ProtocolProcesser的參數 調用acceptor_service。acceptor_service 進入一個死循環(協程的)不停的 accept , 然后將 accept 到的 socket 交給 ProtocolProcesser,也就是 avbot_rpc_server 。
avbot_rpc_server 處理一下客戶的請求頭,然后把自己注冊到 on_message 信號處理。
然后,然后就沒然后了。
on_message 在 avbot 接收到消息的時候發出。結果就是 avbot_rpc_server 的 第二個 operator() 被調用。然后就繼續發送了。
當然,并不是每一個 on_message 都會導致 avbot_rpc_server 的 第二個 operator() 被調用的,必須是列隊為空的時候。不為空的時候就不需要調用。發送循環會繼續循環的,避免競爭出現
總結
以上是生活随笔為你收集整理的ASIO协程彻底转变你的思维的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 协程,又称微线程和纤程
- 下一篇: c面试题总结(含答案)