MultiplexedSocket: handleControlPacket on processing loop
Avoids potential processing ordering issues
Change-Id: I480872ffecb80439620a8442610b845f790b9db6
diff --git a/src/multiplexed_socket.cpp b/src/multiplexed_socket.cpp
index db4b0cb..e064186 100644
--- a/src/multiplexed_socket.cpp
+++ b/src/multiplexed_socket.cpp
@@ -445,42 +445,38 @@
void
MultiplexedSocket::Impl::handleControlPacket(std::vector<uint8_t>&& pkt)
{
- // Run this on dedicated thread because some callbacks can take time
- dht::ThreadPool::io().run([w = parent_.weak(), pkt = std::move(pkt)]() {
- auto shared = w.lock();
- if (!shared)
- return;
- auto& pimpl = *shared->pimpl_;
- try {
- size_t off = 0;
- while (off != pkt.size()) {
- msgpack::unpacked result;
- msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off);
- auto object = result.get();
- if (pimpl.handleProtocolMsg(object))
- continue;
- auto req = object.as<ChannelRequest>();
- if (req.state == ChannelRequestState::REQUEST) {
- pimpl.onRequest(req.name, req.channel);
- }
- else if (req.state == ChannelRequestState::ACCEPT) {
- pimpl.onAccept(req.name, req.channel);
- } else {
- // DECLINE or unknown
- std::lock_guard<std::mutex> lkSockets(pimpl.socketsMutex);
- auto channel = pimpl.sockets.find(req.channel);
- if (channel != pimpl.sockets.end()) {
- channel->second->ready(false);
- channel->second->stop();
- pimpl.sockets.erase(channel);
- }
+ try {
+ size_t off = 0;
+ while (off != pkt.size()) {
+ msgpack::unpacked result;
+ msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off);
+ auto object = result.get();
+ if (handleProtocolMsg(object))
+ continue;
+ auto req = object.as<ChannelRequest>();
+ if (req.state == ChannelRequestState::REQUEST) {
+ dht::ThreadPool::io().run([w = parent_.weak(), req = std::move(req)]() {
+ if (auto shared = w.lock())
+ shared->pimpl_->onRequest(req.name, req.channel);
+ });
+ }
+ else if (req.state == ChannelRequestState::ACCEPT) {
+ onAccept(req.name, req.channel);
+ } else {
+ // DECLINE or unknown
+ std::lock_guard<std::mutex> lkSockets(socketsMutex);
+ auto channel = sockets.find(req.channel);
+ if (channel != sockets.end()) {
+ channel->second->ready(false);
+ channel->second->stop();
+ sockets.erase(channel);
}
}
- } catch (const std::exception& e) {
- if (pimpl.logger_)
- pimpl.logger_->error("Error on the control channel: {}", e.what());
}
- });
+ } catch (const std::exception& e) {
+ if (logger_)
+ logger_->error("Error on the control channel: {}", e.what());
+ }
}
void