connectionmanager: close all pending callback if all connections fails
backport of af3c2229c051975aa9d518f77ba597604546a6c6 in jami-daemon
Change-Id: I7ace35c63c97bb957359ea8c7dafae6f6f93f5b8
diff --git a/src/connectionmanager.cpp b/src/connectionmanager.cpp
index e4453f0..fc104ed 100644
--- a/src/connectionmanager.cpp
+++ b/src/connectionmanager.cpp
@@ -118,24 +118,21 @@
{
if (isDestroying_.exchange(true))
return;
+ decltype(pendingOperations_) po;
{
std::lock_guard<std::mutex> lk(connectCbsMtx_);
- // Call all pending callbacks that channel is not ready
- for (auto& [deviceId, pcbs] : pendingCbs_)
- for (auto& pending : pcbs)
- pending.cb(nullptr, deviceId);
- pendingCbs_.clear();
+ po = std::move(pendingOperations_);
}
+ for (auto& [deviceId, pcbs] : po) {
+ for (auto& [id, pending] : pcbs.connecting)
+ pending.cb(nullptr, deviceId);
+ for (auto& [id, pending] : pcbs.waiting)
+ pending.cb(nullptr, deviceId);
+ }
+
removeUnusedConnections();
}
- struct PendingCb
- {
- std::string name;
- ConnectCallback cb;
- dht::Value::Id vid;
- };
-
void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
const dht::Value::Id& vid,
const std::string& connType,
@@ -319,49 +316,72 @@
* be done in parallel and we only want one socket
*/
std::mutex connectCbsMtx_ {};
- std::map<DeviceId, std::vector<PendingCb>> pendingCbs_ {};
- std::vector<PendingCb> extractPendingCallbacks(const DeviceId& deviceId,
- const dht::Value::Id vid = 0)
+ struct PendingCb
+ {
+ std::string name;
+ ConnectCallback cb;
+ };
+ struct PendingOperations {
+ std::map<dht::Value::Id, PendingCb> connecting;
+ std::map<dht::Value::Id, PendingCb> waiting;
+ };
+
+ std::map<DeviceId, PendingOperations> pendingOperations_ {};
+
+ void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock)
{
std::vector<PendingCb> ret;
- std::lock_guard<std::mutex> lk(connectCbsMtx_);
- auto pendingIt = pendingCbs_.find(deviceId);
- if (pendingIt == pendingCbs_.end())
- return ret;
- auto& pendings = pendingIt->second;
+ std::unique_lock<std::mutex> lk(connectCbsMtx_);
+ auto it = pendingOperations_.find(deviceId);
+ if (it == pendingOperations_.end())
+ return;
+ auto& pendingOperations = it->second;
if (vid == 0) {
- ret = std::move(pendings);
- } else {
- for (auto it = pendings.begin(); it != pendings.end(); ++it) {
- if (it->vid == vid) {
- ret.emplace_back(std::move(*it));
- pendings.erase(it);
- break;
- }
+ // Extract all pending callbacks
+ for (auto& [vid, cb] : pendingOperations.connecting)
+ ret.emplace_back(std::move(cb));
+ pendingOperations.connecting.clear();
+ for (auto& [vid, cb] : pendingOperations.waiting)
+ ret.emplace_back(std::move(cb));
+ pendingOperations.waiting.clear();
+ } else if (auto n = pendingOperations.waiting.extract(vid)) {
+ // If it's a waiting operation, just move it
+ ret.emplace_back(std::move(n.mapped()));
+ } else if (auto n = pendingOperations.connecting.extract(vid)) {
+ ret.emplace_back(std::move(n.mapped()));
+ // If sock is nullptr, execute if it's the last connecting operation
+ if (!sock && pendingOperations.connecting.empty()) {
+ for (auto& [vid, cb] : pendingOperations.waiting)
+ ret.emplace_back(std::move(cb));
+ pendingOperations.waiting.clear();
+ for (auto& [vid, cb] : pendingOperations.connecting)
+ ret.emplace_back(std::move(cb));
+ pendingOperations.connecting.clear();
}
}
- if (pendings.empty())
- pendingCbs_.erase(pendingIt);
- return ret;
+ if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
+ pendingOperations_.erase(it);
+ lk.unlock();
+ for (auto& cb : ret)
+ cb.cb(sock, deviceId);
}
- std::vector<PendingCb> getPendingCallbacks(const DeviceId& deviceId,
- const dht::Value::Id vid = 0)
+ std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
{
- std::vector<PendingCb> ret;
+ std::map<dht::Value::Id, std::string> ret;
std::lock_guard<std::mutex> lk(connectCbsMtx_);
- auto pendingIt = pendingCbs_.find(deviceId);
- if (pendingIt == pendingCbs_.end())
+ auto it = pendingOperations_.find(deviceId);
+ if (it == pendingOperations_.end())
return ret;
- auto& pendings = pendingIt->second;
- if (vid == 0) {
- ret = pendings;
- } else {
- std::copy_if(pendings.begin(),
- pendings.end(),
- std::back_inserter(ret),
- [&](auto pending) { return pending.vid == vid; });
+ auto& pendingOp = it->second;
+ for (const auto& [id, pc]: pendingOp.connecting) {
+ if (vid == 0 || id == vid)
+ ret[id] = pc.name;
+ }
+ for (const auto& [id, pc]: pendingOp.waiting) {
+ if (vid == 0 || id == vid)
+ ret[id] = pc.name;
}
return ret;
}
@@ -616,23 +636,24 @@
auto isConnectingToDevice = false;
{
std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
- auto pendingsIt = sthis->pendingCbs_.find(deviceId);
- if (pendingsIt != sthis->pendingCbs_.end()) {
+ auto pendingsIt = sthis->pendingOperations_.find(deviceId);
+ if (pendingsIt != sthis->pendingOperations_.end()) {
const auto& pendings = pendingsIt->second;
- while (std::find_if(pendings.begin(), pendings.end(), [&](const auto& it){ return it.vid == vid; }) != pendings.end()) {
- vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
+ while (pendings.connecting.find(vid) != pendings.connecting.end()
+ && pendings.waiting.find(vid) != pendings.waiting.end()) {
+ vid = ValueIdDist(1, JAMI_ID_MAX_VAL)(sthis->account.rand);
}
}
// Check if already connecting
- isConnectingToDevice = pendingsIt != sthis->pendingCbs_.end();
+ isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
// Save current request for sendChannelRequest.
// Note: do not return here, cause we can be in a state where first
// socket is negotiated and first channel is pending
// so return only after we checked the info
- if (isConnectingToDevice)
- pendingsIt->second.emplace_back(PendingCb {name, std::move(cb), vid});
+ if (isConnectingToDevice && !forceNewSocket)
+ pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
else
- sthis->pendingCbs_[deviceId] = {{name, std::move(cb), vid}};
+ sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
}
// Check if already negotiated
@@ -655,8 +676,7 @@
}
if (noNewSocket) {
// If no new socket is specified, we don't try to generate a new socket
- for (const auto& pending : sthis->extractPendingCallbacks(deviceId, vid))
- pending.cb(nullptr, deviceId);
+ sthis->executePendingOperations(deviceId, vid, nullptr);
return;
}
@@ -665,8 +685,7 @@
auto eraseInfo = [w, cbId] {
if (auto shared = w.lock()) {
// If no new socket is specified, we don't try to generate a new socket
- for (const auto& pending : shared->extractPendingCallbacks(cbId.first, cbId.second))
- pending.cb(nullptr, cbId.first);
+ shared->executePendingOperations(cbId.first, cbId.second, nullptr);
std::lock_guard<std::mutex> lk(shared->infosMtx_);
shared->infos_.erase(cbId);
}
@@ -776,17 +795,15 @@
auto channelSock = sock->addChannel(name);
channelSock->onShutdown([name, deviceId, vid, w = weak()] {
auto shared = w.lock();
- if (shared)
- for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
- pending.cb(nullptr, deviceId);
+ if (auto shared = w.lock())
+ shared->executePendingOperations(deviceId, vid, nullptr);
});
channelSock->onReady(
[wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()]() {
auto shared = w.lock();
auto channelSock = wSock.lock();
if (shared)
- for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
- pending.cb(channelSock, deviceId);
+ shared->executePendingOperations(deviceId, vid, channelSock);
});
ChannelRequest val;
@@ -911,8 +928,7 @@
deviceId,
name,
vid);
- for (const auto& pending : extractPendingCallbacks(deviceId))
- pending.cb(nullptr, deviceId);
+ executePendingOperations(deviceId, vid, nullptr);
}
} else {
// The socket is ready, store it
@@ -934,12 +950,12 @@
// Finally, open the channel and launch pending callbacks
if (info->socket_) {
// Note: do not remove pending there it's done in sendChannelRequest
- for (const auto& pending : getPendingCallbacks(deviceId)) {
+ for (const auto& [id, name] : getPendingIds(deviceId)) {
if (config_->logger)
config_->logger->debug("Send request on TLS socket for channel {} to {}",
- pending.name,
- deviceId);
- sendChannelRequest(info->socket_, pending.name, deviceId, pending.vid);
+ name,
+ deviceId.toString());
+ sendChannelRequest(info->socket_, name, deviceId, id);
}
}
}
@@ -1085,8 +1101,7 @@
auto eraseInfo = [w, id = req.id, deviceId] {
if (auto shared = w.lock()) {
// If no new socket is specified, we don't try to generate a new socket
- for (const auto& pending : shared->extractPendingCallbacks(deviceId, id))
- pending.cb(nullptr, deviceId);
+ shared->executePendingOperations(deviceId, id, nullptr);
if (shared->connReadyCb_)
shared->connReadyCb_(deviceId, "", nullptr);
std::lock_guard<std::mutex> lk(shared->infosMtx_);
@@ -1202,8 +1217,7 @@
}
}
for (const auto& cbId : ids)
- for (const auto& pending : sthis->extractPendingCallbacks(cbId.first, cbId.second))
- pending.cb(nullptr, deviceId);
+ sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
std::lock_guard<std::mutex> lk(sthis->infosMtx_);
sthis->infos_.erase({deviceId, vid});
@@ -1522,8 +1536,8 @@
bool
ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
{
- auto pending = pimpl_->getPendingCallbacks(deviceId);
- return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; })
+ auto pending = pimpl_->getPendingIds(deviceId);
+ return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
!= pending.end();
}
@@ -1549,8 +1563,7 @@
}
// Stop connections to all peers devices
for (const auto& deviceId : peersDevices) {
- for (const auto& pending : pimpl_->extractPendingCallbacks(deviceId))
- pending.cb(nullptr, deviceId);
+ pimpl_->executePendingOperations(deviceId, 0, nullptr);
// This will close the TLS Session
pimpl_->removeUnusedConnections(deviceId);
}