MultiplexedSocket: handleControlPacket on processing loop

Avoids potential processing ordering issues

Change-Id: I480872ffecb80439620a8442610b845f790b9db6
diff --git a/src/multiplexed_socket.cpp b/src/multiplexed_socket.cpp
index db4b0cb..e064186 100644
--- a/src/multiplexed_socket.cpp
+++ b/src/multiplexed_socket.cpp
@@ -445,42 +445,38 @@
 void
 MultiplexedSocket::Impl::handleControlPacket(std::vector<uint8_t>&& pkt)
 {
-    // Run this on dedicated thread because some callbacks can take time
-    dht::ThreadPool::io().run([w = parent_.weak(), pkt = std::move(pkt)]() {
-        auto shared = w.lock();
-        if (!shared)
-            return;
-        auto& pimpl = *shared->pimpl_;
-        try {
-            size_t off = 0;
-            while (off != pkt.size()) {
-                msgpack::unpacked result;
-                msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off);
-                auto object = result.get();
-                if (pimpl.handleProtocolMsg(object))
-                    continue;
-                auto req = object.as<ChannelRequest>();
-                if (req.state == ChannelRequestState::REQUEST) {
-                    pimpl.onRequest(req.name, req.channel);
-                }
-                else if (req.state == ChannelRequestState::ACCEPT) {
-                    pimpl.onAccept(req.name, req.channel);
-                } else {
-                    // DECLINE or unknown
-                    std::lock_guard<std::mutex> lkSockets(pimpl.socketsMutex);
-                    auto channel = pimpl.sockets.find(req.channel);
-                    if (channel != pimpl.sockets.end()) {
-                        channel->second->ready(false);
-                        channel->second->stop();
-                        pimpl.sockets.erase(channel);
-                    }
+    try {
+        size_t off = 0;
+        while (off != pkt.size()) {
+            msgpack::unpacked result;
+            msgpack::unpack(result, (const char*) pkt.data(), pkt.size(), off);
+            auto object = result.get();
+            if (handleProtocolMsg(object))
+                continue;
+            auto req = object.as<ChannelRequest>();
+            if (req.state == ChannelRequestState::REQUEST) {
+                dht::ThreadPool::io().run([w = parent_.weak(), req = std::move(req)]() {
+                    if (auto shared = w.lock())
+                        shared->pimpl_->onRequest(req.name, req.channel);
+                });
+            }
+            else if (req.state == ChannelRequestState::ACCEPT) {
+                onAccept(req.name, req.channel);
+            } else {
+                // DECLINE or unknown
+                std::lock_guard<std::mutex> lkSockets(socketsMutex);
+                auto channel = sockets.find(req.channel);
+                if (channel != sockets.end()) {
+                    channel->second->ready(false);
+                    channel->second->stop();
+                    sockets.erase(channel);
                 }
             }
-        } catch (const std::exception& e) {
-            if (pimpl.logger_)
-                pimpl.logger_->error("Error on the control channel: {}", e.what());
         }
-    });
+    } catch (const std::exception& e) {
+        if (logger_)
+            logger_->error("Error on the control channel: {}", e.what());
+    }
 }
 
 void