客戶端:
// Client.cpp : 定義控制臺(tái)應(yīng)用程序的入口點(diǎn)。
//#include "stdafx.h"
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>using boost::asio::ip::tcp;
using boost::asio::ip::address;class session: public boost::enable_shared_from_this<session> {
public:session(boost::asio::io_service &io_service, tcp::endpoint &endpoint): io_service_(io_service), socket_(io_service), endpoint_(endpoint){ } void start() {socket_.async_connect(endpoint_,boost::bind(&session::handle_connect, shared_from_this(), boost::asio::placeholders::error));} private:void handle_connect(const boost::system::error_code &error) {if (error) {if (error.value() != boost::system::errc::operation_canceled) {std::cerr << boost::system::system_error(error).what() << std::endl;} socket_.close();return;} static tcp::no_delay option(true);socket_.set_option(option);strcpy(buf, "Hello World!\n");boost::asio::async_write(socket_, boost::asio::buffer(buf, strlen(buf)),boost::bind(&session::handle_write, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));} void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {memset(buf, sizeof(buf), 0); boost::asio::async_read_until(socket_, sbuf,"\n",boost::bind(&session::handle_read,shared_from_this(),boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));}void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {std::cout << buf << std::endl;}private:boost::asio::io_service &io_service_;tcp::socket socket_;tcp::endpoint &endpoint_;char buf[1024];boost::asio::streambuf sbuf;
};typedef boost::shared_ptr<session> session_ptr;int main(int argc, char* argv[])
{boost::asio::io_service io_service;tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028);session_ptr new_session(new session(io_service, endpoint));new_session->start();io_service.run();return 0;
}
服務(wù)器:
#include <string.h>
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>using boost::asio::ip::tcp;
using boost::asio::ip::address;class session: public boost::enable_shared_from_this<session> {
public:session(boost::asio::io_service &io_service): socket_(io_service){ } void start() {static tcp::no_delay option(true);socket_.set_option(option);boost::asio::async_read_until(socket_, sbuf_,"\n",boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));} tcp::socket &socket() {return socket_;} private:void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {boost::asio::async_read_until(socket_, sbuf_,"\n",boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));} void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {boost::asio::async_write(socket_, sbuf_,boost::bind(&session::handle_write, shared_from_this(), boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred));std::ostream os(&sbuf_);printf("%s \n",sbuf_.data());}private:tcp::socket socket_;boost::asio::streambuf sbuf_;
};typedef boost::shared_ptr<session> session_ptr;class server {
public:server(boost::asio::io_service &io_service, tcp::endpoint &endpoint): io_service_(io_service), acceptor_(io_service, endpoint){session_ptr new_session(new session(io_service_));acceptor_.async_accept(new_session->socket(),boost::bind(&server::handle_accept,this,new_session,boost::asio::placeholders::error));}void handle_accept(session_ptr new_session, const boost::system::error_code& error) {if (error) {return;}new_session->start();new_session.reset(new session(io_service_));acceptor_.async_accept(new_session->socket(),boost::bind(&server::handle_accept,this,new_session,boost::asio::placeholders::error));}void run() {io_service_.run();}private:boost::asio::io_service &io_service_;tcp::acceptor acceptor_;
};int main(int argc, char* argv[])
{boost::asio::io_service io_service;//tcp::endpoint endpoint(address::from_string("172.16.6.70"), 10028);tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), 10028);server s(io_service, endpoint);s.run();return 0;
}
編譯:?
g++ -Wall -o client client.cpp -lboost_system
g++ -Wall -o server server.cpp -lboost_system
這里需要注意的是: async_write, async_read, async_read_until 都是需要達(dá)到特定條件才會(huì)調(diào)用回調(diào)函數(shù),?
在調(diào)用回調(diào)函數(shù)之前, 不能再次調(diào)用, 否則接收到的數(shù)據(jù)很可能是亂的. 所以, 在實(shí)際代碼當(dāng)中, 會(huì)有一個(gè)寫緩沖隊(duì)列, 當(dāng)需要write的時(shí), 先放到隊(duì)列中, 如果隊(duì)列個(gè)數(shù)為1, 則調(diào)用async_write, 否則等待函數(shù)回調(diào), 當(dāng)函數(shù)回調(diào)時(shí)將首個(gè)元素從隊(duì)列中移除, 然后接著發(fā)送下一個(gè), 直到隊(duì)列為空.
對(duì)于client, 由于is_open在async_connect之后就是true狀態(tài)了, 因此在async_connect回調(diào)返回之前沒(méi)有方法知道是否已經(jīng)連接成功, 實(shí)際代碼當(dāng)中一般會(huì)增加一個(gè)變量以表示該套接字是否已經(jīng)允許發(fā)送數(shù)據(jù).
============================================================================
服務(wù)器:
[cpp]?view plain
?copy ? #define?????????PACK_MAX_SIZE??????????16*1024?? class?TCPConnection:?public?boost::enable_shared_from_this<TCPConnection>?{?? public:?? ????static?boost::shared_ptr<TCPConnection>?create(IoService&?ioService);?? ????tcp::socket&?getSocket();?? ????void?start();?? ?? private:?? ????TCPConnection(IoService&?ioService);?? ????void?handleRead(const?boost::system::error_code&?e,size_t?bytesTransferred);?? ????void?handleWrite(const?boost::system::error_code&?e,size_t?bytesTransferred);?? ?? ?????? ????tcp::socket?socket;?? ?????? ????char?m_buffer[PACK_MAX_SIZE];?? ????size_t?m_bytesReceived;?? };?? typedef?boost::shared_ptr<TCPConnection>?pointer;?? ?? class?TCPServer?{?? public:?? ????TCPServer(IoService&?ioService,?int?port);?? ?? public:?? ?????? private:?? ????void?startAccept();?? ????void?handleAccept(boost::shared_ptr<TCPConnection>?newConnection,?? ????????????const?boost::system::error_code&?error);?? ?? private:?? ????tcp::acceptor?acceptor;?? };??
[cpp]?view plain
?copy ? TCPConnection::TCPConnection(IoService&?ioService)?:socket(ioService)?{?? ????m_bytesReceived?=?0;?? }?? ?? boost::shared_ptr<TCPConnection>?TCPConnection::create(IoService&?ioService)?{?? ????return?pointer(new?TCPConnection(ioService));?? }?? ?? tcp::socket&?TCPConnection::getSocket()?{?? ????return?socket;?? }?? ?? void?TCPConnection::start()?{?? ????static?tcp::no_delay?option(?true?);?? ????socket.set_option(?option?);?? ????????socket.set_option(boost::asio::socket_base::keep_alive(true));???? ????memset(m_buffer,?0x0,?2048);?? ?????? ????socket.async_read_some(boost::asio::buffer(m_buffer),?? ????????????boost::bind(&TCPConnection::handleRead,?shared_from_this(),?? ????????????????????boost::asio::placeholders::error,?? ????????????????????boost::asio::placeholders::bytes_transferred));?? }?? ?? void?TCPConnection::sendTermMessage(?long?phoneNum,?unsigned?short?serialNum,?unsigned?char?cmd,?void*?rawData?)?? {?? ????socket.async_write_some(?? ????????boost::asio::buffer(?resp->mem,?resp->size?),?? ????boost::bind(?? ????????&TCPConnection::handleWrite,?? ????????shared_from_this(),?? ????????boost::asio::placeholders::error,?? ????????boost::asio::placeholders::bytes_transferred?? ????????)?? ????);?? }?? ?? void?TCPConnection::handleRead(const?boost::system::error_code&?error,?size_t?bytesTransferred)?? {?? ?????? ?? ????if(?error?!=?0?||?bytesTransferred?==?0?)?? ????{?? ????????std::cout?<<?"!!!?network?exception:?err("?<<?error.value()?<<?"),?"?<<?boost::system::system_error(?error?).what()?<<?"?!!!"?<<?std::endl;?? ????????socket.close();?? ????????CmdQueue::getInstance()->cancelSubscribe(?this?);?? ????????return;?? ????}?? ?????? ????std::cout?<<?"received?some?from?remote?serivce";?? ????time_t?nowTime?=?time(NULL);?? ????std::cout?<<?":(time:"<<nowTime<<")"?<<?SQ::Utils::bytes2HexString(?(SQ::Byte?*)?m_buffer,?bytesTransferred<=40?bytesTransferred:40);?? ????if?(bytesTransferred<=40)?? ????{?? ????????std::cout<<?std::endl;?? ????}?? ????else?? ????{?? ????????cout?<<?"..."?<<endl;?? ????}?? ?????? ?? ?????? ????size_t?m_bytesReceived?=?0;?? ????m_bytesReceived?+=?bytesTransferred;?? ????size_t?dealDataSize?=?0;?? ?? ????if?(m_bytesReceived>=sizeof(TCPHead))?? ????{?? ????????int?offset?=?0;?? ????????while?(true)?? ????????{?? ?????????????? ????????????offset?=?dealDataSize;?? ????????????if?(m_bytesReceived-offset?<?sizeof(TCPHead))?break;?? ?? ????????????TCPHead*?head?=?(TCPHead*)(m_buffer?+?offset);?? ?? ????????????unsigned?long?phoneNum?=?SQ::Utils::BCD2long(?head->terminalID,?PHONENUM_LENGTH?);?? ????????????offset?+=?PHONENUM_LENGTH;?? ?? ????????????unsigned?short?serialNum?=?ntohs(?head->serialNum?);?? ????????????offset?+=?2;?? ?? ????????????unsigned?char?cmd?=?head->cmd?;?? ????????????offset?+=?1;?? ?? ????????????int?bodyLen?=?ntohl(?head->bodyLen?);?? ????????????offset?+=?4;?? ?? ????????????if?(m_bytesReceived-offset?<?bodyLen)?break;?? ????????????SQ::Byte?*pRawData?=?(SQ::Byte?*)m_buffer?+?offset;?? ????????????sendUDPPackToTerminal(phoneNum,?pRawData,?bodyLen);?? ????????????dealDataSize?+=?(sizeof(TCPHead)+bodyLen);?? ????????}?? ????}?? ?? ?????? ????if?(dealDataSize?>?0)?? ????{?? ????????memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);?? ????????m_bytesReceived?-=?dealDataSize;?? ????}?? ?????? ????socket.async_read_some(boost::asio::buffer(m_buffer?+?m_bytesReceived,?PACK_MAX_SIZE-m_bytesReceived),?? ????????????boost::bind(&TCPConnection::handleRead,?shared_from_this(),?? ????????????????????boost::asio::placeholders::error,?? ????????????????????boost::asio::placeholders::bytes_transferred));?? ????return;?? }?? void?TCPConnection::handleWrite(?const?boost::system::error_code&?error,?size_t?bytesTransferred)?? {?? ????if(?error?!=?0?)?? ????{?? ????????std::cout?<<?"write?error."?<<?std::endl;?? ????}?? }?? TCPServer::TCPServer(IoService&?ioService,?int?port)?:?? ????????acceptor(ioService,?tcp::endpoint(tcp::v4(),?port))?{?? ????startAccept();?? }?? void?TCPServer::startAccept()?{?? ????boost::shared_ptr<TCPConnection>?newConnection?=?TCPConnection::create(?? ????????????acceptor.get_io_service());?? ?? ????acceptor.async_accept(newConnection->getSocket(),?? ????????????boost::bind(&TCPServer::handleAccept,?this,?newConnection,?? ????????????????????boost::asio::placeholders::error));?? }?? void?TCPServer::handleAccept(boost::shared_ptr<TCPConnection>?newConnection,?? ????????const?boost::system::error_code&?error)?{?? ????if?(!error)?{?? ????????newConnection->start();?? ????????startAccept();?? ????}?? }??
客戶端:
[cpp]?view plain
?copy ? class?CommonSession?:?public?boost::enable_shared_from_this<CommonSession>?? {?? ?????? public:?? ????CommonSession(?boost::asio::io_service?*io_service,?const?std::string?&ip,?short?port);?? ????void?start();?? ????void?sendDataBytes(SQ::Byte?*pByte,?int?len);?? ????bool?getConnectState()?{return?m_isConnected;}?? ?? ?????? protected:?? ????void?onConnect(?const?boost::system::error_code?&error?);?? ????void?onWrite(?const?boost::system::error_code&?error,?size_t?bytes_transferred?);?? ????void?onRead(?const?boost::system::error_code&?error,?size_t?bytes_transferred?);?? ?? private:?? ????tcp::socket??????????????????????????????m_socket;???????????? ????char?????????????????????????????????????m_buffer[PACK_MAX_SIZE];????? ????bool?????????????????????m_isConnected;??????????? ????std::string??????????????????????????????m_ip;???????????????? ????short????????????????????????????????????m_port;?????????????? ????size_t???????????????????????????????????m_bytesReceived;????????? ????unsigned?short???????????????????????????m_serialNum;????????????? ?? ????boost::recursive_mutex???????????????????m_mutex;????????????? ????boost::asio::io_service*?????????????????m_io_service;???????????? };??
[cpp]?view plain
?copy ? CommonSession::CommonSession(?boost::asio::io_service?*io_service,?const?std::string?&ip,?short?port?)?? :?m_io_service(?io_service?),?m_socket(?*io_service?),?m_ip(?ip?),?m_port(?port?)?? {?? ????m_bytesReceived?=?0;?? ????m_isConnected???=?false;?? ????haveLog?????????=?false;?? }?? ?? void?CommonSession::onConnect(?const?boost::system::error_code?&?error?)?? {?? ????if(?error?)??? ????{?? ?????????? ????????if?(!haveLog)?? ????????{?? ????????????haveLog?=?true;?? ?? ?????????????? ????????????std::cout?<<?"connect?to?"?<<?m_ip?<<?":"?<<?m_port?<<?"?failed!"?<<?"?->?ERR?:?";?? ????????????if(?error.value()?!=?boost::system::errc::operation_canceled?)??? ????????????{?? ????????????????std::cerr?<<?boost::system::system_error(?error?).what()?<<?std::endl;?? ????????????}?? ????????}?? ?? ?????????? ????????m_socket.close();?? ????????boost::this_thread::sleep(?boost::posix_time::seconds(?3?)?);?? ????????start();?? ????}?? ????else?? ????{?? ????????std::cout?<<?"connect?to?"?<<?m_ip?<<?":"?<<?m_port?<<?"?successed!?(time:"<<?time(NULL)<<")"?<<?std::endl;?? ????????m_isConnected?=?true;?? ?? ????????static?tcp::no_delay?option(?true?);?? ????????m_socket.set_option(?option?);?? ????????m_socket.set_option(?boost::asio::socket_base::keep_alive(?true?)?);?? ?? ?????????? ?? ?? ?????????? ????????m_socket.async_read_some(?boost::asio::buffer(?m_buffer?),?? ????????????boost::bind(?&CommonSession::onRead,?? ????????????shared_from_this(),?? ????????????boost::asio::placeholders::error,?? ????????????boost::asio::placeholders::bytes_transferred?)?);?? ????}?? }?? void?CommonSession::onWrite(?const?boost::system::error_code&?error,?size_t?bytes_transferred?)?? {?? ????if(?error?!=?0?)?? ????{?? ????????std::cout?<<?"!!!?send?error,?code?is?"?<<?error<<"?!!!"<<std::endl;?? ????????std::cerr?<<?boost::system::system_error(?error?).what()?<<?std::endl;?? ????}?? }?? ?? void?CommonSession::onRead(?const?boost::system::error_code&?error,?size_t?bytes_transferred?)?? {?? ?????? ?????? ????if(?error?!=?0?)?? ????{?? ????????std::cout?<<?"!!!?onRead?error,?code?is?"?<<?error<<"?!!!"<<std::endl;?? ????????std::cerr?<<?boost::system::system_error(?error?).what()?<<?std::endl;?? ?? ????????m_socket.close();?? ?? ?????????? ????????boost::this_thread::sleep(?boost::posix_time::seconds(?3?)?);?? ????????m_isConnected?=?false;?? ????????start();?? ????????return;?? ????}?? ????else?if(?bytes_transferred?==?0?)?? ????{?? ????????m_socket.close();?? ?? ?????????? ????????boost::this_thread::sleep(?boost::posix_time::seconds(?3?)?);?? ????????m_isConnected?=?false;?? ????????start();?? ????????return;?? ????}?? ?? ?????? ????size_t?m_bytesReceived?=?0;?? ????m_bytesReceived?+=?bytes_transferred;?? ????size_t?dealDataSize?=?0;?? ?? ????if?(m_bytesReceived>=sizeof(TCPHead))?? ????{?? ????????int?offset?=?0;?? ????????while?(true)?? ????????{?? ?????????????? ????????????offset?=?dealDataSize;?? ????????????if?(m_bytesReceived-offset?<?sizeof(SQ::TCPHead))?break;?? ?? ????????????SQ::TCPHead*?head?=?(SQ::TCPHead*)(m_buffer?+?offset);?? ?? ????????????unsigned?long?phoneNum?=?SQ::Utils::BCD2long(?head->terminalID,?PHONENUM_LENGTH?);?? ????????????offset?+=?PHONENUM_LENGTH;?? ?? ????????????unsigned?short?serialNum?=?ntohs(?head->serialNum?);?? ????????????offset?+=?2;?? ?? ????????????unsigned?char?cmd?=?head->cmd?;?? ????????????offset?+=?1;?? ?? ????????????int?bodyLen?=?ntohl(?head->bodyLen?);?? ????????????offset?+=?4;?? ?? ????????????if?(bodyLen?==?0)?? ????????????{?? ????????????????std::cout<<?"receive(time:"<<?time(NULL)<<")?command?from?gate,?cmd:"?<<?(int)cmd?<<?"?phoneNum:"<<phoneNum<<std::endl;?? ????????????????dealCmd(cmd,?phoneNum);?? ????????????????dealDataSize?+=?sizeof(SQ::TCPHead);?? ????????????}?? ????????????else?? ????????????{?? ?????????????????? ????????????????if?(m_bytesReceived-offset?<?bodyLen)?break;?? ?? ?????????????????? ????????????????char?*pRawData?=?m_buffer?+?offset;?? ?? ?????????????????? ????????????????CodecMsgHead?csMsgHead;?? ????????????????AbstractCodec?tmpCode;?? ????????????????tmpCode.decodeHead((SQ::Byte*)pRawData,?&csMsgHead);?? ?? ?????????????????? ????????????????dealDataSize?+=?(sizeof(SQ::TCPHead)+bodyLen);?? ????????????}?? ????????}?? ????}?? ?? ?????? ????if?(dealDataSize?>?0)?? ????{?? ????????memmove(m_buffer,m_buffer+dealDataSize,m_bytesReceived-dealDataSize);?? ????????m_bytesReceived?-=?dealDataSize;?? ????}?? ?? ?????? ????m_socket.async_read_some(?boost::asio::buffer(m_buffer?+?m_bytesReceived,?PACK_MAX_SIZE-m_bytesReceived),?? ????????boost::bind(?&CommonSession::onRead,?? ????????shared_from_this(),?? ????????boost::asio::placeholders::error,?? ????????boost::asio::placeholders::bytes_transferred?)?);?? }?? ?? void?CommonSession::start()?? {?? ????tcp::endpoint?ep?=?tcp::endpoint(?address::from_string(?m_ip?),?m_port?);?? ????m_socket.async_connect(?ep,?? ????????boost::bind(?&CommonSession::onConnect,?? ????????????shared_from_this(),?? ????????????boost::asio::placeholders::error?)?);?? }?? ?? ?? void?CommonSession::sendDataBytes(SQ::Byte?*pByte,?int?len)?? {?? ?????? ????if?(!m_isConnected)?? ????{?? ????????std::cout<<?"connection(to?"<<m_ip<<":"<<?m_port<<")?is?not?established,?so?sendDataBytes?faild"?<<std::endl;?? ????????return;?? ????}?? ?? ?????? ????time_t?nowTime?=?time(NULL);?? ????std::cout?<<?"send:(time:"<<nowTime<<")"?<<?SQ::Utils::bytes2HexString(?pByte,?len<=40?len:40);?? ????if?(len<=40)?? ????{?? ????????std::cout<<?std::endl;?? ????}?? ????else?? ????{?? ????????cout?<<?"..."?<<endl;?? ????}?? ?? ?????? ????boost::asio::async_write(?? ????????m_socket,?? ????????boost::asio::buffer(?pByte,?len),?? ????????boost::bind(&CommonSession::onWrite,shared_from_this(),boost::asio::placeholders::error,boost::asio::placeholders::bytes_transferred)?? ????????);?? }??
總結(jié)
以上是生活随笔為你收集整理的【Boost】boost库asio详解9——TCP的简单例子2的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。