c – 不正确使用boost :: asio和boost :: thread
我正在使用boost :: asio和boost :: thread来实现一个接受消息的消息服务,如果没有消息被处理则异步发送消息,或者如果正在处理消息则将消息排队.
消息率在我看来很高,大约每秒2.000条消息.有这么多消息,我面对的是破败的消息,但很少见.在2.000中,大约4-8的消息已损坏.我认为问题是由于不正确使用boost :: asio和/或boost :: thread库. 我实现的代码主要基于this boost tutorial.我找不到错误,因为主要消息发现,我发现很难缩小问题范围. 也许其他人知道这里出了什么问题? 基本上这个类以下列方式使用: (1)在我的程序开始时调用构造函数以启动线程,从而接受和传输消息的服务 (2)每当我想传输消息时,我都会调用MessageService :: transmitMessage(),它将使用async_write的任务委托给处理消息队列的线程. using namespace google::protobuf::io; using boost::asio::ip::tcp; MessageService::MessageService(std::string ip,std::string port) : work(io_service),resolver(io_service),socket(io_service) { messageQueue = new std::deque<AgentMessage>; tcp::resolver::query query(ip,port); endpoint_iterator = resolver.resolve(query); tcp::endpoint endpoint = *endpoint_iterator; socket.async_connect(endpoint,boost::bind(&MessageService::handle_connect,this,boost::asio::placeholders::error,++endpoint_iterator)); boost::thread t(boost::bind(&boost::asio::io_service::run,&io_service)); } void MessageService::await() { while (!messageQueue->empty()) { signal(SIGINT,exit); int messagesLeft = messageQueue->size(); sleep(3); std::cout << "Pending Profiler Agents Messages: " << messageQueue->size() << std::endl; if (messagesLeft == messageQueue->size()) { std::cout << "Connection Error" << std::endl; break; } } std::cout << i << std::endl; } void MessageService::write(AgentMessage agentMessage,long systemTime,int JVM_ID) { agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle()); agentMessage.set_jvm_id(JVM_ID); agentMessage.set_systemtime(systemTime); io_service.post(boost::bind(&MessageService::do_write,agentMessage)); } void MessageService::do_close() { socket.close(); } void MessageService::transmitMessage(AgentMessage agentMessage) { ++i; boost::asio::streambuf b; std::ostream os(&b); ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os); CodedOutputStream *coded_output = new CodedOutputStream(raw_output); coded_output->WriteVarint32(agentMessage.ByteSize()); agentMessage.SerializeToCodedStream(coded_output); delete coded_output; delete raw_output; boost::system::error_code ignored_error; boost::asio::async_write(socket,b.data(),boost::bind( &MessageService::handle_write,boost::asio::placeholders::error)); } void MessageService::do_write(AgentMessage agentMessage) { bool write_in_progress = !messageQueue->empty(); messageQueue->push_back(agentMessage); if (!write_in_progress) { transmitMessage(agentMessage); } } void MessageService::handle_write(const boost::system::error_code &error) { if (!error) { messageQueue->pop_front(); if (!messageQueue->empty()) { transmitMessage(messageQueue->front()); } } else { std::cout << error << std::endl; do_close(); } } void MessageService::handle_connect(const boost::system::error_code &error,tcp::resolver::iterator endpoint_iterator) { // can be used to receive commands from the Java profiler interface } MessageService::~MessageService() { // TODO Auto-generated destructor stub } 头文件: using boost::asio::ip::tcp; class MessageService { public: MessageService(std::string ip,std::string port); virtual ~MessageService(); void write(AgentMessage agentMessage,int JVM_ID); void await(); private: boost::asio::io_service io_service; boost::asio::io_service::work work; tcp::resolver resolver; tcp::resolver::iterator endpoint_iterator; tcp::socket socket; std::deque<AgentMessage> *messageQueue; void do_write(AgentMessage agentMessage); void do_close(); void handle_write(const boost::system::error_code &error); void handle_connect(const boost::system::error_code &error,tcp::resolver::iterator endpoint_iterator); void transmitMessage(AgentMessage agentMessage); }; 解决方法
这种方法对我来说似乎很可疑
void MessageService::transmitMessage(AgentMessage agentMessage) { ++i; boost::asio::streambuf b; std::ostream os(&b); ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os); CodedOutputStream *coded_output = new CodedOutputStream(raw_output); coded_output->WriteVarint32(agentMessage.ByteSize()); agentMessage.SerializeToCodedStream(coded_output); delete coded_output; delete raw_output; boost::system::error_code ignored_error; boost::asio::async_write(socket,boost::asio::placeholders::error)); } 您似乎将AgentMessage(应该通过const引用btw传递)序列化为streambuf.但是,在调用async_write完成处理程序之前,不保证此序列化数据存在,这在async_write documentation中有明确说明.
要解决此问题,请确保缓冲区保持在作用域中,直到调用完成处理程序.一种方法是将缓冲区作为参数传递给有界完成处理程序: boost::asio::async_write(socket,coded_output // ^^^ buffer goes here )); 然后从完成处理程序中删除它.我建议你也看看使用shared_ptr而不是使用裸指针. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |