c – 使用线程池进行模拟:boost-thread和boost-asio
我想使用boost :: asio来设置一个线程池.
我的问题是:如何将特定数据附加到创建的每个线程,以及如何管理单个输出? 更具体地说,我写了一个类Simulation,它通过在输入中获取一些参数的方法来执行模拟. 我想做这样的事情: class ParallelSimulation { public: static const std::size_t N = 10; protected: std::vector< boost::shared_ptr<Simulation> > simuInst; // N copy of a reference instance. public: ... // Simulation with a large (>>N) number of inputs void eval( std::vector< SimulationInput > inputs ) { // Creation of the pool using N threads asio::io_service io_service; asio::io_service::work work(io_service); boost::thread_group threads; for (std::size_t i = 0; i < N; ++i) threads.create_thread(boost::bind(&asio::io_service::run,&io_service)); // Here ? Attaching the duplicates instances of class Simulation ? // Adding tasks for( std::size_t i = 0,i_end = inputs.size(); i<i_end; ++i) io_service.post(...); // add simulation with inputs[i] to the queue // How to deal with outputs ? // End of the tasks io_service.stop(); threads.join_all(); } }; 也许用于设置线程池的技术(使用boost :: asio)不适合我的问题.你有什么建议吗? 解决方法
以下是我的研究结果!
分布式仿真基于使用两个实现类的主类DistributedSimulation:impl :: m_io_service和impl :: dispatcher. boost :: asio线程池基于将io_service :: run()方法附加到不同的线程. namespace impl { // Create a derived class of io_service including thread specific data (a unique identifier of the thread) struct m_io_service : public boost::asio::io_service { static boost::thread_specific_ptr<boost::uuids::uuid> ptrSpec_; std::size_t run() { if(ptrSpec_.get() == 0) ptrSpec_.reset(new boost::uuids::uuid(boost::uuids::random_generator()()) ); return boost::asio::io_service::run(); } }; // Create a class that dispatches the input data over the N instances of the class Simulation template <class Simulation> class dispatcher { public: static const std::size_t N = 6; typedef Simulation::input_t input_t; typedef Simulation::output_t output_t; friend DistributedSimulation; protected: std::vector< boost::shared_ptr<Simulation> > simuInst; std::vector< boost::uuids::uuid > map; public: // Constructor,creating the N instances of class Simulation dispatcher( const Simulation& simuRef) { simuInst.resize(N); for(std::size_t i=0; i<N; ++i) simuInst[i].reset( simuRef.clone() ); } // Record the unique identifiers and do the calculation using the right instance of class Simulation void dispatch( const Simulation::input_t& in ) { if( map.size() == 0 ) { map.push_back(*m_io_service::ptrSpec_); simuInst[0]->eval(in,*m_io_service::ptrSpec_); } else { if( map.size() < N ) { map.push_back(*m_io_service::ptrSpec_); simuInst[map.size()-1]->eval(in,*m_io_service::ptrSpec_); } else { for(size_t i=0; i<N;++i) { if( map[i] == *m_io_service::ptrSpec_) { simuInst[i]->eval(in,*m_io_service::ptrSpec_); return; } } } } } }; boost::thread_specific_ptr<boost::uuids::uuid> m_io_service::ptrSpec_; } // Main class,create a distributed simulation based on a class Simulation template <class Simulation> class DistributedSimulation { public: static const std::size_t N = impl::dispatcher::N; protected: impl::dispatcher _disp; public: DistributedSimulation() : _disp( Simulation() ) {} DistributedSimulation(Simulation& simuRef) : _disp( simuRef ) { } // Simulation with a large (>>N) number of inputs void eval( const std::vector< Simulation::input_t >& inputs,std::vector< Simulation::output_t >& outputs ) { // Clear the results from a previous calculation (and stored in instances of class Simulation) ... // Creation of the pool using N threads impl::m_io_service io_service; boost::asio::io_service::work work(io_service); boost::thread_group threads; for (std::size_t i = 0; i < N; ++i) threads.create_thread(boost::bind(&impl::m_io_service::run,&io_service)); // Adding tasks for( std::size_t i = 0,i_end = inputs.size(); i<i_end; ++i) io_service.post( boost::bind(&impl::dispatcher::dispatch,&_disp,inputs[i]) ); // End of the tasks io_service.stop(); threads.join_all(); // Gather the results iterating through instances of class simulation ... } }; 编辑 下面的代码是我之前的解决方案的更新,考虑到Tres的评论.正如我之前所说,它更易读! template <class Simulation> class DistributedSimulation { public: typedef typename Simulation::input_t input_t; typedef typename Simulation::output_t output_t; typedef boost::shared_ptr<Simulation> SimulationSPtr_t; typedef boost::thread::id id_t; typedef std::map< id_t,std::size_t >::iterator IDMapIterator_t; protected: unsigned int _NThreads; // Number of threads std::vector< SimulationSPtr_t > _simuInst; // Instances of class Simulation std::map< id_t,std::size_t > _IDMap; // Map between thread id and instance index. private: boost::mutex _mutex; public: DistributedSimulation( ) {} DistributedSimulation( const Simulation& simuRef,const unsigned int NThreads = boost::thread::hardware_concurrency() ) { init(simuRef,NThreads); } DistributedSimulation(const DistributedSimulation& simuDistrib) { init(simuRef,NThreads); } virtual ~DistributedSimulation() {} void init(const Simulation& simuRef,const unsigned int NThreads = boost::thread::hardware_concurrency()) { _NThreads = (NThreads == 0) ? 1 : NThreads; _simuInst.resize(_NThreads); for(std::size_t i=0; i<_NThreads; ++i) _simuInst[i].reset( simuRef.clone() ); _IDMap.clear(); } void dispatch( const input_t& input ) { // Get current thread id boost::thread::id id0 = boost::this_thread::get_id(); // Get the right instance Simulation* sim = NULL; { boost::mutex::scoped_lock scoped_lock(_mutex); IDMapIterator_t it = _IDMap.find(id0); if( it != _IDMap.end() ) sim = _simuInst[it->second].get(); } // Simulation if( NULL != sim ) sim->eval(input); } // Distributed evaluation. void eval( const std::vector< input_t >& inputs,std::vector< output_t >& outputs ) { //--Initialisation const std::size_t NInputs = inputs.size(); // Clear the ouptuts f(contained in instances of class Simulation) from a previous run ... // Create thread pool and save ids boost::asio::io_service io_service; boost::asio::io_service::work work(io_service); boost::thread_group threads; for (std::size_t i = 0; i < _NThreads; ++i) { boost::thread* thread_ptr = threads.create_thread(boost::bind(&boost::asio::io_service::run,&io_service)); _IDMap[ thread_ptr->get_id() ] = i; } // Add tasks for( std::size_t i = 0; i < NInputs; ++i) io_service.post( boost::bind(&DistributedSimulation::dispatch,this,inputs[i]) ); // Stop the service io_service.stop(); threads.join_all(); // Gather results (contained in each instances of class Simulation) ... } }; (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |