blob: ecfc5051bebbcc66720937754e9f3fb5ab7b7981 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Amna31791e52023-08-03 12:40:57 -040044std::string
45callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
46{
47 return fmt::format("{} {}", did.to_view(), vid);
48}
Adrien Béraud612b55b2023-05-29 10:42:04 -040049
Amna31791e52023-08-03 12:40:57 -040050CallbackId parseCallbackId(std::string_view ci)
51{
52 auto sep = ci.find(' ');
53 std::string_view deviceIdString = ci.substr(0, sep);
54 std::string_view vidString = ci.substr(sep + 1);
55
56 dhtnet::DeviceId deviceId(deviceIdString);
57 dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);
58
59 return CallbackId(deviceId, vid);
60}
Adrien Béraud612b55b2023-05-29 10:42:04 -040061struct ConnectionInfo
62{
63 ~ConnectionInfo()
64 {
65 if (socket_)
66 socket_->join();
67 }
68
69 std::mutex mutex_ {};
70 bool responseReceived_ {false};
71 PeerConnectionRequest response_ {};
72 std::unique_ptr<IceTransport> ice_ {nullptr};
73 // Used to store currently non ready TLS Socket
74 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
75 std::shared_ptr<MultiplexedSocket> socket_ {};
76 std::set<CallbackId> cbIds_ {};
77
78 std::function<void(bool)> onConnected_;
79 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
80};
81
82/**
83 * returns whether or not UPnP is enabled and active_
84 * ie: if it is able to make port mappings
85 */
86bool
87ConnectionManager::Config::getUPnPActive() const
88{
89 if (upnpCtrl)
90 return upnpCtrl->isReady();
91 return false;
92}
93
94class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
95{
96public:
97 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
98 : config_ {std::move(config_)}
Sébastien Blincf569402023-07-27 09:46:40 -040099 , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Adrien Béraud612b55b2023-05-29 10:42:04 -0400100 {}
101 ~Impl() {}
102
103 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
104 const dht::crypto::Identity& identity() const { return config_->id; }
105
106 void removeUnusedConnections(const DeviceId& deviceId = {})
107 {
108 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
109
110 {
111 std::lock_guard<std::mutex> lk(infosMtx_);
112 for (auto it = infos_.begin(); it != infos_.end();) {
113 auto& [key, info] = *it;
114 if (info && (!deviceId || key.first == deviceId)) {
115 unused.emplace_back(std::move(info));
116 it = infos_.erase(it);
117 } else {
118 ++it;
119 }
120 }
121 }
122 for (auto& info: unused) {
123 if (info->tls_)
124 info->tls_->shutdown();
125 if (info->socket_)
126 info->socket_->shutdown();
127 if (info->waitForAnswer_)
128 info->waitForAnswer_->cancel();
129 }
130 if (!unused.empty())
131 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
132 }
133
134 void shutdown()
135 {
136 if (isDestroying_.exchange(true))
137 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400138 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400139 {
140 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400141 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400142 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400143 for (auto& [deviceId, pcbs] : po) {
144 for (auto& [id, pending] : pcbs.connecting)
145 pending.cb(nullptr, deviceId);
146 for (auto& [id, pending] : pcbs.waiting)
147 pending.cb(nullptr, deviceId);
148 }
149
Adrien Béraud612b55b2023-05-29 10:42:04 -0400150 removeUnusedConnections();
151 }
152
Adrien Béraud612b55b2023-05-29 10:42:04 -0400153 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
154 const dht::Value::Id& vid,
155 const std::string& connType,
156 std::function<void(bool)> onConnected);
157 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
158 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
159 const std::string& name,
160 const dht::Value::Id& vid,
161 const std::shared_ptr<dht::crypto::Certificate>& cert);
162 void connectDevice(const DeviceId& deviceId,
163 const std::string& uri,
164 ConnectCallback cb,
165 bool noNewSocket = false,
166 bool forceNewSocket = false,
167 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400168 void connectDevice(const dht::InfoHash& deviceId,
169 const std::string& uri,
170 ConnectCallbackLegacy cb,
171 bool noNewSocket = false,
172 bool forceNewSocket = false,
173 const std::string& connType = "");
174
Adrien Béraud612b55b2023-05-29 10:42:04 -0400175 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
176 const std::string& name,
177 ConnectCallback cb,
178 bool noNewSocket = false,
179 bool forceNewSocket = false,
180 const std::string& connType = "");
181 /**
182 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
183 * @param sock socket used to send the request
184 * @param name channel's name
185 * @param vid channel's id
186 * @param deviceId to identify the linked ConnectCallback
187 */
188 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
189 const std::string& name,
190 const DeviceId& deviceId,
191 const dht::Value::Id& vid);
192 /**
193 * Triggered when a PeerConnectionRequest comes from the DHT
194 */
195 void answerTo(IceTransport& ice,
196 const dht::Value::Id& id,
197 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
198 bool onRequestStartIce(const PeerConnectionRequest& req);
199 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
200 void onDhtPeerRequest(const PeerConnectionRequest& req,
201 const std::shared_ptr<dht::crypto::Certificate>& cert);
202
203 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
204 void onPeerResponse(const PeerConnectionRequest& req);
205 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
206
207 const std::shared_future<tls::DhParams> dhParams() const;
208 tls::CertificateStore& certStore() const { return *config_->certStore; }
209
210 mutable std::mutex messageMutex_ {};
211 std::set<std::string, std::less<>> treatedMessages_ {};
212
213 void loadTreatedMessages();
214 void saveTreatedMessages() const;
215
216 /// \return true if the given DHT message identifier has been treated
217 /// \note if message has not been treated yet this method st/ore this id and returns true at
218 /// further calls
219 bool isMessageTreated(std::string_view id);
220
221 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
222
223 /**
224 * Published IPv4/IPv6 addresses, used only if defined by the user in account
225 * configuration
226 *
227 */
228 IpAddr publishedIp_[2] {};
229
Adrien Béraud612b55b2023-05-29 10:42:04 -0400230 /**
231 * interface name on which this account is bound
232 */
233 std::string interface_ {"default"};
234
235 /**
236 * Get the local interface name on which this account is bound.
237 */
238 const std::string& getLocalInterface() const { return interface_; }
239
240 /**
241 * Get the published IP address, fallbacks to NAT if family is unspecified
242 * Prefers the usage of IPv4 if possible.
243 */
244 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
245
246 /**
247 * Set published IP address according to given family
248 */
249 void setPublishedAddress(const IpAddr& ip_addr);
250
251 /**
252 * Store the local/public addresses used to register
253 */
254 void storeActiveIpAddress(std::function<void()>&& cb = {});
255
256 /**
257 * Create and return ICE options.
258 */
259 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
260 IceTransportOptions getIceOptions() const noexcept;
261
262 /**
263 * Inform that a potential peer device have been found.
264 * Returns true only if the device certificate is a valid device certificate.
265 * In that case (true is returned) the account_id parameter is set to the peer account ID.
266 */
267 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
268 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
269
270 bool findCertificate(const dht::PkId& id,
271 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400272 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400273
274 /**
275 * returns whether or not UPnP is enabled and active
276 * ie: if it is able to make port mappings
277 */
278 bool getUPnPActive() const;
279
280 /**
281 * Triggered when a new TLS socket is ready to use
282 * @param ok If succeed
283 * @param deviceId Related device
284 * @param vid vid of the connection request
285 * @param name non empty if TLS was created by connectDevice()
286 */
287 void onTlsNegotiationDone(bool ok,
288 const DeviceId& deviceId,
289 const dht::Value::Id& vid,
290 const std::string& name = "");
291
292 std::shared_ptr<ConnectionManager::Config> config_;
293
Adrien Béraud612b55b2023-05-29 10:42:04 -0400294 mutable std::mt19937_64 rand;
295
296 iOSConnectedCallback iOSConnectedCb_ {};
297
298 std::mutex infosMtx_ {};
299 // Note: Someone can ask multiple sockets, so to avoid any race condition,
300 // each device can have multiple multiplexed sockets.
301 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
302
303 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
304 {
305 std::lock_guard<std::mutex> lk(infosMtx_);
306 auto it = infos_.find({deviceId, id});
307 if (it != infos_.end())
308 return it->second;
309 return {};
310 }
311
312 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
313 {
314 std::lock_guard<std::mutex> lk(infosMtx_);
315 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
316 auto& [key, value] = item;
317 return key.first == deviceId && value && value->socket_;
318 });
319 if (it != infos_.end())
320 return it->second;
321 return {};
322 }
323
324 ChannelRequestCallback channelReqCb_ {};
325 ConnectionReadyCallback connReadyCb_ {};
326 onICERequestCallback iceReqCb_ {};
327
328 /**
329 * Stores callback from connectDevice
330 * @note: each device needs a vector because several connectDevice can
331 * be done in parallel and we only want one socket
332 */
333 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400334
Adrien Béraud665294f2023-06-13 18:09:11 -0400335 struct PendingCb
336 {
337 std::string name;
338 ConnectCallback cb;
339 };
340 struct PendingOperations {
341 std::map<dht::Value::Id, PendingCb> connecting;
342 std::map<dht::Value::Id, PendingCb> waiting;
343 };
344
345 std::map<DeviceId, PendingOperations> pendingOperations_ {};
346
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400347 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400348 {
349 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400350 std::unique_lock<std::mutex> lk(connectCbsMtx_);
351 auto it = pendingOperations_.find(deviceId);
352 if (it == pendingOperations_.end())
353 return;
354 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400355 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400356 // Extract all pending callbacks
357 for (auto& [vid, cb] : pendingOperations.connecting)
358 ret.emplace_back(std::move(cb));
359 pendingOperations.connecting.clear();
360 for (auto& [vid, cb] : pendingOperations.waiting)
361 ret.emplace_back(std::move(cb));
362 pendingOperations.waiting.clear();
363 } else if (auto n = pendingOperations.waiting.extract(vid)) {
364 // If it's a waiting operation, just move it
365 ret.emplace_back(std::move(n.mapped()));
366 } else if (auto n = pendingOperations.connecting.extract(vid)) {
367 ret.emplace_back(std::move(n.mapped()));
368 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400369 // If accepted is false, it means that underlying socket is ok, but channel is declined
370 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400371 for (auto& [vid, cb] : pendingOperations.waiting)
372 ret.emplace_back(std::move(cb));
373 pendingOperations.waiting.clear();
374 for (auto& [vid, cb] : pendingOperations.connecting)
375 ret.emplace_back(std::move(cb));
376 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400377 }
378 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400379 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
380 pendingOperations_.erase(it);
381 lk.unlock();
382 for (auto& cb : ret)
383 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400384 }
385
Adrien Béraud665294f2023-06-13 18:09:11 -0400386 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400387 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400388 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400389 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400390 auto it = pendingOperations_.find(deviceId);
391 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400392 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400393 auto& pendingOp = it->second;
394 for (const auto& [id, pc]: pendingOp.connecting) {
395 if (vid == 0 || id == vid)
396 ret[id] = pc.name;
397 }
398 for (const auto& [id, pc]: pendingOp.waiting) {
399 if (vid == 0 || id == vid)
400 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400401 }
402 return ret;
403 }
404
405 std::shared_ptr<ConnectionManager::Impl> shared()
406 {
407 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
408 }
409 std::shared_ptr<ConnectionManager::Impl const> shared() const
410 {
411 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
412 }
413 std::weak_ptr<ConnectionManager::Impl> weak()
414 {
415 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
416 }
417 std::weak_ptr<ConnectionManager::Impl const> weak() const
418 {
419 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
420 }
421
422 std::atomic_bool isDestroying_ {false};
423};
424
425void
426ConnectionManager::Impl::connectDeviceStartIce(
427 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
428 const dht::Value::Id& vid,
429 const std::string& connType,
430 std::function<void(bool)> onConnected)
431{
432 auto deviceId = devicePk->getLongId();
433 auto info = getInfo(deviceId, vid);
434 if (!info) {
435 onConnected(false);
436 return;
437 }
438
439 std::unique_lock<std::mutex> lk(info->mutex_);
440 auto& ice = info->ice_;
441
442 if (!ice) {
443 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400444 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400445 onConnected(false);
446 return;
447 }
448
449 auto iceAttributes = ice->getLocalAttributes();
450 std::ostringstream icemsg;
451 icemsg << iceAttributes.ufrag << "\n";
452 icemsg << iceAttributes.pwd << "\n";
453 for (const auto& addr : ice->getLocalCandidates(1)) {
454 icemsg << addr << "\n";
455 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400456 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400457 }
458
459 // Prepare connection request as a DHT message
460 PeerConnectionRequest val;
461
462 val.id = vid; /* Random id for the message unicity */
463 val.ice_msg = icemsg.str();
464 val.connType = connType;
465
466 auto value = std::make_shared<dht::Value>(std::move(val));
467 value->user_type = "peer_request";
468
469 // Send connection request through DHT
470 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400471 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400472 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
473 + devicePk->getId().toString()),
474 devicePk,
475 value,
476 [l=config_->logger,deviceId](bool ok) {
477 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400478 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400479 deviceId,
480 (ok ? "ok" : "failed"));
481 });
482 // Wait for call to onResponse() operated by DHT
483 if (isDestroying_) {
484 onConnected(true); // This avoid to wait new negotiation when destroying
485 return;
486 }
487
488 info->onConnected_ = std::move(onConnected);
489 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
490 std::chrono::steady_clock::now()
491 + DHT_MSG_TIMEOUT);
492 info->waitForAnswer_->async_wait(
493 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
494}
495
496void
497ConnectionManager::Impl::onResponse(const asio::error_code& ec,
498 const DeviceId& deviceId,
499 const dht::Value::Id& vid)
500{
501 if (ec == asio::error::operation_aborted)
502 return;
503 auto info = getInfo(deviceId, vid);
504 if (!info)
505 return;
506
507 std::unique_lock<std::mutex> lk(info->mutex_);
508 auto& ice = info->ice_;
509 if (isDestroying_) {
510 info->onConnected_(true); // The destructor can wake a pending wait here.
511 return;
512 }
513 if (!info->responseReceived_) {
514 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400515 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400516 info->onConnected_(false);
517 return;
518 }
519
520 if (!info->ice_) {
521 info->onConnected_(false);
522 return;
523 }
524
525 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
526
527 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
528 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400529 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400530 info->onConnected_(false);
531 return;
532 }
533 info->onConnected_(true);
534}
535
536bool
537ConnectionManager::Impl::connectDeviceOnNegoDone(
538 const DeviceId& deviceId,
539 const std::string& name,
540 const dht::Value::Id& vid,
541 const std::shared_ptr<dht::crypto::Certificate>& cert)
542{
543 auto info = getInfo(deviceId, vid);
544 if (!info)
545 return false;
546
547 std::unique_lock<std::mutex> lk {info->mutex_};
548 if (info->waitForAnswer_) {
549 // Negotiation is done and connected, go to handshake
550 // and avoid any cancellation at this point.
551 info->waitForAnswer_->cancel();
552 }
553 auto& ice = info->ice_;
554 if (!ice || !ice->isRunning()) {
555 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400556 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400557 return false;
558 }
559
560 // Build socket
561 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
562 std::move(ice)),
563 true);
564
565 // Negotiate a TLS session
566 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400567 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400568 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
569 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400570 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400571 identity(),
572 dhParams(),
573 *cert);
574
575 info->tls_->setOnReady(
576 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
577 bool ok) {
578 if (auto shared = w.lock())
579 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
580 });
581 return true;
582}
583
584void
585ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
586 const std::string& name,
587 ConnectCallback cb,
588 bool noNewSocket,
589 bool forceNewSocket,
590 const std::string& connType)
591{
592 if (!dht()) {
593 cb(nullptr, deviceId);
594 return;
595 }
596 if (deviceId.toString() == identity().second->getLongId().toString()) {
597 cb(nullptr, deviceId);
598 return;
599 }
600 findCertificate(deviceId,
601 [w = weak(),
602 deviceId,
603 name,
604 cb = std::move(cb),
605 noNewSocket,
606 forceNewSocket,
607 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
608 if (!cert) {
609 if (auto shared = w.lock())
610 if (shared->config_->logger)
611 shared->config_->logger->error(
612 "No valid certificate found for device {}",
613 deviceId);
614 cb(nullptr, deviceId);
615 return;
616 }
617 if (auto shared = w.lock()) {
618 shared->connectDevice(cert,
619 name,
620 std::move(cb),
621 noNewSocket,
622 forceNewSocket,
623 connType);
624 } else
625 cb(nullptr, deviceId);
626 });
627}
628
629void
Amna0cf544d2023-07-25 14:25:09 -0400630ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
631 const std::string& name,
632 ConnectCallbackLegacy cb,
633 bool noNewSocket,
634 bool forceNewSocket,
635 const std::string& connType)
636{
637 if (!dht()) {
638 cb(nullptr, deviceId);
639 return;
640 }
641 if (deviceId.toString() == identity().second->getLongId().toString()) {
642 cb(nullptr, deviceId);
643 return;
644 }
645 findCertificate(deviceId,
646 [w = weak(),
647 deviceId,
648 name,
649 cb = std::move(cb),
650 noNewSocket,
651 forceNewSocket,
652 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
653 if (!cert) {
654 if (auto shared = w.lock())
655 if (shared->config_->logger)
656 shared->config_->logger->error(
657 "No valid certificate found for device {}",
658 deviceId);
659 cb(nullptr, deviceId);
660 return;
661 }
662 if (auto shared = w.lock()) {
663 shared->connectDevice(cert,
664 name,
Adrien Béraudd78d1ac2023-08-25 10:43:33 -0400665 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){
Amna0cf544d2023-07-25 14:25:09 -0400666 cb(sock, deviceId);
667 },
668 noNewSocket,
669 forceNewSocket,
670 connType);
671 } else
672 cb(nullptr, deviceId);
673 });
674}
675
676void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400677ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
678 const std::string& name,
679 ConnectCallback cb,
680 bool noNewSocket,
681 bool forceNewSocket,
682 const std::string& connType)
683{
684 // Avoid dht operation in a DHT callback to avoid deadlocks
685 dht::ThreadPool::computation().run([w = weak(),
686 name = std::move(name),
687 cert = std::move(cert),
688 cb = std::move(cb),
689 noNewSocket,
690 forceNewSocket,
691 connType] {
692 auto devicePk = cert->getSharedPublicKey();
693 auto deviceId = devicePk->getLongId();
694 auto sthis = w.lock();
695 if (!sthis || sthis->isDestroying_) {
696 cb(nullptr, deviceId);
697 return;
698 }
699 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
700 auto isConnectingToDevice = false;
701 {
702 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400703 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
704 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400705 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400706 while (pendings.connecting.find(vid) != pendings.connecting.end()
707 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400708 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400709 }
710 }
711 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400712 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400713 // Save current request for sendChannelRequest.
714 // Note: do not return here, cause we can be in a state where first
715 // socket is negotiated and first channel is pending
716 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400717 if (isConnectingToDevice && !forceNewSocket)
718 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400719 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400720 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400721 }
722
723 // Check if already negotiated
724 CallbackId cbId(deviceId, vid);
725 if (auto info = sthis->getConnectedInfo(deviceId)) {
726 std::lock_guard<std::mutex> lk(info->mutex_);
727 if (info->socket_) {
728 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400729 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400730 info->cbIds_.emplace(cbId);
731 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
732 return;
733 }
734 }
735
736 if (isConnectingToDevice && !forceNewSocket) {
737 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400738 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400739 return;
740 }
741 if (noNewSocket) {
742 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400743 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400744 return;
745 }
746
747 // Note: used when the ice negotiation fails to erase
748 // all stored structures.
749 auto eraseInfo = [w, cbId] {
750 if (auto shared = w.lock()) {
751 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400752 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400753 std::lock_guard<std::mutex> lk(shared->infosMtx_);
754 shared->infos_.erase(cbId);
755 }
756 };
757
758 // If no socket exists, we need to initiate an ICE connection.
759 sthis->getIceOptions([w,
760 deviceId = std::move(deviceId),
761 devicePk = std::move(devicePk),
762 name = std::move(name),
763 cert = std::move(cert),
764 vid,
765 connType,
766 eraseInfo](auto&& ice_config) {
767 auto sthis = w.lock();
768 if (!sthis) {
769 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
770 return;
771 }
772 ice_config.tcpEnable = true;
773 ice_config.onInitDone = [w,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400774 devicePk = std::move(devicePk),
775 name = std::move(name),
776 cert = std::move(cert),
777 vid,
778 connType,
779 eraseInfo](bool ok) {
780 dht::ThreadPool::io().run([w = std::move(w),
781 devicePk = std::move(devicePk),
782 vid = std::move(vid),
783 eraseInfo,
784 connType, ok] {
785 auto sthis = w.lock();
786 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400787 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400788 if (!sthis || !ok) {
789 eraseInfo();
790 return;
791 }
792 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
793 if (!ok) {
794 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
795 }
796 });
797 });
798 };
799 ice_config.onNegoDone = [w,
800 deviceId,
801 name,
802 cert = std::move(cert),
803 vid,
804 eraseInfo](bool ok) {
805 dht::ThreadPool::io().run([w = std::move(w),
806 deviceId = std::move(deviceId),
807 name = std::move(name),
808 cert = std::move(cert),
809 vid = std::move(vid),
810 eraseInfo = std::move(eraseInfo),
811 ok] {
812 auto sthis = w.lock();
813 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400814 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400815 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
816 eraseInfo();
817 });
818 };
819
820 auto info = std::make_shared<ConnectionInfo>();
821 {
822 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
823 sthis->infos_[{deviceId, vid}] = info;
824 }
825 std::unique_lock<std::mutex> lk {info->mutex_};
826 ice_config.master = false;
827 ice_config.streamsCount = 1;
828 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400829 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400830 if (!info->ice_) {
831 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400832 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400833 eraseInfo();
834 return;
835 }
836 // We need to detect any shutdown if the ice session is destroyed before going to the
837 // TLS session;
838 info->ice_->setOnShutdown([eraseInfo]() {
839 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
840 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400841 try {
842 info->ice_->initIceInstance(ice_config);
843 } catch (const std::exception& e) {
844 if (sthis->config_->logger)
845 sthis->config_->logger->error("{}", e.what());
846 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
847 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400848 });
849 });
850}
851
852void
853ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
854 const std::string& name,
855 const DeviceId& deviceId,
856 const dht::Value::Id& vid)
857{
858 auto channelSock = sock->addChannel(name);
859 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
860 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400861 if (auto shared = w.lock())
862 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400863 });
864 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400865 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400866 auto shared = w.lock();
867 auto channelSock = wSock.lock();
868 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400869 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400870 });
871
872 ChannelRequest val;
873 val.name = channelSock->name();
874 val.state = ChannelRequestState::REQUEST;
875 val.channel = channelSock->channel();
876 msgpack::sbuffer buffer(256);
877 msgpack::pack(buffer, val);
878
879 std::error_code ec;
880 int res = sock->write(CONTROL_CHANNEL,
881 reinterpret_cast<const uint8_t*>(buffer.data()),
882 buffer.size(),
883 ec);
884 if (res < 0) {
885 // TODO check if we should handle errors here
886 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400887 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400888 }
889}
890
891void
892ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
893{
894 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400895 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400896 if (config_->logger)
897 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400898 std::lock_guard<std::mutex> lk {info->mutex_};
899 info->responseReceived_ = true;
900 info->response_ = std::move(req);
901 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
902 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
903 this,
904 std::placeholders::_1,
905 device,
906 req.id));
907 } else {
908 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400909 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400910 }
911}
912
913void
914ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
915{
916 if (!dht())
917 return;
918 dht()->listen<PeerConnectionRequest>(
919 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
920 [w = weak()](PeerConnectionRequest&& req) {
921 auto shared = w.lock();
922 if (!shared)
923 return false;
924 if (shared->isMessageTreated(to_hex_string(req.id))) {
925 // Message already treated. Just ignore
926 return true;
927 }
928 if (req.isAnswer) {
929 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400930 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400931 } else {
932 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400933 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400934 }
935 if (req.isAnswer) {
936 shared->onPeerResponse(req);
937 } else {
938 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400939 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400940 req.from,
941 [w, req = std::move(req)](
942 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
943 auto shared = w.lock();
944 if (!shared)
945 return;
946 dht::InfoHash peer_h;
947 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
948#if TARGET_OS_IOS
949 if (shared->iOSConnectedCb_(req.connType, peer_h))
950 return;
951#endif
952 shared->onDhtPeerRequest(req, cert);
953 } else {
954 if (shared->config_->logger)
955 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -0400956 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400957 req.owner->getLongId());
958 }
959 });
960 }
961
962 return true;
963 },
964 dht::Value::UserTypeFilter("peer_request"));
965}
966
967void
968ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
969 const DeviceId& deviceId,
970 const dht::Value::Id& vid,
971 const std::string& name)
972{
973 if (isDestroying_)
974 return;
975 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
976 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
977 // asked yet)
978 auto isDhtRequest = name.empty();
979 if (!ok) {
980 if (isDhtRequest) {
981 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400982 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400983 deviceId,
984 name,
985 vid);
986 if (connReadyCb_)
987 connReadyCb_(deviceId, "", nullptr);
988 } else {
989 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400990 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400991 deviceId,
992 name,
993 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400994 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400995 }
996 } else {
997 // The socket is ready, store it
998 if (isDhtRequest) {
999 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001000 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001001 deviceId,
1002 vid);
1003 } else {
1004 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001005 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001006 deviceId,
1007 name,
1008 vid);
1009 }
1010
1011 auto info = getInfo(deviceId, vid);
1012 addNewMultiplexedSocket({deviceId, vid}, info);
1013 // Finally, open the channel and launch pending callbacks
1014 if (info->socket_) {
1015 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001016 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001017 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001018 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001019 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001020 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001021 }
1022 }
1023 }
1024}
1025
1026void
1027ConnectionManager::Impl::answerTo(IceTransport& ice,
1028 const dht::Value::Id& id,
1029 const std::shared_ptr<dht::crypto::PublicKey>& from)
1030{
1031 // NOTE: This is a shortest version of a real SDP message to save some bits
1032 auto iceAttributes = ice.getLocalAttributes();
1033 std::ostringstream icemsg;
1034 icemsg << iceAttributes.ufrag << "\n";
1035 icemsg << iceAttributes.pwd << "\n";
1036 for (const auto& addr : ice.getLocalCandidates(1)) {
1037 icemsg << addr << "\n";
1038 }
1039
1040 // Send PeerConnection response
1041 PeerConnectionRequest val;
1042 val.id = id;
1043 val.ice_msg = icemsg.str();
1044 val.isAnswer = true;
1045 auto value = std::make_shared<dht::Value>(std::move(val));
1046 value->user_type = "peer_request";
1047
1048 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001049 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001050 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1051 + from->getId().toString()),
1052 from,
1053 value,
1054 [from,l=config_->logger](bool ok) {
1055 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001056 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001057 from->getLongId(),
1058 (ok ? "ok" : "failed"));
1059 });
1060}
1061
1062bool
1063ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1064{
1065 auto deviceId = req.owner->getLongId();
1066 auto info = getInfo(deviceId, req.id);
1067 if (!info)
1068 return false;
1069
1070 std::unique_lock<std::mutex> lk {info->mutex_};
1071 auto& ice = info->ice_;
1072 if (!ice) {
1073 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001074 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001075 if (connReadyCb_)
1076 connReadyCb_(deviceId, "", nullptr);
1077 return false;
1078 }
1079
1080 auto sdp = ice->parseIceCandidates(req.ice_msg);
1081 answerTo(*ice, req.id, req.owner);
1082 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1083 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001084 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001085 ice = nullptr;
1086 if (connReadyCb_)
1087 connReadyCb_(deviceId, "", nullptr);
1088 return false;
1089 }
1090 return true;
1091}
1092
1093bool
1094ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1095{
1096 auto deviceId = req.owner->getLongId();
1097 auto info = getInfo(deviceId, req.id);
1098 if (!info)
1099 return false;
1100
1101 std::unique_lock<std::mutex> lk {info->mutex_};
1102 auto& ice = info->ice_;
1103 if (!ice) {
1104 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001105 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001106 return false;
1107 }
1108
1109 // Build socket
1110 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1111 std::move(ice)),
1112 false);
1113
1114 // init TLS session
1115 auto ph = req.from;
1116 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001117 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1118 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001119 req.id);
1120 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1121 std::move(endpoint),
1122 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001123 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001124 identity(),
1125 dhParams(),
Adrien Béraud9efbd442023-08-27 12:38:07 -04001126 [ph, deviceId, w=weak(), l=config_->logger](const dht::crypto::Certificate& cert) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001127 auto shared = w.lock();
1128 if (!shared)
1129 return false;
Adrien Béraud9efbd442023-08-27 12:38:07 -04001130 if (cert.getPublicKey().getId() != ph
1131 || deviceId != cert.getPublicKey().getLongId()) {
1132 if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.",
1133 deviceId,
1134 cert.getPublicKey().getLongId());
1135 return false;
1136 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001137 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1138 if (!crt)
1139 return false;
1140 return crt->getPacked() == cert.getPacked();
1141 });
1142
1143 info->tls_->setOnReady(
1144 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1145 if (auto shared = w.lock())
1146 shared->onTlsNegotiationDone(ok, deviceId, vid);
1147 });
1148 return true;
1149}
1150
1151void
1152ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1153 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1154{
1155 auto deviceId = req.owner->getLongId();
1156 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001157 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001158 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1159 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001160 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001161 return;
1162 }
1163
1164 // Because the connection is accepted, create an ICE socket.
1165 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1166 auto shared = w.lock();
1167 if (!shared)
1168 return;
1169 // Note: used when the ice negotiation fails to erase
1170 // all stored structures.
1171 auto eraseInfo = [w, id = req.id, deviceId] {
1172 if (auto shared = w.lock()) {
1173 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001174 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001175 if (shared->connReadyCb_)
1176 shared->connReadyCb_(deviceId, "", nullptr);
1177 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1178 shared->infos_.erase({deviceId, id});
1179 }
1180 };
1181
1182 ice_config.tcpEnable = true;
1183 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1184 auto shared = w.lock();
1185 if (!shared)
1186 return;
1187 if (!ok) {
1188 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001189 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001190 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1191 return;
1192 }
1193
1194 dht::ThreadPool::io().run(
1195 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1196 auto shared = w.lock();
1197 if (!shared)
1198 return;
1199 if (!shared->onRequestStartIce(req))
1200 eraseInfo();
1201 });
1202 };
1203
1204 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1205 auto shared = w.lock();
1206 if (!shared)
1207 return;
1208 if (!ok) {
1209 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001210 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001211 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1212 return;
1213 }
1214
1215 dht::ThreadPool::io().run(
1216 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1217 if (auto shared = w.lock())
1218 if (!shared->onRequestOnNegoDone(req))
1219 eraseInfo();
1220 });
1221 };
1222
1223 // Negotiate a new ICE socket
1224 auto info = std::make_shared<ConnectionInfo>();
1225 {
1226 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1227 shared->infos_[{deviceId, req.id}] = info;
1228 }
1229 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001230 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001231 std::unique_lock<std::mutex> lk {info->mutex_};
1232 ice_config.streamsCount = 1;
1233 ice_config.compCountPerStream = 1; // TCP
1234 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001235 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001236 if (not info->ice_) {
1237 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001238 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001239 eraseInfo();
1240 return;
1241 }
1242 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1243 info->ice_->setOnShutdown([eraseInfo]() {
1244 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1245 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001246 try {
1247 info->ice_->initIceInstance(ice_config);
1248 } catch (const std::exception& e) {
1249 if (shared->config_->logger)
1250 shared->config_->logger->error("{}", e.what());
1251 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1252 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001253 });
1254}
1255
1256void
1257ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1258{
1259 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1260 info->socket_->setOnReady(
1261 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1262 if (auto sthis = w.lock())
1263 if (sthis->connReadyCb_)
1264 sthis->connReadyCb_(deviceId, socket->name(), socket);
1265 });
1266 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1267 const uint16_t&,
1268 const std::string& name) {
1269 if (auto sthis = w.lock())
1270 if (sthis->channelReqCb_)
1271 return sthis->channelReqCb_(peer, name);
1272 return false;
1273 });
1274 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1275 // Cancel current outgoing connections
1276 dht::ThreadPool::io().run([w, deviceId, vid] {
1277 auto sthis = w.lock();
1278 if (!sthis)
1279 return;
1280
1281 std::set<CallbackId> ids;
1282 if (auto info = sthis->getInfo(deviceId, vid)) {
1283 std::lock_guard<std::mutex> lk(info->mutex_);
1284 if (info->socket_) {
1285 ids = std::move(info->cbIds_);
1286 info->socket_->shutdown();
1287 }
1288 }
1289 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001290 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001291
1292 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1293 sthis->infos_.erase({deviceId, vid});
1294 });
1295 });
1296}
1297
1298const std::shared_future<tls::DhParams>
1299ConnectionManager::Impl::dhParams() const
1300{
1301 return dht::ThreadPool::computation().get<tls::DhParams>(
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001302 std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001303}
1304
1305template<typename ID = dht::Value::Id>
1306std::set<ID, std::less<>>
1307loadIdList(const std::string& path)
1308{
1309 std::set<ID, std::less<>> ids;
1310 std::ifstream file = fileutils::ifstream(path);
1311 if (!file.is_open()) {
1312 //JAMI_DBG("Could not load %s", path.c_str());
1313 return ids;
1314 }
1315 std::string line;
1316 while (std::getline(file, line)) {
1317 if constexpr (std::is_same<ID, std::string>::value) {
1318 ids.emplace(std::move(line));
1319 } else if constexpr (std::is_integral<ID>::value) {
1320 ID vid;
1321 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1322 ec == std::errc()) {
1323 ids.emplace(vid);
1324 }
1325 }
1326 }
1327 return ids;
1328}
1329
1330template<typename List = std::set<dht::Value::Id>>
1331void
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001332saveIdList(const std::filesystem::path& path, const List& ids)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001333{
1334 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1335 if (!file.is_open()) {
1336 //JAMI_ERR("Could not save to %s", path.c_str());
1337 return;
1338 }
1339 for (auto& c : ids)
1340 file << std::hex << c << "\n";
1341}
1342
1343void
1344ConnectionManager::Impl::loadTreatedMessages()
1345{
1346 std::lock_guard<std::mutex> lock(messageMutex_);
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001347 auto path = config_->cachePath / "treatedMessages";
Adrien Béraud612b55b2023-05-29 10:42:04 -04001348 treatedMessages_ = loadIdList<std::string>(path);
1349 if (treatedMessages_.empty()) {
1350 auto messages = loadIdList(path);
1351 for (const auto& m : messages)
1352 treatedMessages_.emplace(to_hex_string(m));
1353 }
1354}
1355
1356void
1357ConnectionManager::Impl::saveTreatedMessages() const
1358{
1359 dht::ThreadPool::io().run([w = weak()]() {
1360 if (auto sthis = w.lock()) {
1361 auto& this_ = *sthis;
1362 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1363 fileutils::check_dir(this_.config_->cachePath.c_str());
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001364 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001365 this_.treatedMessages_);
1366 }
1367 });
1368}
1369
1370bool
1371ConnectionManager::Impl::isMessageTreated(std::string_view id)
1372{
1373 std::lock_guard<std::mutex> lock(messageMutex_);
1374 auto res = treatedMessages_.emplace(id);
1375 if (res.second) {
1376 saveTreatedMessages();
1377 return false;
1378 }
1379 return true;
1380}
1381
1382/**
1383 * returns whether or not UPnP is enabled and active_
1384 * ie: if it is able to make port mappings
1385 */
1386bool
1387ConnectionManager::Impl::getUPnPActive() const
1388{
1389 return config_->getUPnPActive();
1390}
1391
1392IpAddr
1393ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1394{
1395 if (family == AF_INET)
1396 return publishedIp_[0];
1397 if (family == AF_INET6)
1398 return publishedIp_[1];
1399
1400 assert(family == AF_UNSPEC);
1401
1402 // If family is not set, prefere IPv4 if available. It's more
1403 // likely to succeed behind NAT.
1404 if (publishedIp_[0])
1405 return publishedIp_[0];
1406 if (publishedIp_[1])
1407 return publishedIp_[1];
1408 return {};
1409}
1410
1411void
1412ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1413{
1414 if (ip_addr.getFamily() == AF_INET) {
1415 publishedIp_[0] = ip_addr;
1416 } else {
1417 publishedIp_[1] = ip_addr;
1418 }
1419}
1420
1421void
1422ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1423{
1424 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1425 bool hasIpv4 {false}, hasIpv6 {false};
1426 for (auto& result : results) {
1427 auto family = result.getFamily();
1428 if (family == AF_INET) {
1429 if (not hasIpv4) {
1430 hasIpv4 = true;
1431 if (config_->logger)
1432 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1433 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1434 setPublishedAddress(*result.get());
1435 if (config_->upnpCtrl) {
1436 config_->upnpCtrl->setPublicAddress(*result.get());
1437 }
1438 }
1439 } else if (family == AF_INET6) {
1440 if (not hasIpv6) {
1441 hasIpv6 = true;
1442 if (config_->logger)
1443 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1444 setPublishedAddress(*result.get());
1445 }
1446 }
1447 if (hasIpv4 and hasIpv6)
1448 break;
1449 }
1450 if (cb)
1451 cb();
1452 });
1453}
1454
1455void
1456ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1457{
1458 storeActiveIpAddress([this, cb = std::move(cb)] {
1459 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1460 auto publishedAddr = getPublishedIpAddress();
1461
1462 if (publishedAddr) {
1463 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1464 publishedAddr.getFamily());
1465 if (interfaceAddr) {
1466 opts.accountLocalAddr = interfaceAddr;
1467 opts.accountPublicAddr = publishedAddr;
1468 }
1469 }
1470 if (cb)
1471 cb(std::move(opts));
1472 });
1473}
1474
1475IceTransportOptions
1476ConnectionManager::Impl::getIceOptions() const noexcept
1477{
1478 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001479 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001480 opts.upnpEnable = getUPnPActive();
Adrien Béraud7b869d92023-08-21 09:02:35 -04001481 opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001482
1483 if (config_->stunEnabled)
1484 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1485 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001486 if (config_->turnCache) {
1487 auto turnAddr = config_->turnCache->getResolvedTurn();
1488 if (turnAddr != std::nullopt) {
1489 opts.turnServers.emplace_back(TurnServerInfo()
1490 .setUri(turnAddr->toString())
1491 .setUsername(config_->turnServerUserName)
1492 .setPassword(config_->turnServerPwd)
1493 .setRealm(config_->turnServerRealm));
1494 }
1495 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001496 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001497 .setUri(config_->turnServer)
1498 .setUsername(config_->turnServerUserName)
1499 .setPassword(config_->turnServerPwd)
1500 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001501 }
1502 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1503 // co issues. So this needs some debug. for now just disable
1504 // if (cacheTurnV6 && *cacheTurnV6) {
1505 // opts.turnServers.emplace_back(TurnServerInfo()
1506 // .setUri(cacheTurnV6->toString(true))
1507 // .setUsername(turnServerUserName_)
1508 // .setPassword(turnServerPwd_)
1509 // .setRealm(turnServerRealm_));
1510 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001511 }
1512 return opts;
1513}
1514
1515bool
1516ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1517 dht::InfoHash& account_id,
1518 const std::shared_ptr<Logger>& logger)
1519{
1520 if (not crt)
1521 return false;
1522
1523 auto top_issuer = crt;
1524 while (top_issuer->issuer)
1525 top_issuer = top_issuer->issuer;
1526
1527 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001528 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001529 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001530 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001531 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001532 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001533
1534 // Check peer certificate chain
1535 // Trust store with top issuer as the only CA
1536 dht::crypto::TrustList peer_trust;
1537 peer_trust.add(*top_issuer);
1538 if (not peer_trust.verify(*crt)) {
1539 if (logger)
1540 logger->warn("Found invalid peer device: {}", crt->getLongId());
1541 return false;
1542 }
1543
1544 // Check cached OCSP response
1545 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1546 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001547 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001548 return false;
1549 }
1550
Adrien Béraudc631a832023-07-26 22:19:00 -04001551 account_id = crt->issuer->getId();
1552 if (logger)
1553 logger->warn("Found peer device: {} account:{} CA:{}",
1554 crt->getLongId(),
1555 account_id,
1556 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001557 return true;
1558}
1559
1560bool
1561ConnectionManager::Impl::findCertificate(
1562 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1563{
1564 if (auto cert = certStore().getCertificate(id.toString())) {
1565 if (cb)
1566 cb(cert);
1567 } else if (cb)
1568 cb(nullptr);
1569 return true;
1570}
1571
Sébastien Blin34086512023-07-25 09:52:14 -04001572bool
1573ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1574 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1575{
1576 if (auto cert = certStore().getCertificate(h.toString())) {
1577 if (cb)
1578 cb(cert);
1579 } else {
1580 dht()->findCertificate(h,
1581 [cb = std::move(cb), this](
1582 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1583 if (crt)
1584 certStore().pinCertificate(crt);
1585 if (cb)
1586 cb(crt);
1587 });
1588 }
1589 return true;
1590}
1591
Adrien Béraud612b55b2023-05-29 10:42:04 -04001592ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1593 : pimpl_ {std::make_shared<Impl>(config_)}
1594{}
1595
1596ConnectionManager::~ConnectionManager()
1597{
1598 if (pimpl_)
1599 pimpl_->shutdown();
1600}
1601
1602void
1603ConnectionManager::connectDevice(const DeviceId& deviceId,
1604 const std::string& name,
1605 ConnectCallback cb,
1606 bool noNewSocket,
1607 bool forceNewSocket,
1608 const std::string& connType)
1609{
1610 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1611}
1612
1613void
Amna0cf544d2023-07-25 14:25:09 -04001614ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1615 const std::string& name,
1616 ConnectCallbackLegacy cb,
1617 bool noNewSocket,
1618 bool forceNewSocket,
1619 const std::string& connType)
1620{
1621 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1622}
1623
1624
1625void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001626ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1627 const std::string& name,
1628 ConnectCallback cb,
1629 bool noNewSocket,
1630 bool forceNewSocket,
1631 const std::string& connType)
1632{
1633 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1634}
1635
1636bool
1637ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1638{
Adrien Béraud665294f2023-06-13 18:09:11 -04001639 auto pending = pimpl_->getPendingIds(deviceId);
1640 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001641 != pending.end();
1642}
1643
1644void
1645ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1646{
1647 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1648 std::set<DeviceId> peersDevices;
1649 {
1650 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1651 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1652 auto const& [key, value] = *iter;
1653 auto deviceId = key.first;
1654 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1655 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1656 connInfos.emplace_back(value);
1657 peersDevices.emplace(deviceId);
1658 iter = pimpl_->infos_.erase(iter);
1659 } else {
1660 iter++;
1661 }
1662 }
1663 }
1664 // Stop connections to all peers devices
1665 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001666 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001667 // This will close the TLS Session
1668 pimpl_->removeUnusedConnections(deviceId);
1669 }
1670 for (auto& info : connInfos) {
1671 if (info->socket_)
1672 info->socket_->shutdown();
1673 if (info->waitForAnswer_)
1674 info->waitForAnswer_->cancel();
1675 if (info->ice_) {
1676 std::unique_lock<std::mutex> lk {info->mutex_};
1677 dht::ThreadPool::io().run(
1678 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1679 }
1680 }
1681}
1682
1683void
1684ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1685{
1686 pimpl_->onDhtConnected(devicePk);
1687}
1688
1689void
1690ConnectionManager::onICERequest(onICERequestCallback&& cb)
1691{
1692 pimpl_->iceReqCb_ = std::move(cb);
1693}
1694
1695void
1696ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1697{
1698 pimpl_->channelReqCb_ = std::move(cb);
1699}
1700
1701void
1702ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1703{
1704 pimpl_->connReadyCb_ = std::move(cb);
1705}
1706
1707void
1708ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1709{
1710 pimpl_->iOSConnectedCb_ = std::move(cb);
1711}
1712
1713std::size_t
1714ConnectionManager::activeSockets() const
1715{
1716 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1717 return pimpl_->infos_.size();
1718}
1719
1720void
1721ConnectionManager::monitor() const
1722{
1723 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1724 auto logger = pimpl_->config_->logger;
1725 if (!logger)
1726 return;
1727 logger->debug("ConnectionManager current status:");
1728 for (const auto& [_, ci] : pimpl_->infos_) {
1729 if (ci->socket_)
1730 ci->socket_->monitor();
1731 }
1732 logger->debug("ConnectionManager end status.");
1733}
1734
1735void
1736ConnectionManager::connectivityChanged()
1737{
1738 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1739 for (const auto& [_, ci] : pimpl_->infos_) {
1740 if (ci->socket_)
1741 ci->socket_->sendBeacon();
1742 }
1743}
1744
1745void
1746ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1747{
1748 return pimpl_->getIceOptions(std::move(cb));
1749}
1750
1751IceTransportOptions
1752ConnectionManager::getIceOptions() const noexcept
1753{
1754 return pimpl_->getIceOptions();
1755}
1756
1757IpAddr
1758ConnectionManager::getPublishedIpAddress(uint16_t family) const
1759{
1760 return pimpl_->getPublishedIpAddress(family);
1761}
1762
1763void
1764ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1765{
1766 return pimpl_->setPublishedAddress(ip_addr);
1767}
1768
1769void
1770ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1771{
1772 return pimpl_->storeActiveIpAddress(std::move(cb));
1773}
1774
1775std::shared_ptr<ConnectionManager::Config>
1776ConnectionManager::getConfig()
1777{
1778 return pimpl_->config_;
1779}
1780
Amna31791e52023-08-03 12:40:57 -04001781std::vector<std::map<std::string, std::string>>
1782ConnectionManager::getConnectionList(const DeviceId& device) const
1783{
1784 std::vector<std::map<std::string, std::string>> connectionsList;
1785 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1786
1787 for (const auto& [key, ci] : pimpl_->infos_) {
1788 if (device && key.first != device)
1789 continue;
1790 std::map<std::string, std::string> connectionInfo;
1791 connectionInfo["id"] = callbackIdToString(key.first, key.second);
Amna82420202023-08-15 16:27:18 -04001792 connectionInfo["device"] = key.first.toString();
Amna6c999d82023-08-15 15:19:41 -04001793 if (ci->tls_) {
1794 if (auto cert = ci->tls_->peerCertificate()) {
1795 connectionInfo["peer"] = cert->issuer->getId().toString();
1796 }
Amna31791e52023-08-03 12:40:57 -04001797 }
1798 if (ci->socket_) {
1799 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
1800 } else if (ci->tls_) {
1801 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
1802 } else if(ci->ice_)
1803 {
1804 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
1805 }
1806 if (ci->tls_) {
1807 std::string remoteAddress = ci->tls_->getRemoteAddress();
1808 std::string remoteAddressIp = remoteAddress.substr(0, remoteAddress.find(':'));
1809 std::string remoteAddressPort = remoteAddress.substr(remoteAddress.find(':') + 1);
1810 connectionInfo["remoteAdress"] = remoteAddressIp;
1811 connectionInfo["remotePort"] = remoteAddressPort;
1812 }
1813 connectionsList.emplace_back(std::move(connectionInfo));
1814 }
1815
1816 if (device) {
1817 auto it = pimpl_->pendingOperations_.find(device);
1818 if (it != pimpl_->pendingOperations_.end()) {
1819 const auto& po = it->second;
1820 for (const auto& [vid, ci] : po.connecting) {
1821 std::map<std::string, std::string> connectionInfo;
1822 connectionInfo["id"] = callbackIdToString(device, vid);
1823 connectionInfo["deviceId"] = vid;
1824 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1825 connectionsList.emplace_back(std::move(connectionInfo));
1826 }
1827
1828 for (const auto& [vid, ci] : po.waiting) {
1829 std::map<std::string, std::string> connectionInfo;
1830 connectionInfo["id"] = callbackIdToString(device, vid);
1831 connectionInfo["deviceId"] = vid;
1832 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1833 connectionsList.emplace_back(std::move(connectionInfo));
1834 }
1835 }
1836 }
1837 else {
1838 for (const auto& [key, po] : pimpl_->pendingOperations_) {
1839 for (const auto& [vid, ci] : po.connecting) {
1840 std::map<std::string, std::string> connectionInfo;
1841 connectionInfo["id"] = callbackIdToString(device, vid);
1842 connectionInfo["deviceId"] = vid;
1843 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1844 connectionsList.emplace_back(std::move(connectionInfo));
1845 }
1846
1847 for (const auto& [vid, ci] : po.waiting) {
1848 std::map<std::string, std::string> connectionInfo;
1849 connectionInfo["id"] = callbackIdToString(device, vid);
1850 connectionInfo["deviceId"] = vid;
1851 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1852 connectionsList.emplace_back(std::move(connectionInfo));
1853 }
1854 }
1855 }
1856 return connectionsList;
1857}
1858
1859std::vector<std::map<std::string, std::string>>
1860ConnectionManager::getChannelList(const std::string& connectionId) const
1861{
1862 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1863 CallbackId cbid = parseCallbackId(connectionId);
1864 if (pimpl_->infos_.count(cbid) > 0) {
1865 return pimpl_->infos_[cbid]->socket_->getChannelList();
1866 } else {
1867 return {};
1868 }
1869}
1870
Sébastien Blin464bdff2023-07-19 08:02:53 -04001871} // namespace dhtnet