c++ - boost::asio async_receive_from UDP endpoint shared between threads? -
boost asio allows multiple threads phone call run() method on io_service. seems great way create multithreaded udp server. however, i've nail snag i'm struggling reply to.
looking @ typical async_receive_from call:
m_socket->async_receive_from( boost::asio::buffer(m_recv_buffer), m_remote_endpoint, boost::bind( &udp_server::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); the remote endpoint , message buffer not passed through handler, @ higher scope level (member variable in example). code handle udp message when arrives like:
void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size) { // process message blah(m_recv_buffer, size); // send respond(m_remote_endpoint); } if there multiple threads running, how synchronisation work? having single end point , receive buffer shared between threads implies asio waits handler finish within single thread before calling handler in thread in case message arrived in meantime. seems negate point of allowing multiple threads phone call run in first place.
if want concurrent serving of requests, looks need hand off work packets, along re-create of end point, separate thread allowing handler method homecoming asio can on , pass message in parallel 1 of threads called run().
that seems more nasty. missing here?
having single end point , receive buffer shared between threads implies asio waits handler finish within single thread
if mean "when running service a single thread" correct.
otherwise, isn't case. instead asio says behaviour "undefined" when phone call operations on single service object (i.e. socket, not io_service) concurrently.
that seems negate point of allowing multiple threads phone call run in first place.
not unless processing takes considerable amount of time.
the first paragraphs of introduction of timer.5 sample seem exposition topic.
sessionto separate request-specific info (buffer , endpoint) want notion of session. popular mechanism in asio either bound shared_ptrs or shared-from-this session class (boost bind supports binding boost::shared_ptr instances directly).
to avoid concurrent, unsynchronized access members of m_socket can either add together locks or utilize strand approach documented in timer.5 sample linked above.
here enjoyment daytime.6 asynchronous udp daytime server, modified work many service io threads.
note that, logically, there's still single io thread (the strand) don't violate socket class's documented thread-safety.
however, unlike official sample, responses may queued out of order, depending on time taken actual processing in udp_session::handle_request.
note the
audp_session class hold buffers , remote endpoint per request a pool of threads, able scale load of actual processing (not io) on multiple cores. #include <ctime> #include <iostream> #include <string> #include <boost/array.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/make_shared.hpp> #include <boost/asio.hpp> #include <boost/thread.hpp> using namespace boost; using asio::ip::udp; using system::error_code; std::string make_daytime_string() { using namespace std; // time_t, time , ctime; time_t = time(0); homecoming ctime(&now); } class udp_server; // forwards declaration struct udp_session : enable_shared_from_this<udp_session> { udp_session(udp_server* server) : server_(server) {} void handle_request(const error_code& error); void handle_sent(const error_code& ec, std::size_t) { // here response has been sent if (ec) { std::cout << "error sending response " << remote_endpoint_ << ": " << ec.message() << "\n"; } } udp::endpoint remote_endpoint_; array<char, 100> recv_buffer_; std::string message; udp_server* server_; }; class udp_server { typedef shared_ptr<udp_session> shared_session; public: udp_server(asio::io_service& io_service) : socket_(io_service, udp::endpoint(udp::v4(), 1313)), strand_(io_service) { receive_session(); } private: void receive_session() { // our session hold buffer + endpoint auto session = make_shared<udp_session>(this); socket_.async_receive_from( asio::buffer(session->recv_buffer_), session->remote_endpoint_, strand_.wrap( bind(&udp_server::handle_receive, this, session, // keep-alive of buffer/endpoint asio::placeholders::error, asio::placeholders::bytes_transferred))); } void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) { // now, handle current session on available pool thread socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec)); // take new datagrams receive_session(); } void enqueue_response(shared_session const& session) { socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_, strand_.wrap(bind(&udp_session::handle_sent, session, // keep-alive of buffer/endpoint asio::placeholders::error, asio::placeholders::bytes_transferred))); } udp::socket socket_; asio::strand strand_; friend struct udp_session; }; void udp_session::handle_request(const error_code& error) { if (!error || error == asio::error::message_size) { message = make_daytime_string(); // let's assume might slow // allow server coordinate actual io server_->enqueue_response(shared_from_this()); } } int main() { seek { asio::io_service io_service; udp_server server(io_service); thread_group group; (unsigned = 0; < thread::hardware_concurrency(); ++i) group.create_thread(bind(&asio::io_service::run, ref(io_service))); group.join_all(); } grab (std::exception& e) { std::cerr << e.what() << std::endl; } } closing thoughts interestingly, in cases you'll see single-thread version performing well, , there's no reason complicate design.
alternatively, can utilize single-threaded io_service dedicated io , utilize old fashioned worker pool background processing of requests if indeed cpu intensive part. firstly, simplifies design, secondly might improve throughput on io tasks because there no more need coordinate tasks posted on strand.
c++ multithreading boost udp boost-asio
No comments:
Post a Comment