c – 提升异步tcp客户端
我刚刚开始使用boost.
我正在使用异步套接字编写TCP客户端服务器. 任务如下: >客户端向服务器发送一个号码 现在工作如下 >从客户端向服务器发送号码 如何允许客户端从键盘输入数字并同时等待服务器的回答? 为什么我的客户不等待服务器的答案? 客户端代码: using boost::asio::ip::tcp; class TCPClient { public: TCPClient(boost::asio::io_service& IO_Service,tcp::resolver::iterator EndPointIter); void Close(); private: boost::asio::io_service& m_IOService; tcp::socket m_Socket; string m_SendBuffer; static const size_t m_BufLen = 100; char m_RecieveBuffer[m_BufLen*2]; void OnConnect(const boost::system::error_code& ErrorCode,tcp::resolver::iterator EndPointIter); void OnReceive(const boost::system::error_code& ErrorCode); void OnSend(const boost::system::error_code& ErrorCode); void DoClose(); }; TCPClient::TCPClient(boost::asio::io_service& IO_Service,tcp::resolver::iterator EndPointIter) : m_IOService(IO_Service),m_Socket(IO_Service),m_SendBuffer("") { tcp::endpoint EndPoint = *EndPointIter; m_Socket.async_connect(EndPoint,boost::bind(&TCPClient::OnConnect,this,boost::asio::placeholders::error,++EndPointIter)); } void TCPClient::Close() { m_IOService.post( boost::bind(&TCPClient::DoClose,this)); } void TCPClient::OnConnect(const boost::system::error_code& ErrorCode,tcp::resolver::iterator EndPointIter) { cout << "OnConnect..." << endl; if (ErrorCode == 0) { cin >> m_SendBuffer; cout << "Entered: " << m_SendBuffer << endl; m_SendBuffer += " "; m_Socket.async_send(boost::asio::buffer(m_SendBuffer.c_str(),m_SendBuffer.length()+1),boost::bind(&TCPClient::OnSend,boost::asio::placeholders::error)); } else if (EndPointIter != tcp::resolver::iterator()) { m_Socket.close(); tcp::endpoint EndPoint = *EndPointIter; m_Socket.async_connect(EndPoint,++EndPointIter)); } } void TCPClient::OnReceive(const boost::system::error_code& ErrorCode) { cout << "receiving..." << endl; if (ErrorCode == 0) { cout << m_RecieveBuffer << endl; m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer,m_BufLen),boost::bind(&TCPClient::OnReceive,boost::asio::placeholders::error)); } else { cout << "ERROR! OnReceive..." << endl; DoClose(); } } void TCPClient::OnSend(const boost::system::error_code& ErrorCode) { cout << "sending..." << endl; if (!ErrorCode) { cout << """<< m_SendBuffer <<"" has been sent" << endl; m_SendBuffer = ""; m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer,boost::asio::placeholders::error)); } else { cout << "OnSend closing" << endl; DoClose(); } } void TCPClient::DoClose() { m_Socket.close(); } int main() { try { cout << "Client is starting..." << endl; boost::asio::io_service IO_Service; tcp::resolver Resolver(IO_Service); string port = "13"; tcp::resolver::query Query("127.0.0.1",port); tcp::resolver::iterator EndPointIterator = Resolver.resolve(Query); TCPClient Client(IO_Service,EndPointIterator); cout << "Client is started!" << endl; cout << "Enter a query string " << endl; boost::thread ClientThread(boost::bind(&boost::asio::io_service::run,&IO_Service)); Client.Close(); ClientThread.join(); } catch (exception& e) { cerr << e.what() << endl; } cout << "nClosing"; getch(); } 这是控制台的输出 Client is starting... Client is started! OnConnect... 12 Entered: 12 sending... "12" has been sent receiving... ERROR! OnReceive... Closing 服务器部分 class Session { public: Session(boost::asio::io_service& io_service) : socket_(io_service) { dataRx[0] = ' '; dataTx[0] = ' '; } tcp::socket& socket() { return socket_; } void start() { socket_.async_read_some(boost::asio::buffer(dataRx,max_length),boost::bind(&Session::handle_read,boost::asio::placeholders::bytes_transferred)); } void handle_read(const boost::system::error_code& error,size_t bytes_transferred) { cout << "reading..." << endl; cout << "Data: " << dataRx << endl; if (!error) { if (!isValidData()) { cout << "Bad data!" << endl; sprintf(dataTx,"Bad data! "); dataRx[0] = ' '; } else { sprintf(dataTx,getFactorization().c_str()); dataRx[0] = ' '; } boost::asio::async_write(socket_,boost::asio::buffer(dataTx,max_length*2),boost::bind(&Session::handle_write,boost::asio::placeholders::error)); } else { delete this; } } void handle_write(const boost::system::error_code& error) { cout << "writing..." << endl; if (!error) { cout << "dataTx sent: " << dataTx << endl; dataTx[0] = ' '; socket_.async_read_some(boost::asio::buffer(dataRx,boost::asio::placeholders::bytes_transferred)); } else { delete this; } } string getFactorization() const { //Do something } bool isValidData() { locale loc; for (int i = 0; i < strlen(dataRx); i++) if (!isdigit(dataRx[i],loc)) return false; return true; } private: tcp::socket socket_; static const size_t max_length = 100; char dataRx[max_length]; char dataTx[max_length*2]; }; class Server { public: Server(boost::asio::io_service& io_service,short port) : io_service_(io_service),acceptor_(io_service,tcp::endpoint(tcp::v4(),port)) { Session* new_session = new Session(io_service_); acceptor_.async_accept(new_session->socket(),boost::bind(&Server::handle_accept,new_session,boost::asio::placeholders::error)); } void handle_accept(Session* new_session,const boost::system::error_code& error) { if (!error) { new_session->start(); new_session = new Session(io_service_); acceptor_.async_accept(new_session->socket(),boost::asio::placeholders::error)); } else { delete new_session; } } private: boost::asio::io_service& io_service_; tcp::acceptor acceptor_; }; int main(int argc,char* argv[]) { cout << "Server is runing..." << endl; try { boost::asio::io_service io_service; int port = 13; Server s(io_service,port); cout << "Server is run!" << endl; io_service.run(); } catch (boost::system::error_code& e) { std::cerr << e << "n"; } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "n"; } return 0; } 服务器的输出 Server is runing... Server is run! reading... Data: 12 writing... dataTx sent: 13 //just send back received ++number reading... Data: 非常感谢您的帮助 ======== 添加 好的,我明白了.但检查ErrorCode == boost :: asio :: error :: eof不起作用……我做错了什么? else if (ErrorCode == boost::asio::error::eof) { cout << "boost::asio::error::eof in OnReceive!" << endl; } else { cout << "ERROR! OnReceive..." << ErrorCode << endl; DoClose(); } 打印输出错误! OnReceive …系统:10009似乎是我的比较是不正确的 ======== 添加 我找到了根本原因.我已声明使用async_receive(而不是async_read_some)并将main中的行转换为 ClientThread.join(); Client.Close(); 现在它工作正常! 现在我正在尝试同时从/向套接字读取和写入数据(因为客户端应该能够在收到服务器的回复之前发送其他请求. 在OnConnect函数中,我创建了boost线程: boost::thread addMsgThread(boost::bind(&TCPClient::addMsgLoop,this)); boost::thread receivingThread(boost::bind(&TCPClient::startReceiving,this)); boost::thread sendingThread(boost::bind(&TCPClient::startSending,this)); 与实施 void TCPClient::startReceiving() { cout << "receiving..." << endl; m_RecieveBuffer[0] = ' '; m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer,boost::bind(&TCPClient::receivingLoop,boost::asio::placeholders::error)); //runtime error here cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl; } void TCPClient::receivingLoop(const boost::system::error_code& ErrorCode) { cout << "receiving..." << endl; if (ErrorCode == 0) { cout << "m_RecieveBuffer = " << m_RecieveBuffer << endl; m_Socket.async_receive(boost::asio::buffer(m_RecieveBuffer,boost::asio::placeholders::error)); } else { cout << "ERROR! receivingLoop..." << ErrorCode << endl; DoClose(); } } void TCPClient::addMsgLoop() { while (true) { string tmp; cin >> tmp; cout << "Entered: " << tmp << endl; tmp += " "; try { msgQueue.push(tmp); } catch(exception &e) { cerr << "Canno add msg to send queue... " << e.what() << endl; } } } 接收和发送线程的问题是相同的:运行时错误(在boost库中的某处写入访问冲突). void TCPClient::startReceiving() { ... m_Socket.async_receive(); //runtime error here } 在后续版本中一切正常(但我不知道如何在回答之前实现多个发送). 解决方法
顾名思义,boost::asio::ip::tcp::socket::async_read_some不保证能够读取完整的数据.当客户端完成写入时,它将error对象设置为boost :: asio :: error :: eof.
你得到的错误是因为: 服务器部分 if (!error) { ... } else { delete this; } 在else块中,您假设这是一个错误情况并关闭连接.这并非总是如此.在其他之前你需要检查错误== boost :: asio :: error :: eof. 除了读取处理程序中的这个,你应该继续收集缓冲区中读取的内容,直到你遇到error == boost :: asio :: error :: eof.只有这样,您才应验证读取数据并写回客户端. 请参阅examples部分中的HTTP服务器1,2,3实现. 更新:回答更新的问题 您有更新的代码的线程同步问题. > msgQueue可以从两个或多个线程同时访问,无需任何锁定. 如果我理解你的问题,你想: >获取用户输入并将其发送到服务器. 您可以使用两个boost::asio::io_service::strands执行这两项任务.使用Asio时,strands是同步任务的方式. Asio确保在一个链中发布的任务是同步执行的. >在strand1中发布一个类似于以下内容的发送任务:read_user_input – > send_to_server – > handle_send – > read_user_input 这将确保不会从两个线程同时访问msgQueue.使用两个套接字对服务器进行读写操作,以确保不会在同一个套接字上调用同时读写. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |