blob: 70d48e448ef1bb268c6792d0e4ccef4e38d086a6 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040044
45struct ConnectionInfo
46{
47 ~ConnectionInfo()
48 {
49 if (socket_)
50 socket_->join();
51 }
52
53 std::mutex mutex_ {};
54 bool responseReceived_ {false};
55 PeerConnectionRequest response_ {};
56 std::unique_ptr<IceTransport> ice_ {nullptr};
57 // Used to store currently non ready TLS Socket
58 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
59 std::shared_ptr<MultiplexedSocket> socket_ {};
60 std::set<CallbackId> cbIds_ {};
61
62 std::function<void(bool)> onConnected_;
63 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
64};
65
66/**
67 * returns whether or not UPnP is enabled and active_
68 * ie: if it is able to make port mappings
69 */
70bool
71ConnectionManager::Config::getUPnPActive() const
72{
73 if (upnpCtrl)
74 return upnpCtrl->isReady();
75 return false;
76}
77
78class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
79{
80public:
81 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
82 : config_ {std::move(config_)}
83 {}
84 ~Impl() {}
85
86 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
87 const dht::crypto::Identity& identity() const { return config_->id; }
88
89 void removeUnusedConnections(const DeviceId& deviceId = {})
90 {
91 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
92
93 {
94 std::lock_guard<std::mutex> lk(infosMtx_);
95 for (auto it = infos_.begin(); it != infos_.end();) {
96 auto& [key, info] = *it;
97 if (info && (!deviceId || key.first == deviceId)) {
98 unused.emplace_back(std::move(info));
99 it = infos_.erase(it);
100 } else {
101 ++it;
102 }
103 }
104 }
105 for (auto& info: unused) {
106 if (info->tls_)
107 info->tls_->shutdown();
108 if (info->socket_)
109 info->socket_->shutdown();
110 if (info->waitForAnswer_)
111 info->waitForAnswer_->cancel();
112 }
113 if (!unused.empty())
114 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
115 }
116
117 void shutdown()
118 {
119 if (isDestroying_.exchange(true))
120 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400121 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400122 {
123 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400124 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400125 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400126 for (auto& [deviceId, pcbs] : po) {
127 for (auto& [id, pending] : pcbs.connecting)
128 pending.cb(nullptr, deviceId);
129 for (auto& [id, pending] : pcbs.waiting)
130 pending.cb(nullptr, deviceId);
131 }
132
Adrien Béraud612b55b2023-05-29 10:42:04 -0400133 removeUnusedConnections();
134 }
135
Adrien Béraud612b55b2023-05-29 10:42:04 -0400136 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
137 const dht::Value::Id& vid,
138 const std::string& connType,
139 std::function<void(bool)> onConnected);
140 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
141 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
142 const std::string& name,
143 const dht::Value::Id& vid,
144 const std::shared_ptr<dht::crypto::Certificate>& cert);
145 void connectDevice(const DeviceId& deviceId,
146 const std::string& uri,
147 ConnectCallback cb,
148 bool noNewSocket = false,
149 bool forceNewSocket = false,
150 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400151 void connectDevice(const dht::InfoHash& deviceId,
152 const std::string& uri,
153 ConnectCallbackLegacy cb,
154 bool noNewSocket = false,
155 bool forceNewSocket = false,
156 const std::string& connType = "");
157
Adrien Béraud612b55b2023-05-29 10:42:04 -0400158 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
159 const std::string& name,
160 ConnectCallback cb,
161 bool noNewSocket = false,
162 bool forceNewSocket = false,
163 const std::string& connType = "");
164 /**
165 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
166 * @param sock socket used to send the request
167 * @param name channel's name
168 * @param vid channel's id
169 * @param deviceId to identify the linked ConnectCallback
170 */
171 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
172 const std::string& name,
173 const DeviceId& deviceId,
174 const dht::Value::Id& vid);
175 /**
176 * Triggered when a PeerConnectionRequest comes from the DHT
177 */
178 void answerTo(IceTransport& ice,
179 const dht::Value::Id& id,
180 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
181 bool onRequestStartIce(const PeerConnectionRequest& req);
182 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
183 void onDhtPeerRequest(const PeerConnectionRequest& req,
184 const std::shared_ptr<dht::crypto::Certificate>& cert);
185
186 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
187 void onPeerResponse(const PeerConnectionRequest& req);
188 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
189
190 const std::shared_future<tls::DhParams> dhParams() const;
191 tls::CertificateStore& certStore() const { return *config_->certStore; }
192
193 mutable std::mutex messageMutex_ {};
194 std::set<std::string, std::less<>> treatedMessages_ {};
195
196 void loadTreatedMessages();
197 void saveTreatedMessages() const;
198
199 /// \return true if the given DHT message identifier has been treated
200 /// \note if message has not been treated yet this method st/ore this id and returns true at
201 /// further calls
202 bool isMessageTreated(std::string_view id);
203
204 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
205
206 /**
207 * Published IPv4/IPv6 addresses, used only if defined by the user in account
208 * configuration
209 *
210 */
211 IpAddr publishedIp_[2] {};
212
Adrien Béraud612b55b2023-05-29 10:42:04 -0400213 /**
214 * interface name on which this account is bound
215 */
216 std::string interface_ {"default"};
217
218 /**
219 * Get the local interface name on which this account is bound.
220 */
221 const std::string& getLocalInterface() const { return interface_; }
222
223 /**
224 * Get the published IP address, fallbacks to NAT if family is unspecified
225 * Prefers the usage of IPv4 if possible.
226 */
227 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
228
229 /**
230 * Set published IP address according to given family
231 */
232 void setPublishedAddress(const IpAddr& ip_addr);
233
234 /**
235 * Store the local/public addresses used to register
236 */
237 void storeActiveIpAddress(std::function<void()>&& cb = {});
238
239 /**
240 * Create and return ICE options.
241 */
242 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
243 IceTransportOptions getIceOptions() const noexcept;
244
245 /**
246 * Inform that a potential peer device have been found.
247 * Returns true only if the device certificate is a valid device certificate.
248 * In that case (true is returned) the account_id parameter is set to the peer account ID.
249 */
250 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
251 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
252
253 bool findCertificate(const dht::PkId& id,
254 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400255 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400256
257 /**
258 * returns whether or not UPnP is enabled and active
259 * ie: if it is able to make port mappings
260 */
261 bool getUPnPActive() const;
262
263 /**
264 * Triggered when a new TLS socket is ready to use
265 * @param ok If succeed
266 * @param deviceId Related device
267 * @param vid vid of the connection request
268 * @param name non empty if TLS was created by connectDevice()
269 */
270 void onTlsNegotiationDone(bool ok,
271 const DeviceId& deviceId,
272 const dht::Value::Id& vid,
273 const std::string& name = "");
274
275 std::shared_ptr<ConnectionManager::Config> config_;
276
Adrien Béraud612b55b2023-05-29 10:42:04 -0400277 mutable std::mt19937_64 rand;
278
279 iOSConnectedCallback iOSConnectedCb_ {};
280
281 std::mutex infosMtx_ {};
282 // Note: Someone can ask multiple sockets, so to avoid any race condition,
283 // each device can have multiple multiplexed sockets.
284 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
285
286 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
287 {
288 std::lock_guard<std::mutex> lk(infosMtx_);
289 auto it = infos_.find({deviceId, id});
290 if (it != infos_.end())
291 return it->second;
292 return {};
293 }
294
295 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
296 {
297 std::lock_guard<std::mutex> lk(infosMtx_);
298 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
299 auto& [key, value] = item;
300 return key.first == deviceId && value && value->socket_;
301 });
302 if (it != infos_.end())
303 return it->second;
304 return {};
305 }
306
307 ChannelRequestCallback channelReqCb_ {};
308 ConnectionReadyCallback connReadyCb_ {};
309 onICERequestCallback iceReqCb_ {};
310
311 /**
312 * Stores callback from connectDevice
313 * @note: each device needs a vector because several connectDevice can
314 * be done in parallel and we only want one socket
315 */
316 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400317
Adrien Béraud665294f2023-06-13 18:09:11 -0400318 struct PendingCb
319 {
320 std::string name;
321 ConnectCallback cb;
322 };
323 struct PendingOperations {
324 std::map<dht::Value::Id, PendingCb> connecting;
325 std::map<dht::Value::Id, PendingCb> waiting;
326 };
327
328 std::map<DeviceId, PendingOperations> pendingOperations_ {};
329
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400330 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400331 {
332 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400333 std::unique_lock<std::mutex> lk(connectCbsMtx_);
334 auto it = pendingOperations_.find(deviceId);
335 if (it == pendingOperations_.end())
336 return;
337 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400338 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400339 // Extract all pending callbacks
340 for (auto& [vid, cb] : pendingOperations.connecting)
341 ret.emplace_back(std::move(cb));
342 pendingOperations.connecting.clear();
343 for (auto& [vid, cb] : pendingOperations.waiting)
344 ret.emplace_back(std::move(cb));
345 pendingOperations.waiting.clear();
346 } else if (auto n = pendingOperations.waiting.extract(vid)) {
347 // If it's a waiting operation, just move it
348 ret.emplace_back(std::move(n.mapped()));
349 } else if (auto n = pendingOperations.connecting.extract(vid)) {
350 ret.emplace_back(std::move(n.mapped()));
351 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400352 // If accepted is false, it means that underlying socket is ok, but channel is declined
353 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400354 for (auto& [vid, cb] : pendingOperations.waiting)
355 ret.emplace_back(std::move(cb));
356 pendingOperations.waiting.clear();
357 for (auto& [vid, cb] : pendingOperations.connecting)
358 ret.emplace_back(std::move(cb));
359 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400360 }
361 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400362 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
363 pendingOperations_.erase(it);
364 lk.unlock();
365 for (auto& cb : ret)
366 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400367 }
368
Adrien Béraud665294f2023-06-13 18:09:11 -0400369 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400370 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400371 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400372 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400373 auto it = pendingOperations_.find(deviceId);
374 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400375 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400376 auto& pendingOp = it->second;
377 for (const auto& [id, pc]: pendingOp.connecting) {
378 if (vid == 0 || id == vid)
379 ret[id] = pc.name;
380 }
381 for (const auto& [id, pc]: pendingOp.waiting) {
382 if (vid == 0 || id == vid)
383 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400384 }
385 return ret;
386 }
387
388 std::shared_ptr<ConnectionManager::Impl> shared()
389 {
390 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
391 }
392 std::shared_ptr<ConnectionManager::Impl const> shared() const
393 {
394 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
395 }
396 std::weak_ptr<ConnectionManager::Impl> weak()
397 {
398 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
399 }
400 std::weak_ptr<ConnectionManager::Impl const> weak() const
401 {
402 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
403 }
404
405 std::atomic_bool isDestroying_ {false};
406};
407
408void
409ConnectionManager::Impl::connectDeviceStartIce(
410 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
411 const dht::Value::Id& vid,
412 const std::string& connType,
413 std::function<void(bool)> onConnected)
414{
415 auto deviceId = devicePk->getLongId();
416 auto info = getInfo(deviceId, vid);
417 if (!info) {
418 onConnected(false);
419 return;
420 }
421
422 std::unique_lock<std::mutex> lk(info->mutex_);
423 auto& ice = info->ice_;
424
425 if (!ice) {
426 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400427 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400428 onConnected(false);
429 return;
430 }
431
432 auto iceAttributes = ice->getLocalAttributes();
433 std::ostringstream icemsg;
434 icemsg << iceAttributes.ufrag << "\n";
435 icemsg << iceAttributes.pwd << "\n";
436 for (const auto& addr : ice->getLocalCandidates(1)) {
437 icemsg << addr << "\n";
438 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400439 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400440 }
441
442 // Prepare connection request as a DHT message
443 PeerConnectionRequest val;
444
445 val.id = vid; /* Random id for the message unicity */
446 val.ice_msg = icemsg.str();
447 val.connType = connType;
448
449 auto value = std::make_shared<dht::Value>(std::move(val));
450 value->user_type = "peer_request";
451
452 // Send connection request through DHT
453 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400454 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400455 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
456 + devicePk->getId().toString()),
457 devicePk,
458 value,
459 [l=config_->logger,deviceId](bool ok) {
460 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400461 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400462 deviceId,
463 (ok ? "ok" : "failed"));
464 });
465 // Wait for call to onResponse() operated by DHT
466 if (isDestroying_) {
467 onConnected(true); // This avoid to wait new negotiation when destroying
468 return;
469 }
470
471 info->onConnected_ = std::move(onConnected);
472 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
473 std::chrono::steady_clock::now()
474 + DHT_MSG_TIMEOUT);
475 info->waitForAnswer_->async_wait(
476 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
477}
478
479void
480ConnectionManager::Impl::onResponse(const asio::error_code& ec,
481 const DeviceId& deviceId,
482 const dht::Value::Id& vid)
483{
484 if (ec == asio::error::operation_aborted)
485 return;
486 auto info = getInfo(deviceId, vid);
487 if (!info)
488 return;
489
490 std::unique_lock<std::mutex> lk(info->mutex_);
491 auto& ice = info->ice_;
492 if (isDestroying_) {
493 info->onConnected_(true); // The destructor can wake a pending wait here.
494 return;
495 }
496 if (!info->responseReceived_) {
497 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400498 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400499 info->onConnected_(false);
500 return;
501 }
502
503 if (!info->ice_) {
504 info->onConnected_(false);
505 return;
506 }
507
508 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
509
510 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
511 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400512 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400513 info->onConnected_(false);
514 return;
515 }
516 info->onConnected_(true);
517}
518
519bool
520ConnectionManager::Impl::connectDeviceOnNegoDone(
521 const DeviceId& deviceId,
522 const std::string& name,
523 const dht::Value::Id& vid,
524 const std::shared_ptr<dht::crypto::Certificate>& cert)
525{
526 auto info = getInfo(deviceId, vid);
527 if (!info)
528 return false;
529
530 std::unique_lock<std::mutex> lk {info->mutex_};
531 if (info->waitForAnswer_) {
532 // Negotiation is done and connected, go to handshake
533 // and avoid any cancellation at this point.
534 info->waitForAnswer_->cancel();
535 }
536 auto& ice = info->ice_;
537 if (!ice || !ice->isRunning()) {
538 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400539 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400540 return false;
541 }
542
543 // Build socket
544 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
545 std::move(ice)),
546 true);
547
548 // Negotiate a TLS session
549 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400550 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400551 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
552 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400553 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400554 identity(),
555 dhParams(),
556 *cert);
557
558 info->tls_->setOnReady(
559 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
560 bool ok) {
561 if (auto shared = w.lock())
562 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
563 });
564 return true;
565}
566
567void
568ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
569 const std::string& name,
570 ConnectCallback cb,
571 bool noNewSocket,
572 bool forceNewSocket,
573 const std::string& connType)
574{
575 if (!dht()) {
576 cb(nullptr, deviceId);
577 return;
578 }
579 if (deviceId.toString() == identity().second->getLongId().toString()) {
580 cb(nullptr, deviceId);
581 return;
582 }
583 findCertificate(deviceId,
584 [w = weak(),
585 deviceId,
586 name,
587 cb = std::move(cb),
588 noNewSocket,
589 forceNewSocket,
590 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
591 if (!cert) {
592 if (auto shared = w.lock())
593 if (shared->config_->logger)
594 shared->config_->logger->error(
595 "No valid certificate found for device {}",
596 deviceId);
597 cb(nullptr, deviceId);
598 return;
599 }
600 if (auto shared = w.lock()) {
601 shared->connectDevice(cert,
602 name,
603 std::move(cb),
604 noNewSocket,
605 forceNewSocket,
606 connType);
607 } else
608 cb(nullptr, deviceId);
609 });
610}
611
612void
Amna0cf544d2023-07-25 14:25:09 -0400613ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
614 const std::string& name,
615 ConnectCallbackLegacy cb,
616 bool noNewSocket,
617 bool forceNewSocket,
618 const std::string& connType)
619{
620 if (!dht()) {
621 cb(nullptr, deviceId);
622 return;
623 }
624 if (deviceId.toString() == identity().second->getLongId().toString()) {
625 cb(nullptr, deviceId);
626 return;
627 }
628 findCertificate(deviceId,
629 [w = weak(),
630 deviceId,
631 name,
632 cb = std::move(cb),
633 noNewSocket,
634 forceNewSocket,
635 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
636 if (!cert) {
637 if (auto shared = w.lock())
638 if (shared->config_->logger)
639 shared->config_->logger->error(
640 "No valid certificate found for device {}",
641 deviceId);
642 cb(nullptr, deviceId);
643 return;
644 }
645 if (auto shared = w.lock()) {
646 shared->connectDevice(cert,
647 name,
648 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& did){
649 cb(sock, deviceId);
650 },
651 noNewSocket,
652 forceNewSocket,
653 connType);
654 } else
655 cb(nullptr, deviceId);
656 });
657}
658
659void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400660ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
661 const std::string& name,
662 ConnectCallback cb,
663 bool noNewSocket,
664 bool forceNewSocket,
665 const std::string& connType)
666{
667 // Avoid dht operation in a DHT callback to avoid deadlocks
668 dht::ThreadPool::computation().run([w = weak(),
669 name = std::move(name),
670 cert = std::move(cert),
671 cb = std::move(cb),
672 noNewSocket,
673 forceNewSocket,
674 connType] {
675 auto devicePk = cert->getSharedPublicKey();
676 auto deviceId = devicePk->getLongId();
677 auto sthis = w.lock();
678 if (!sthis || sthis->isDestroying_) {
679 cb(nullptr, deviceId);
680 return;
681 }
682 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
683 auto isConnectingToDevice = false;
684 {
685 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400686 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
687 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400688 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400689 while (pendings.connecting.find(vid) != pendings.connecting.end()
690 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400691 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400692 }
693 }
694 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400695 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400696 // Save current request for sendChannelRequest.
697 // Note: do not return here, cause we can be in a state where first
698 // socket is negotiated and first channel is pending
699 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400700 if (isConnectingToDevice && !forceNewSocket)
701 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400702 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400703 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400704 }
705
706 // Check if already negotiated
707 CallbackId cbId(deviceId, vid);
708 if (auto info = sthis->getConnectedInfo(deviceId)) {
709 std::lock_guard<std::mutex> lk(info->mutex_);
710 if (info->socket_) {
711 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400712 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400713 info->cbIds_.emplace(cbId);
714 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
715 return;
716 }
717 }
718
719 if (isConnectingToDevice && !forceNewSocket) {
720 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400721 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400722 return;
723 }
724 if (noNewSocket) {
725 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400726 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400727 return;
728 }
729
730 // Note: used when the ice negotiation fails to erase
731 // all stored structures.
732 auto eraseInfo = [w, cbId] {
733 if (auto shared = w.lock()) {
734 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400735 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400736 std::lock_guard<std::mutex> lk(shared->infosMtx_);
737 shared->infos_.erase(cbId);
738 }
739 };
740
741 // If no socket exists, we need to initiate an ICE connection.
742 sthis->getIceOptions([w,
743 deviceId = std::move(deviceId),
744 devicePk = std::move(devicePk),
745 name = std::move(name),
746 cert = std::move(cert),
747 vid,
748 connType,
749 eraseInfo](auto&& ice_config) {
750 auto sthis = w.lock();
751 if (!sthis) {
752 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
753 return;
754 }
755 ice_config.tcpEnable = true;
756 ice_config.onInitDone = [w,
757 deviceId = std::move(deviceId),
758 devicePk = std::move(devicePk),
759 name = std::move(name),
760 cert = std::move(cert),
761 vid,
762 connType,
763 eraseInfo](bool ok) {
764 dht::ThreadPool::io().run([w = std::move(w),
765 devicePk = std::move(devicePk),
766 vid = std::move(vid),
767 eraseInfo,
768 connType, ok] {
769 auto sthis = w.lock();
770 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400771 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400772 if (!sthis || !ok) {
773 eraseInfo();
774 return;
775 }
776 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
777 if (!ok) {
778 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
779 }
780 });
781 });
782 };
783 ice_config.onNegoDone = [w,
784 deviceId,
785 name,
786 cert = std::move(cert),
787 vid,
788 eraseInfo](bool ok) {
789 dht::ThreadPool::io().run([w = std::move(w),
790 deviceId = std::move(deviceId),
791 name = std::move(name),
792 cert = std::move(cert),
793 vid = std::move(vid),
794 eraseInfo = std::move(eraseInfo),
795 ok] {
796 auto sthis = w.lock();
797 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400798 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400799 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
800 eraseInfo();
801 });
802 };
803
804 auto info = std::make_shared<ConnectionInfo>();
805 {
806 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
807 sthis->infos_[{deviceId, vid}] = info;
808 }
809 std::unique_lock<std::mutex> lk {info->mutex_};
810 ice_config.master = false;
811 ice_config.streamsCount = 1;
812 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400813 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400814 if (!info->ice_) {
815 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400816 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400817 eraseInfo();
818 return;
819 }
820 // We need to detect any shutdown if the ice session is destroyed before going to the
821 // TLS session;
822 info->ice_->setOnShutdown([eraseInfo]() {
823 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
824 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400825 try {
826 info->ice_->initIceInstance(ice_config);
827 } catch (const std::exception& e) {
828 if (sthis->config_->logger)
829 sthis->config_->logger->error("{}", e.what());
830 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
831 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400832 });
833 });
834}
835
836void
837ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
838 const std::string& name,
839 const DeviceId& deviceId,
840 const dht::Value::Id& vid)
841{
842 auto channelSock = sock->addChannel(name);
843 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
844 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400845 if (auto shared = w.lock())
846 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400847 });
848 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400849 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400850 auto shared = w.lock();
851 auto channelSock = wSock.lock();
852 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400853 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400854 });
855
856 ChannelRequest val;
857 val.name = channelSock->name();
858 val.state = ChannelRequestState::REQUEST;
859 val.channel = channelSock->channel();
860 msgpack::sbuffer buffer(256);
861 msgpack::pack(buffer, val);
862
863 std::error_code ec;
864 int res = sock->write(CONTROL_CHANNEL,
865 reinterpret_cast<const uint8_t*>(buffer.data()),
866 buffer.size(),
867 ec);
868 if (res < 0) {
869 // TODO check if we should handle errors here
870 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400871 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400872 }
873}
874
875void
876ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
877{
878 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400879 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400880 if (config_->logger)
881 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400882 std::lock_guard<std::mutex> lk {info->mutex_};
883 info->responseReceived_ = true;
884 info->response_ = std::move(req);
885 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
886 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
887 this,
888 std::placeholders::_1,
889 device,
890 req.id));
891 } else {
892 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400893 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400894 }
895}
896
897void
898ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
899{
900 if (!dht())
901 return;
902 dht()->listen<PeerConnectionRequest>(
903 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
904 [w = weak()](PeerConnectionRequest&& req) {
905 auto shared = w.lock();
906 if (!shared)
907 return false;
908 if (shared->isMessageTreated(to_hex_string(req.id))) {
909 // Message already treated. Just ignore
910 return true;
911 }
912 if (req.isAnswer) {
913 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400914 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400915 } else {
916 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400917 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400918 }
919 if (req.isAnswer) {
920 shared->onPeerResponse(req);
921 } else {
922 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400923 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400924 req.from,
925 [w, req = std::move(req)](
926 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
927 auto shared = w.lock();
928 if (!shared)
929 return;
930 dht::InfoHash peer_h;
931 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
932#if TARGET_OS_IOS
933 if (shared->iOSConnectedCb_(req.connType, peer_h))
934 return;
935#endif
936 shared->onDhtPeerRequest(req, cert);
937 } else {
938 if (shared->config_->logger)
939 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -0400940 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400941 req.owner->getLongId());
942 }
943 });
944 }
945
946 return true;
947 },
948 dht::Value::UserTypeFilter("peer_request"));
949}
950
951void
952ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
953 const DeviceId& deviceId,
954 const dht::Value::Id& vid,
955 const std::string& name)
956{
957 if (isDestroying_)
958 return;
959 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
960 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
961 // asked yet)
962 auto isDhtRequest = name.empty();
963 if (!ok) {
964 if (isDhtRequest) {
965 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400966 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400967 deviceId,
968 name,
969 vid);
970 if (connReadyCb_)
971 connReadyCb_(deviceId, "", nullptr);
972 } else {
973 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400974 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400975 deviceId,
976 name,
977 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400978 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400979 }
980 } else {
981 // The socket is ready, store it
982 if (isDhtRequest) {
983 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400984 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400985 deviceId,
986 vid);
987 } else {
988 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400989 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400990 deviceId,
991 name,
992 vid);
993 }
994
995 auto info = getInfo(deviceId, vid);
996 addNewMultiplexedSocket({deviceId, vid}, info);
997 // Finally, open the channel and launch pending callbacks
998 if (info->socket_) {
999 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001000 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001001 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001002 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001003 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001004 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001005 }
1006 }
1007 }
1008}
1009
1010void
1011ConnectionManager::Impl::answerTo(IceTransport& ice,
1012 const dht::Value::Id& id,
1013 const std::shared_ptr<dht::crypto::PublicKey>& from)
1014{
1015 // NOTE: This is a shortest version of a real SDP message to save some bits
1016 auto iceAttributes = ice.getLocalAttributes();
1017 std::ostringstream icemsg;
1018 icemsg << iceAttributes.ufrag << "\n";
1019 icemsg << iceAttributes.pwd << "\n";
1020 for (const auto& addr : ice.getLocalCandidates(1)) {
1021 icemsg << addr << "\n";
1022 }
1023
1024 // Send PeerConnection response
1025 PeerConnectionRequest val;
1026 val.id = id;
1027 val.ice_msg = icemsg.str();
1028 val.isAnswer = true;
1029 auto value = std::make_shared<dht::Value>(std::move(val));
1030 value->user_type = "peer_request";
1031
1032 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001033 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001034 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1035 + from->getId().toString()),
1036 from,
1037 value,
1038 [from,l=config_->logger](bool ok) {
1039 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001040 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001041 from->getLongId(),
1042 (ok ? "ok" : "failed"));
1043 });
1044}
1045
1046bool
1047ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1048{
1049 auto deviceId = req.owner->getLongId();
1050 auto info = getInfo(deviceId, req.id);
1051 if (!info)
1052 return false;
1053
1054 std::unique_lock<std::mutex> lk {info->mutex_};
1055 auto& ice = info->ice_;
1056 if (!ice) {
1057 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001058 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001059 if (connReadyCb_)
1060 connReadyCb_(deviceId, "", nullptr);
1061 return false;
1062 }
1063
1064 auto sdp = ice->parseIceCandidates(req.ice_msg);
1065 answerTo(*ice, req.id, req.owner);
1066 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1067 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001068 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001069 ice = nullptr;
1070 if (connReadyCb_)
1071 connReadyCb_(deviceId, "", nullptr);
1072 return false;
1073 }
1074 return true;
1075}
1076
1077bool
1078ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1079{
1080 auto deviceId = req.owner->getLongId();
1081 auto info = getInfo(deviceId, req.id);
1082 if (!info)
1083 return false;
1084
1085 std::unique_lock<std::mutex> lk {info->mutex_};
1086 auto& ice = info->ice_;
1087 if (!ice) {
1088 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001089 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001090 return false;
1091 }
1092
1093 // Build socket
1094 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1095 std::move(ice)),
1096 false);
1097
1098 // init TLS session
1099 auto ph = req.from;
1100 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001101 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1102 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001103 req.id);
1104 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1105 std::move(endpoint),
1106 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001107 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001108 identity(),
1109 dhParams(),
1110 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1111 auto shared = w.lock();
1112 if (!shared)
1113 return false;
1114 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1115 if (!crt)
1116 return false;
1117 return crt->getPacked() == cert.getPacked();
1118 });
1119
1120 info->tls_->setOnReady(
1121 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1122 if (auto shared = w.lock())
1123 shared->onTlsNegotiationDone(ok, deviceId, vid);
1124 });
1125 return true;
1126}
1127
1128void
1129ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1130 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1131{
1132 auto deviceId = req.owner->getLongId();
1133 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001134 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001135 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1136 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001137 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001138 return;
1139 }
1140
1141 // Because the connection is accepted, create an ICE socket.
1142 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1143 auto shared = w.lock();
1144 if (!shared)
1145 return;
1146 // Note: used when the ice negotiation fails to erase
1147 // all stored structures.
1148 auto eraseInfo = [w, id = req.id, deviceId] {
1149 if (auto shared = w.lock()) {
1150 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001151 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001152 if (shared->connReadyCb_)
1153 shared->connReadyCb_(deviceId, "", nullptr);
1154 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1155 shared->infos_.erase({deviceId, id});
1156 }
1157 };
1158
1159 ice_config.tcpEnable = true;
1160 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1161 auto shared = w.lock();
1162 if (!shared)
1163 return;
1164 if (!ok) {
1165 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001166 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001167 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1168 return;
1169 }
1170
1171 dht::ThreadPool::io().run(
1172 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1173 auto shared = w.lock();
1174 if (!shared)
1175 return;
1176 if (!shared->onRequestStartIce(req))
1177 eraseInfo();
1178 });
1179 };
1180
1181 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1182 auto shared = w.lock();
1183 if (!shared)
1184 return;
1185 if (!ok) {
1186 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001187 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001188 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1189 return;
1190 }
1191
1192 dht::ThreadPool::io().run(
1193 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1194 if (auto shared = w.lock())
1195 if (!shared->onRequestOnNegoDone(req))
1196 eraseInfo();
1197 });
1198 };
1199
1200 // Negotiate a new ICE socket
1201 auto info = std::make_shared<ConnectionInfo>();
1202 {
1203 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1204 shared->infos_[{deviceId, req.id}] = info;
1205 }
1206 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001207 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001208 std::unique_lock<std::mutex> lk {info->mutex_};
1209 ice_config.streamsCount = 1;
1210 ice_config.compCountPerStream = 1; // TCP
1211 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001212 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001213 if (not info->ice_) {
1214 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001215 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001216 eraseInfo();
1217 return;
1218 }
1219 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1220 info->ice_->setOnShutdown([eraseInfo]() {
1221 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1222 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001223 try {
1224 info->ice_->initIceInstance(ice_config);
1225 } catch (const std::exception& e) {
1226 if (shared->config_->logger)
1227 shared->config_->logger->error("{}", e.what());
1228 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1229 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001230 });
1231}
1232
1233void
1234ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1235{
1236 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1237 info->socket_->setOnReady(
1238 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1239 if (auto sthis = w.lock())
1240 if (sthis->connReadyCb_)
1241 sthis->connReadyCb_(deviceId, socket->name(), socket);
1242 });
1243 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1244 const uint16_t&,
1245 const std::string& name) {
1246 if (auto sthis = w.lock())
1247 if (sthis->channelReqCb_)
1248 return sthis->channelReqCb_(peer, name);
1249 return false;
1250 });
1251 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1252 // Cancel current outgoing connections
1253 dht::ThreadPool::io().run([w, deviceId, vid] {
1254 auto sthis = w.lock();
1255 if (!sthis)
1256 return;
1257
1258 std::set<CallbackId> ids;
1259 if (auto info = sthis->getInfo(deviceId, vid)) {
1260 std::lock_guard<std::mutex> lk(info->mutex_);
1261 if (info->socket_) {
1262 ids = std::move(info->cbIds_);
1263 info->socket_->shutdown();
1264 }
1265 }
1266 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001267 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001268
1269 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1270 sthis->infos_.erase({deviceId, vid});
1271 });
1272 });
1273}
1274
1275const std::shared_future<tls::DhParams>
1276ConnectionManager::Impl::dhParams() const
1277{
1278 return dht::ThreadPool::computation().get<tls::DhParams>(
1279 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1280 ;
1281}
1282
1283template<typename ID = dht::Value::Id>
1284std::set<ID, std::less<>>
1285loadIdList(const std::string& path)
1286{
1287 std::set<ID, std::less<>> ids;
1288 std::ifstream file = fileutils::ifstream(path);
1289 if (!file.is_open()) {
1290 //JAMI_DBG("Could not load %s", path.c_str());
1291 return ids;
1292 }
1293 std::string line;
1294 while (std::getline(file, line)) {
1295 if constexpr (std::is_same<ID, std::string>::value) {
1296 ids.emplace(std::move(line));
1297 } else if constexpr (std::is_integral<ID>::value) {
1298 ID vid;
1299 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1300 ec == std::errc()) {
1301 ids.emplace(vid);
1302 }
1303 }
1304 }
1305 return ids;
1306}
1307
1308template<typename List = std::set<dht::Value::Id>>
1309void
1310saveIdList(const std::string& path, const List& ids)
1311{
1312 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1313 if (!file.is_open()) {
1314 //JAMI_ERR("Could not save to %s", path.c_str());
1315 return;
1316 }
1317 for (auto& c : ids)
1318 file << std::hex << c << "\n";
1319}
1320
1321void
1322ConnectionManager::Impl::loadTreatedMessages()
1323{
1324 std::lock_guard<std::mutex> lock(messageMutex_);
1325 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1326 treatedMessages_ = loadIdList<std::string>(path);
1327 if (treatedMessages_.empty()) {
1328 auto messages = loadIdList(path);
1329 for (const auto& m : messages)
1330 treatedMessages_.emplace(to_hex_string(m));
1331 }
1332}
1333
1334void
1335ConnectionManager::Impl::saveTreatedMessages() const
1336{
1337 dht::ThreadPool::io().run([w = weak()]() {
1338 if (auto sthis = w.lock()) {
1339 auto& this_ = *sthis;
1340 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1341 fileutils::check_dir(this_.config_->cachePath.c_str());
1342 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1343 + DIR_SEPARATOR_STR "treatedMessages",
1344 this_.treatedMessages_);
1345 }
1346 });
1347}
1348
1349bool
1350ConnectionManager::Impl::isMessageTreated(std::string_view id)
1351{
1352 std::lock_guard<std::mutex> lock(messageMutex_);
1353 auto res = treatedMessages_.emplace(id);
1354 if (res.second) {
1355 saveTreatedMessages();
1356 return false;
1357 }
1358 return true;
1359}
1360
1361/**
1362 * returns whether or not UPnP is enabled and active_
1363 * ie: if it is able to make port mappings
1364 */
1365bool
1366ConnectionManager::Impl::getUPnPActive() const
1367{
1368 return config_->getUPnPActive();
1369}
1370
1371IpAddr
1372ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1373{
1374 if (family == AF_INET)
1375 return publishedIp_[0];
1376 if (family == AF_INET6)
1377 return publishedIp_[1];
1378
1379 assert(family == AF_UNSPEC);
1380
1381 // If family is not set, prefere IPv4 if available. It's more
1382 // likely to succeed behind NAT.
1383 if (publishedIp_[0])
1384 return publishedIp_[0];
1385 if (publishedIp_[1])
1386 return publishedIp_[1];
1387 return {};
1388}
1389
1390void
1391ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1392{
1393 if (ip_addr.getFamily() == AF_INET) {
1394 publishedIp_[0] = ip_addr;
1395 } else {
1396 publishedIp_[1] = ip_addr;
1397 }
1398}
1399
1400void
1401ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1402{
1403 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1404 bool hasIpv4 {false}, hasIpv6 {false};
1405 for (auto& result : results) {
1406 auto family = result.getFamily();
1407 if (family == AF_INET) {
1408 if (not hasIpv4) {
1409 hasIpv4 = true;
1410 if (config_->logger)
1411 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1412 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1413 setPublishedAddress(*result.get());
1414 if (config_->upnpCtrl) {
1415 config_->upnpCtrl->setPublicAddress(*result.get());
1416 }
1417 }
1418 } else if (family == AF_INET6) {
1419 if (not hasIpv6) {
1420 hasIpv6 = true;
1421 if (config_->logger)
1422 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1423 setPublishedAddress(*result.get());
1424 }
1425 }
1426 if (hasIpv4 and hasIpv6)
1427 break;
1428 }
1429 if (cb)
1430 cb();
1431 });
1432}
1433
1434void
1435ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1436{
1437 storeActiveIpAddress([this, cb = std::move(cb)] {
1438 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1439 auto publishedAddr = getPublishedIpAddress();
1440
1441 if (publishedAddr) {
1442 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1443 publishedAddr.getFamily());
1444 if (interfaceAddr) {
1445 opts.accountLocalAddr = interfaceAddr;
1446 opts.accountPublicAddr = publishedAddr;
1447 }
1448 }
1449 if (cb)
1450 cb(std::move(opts));
1451 });
1452}
1453
1454IceTransportOptions
1455ConnectionManager::Impl::getIceOptions() const noexcept
1456{
1457 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001458 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001459 opts.upnpEnable = getUPnPActive();
1460
1461 if (config_->stunEnabled)
1462 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1463 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001464 if (config_->turnCache) {
1465 auto turnAddr = config_->turnCache->getResolvedTurn();
1466 if (turnAddr != std::nullopt) {
1467 opts.turnServers.emplace_back(TurnServerInfo()
1468 .setUri(turnAddr->toString())
1469 .setUsername(config_->turnServerUserName)
1470 .setPassword(config_->turnServerPwd)
1471 .setRealm(config_->turnServerRealm));
1472 }
1473 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001474 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001475 .setUri(config_->turnServer)
1476 .setUsername(config_->turnServerUserName)
1477 .setPassword(config_->turnServerPwd)
1478 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001479 }
1480 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1481 // co issues. So this needs some debug. for now just disable
1482 // if (cacheTurnV6 && *cacheTurnV6) {
1483 // opts.turnServers.emplace_back(TurnServerInfo()
1484 // .setUri(cacheTurnV6->toString(true))
1485 // .setUsername(turnServerUserName_)
1486 // .setPassword(turnServerPwd_)
1487 // .setRealm(turnServerRealm_));
1488 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001489 }
1490 return opts;
1491}
1492
1493bool
1494ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1495 dht::InfoHash& account_id,
1496 const std::shared_ptr<Logger>& logger)
1497{
1498 if (not crt)
1499 return false;
1500
1501 auto top_issuer = crt;
1502 while (top_issuer->issuer)
1503 top_issuer = top_issuer->issuer;
1504
1505 // Device certificate can't be self-signed
Adrien Béraud62b657d2023-07-26 14:18:56 -04001506 /* if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001507 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001508 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001509 return false;
Adrien Béraud62b657d2023-07-26 14:18:56 -04001510 } */
Adrien Béraud612b55b2023-05-29 10:42:04 -04001511
1512 // Check peer certificate chain
1513 // Trust store with top issuer as the only CA
1514 dht::crypto::TrustList peer_trust;
1515 peer_trust.add(*top_issuer);
1516 if (not peer_trust.verify(*crt)) {
1517 if (logger)
1518 logger->warn("Found invalid peer device: {}", crt->getLongId());
1519 return false;
1520 }
1521
1522 // Check cached OCSP response
1523 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1524 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001525 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001526 return false;
1527 }
1528
Adrien Béraud62b657d2023-07-26 14:18:56 -04001529 if (auto issuer = crt->issuer) {
1530 account_id = issuer->getId();
1531 if (logger)
1532 logger->warn("Found peer device: {} account:{} CA:{}",
1533 crt->getLongId(),
1534 account_id,
1535 top_issuer->getId());
1536 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001537 return true;
1538}
1539
1540bool
1541ConnectionManager::Impl::findCertificate(
1542 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1543{
1544 if (auto cert = certStore().getCertificate(id.toString())) {
1545 if (cb)
1546 cb(cert);
1547 } else if (cb)
1548 cb(nullptr);
1549 return true;
1550}
1551
Sébastien Blin34086512023-07-25 09:52:14 -04001552bool
1553ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1554 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1555{
1556 if (auto cert = certStore().getCertificate(h.toString())) {
1557 if (cb)
1558 cb(cert);
1559 } else {
1560 dht()->findCertificate(h,
1561 [cb = std::move(cb), this](
1562 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1563 if (crt)
1564 certStore().pinCertificate(crt);
1565 if (cb)
1566 cb(crt);
1567 });
1568 }
1569 return true;
1570}
1571
Adrien Béraud612b55b2023-05-29 10:42:04 -04001572ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1573 : pimpl_ {std::make_shared<Impl>(config_)}
1574{}
1575
1576ConnectionManager::~ConnectionManager()
1577{
1578 if (pimpl_)
1579 pimpl_->shutdown();
1580}
1581
1582void
1583ConnectionManager::connectDevice(const DeviceId& deviceId,
1584 const std::string& name,
1585 ConnectCallback cb,
1586 bool noNewSocket,
1587 bool forceNewSocket,
1588 const std::string& connType)
1589{
1590 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1591}
1592
1593void
Amna0cf544d2023-07-25 14:25:09 -04001594ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1595 const std::string& name,
1596 ConnectCallbackLegacy cb,
1597 bool noNewSocket,
1598 bool forceNewSocket,
1599 const std::string& connType)
1600{
1601 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1602}
1603
1604
1605void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001606ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1607 const std::string& name,
1608 ConnectCallback cb,
1609 bool noNewSocket,
1610 bool forceNewSocket,
1611 const std::string& connType)
1612{
1613 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1614}
1615
1616bool
1617ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1618{
Adrien Béraud665294f2023-06-13 18:09:11 -04001619 auto pending = pimpl_->getPendingIds(deviceId);
1620 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001621 != pending.end();
1622}
1623
1624void
1625ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1626{
1627 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1628 std::set<DeviceId> peersDevices;
1629 {
1630 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1631 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1632 auto const& [key, value] = *iter;
1633 auto deviceId = key.first;
1634 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1635 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1636 connInfos.emplace_back(value);
1637 peersDevices.emplace(deviceId);
1638 iter = pimpl_->infos_.erase(iter);
1639 } else {
1640 iter++;
1641 }
1642 }
1643 }
1644 // Stop connections to all peers devices
1645 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001646 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001647 // This will close the TLS Session
1648 pimpl_->removeUnusedConnections(deviceId);
1649 }
1650 for (auto& info : connInfos) {
1651 if (info->socket_)
1652 info->socket_->shutdown();
1653 if (info->waitForAnswer_)
1654 info->waitForAnswer_->cancel();
1655 if (info->ice_) {
1656 std::unique_lock<std::mutex> lk {info->mutex_};
1657 dht::ThreadPool::io().run(
1658 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1659 }
1660 }
1661}
1662
1663void
1664ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1665{
1666 pimpl_->onDhtConnected(devicePk);
1667}
1668
1669void
1670ConnectionManager::onICERequest(onICERequestCallback&& cb)
1671{
1672 pimpl_->iceReqCb_ = std::move(cb);
1673}
1674
1675void
1676ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1677{
1678 pimpl_->channelReqCb_ = std::move(cb);
1679}
1680
1681void
1682ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1683{
1684 pimpl_->connReadyCb_ = std::move(cb);
1685}
1686
1687void
1688ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1689{
1690 pimpl_->iOSConnectedCb_ = std::move(cb);
1691}
1692
1693std::size_t
1694ConnectionManager::activeSockets() const
1695{
1696 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1697 return pimpl_->infos_.size();
1698}
1699
1700void
1701ConnectionManager::monitor() const
1702{
1703 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1704 auto logger = pimpl_->config_->logger;
1705 if (!logger)
1706 return;
1707 logger->debug("ConnectionManager current status:");
1708 for (const auto& [_, ci] : pimpl_->infos_) {
1709 if (ci->socket_)
1710 ci->socket_->monitor();
1711 }
1712 logger->debug("ConnectionManager end status.");
1713}
1714
1715void
1716ConnectionManager::connectivityChanged()
1717{
1718 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1719 for (const auto& [_, ci] : pimpl_->infos_) {
1720 if (ci->socket_)
1721 ci->socket_->sendBeacon();
1722 }
1723}
1724
1725void
1726ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1727{
1728 return pimpl_->getIceOptions(std::move(cb));
1729}
1730
1731IceTransportOptions
1732ConnectionManager::getIceOptions() const noexcept
1733{
1734 return pimpl_->getIceOptions();
1735}
1736
1737IpAddr
1738ConnectionManager::getPublishedIpAddress(uint16_t family) const
1739{
1740 return pimpl_->getPublishedIpAddress(family);
1741}
1742
1743void
1744ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1745{
1746 return pimpl_->setPublishedAddress(ip_addr);
1747}
1748
1749void
1750ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1751{
1752 return pimpl_->storeActiveIpAddress(std::move(cb));
1753}
1754
1755std::shared_ptr<ConnectionManager::Config>
1756ConnectionManager::getConfig()
1757{
1758 return pimpl_->config_;
1759}
1760
Sébastien Blin464bdff2023-07-19 08:02:53 -04001761} // namespace dhtnet