blob: b3d8b7919ae2cb494cbe5e0096112b170d80f240 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040044
45struct ConnectionInfo
46{
47 ~ConnectionInfo()
48 {
49 if (socket_)
50 socket_->join();
51 }
52
53 std::mutex mutex_ {};
54 bool responseReceived_ {false};
55 PeerConnectionRequest response_ {};
56 std::unique_ptr<IceTransport> ice_ {nullptr};
57 // Used to store currently non ready TLS Socket
58 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
59 std::shared_ptr<MultiplexedSocket> socket_ {};
60 std::set<CallbackId> cbIds_ {};
61
62 std::function<void(bool)> onConnected_;
63 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
64};
65
66/**
67 * returns whether or not UPnP is enabled and active_
68 * ie: if it is able to make port mappings
69 */
70bool
71ConnectionManager::Config::getUPnPActive() const
72{
73 if (upnpCtrl)
74 return upnpCtrl->isReady();
75 return false;
76}
77
78class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
79{
80public:
81 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
82 : config_ {std::move(config_)}
Sébastien Blincf569402023-07-27 09:46:40 -040083 , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Adrien Béraud612b55b2023-05-29 10:42:04 -040084 {}
85 ~Impl() {}
86
87 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
88 const dht::crypto::Identity& identity() const { return config_->id; }
89
90 void removeUnusedConnections(const DeviceId& deviceId = {})
91 {
92 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
93
94 {
95 std::lock_guard<std::mutex> lk(infosMtx_);
96 for (auto it = infos_.begin(); it != infos_.end();) {
97 auto& [key, info] = *it;
98 if (info && (!deviceId || key.first == deviceId)) {
99 unused.emplace_back(std::move(info));
100 it = infos_.erase(it);
101 } else {
102 ++it;
103 }
104 }
105 }
106 for (auto& info: unused) {
107 if (info->tls_)
108 info->tls_->shutdown();
109 if (info->socket_)
110 info->socket_->shutdown();
111 if (info->waitForAnswer_)
112 info->waitForAnswer_->cancel();
113 }
114 if (!unused.empty())
115 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
116 }
117
118 void shutdown()
119 {
120 if (isDestroying_.exchange(true))
121 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400122 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400123 {
124 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400125 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400126 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400127 for (auto& [deviceId, pcbs] : po) {
128 for (auto& [id, pending] : pcbs.connecting)
129 pending.cb(nullptr, deviceId);
130 for (auto& [id, pending] : pcbs.waiting)
131 pending.cb(nullptr, deviceId);
132 }
133
Adrien Béraud612b55b2023-05-29 10:42:04 -0400134 removeUnusedConnections();
135 }
136
Adrien Béraud612b55b2023-05-29 10:42:04 -0400137 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
138 const dht::Value::Id& vid,
139 const std::string& connType,
140 std::function<void(bool)> onConnected);
141 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
142 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
143 const std::string& name,
144 const dht::Value::Id& vid,
145 const std::shared_ptr<dht::crypto::Certificate>& cert);
146 void connectDevice(const DeviceId& deviceId,
147 const std::string& uri,
148 ConnectCallback cb,
149 bool noNewSocket = false,
150 bool forceNewSocket = false,
151 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400152 void connectDevice(const dht::InfoHash& deviceId,
153 const std::string& uri,
154 ConnectCallbackLegacy cb,
155 bool noNewSocket = false,
156 bool forceNewSocket = false,
157 const std::string& connType = "");
158
Adrien Béraud612b55b2023-05-29 10:42:04 -0400159 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
160 const std::string& name,
161 ConnectCallback cb,
162 bool noNewSocket = false,
163 bool forceNewSocket = false,
164 const std::string& connType = "");
165 /**
166 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
167 * @param sock socket used to send the request
168 * @param name channel's name
169 * @param vid channel's id
170 * @param deviceId to identify the linked ConnectCallback
171 */
172 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
173 const std::string& name,
174 const DeviceId& deviceId,
175 const dht::Value::Id& vid);
176 /**
177 * Triggered when a PeerConnectionRequest comes from the DHT
178 */
179 void answerTo(IceTransport& ice,
180 const dht::Value::Id& id,
181 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
182 bool onRequestStartIce(const PeerConnectionRequest& req);
183 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
184 void onDhtPeerRequest(const PeerConnectionRequest& req,
185 const std::shared_ptr<dht::crypto::Certificate>& cert);
186
187 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
188 void onPeerResponse(const PeerConnectionRequest& req);
189 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
190
191 const std::shared_future<tls::DhParams> dhParams() const;
192 tls::CertificateStore& certStore() const { return *config_->certStore; }
193
194 mutable std::mutex messageMutex_ {};
195 std::set<std::string, std::less<>> treatedMessages_ {};
196
197 void loadTreatedMessages();
198 void saveTreatedMessages() const;
199
200 /// \return true if the given DHT message identifier has been treated
201 /// \note if message has not been treated yet this method st/ore this id and returns true at
202 /// further calls
203 bool isMessageTreated(std::string_view id);
204
205 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
206
207 /**
208 * Published IPv4/IPv6 addresses, used only if defined by the user in account
209 * configuration
210 *
211 */
212 IpAddr publishedIp_[2] {};
213
Adrien Béraud612b55b2023-05-29 10:42:04 -0400214 /**
215 * interface name on which this account is bound
216 */
217 std::string interface_ {"default"};
218
219 /**
220 * Get the local interface name on which this account is bound.
221 */
222 const std::string& getLocalInterface() const { return interface_; }
223
224 /**
225 * Get the published IP address, fallbacks to NAT if family is unspecified
226 * Prefers the usage of IPv4 if possible.
227 */
228 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
229
230 /**
231 * Set published IP address according to given family
232 */
233 void setPublishedAddress(const IpAddr& ip_addr);
234
235 /**
236 * Store the local/public addresses used to register
237 */
238 void storeActiveIpAddress(std::function<void()>&& cb = {});
239
240 /**
241 * Create and return ICE options.
242 */
243 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
244 IceTransportOptions getIceOptions() const noexcept;
245
246 /**
247 * Inform that a potential peer device have been found.
248 * Returns true only if the device certificate is a valid device certificate.
249 * In that case (true is returned) the account_id parameter is set to the peer account ID.
250 */
251 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
252 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
253
254 bool findCertificate(const dht::PkId& id,
255 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400256 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400257
258 /**
259 * returns whether or not UPnP is enabled and active
260 * ie: if it is able to make port mappings
261 */
262 bool getUPnPActive() const;
263
264 /**
265 * Triggered when a new TLS socket is ready to use
266 * @param ok If succeed
267 * @param deviceId Related device
268 * @param vid vid of the connection request
269 * @param name non empty if TLS was created by connectDevice()
270 */
271 void onTlsNegotiationDone(bool ok,
272 const DeviceId& deviceId,
273 const dht::Value::Id& vid,
274 const std::string& name = "");
275
276 std::shared_ptr<ConnectionManager::Config> config_;
277
Adrien Béraud612b55b2023-05-29 10:42:04 -0400278 mutable std::mt19937_64 rand;
279
280 iOSConnectedCallback iOSConnectedCb_ {};
281
282 std::mutex infosMtx_ {};
283 // Note: Someone can ask multiple sockets, so to avoid any race condition,
284 // each device can have multiple multiplexed sockets.
285 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
286
287 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
288 {
289 std::lock_guard<std::mutex> lk(infosMtx_);
290 auto it = infos_.find({deviceId, id});
291 if (it != infos_.end())
292 return it->second;
293 return {};
294 }
295
296 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
297 {
298 std::lock_guard<std::mutex> lk(infosMtx_);
299 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
300 auto& [key, value] = item;
301 return key.first == deviceId && value && value->socket_;
302 });
303 if (it != infos_.end())
304 return it->second;
305 return {};
306 }
307
308 ChannelRequestCallback channelReqCb_ {};
309 ConnectionReadyCallback connReadyCb_ {};
310 onICERequestCallback iceReqCb_ {};
311
312 /**
313 * Stores callback from connectDevice
314 * @note: each device needs a vector because several connectDevice can
315 * be done in parallel and we only want one socket
316 */
317 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400318
Adrien Béraud665294f2023-06-13 18:09:11 -0400319 struct PendingCb
320 {
321 std::string name;
322 ConnectCallback cb;
323 };
324 struct PendingOperations {
325 std::map<dht::Value::Id, PendingCb> connecting;
326 std::map<dht::Value::Id, PendingCb> waiting;
327 };
328
329 std::map<DeviceId, PendingOperations> pendingOperations_ {};
330
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400331 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400332 {
333 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400334 std::unique_lock<std::mutex> lk(connectCbsMtx_);
335 auto it = pendingOperations_.find(deviceId);
336 if (it == pendingOperations_.end())
337 return;
338 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400339 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400340 // Extract all pending callbacks
341 for (auto& [vid, cb] : pendingOperations.connecting)
342 ret.emplace_back(std::move(cb));
343 pendingOperations.connecting.clear();
344 for (auto& [vid, cb] : pendingOperations.waiting)
345 ret.emplace_back(std::move(cb));
346 pendingOperations.waiting.clear();
347 } else if (auto n = pendingOperations.waiting.extract(vid)) {
348 // If it's a waiting operation, just move it
349 ret.emplace_back(std::move(n.mapped()));
350 } else if (auto n = pendingOperations.connecting.extract(vid)) {
351 ret.emplace_back(std::move(n.mapped()));
352 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400353 // If accepted is false, it means that underlying socket is ok, but channel is declined
354 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400355 for (auto& [vid, cb] : pendingOperations.waiting)
356 ret.emplace_back(std::move(cb));
357 pendingOperations.waiting.clear();
358 for (auto& [vid, cb] : pendingOperations.connecting)
359 ret.emplace_back(std::move(cb));
360 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400361 }
362 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400363 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
364 pendingOperations_.erase(it);
365 lk.unlock();
366 for (auto& cb : ret)
367 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400368 }
369
Adrien Béraud665294f2023-06-13 18:09:11 -0400370 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400371 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400372 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400373 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400374 auto it = pendingOperations_.find(deviceId);
375 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400376 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400377 auto& pendingOp = it->second;
378 for (const auto& [id, pc]: pendingOp.connecting) {
379 if (vid == 0 || id == vid)
380 ret[id] = pc.name;
381 }
382 for (const auto& [id, pc]: pendingOp.waiting) {
383 if (vid == 0 || id == vid)
384 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400385 }
386 return ret;
387 }
388
389 std::shared_ptr<ConnectionManager::Impl> shared()
390 {
391 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
392 }
393 std::shared_ptr<ConnectionManager::Impl const> shared() const
394 {
395 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
396 }
397 std::weak_ptr<ConnectionManager::Impl> weak()
398 {
399 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
400 }
401 std::weak_ptr<ConnectionManager::Impl const> weak() const
402 {
403 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
404 }
405
406 std::atomic_bool isDestroying_ {false};
407};
408
409void
410ConnectionManager::Impl::connectDeviceStartIce(
411 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
412 const dht::Value::Id& vid,
413 const std::string& connType,
414 std::function<void(bool)> onConnected)
415{
416 auto deviceId = devicePk->getLongId();
417 auto info = getInfo(deviceId, vid);
418 if (!info) {
419 onConnected(false);
420 return;
421 }
422
423 std::unique_lock<std::mutex> lk(info->mutex_);
424 auto& ice = info->ice_;
425
426 if (!ice) {
427 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400428 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400429 onConnected(false);
430 return;
431 }
432
433 auto iceAttributes = ice->getLocalAttributes();
434 std::ostringstream icemsg;
435 icemsg << iceAttributes.ufrag << "\n";
436 icemsg << iceAttributes.pwd << "\n";
437 for (const auto& addr : ice->getLocalCandidates(1)) {
438 icemsg << addr << "\n";
439 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400440 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400441 }
442
443 // Prepare connection request as a DHT message
444 PeerConnectionRequest val;
445
446 val.id = vid; /* Random id for the message unicity */
447 val.ice_msg = icemsg.str();
448 val.connType = connType;
449
450 auto value = std::make_shared<dht::Value>(std::move(val));
451 value->user_type = "peer_request";
452
453 // Send connection request through DHT
454 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400455 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400456 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
457 + devicePk->getId().toString()),
458 devicePk,
459 value,
460 [l=config_->logger,deviceId](bool ok) {
461 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400462 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400463 deviceId,
464 (ok ? "ok" : "failed"));
465 });
466 // Wait for call to onResponse() operated by DHT
467 if (isDestroying_) {
468 onConnected(true); // This avoid to wait new negotiation when destroying
469 return;
470 }
471
472 info->onConnected_ = std::move(onConnected);
473 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
474 std::chrono::steady_clock::now()
475 + DHT_MSG_TIMEOUT);
476 info->waitForAnswer_->async_wait(
477 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
478}
479
480void
481ConnectionManager::Impl::onResponse(const asio::error_code& ec,
482 const DeviceId& deviceId,
483 const dht::Value::Id& vid)
484{
485 if (ec == asio::error::operation_aborted)
486 return;
487 auto info = getInfo(deviceId, vid);
488 if (!info)
489 return;
490
491 std::unique_lock<std::mutex> lk(info->mutex_);
492 auto& ice = info->ice_;
493 if (isDestroying_) {
494 info->onConnected_(true); // The destructor can wake a pending wait here.
495 return;
496 }
497 if (!info->responseReceived_) {
498 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400499 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400500 info->onConnected_(false);
501 return;
502 }
503
504 if (!info->ice_) {
505 info->onConnected_(false);
506 return;
507 }
508
509 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
510
511 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
512 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400513 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400514 info->onConnected_(false);
515 return;
516 }
517 info->onConnected_(true);
518}
519
520bool
521ConnectionManager::Impl::connectDeviceOnNegoDone(
522 const DeviceId& deviceId,
523 const std::string& name,
524 const dht::Value::Id& vid,
525 const std::shared_ptr<dht::crypto::Certificate>& cert)
526{
527 auto info = getInfo(deviceId, vid);
528 if (!info)
529 return false;
530
531 std::unique_lock<std::mutex> lk {info->mutex_};
532 if (info->waitForAnswer_) {
533 // Negotiation is done and connected, go to handshake
534 // and avoid any cancellation at this point.
535 info->waitForAnswer_->cancel();
536 }
537 auto& ice = info->ice_;
538 if (!ice || !ice->isRunning()) {
539 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400540 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400541 return false;
542 }
543
544 // Build socket
545 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
546 std::move(ice)),
547 true);
548
549 // Negotiate a TLS session
550 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400551 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400552 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
553 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400554 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400555 identity(),
556 dhParams(),
557 *cert);
558
559 info->tls_->setOnReady(
560 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
561 bool ok) {
562 if (auto shared = w.lock())
563 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
564 });
565 return true;
566}
567
568void
569ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
570 const std::string& name,
571 ConnectCallback cb,
572 bool noNewSocket,
573 bool forceNewSocket,
574 const std::string& connType)
575{
576 if (!dht()) {
577 cb(nullptr, deviceId);
578 return;
579 }
580 if (deviceId.toString() == identity().second->getLongId().toString()) {
581 cb(nullptr, deviceId);
582 return;
583 }
584 findCertificate(deviceId,
585 [w = weak(),
586 deviceId,
587 name,
588 cb = std::move(cb),
589 noNewSocket,
590 forceNewSocket,
591 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
592 if (!cert) {
593 if (auto shared = w.lock())
594 if (shared->config_->logger)
595 shared->config_->logger->error(
596 "No valid certificate found for device {}",
597 deviceId);
598 cb(nullptr, deviceId);
599 return;
600 }
601 if (auto shared = w.lock()) {
602 shared->connectDevice(cert,
603 name,
604 std::move(cb),
605 noNewSocket,
606 forceNewSocket,
607 connType);
608 } else
609 cb(nullptr, deviceId);
610 });
611}
612
613void
Amna0cf544d2023-07-25 14:25:09 -0400614ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
615 const std::string& name,
616 ConnectCallbackLegacy cb,
617 bool noNewSocket,
618 bool forceNewSocket,
619 const std::string& connType)
620{
621 if (!dht()) {
622 cb(nullptr, deviceId);
623 return;
624 }
625 if (deviceId.toString() == identity().second->getLongId().toString()) {
626 cb(nullptr, deviceId);
627 return;
628 }
629 findCertificate(deviceId,
630 [w = weak(),
631 deviceId,
632 name,
633 cb = std::move(cb),
634 noNewSocket,
635 forceNewSocket,
636 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
637 if (!cert) {
638 if (auto shared = w.lock())
639 if (shared->config_->logger)
640 shared->config_->logger->error(
641 "No valid certificate found for device {}",
642 deviceId);
643 cb(nullptr, deviceId);
644 return;
645 }
646 if (auto shared = w.lock()) {
647 shared->connectDevice(cert,
648 name,
649 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& did){
650 cb(sock, deviceId);
651 },
652 noNewSocket,
653 forceNewSocket,
654 connType);
655 } else
656 cb(nullptr, deviceId);
657 });
658}
659
660void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400661ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
662 const std::string& name,
663 ConnectCallback cb,
664 bool noNewSocket,
665 bool forceNewSocket,
666 const std::string& connType)
667{
668 // Avoid dht operation in a DHT callback to avoid deadlocks
669 dht::ThreadPool::computation().run([w = weak(),
670 name = std::move(name),
671 cert = std::move(cert),
672 cb = std::move(cb),
673 noNewSocket,
674 forceNewSocket,
675 connType] {
676 auto devicePk = cert->getSharedPublicKey();
677 auto deviceId = devicePk->getLongId();
678 auto sthis = w.lock();
679 if (!sthis || sthis->isDestroying_) {
680 cb(nullptr, deviceId);
681 return;
682 }
683 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
684 auto isConnectingToDevice = false;
685 {
686 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400687 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
688 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400689 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400690 while (pendings.connecting.find(vid) != pendings.connecting.end()
691 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400692 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400693 }
694 }
695 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400696 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400697 // Save current request for sendChannelRequest.
698 // Note: do not return here, cause we can be in a state where first
699 // socket is negotiated and first channel is pending
700 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400701 if (isConnectingToDevice && !forceNewSocket)
702 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400703 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400704 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400705 }
706
707 // Check if already negotiated
708 CallbackId cbId(deviceId, vid);
709 if (auto info = sthis->getConnectedInfo(deviceId)) {
710 std::lock_guard<std::mutex> lk(info->mutex_);
711 if (info->socket_) {
712 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400713 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400714 info->cbIds_.emplace(cbId);
715 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
716 return;
717 }
718 }
719
720 if (isConnectingToDevice && !forceNewSocket) {
721 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400722 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400723 return;
724 }
725 if (noNewSocket) {
726 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400727 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400728 return;
729 }
730
731 // Note: used when the ice negotiation fails to erase
732 // all stored structures.
733 auto eraseInfo = [w, cbId] {
734 if (auto shared = w.lock()) {
735 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400736 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400737 std::lock_guard<std::mutex> lk(shared->infosMtx_);
738 shared->infos_.erase(cbId);
739 }
740 };
741
742 // If no socket exists, we need to initiate an ICE connection.
743 sthis->getIceOptions([w,
744 deviceId = std::move(deviceId),
745 devicePk = std::move(devicePk),
746 name = std::move(name),
747 cert = std::move(cert),
748 vid,
749 connType,
750 eraseInfo](auto&& ice_config) {
751 auto sthis = w.lock();
752 if (!sthis) {
753 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
754 return;
755 }
756 ice_config.tcpEnable = true;
757 ice_config.onInitDone = [w,
758 deviceId = std::move(deviceId),
759 devicePk = std::move(devicePk),
760 name = std::move(name),
761 cert = std::move(cert),
762 vid,
763 connType,
764 eraseInfo](bool ok) {
765 dht::ThreadPool::io().run([w = std::move(w),
766 devicePk = std::move(devicePk),
767 vid = std::move(vid),
768 eraseInfo,
769 connType, ok] {
770 auto sthis = w.lock();
771 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400772 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400773 if (!sthis || !ok) {
774 eraseInfo();
775 return;
776 }
777 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
778 if (!ok) {
779 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
780 }
781 });
782 });
783 };
784 ice_config.onNegoDone = [w,
785 deviceId,
786 name,
787 cert = std::move(cert),
788 vid,
789 eraseInfo](bool ok) {
790 dht::ThreadPool::io().run([w = std::move(w),
791 deviceId = std::move(deviceId),
792 name = std::move(name),
793 cert = std::move(cert),
794 vid = std::move(vid),
795 eraseInfo = std::move(eraseInfo),
796 ok] {
797 auto sthis = w.lock();
798 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400799 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400800 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
801 eraseInfo();
802 });
803 };
804
805 auto info = std::make_shared<ConnectionInfo>();
806 {
807 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
808 sthis->infos_[{deviceId, vid}] = info;
809 }
810 std::unique_lock<std::mutex> lk {info->mutex_};
811 ice_config.master = false;
812 ice_config.streamsCount = 1;
813 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400814 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400815 if (!info->ice_) {
816 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400817 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400818 eraseInfo();
819 return;
820 }
821 // We need to detect any shutdown if the ice session is destroyed before going to the
822 // TLS session;
823 info->ice_->setOnShutdown([eraseInfo]() {
824 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
825 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400826 try {
827 info->ice_->initIceInstance(ice_config);
828 } catch (const std::exception& e) {
829 if (sthis->config_->logger)
830 sthis->config_->logger->error("{}", e.what());
831 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
832 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400833 });
834 });
835}
836
837void
838ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
839 const std::string& name,
840 const DeviceId& deviceId,
841 const dht::Value::Id& vid)
842{
843 auto channelSock = sock->addChannel(name);
844 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
845 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400846 if (auto shared = w.lock())
847 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400848 });
849 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400850 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400851 auto shared = w.lock();
852 auto channelSock = wSock.lock();
853 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400854 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400855 });
856
857 ChannelRequest val;
858 val.name = channelSock->name();
859 val.state = ChannelRequestState::REQUEST;
860 val.channel = channelSock->channel();
861 msgpack::sbuffer buffer(256);
862 msgpack::pack(buffer, val);
863
864 std::error_code ec;
865 int res = sock->write(CONTROL_CHANNEL,
866 reinterpret_cast<const uint8_t*>(buffer.data()),
867 buffer.size(),
868 ec);
869 if (res < 0) {
870 // TODO check if we should handle errors here
871 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400872 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400873 }
874}
875
876void
877ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
878{
879 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400880 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400881 if (config_->logger)
882 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400883 std::lock_guard<std::mutex> lk {info->mutex_};
884 info->responseReceived_ = true;
885 info->response_ = std::move(req);
886 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
887 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
888 this,
889 std::placeholders::_1,
890 device,
891 req.id));
892 } else {
893 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400894 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400895 }
896}
897
898void
899ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
900{
901 if (!dht())
902 return;
903 dht()->listen<PeerConnectionRequest>(
904 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
905 [w = weak()](PeerConnectionRequest&& req) {
906 auto shared = w.lock();
907 if (!shared)
908 return false;
909 if (shared->isMessageTreated(to_hex_string(req.id))) {
910 // Message already treated. Just ignore
911 return true;
912 }
913 if (req.isAnswer) {
914 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400915 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400916 } else {
917 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400918 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400919 }
920 if (req.isAnswer) {
921 shared->onPeerResponse(req);
922 } else {
923 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400924 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400925 req.from,
926 [w, req = std::move(req)](
927 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
928 auto shared = w.lock();
929 if (!shared)
930 return;
931 dht::InfoHash peer_h;
932 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
933#if TARGET_OS_IOS
934 if (shared->iOSConnectedCb_(req.connType, peer_h))
935 return;
936#endif
937 shared->onDhtPeerRequest(req, cert);
938 } else {
939 if (shared->config_->logger)
940 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -0400941 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400942 req.owner->getLongId());
943 }
944 });
945 }
946
947 return true;
948 },
949 dht::Value::UserTypeFilter("peer_request"));
950}
951
952void
953ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
954 const DeviceId& deviceId,
955 const dht::Value::Id& vid,
956 const std::string& name)
957{
958 if (isDestroying_)
959 return;
960 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
961 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
962 // asked yet)
963 auto isDhtRequest = name.empty();
964 if (!ok) {
965 if (isDhtRequest) {
966 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400967 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400968 deviceId,
969 name,
970 vid);
971 if (connReadyCb_)
972 connReadyCb_(deviceId, "", nullptr);
973 } else {
974 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400975 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400976 deviceId,
977 name,
978 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400979 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400980 }
981 } else {
982 // The socket is ready, store it
983 if (isDhtRequest) {
984 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400985 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400986 deviceId,
987 vid);
988 } else {
989 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400990 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400991 deviceId,
992 name,
993 vid);
994 }
995
996 auto info = getInfo(deviceId, vid);
997 addNewMultiplexedSocket({deviceId, vid}, info);
998 // Finally, open the channel and launch pending callbacks
999 if (info->socket_) {
1000 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001001 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001002 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001003 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001004 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001005 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001006 }
1007 }
1008 }
1009}
1010
1011void
1012ConnectionManager::Impl::answerTo(IceTransport& ice,
1013 const dht::Value::Id& id,
1014 const std::shared_ptr<dht::crypto::PublicKey>& from)
1015{
1016 // NOTE: This is a shortest version of a real SDP message to save some bits
1017 auto iceAttributes = ice.getLocalAttributes();
1018 std::ostringstream icemsg;
1019 icemsg << iceAttributes.ufrag << "\n";
1020 icemsg << iceAttributes.pwd << "\n";
1021 for (const auto& addr : ice.getLocalCandidates(1)) {
1022 icemsg << addr << "\n";
1023 }
1024
1025 // Send PeerConnection response
1026 PeerConnectionRequest val;
1027 val.id = id;
1028 val.ice_msg = icemsg.str();
1029 val.isAnswer = true;
1030 auto value = std::make_shared<dht::Value>(std::move(val));
1031 value->user_type = "peer_request";
1032
1033 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001034 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001035 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1036 + from->getId().toString()),
1037 from,
1038 value,
1039 [from,l=config_->logger](bool ok) {
1040 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001041 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001042 from->getLongId(),
1043 (ok ? "ok" : "failed"));
1044 });
1045}
1046
1047bool
1048ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1049{
1050 auto deviceId = req.owner->getLongId();
1051 auto info = getInfo(deviceId, req.id);
1052 if (!info)
1053 return false;
1054
1055 std::unique_lock<std::mutex> lk {info->mutex_};
1056 auto& ice = info->ice_;
1057 if (!ice) {
1058 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001059 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001060 if (connReadyCb_)
1061 connReadyCb_(deviceId, "", nullptr);
1062 return false;
1063 }
1064
1065 auto sdp = ice->parseIceCandidates(req.ice_msg);
1066 answerTo(*ice, req.id, req.owner);
1067 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1068 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001069 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001070 ice = nullptr;
1071 if (connReadyCb_)
1072 connReadyCb_(deviceId, "", nullptr);
1073 return false;
1074 }
1075 return true;
1076}
1077
1078bool
1079ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1080{
1081 auto deviceId = req.owner->getLongId();
1082 auto info = getInfo(deviceId, req.id);
1083 if (!info)
1084 return false;
1085
1086 std::unique_lock<std::mutex> lk {info->mutex_};
1087 auto& ice = info->ice_;
1088 if (!ice) {
1089 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001090 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001091 return false;
1092 }
1093
1094 // Build socket
1095 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1096 std::move(ice)),
1097 false);
1098
1099 // init TLS session
1100 auto ph = req.from;
1101 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001102 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1103 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001104 req.id);
1105 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1106 std::move(endpoint),
1107 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001108 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001109 identity(),
1110 dhParams(),
1111 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1112 auto shared = w.lock();
1113 if (!shared)
1114 return false;
1115 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1116 if (!crt)
1117 return false;
1118 return crt->getPacked() == cert.getPacked();
1119 });
1120
1121 info->tls_->setOnReady(
1122 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1123 if (auto shared = w.lock())
1124 shared->onTlsNegotiationDone(ok, deviceId, vid);
1125 });
1126 return true;
1127}
1128
1129void
1130ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1131 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1132{
1133 auto deviceId = req.owner->getLongId();
1134 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001135 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001136 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1137 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001138 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001139 return;
1140 }
1141
1142 // Because the connection is accepted, create an ICE socket.
1143 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1144 auto shared = w.lock();
1145 if (!shared)
1146 return;
1147 // Note: used when the ice negotiation fails to erase
1148 // all stored structures.
1149 auto eraseInfo = [w, id = req.id, deviceId] {
1150 if (auto shared = w.lock()) {
1151 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001152 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001153 if (shared->connReadyCb_)
1154 shared->connReadyCb_(deviceId, "", nullptr);
1155 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1156 shared->infos_.erase({deviceId, id});
1157 }
1158 };
1159
1160 ice_config.tcpEnable = true;
1161 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1162 auto shared = w.lock();
1163 if (!shared)
1164 return;
1165 if (!ok) {
1166 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001167 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001168 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1169 return;
1170 }
1171
1172 dht::ThreadPool::io().run(
1173 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1174 auto shared = w.lock();
1175 if (!shared)
1176 return;
1177 if (!shared->onRequestStartIce(req))
1178 eraseInfo();
1179 });
1180 };
1181
1182 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1183 auto shared = w.lock();
1184 if (!shared)
1185 return;
1186 if (!ok) {
1187 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001188 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001189 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1190 return;
1191 }
1192
1193 dht::ThreadPool::io().run(
1194 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1195 if (auto shared = w.lock())
1196 if (!shared->onRequestOnNegoDone(req))
1197 eraseInfo();
1198 });
1199 };
1200
1201 // Negotiate a new ICE socket
1202 auto info = std::make_shared<ConnectionInfo>();
1203 {
1204 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1205 shared->infos_[{deviceId, req.id}] = info;
1206 }
1207 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001208 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001209 std::unique_lock<std::mutex> lk {info->mutex_};
1210 ice_config.streamsCount = 1;
1211 ice_config.compCountPerStream = 1; // TCP
1212 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001213 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001214 if (not info->ice_) {
1215 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001216 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001217 eraseInfo();
1218 return;
1219 }
1220 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1221 info->ice_->setOnShutdown([eraseInfo]() {
1222 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1223 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001224 try {
1225 info->ice_->initIceInstance(ice_config);
1226 } catch (const std::exception& e) {
1227 if (shared->config_->logger)
1228 shared->config_->logger->error("{}", e.what());
1229 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1230 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001231 });
1232}
1233
1234void
1235ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1236{
1237 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1238 info->socket_->setOnReady(
1239 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1240 if (auto sthis = w.lock())
1241 if (sthis->connReadyCb_)
1242 sthis->connReadyCb_(deviceId, socket->name(), socket);
1243 });
1244 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1245 const uint16_t&,
1246 const std::string& name) {
1247 if (auto sthis = w.lock())
1248 if (sthis->channelReqCb_)
1249 return sthis->channelReqCb_(peer, name);
1250 return false;
1251 });
1252 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1253 // Cancel current outgoing connections
1254 dht::ThreadPool::io().run([w, deviceId, vid] {
1255 auto sthis = w.lock();
1256 if (!sthis)
1257 return;
1258
1259 std::set<CallbackId> ids;
1260 if (auto info = sthis->getInfo(deviceId, vid)) {
1261 std::lock_guard<std::mutex> lk(info->mutex_);
1262 if (info->socket_) {
1263 ids = std::move(info->cbIds_);
1264 info->socket_->shutdown();
1265 }
1266 }
1267 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001268 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001269
1270 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1271 sthis->infos_.erase({deviceId, vid});
1272 });
1273 });
1274}
1275
1276const std::shared_future<tls::DhParams>
1277ConnectionManager::Impl::dhParams() const
1278{
1279 return dht::ThreadPool::computation().get<tls::DhParams>(
1280 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001281}
1282
1283template<typename ID = dht::Value::Id>
1284std::set<ID, std::less<>>
1285loadIdList(const std::string& path)
1286{
1287 std::set<ID, std::less<>> ids;
1288 std::ifstream file = fileutils::ifstream(path);
1289 if (!file.is_open()) {
1290 //JAMI_DBG("Could not load %s", path.c_str());
1291 return ids;
1292 }
1293 std::string line;
1294 while (std::getline(file, line)) {
1295 if constexpr (std::is_same<ID, std::string>::value) {
1296 ids.emplace(std::move(line));
1297 } else if constexpr (std::is_integral<ID>::value) {
1298 ID vid;
1299 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1300 ec == std::errc()) {
1301 ids.emplace(vid);
1302 }
1303 }
1304 }
1305 return ids;
1306}
1307
1308template<typename List = std::set<dht::Value::Id>>
1309void
1310saveIdList(const std::string& path, const List& ids)
1311{
1312 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1313 if (!file.is_open()) {
1314 //JAMI_ERR("Could not save to %s", path.c_str());
1315 return;
1316 }
1317 for (auto& c : ids)
1318 file << std::hex << c << "\n";
1319}
1320
1321void
1322ConnectionManager::Impl::loadTreatedMessages()
1323{
1324 std::lock_guard<std::mutex> lock(messageMutex_);
1325 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1326 treatedMessages_ = loadIdList<std::string>(path);
1327 if (treatedMessages_.empty()) {
1328 auto messages = loadIdList(path);
1329 for (const auto& m : messages)
1330 treatedMessages_.emplace(to_hex_string(m));
1331 }
1332}
1333
1334void
1335ConnectionManager::Impl::saveTreatedMessages() const
1336{
1337 dht::ThreadPool::io().run([w = weak()]() {
1338 if (auto sthis = w.lock()) {
1339 auto& this_ = *sthis;
1340 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1341 fileutils::check_dir(this_.config_->cachePath.c_str());
1342 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1343 + DIR_SEPARATOR_STR "treatedMessages",
1344 this_.treatedMessages_);
1345 }
1346 });
1347}
1348
1349bool
1350ConnectionManager::Impl::isMessageTreated(std::string_view id)
1351{
1352 std::lock_guard<std::mutex> lock(messageMutex_);
1353 auto res = treatedMessages_.emplace(id);
1354 if (res.second) {
1355 saveTreatedMessages();
1356 return false;
1357 }
1358 return true;
1359}
1360
1361/**
1362 * returns whether or not UPnP is enabled and active_
1363 * ie: if it is able to make port mappings
1364 */
1365bool
1366ConnectionManager::Impl::getUPnPActive() const
1367{
1368 return config_->getUPnPActive();
1369}
1370
1371IpAddr
1372ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1373{
1374 if (family == AF_INET)
1375 return publishedIp_[0];
1376 if (family == AF_INET6)
1377 return publishedIp_[1];
1378
1379 assert(family == AF_UNSPEC);
1380
1381 // If family is not set, prefere IPv4 if available. It's more
1382 // likely to succeed behind NAT.
1383 if (publishedIp_[0])
1384 return publishedIp_[0];
1385 if (publishedIp_[1])
1386 return publishedIp_[1];
1387 return {};
1388}
1389
1390void
1391ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1392{
1393 if (ip_addr.getFamily() == AF_INET) {
1394 publishedIp_[0] = ip_addr;
1395 } else {
1396 publishedIp_[1] = ip_addr;
1397 }
1398}
1399
1400void
1401ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1402{
1403 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1404 bool hasIpv4 {false}, hasIpv6 {false};
1405 for (auto& result : results) {
1406 auto family = result.getFamily();
1407 if (family == AF_INET) {
1408 if (not hasIpv4) {
1409 hasIpv4 = true;
1410 if (config_->logger)
1411 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1412 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1413 setPublishedAddress(*result.get());
1414 if (config_->upnpCtrl) {
1415 config_->upnpCtrl->setPublicAddress(*result.get());
1416 }
1417 }
1418 } else if (family == AF_INET6) {
1419 if (not hasIpv6) {
1420 hasIpv6 = true;
1421 if (config_->logger)
1422 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1423 setPublishedAddress(*result.get());
1424 }
1425 }
1426 if (hasIpv4 and hasIpv6)
1427 break;
1428 }
1429 if (cb)
1430 cb();
1431 });
1432}
1433
1434void
1435ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1436{
1437 storeActiveIpAddress([this, cb = std::move(cb)] {
1438 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1439 auto publishedAddr = getPublishedIpAddress();
1440
1441 if (publishedAddr) {
1442 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1443 publishedAddr.getFamily());
1444 if (interfaceAddr) {
1445 opts.accountLocalAddr = interfaceAddr;
1446 opts.accountPublicAddr = publishedAddr;
1447 }
1448 }
1449 if (cb)
1450 cb(std::move(opts));
1451 });
1452}
1453
1454IceTransportOptions
1455ConnectionManager::Impl::getIceOptions() const noexcept
1456{
1457 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001458 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001459 opts.upnpEnable = getUPnPActive();
1460
1461 if (config_->stunEnabled)
1462 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1463 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001464 if (config_->turnCache) {
1465 auto turnAddr = config_->turnCache->getResolvedTurn();
1466 if (turnAddr != std::nullopt) {
1467 opts.turnServers.emplace_back(TurnServerInfo()
1468 .setUri(turnAddr->toString())
1469 .setUsername(config_->turnServerUserName)
1470 .setPassword(config_->turnServerPwd)
1471 .setRealm(config_->turnServerRealm));
1472 }
1473 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001474 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001475 .setUri(config_->turnServer)
1476 .setUsername(config_->turnServerUserName)
1477 .setPassword(config_->turnServerPwd)
1478 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001479 }
1480 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1481 // co issues. So this needs some debug. for now just disable
1482 // if (cacheTurnV6 && *cacheTurnV6) {
1483 // opts.turnServers.emplace_back(TurnServerInfo()
1484 // .setUri(cacheTurnV6->toString(true))
1485 // .setUsername(turnServerUserName_)
1486 // .setPassword(turnServerPwd_)
1487 // .setRealm(turnServerRealm_));
1488 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001489 }
1490 return opts;
1491}
1492
1493bool
1494ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1495 dht::InfoHash& account_id,
1496 const std::shared_ptr<Logger>& logger)
1497{
1498 if (not crt)
1499 return false;
1500
1501 auto top_issuer = crt;
1502 while (top_issuer->issuer)
1503 top_issuer = top_issuer->issuer;
1504
1505 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001506 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001507 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001508 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001509 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001510 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001511
1512 // Check peer certificate chain
1513 // Trust store with top issuer as the only CA
1514 dht::crypto::TrustList peer_trust;
1515 peer_trust.add(*top_issuer);
1516 if (not peer_trust.verify(*crt)) {
1517 if (logger)
1518 logger->warn("Found invalid peer device: {}", crt->getLongId());
1519 return false;
1520 }
1521
1522 // Check cached OCSP response
1523 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1524 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001525 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001526 return false;
1527 }
1528
Adrien Béraudc631a832023-07-26 22:19:00 -04001529 account_id = crt->issuer->getId();
1530 if (logger)
1531 logger->warn("Found peer device: {} account:{} CA:{}",
1532 crt->getLongId(),
1533 account_id,
1534 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001535 return true;
1536}
1537
1538bool
1539ConnectionManager::Impl::findCertificate(
1540 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1541{
1542 if (auto cert = certStore().getCertificate(id.toString())) {
1543 if (cb)
1544 cb(cert);
1545 } else if (cb)
1546 cb(nullptr);
1547 return true;
1548}
1549
Sébastien Blin34086512023-07-25 09:52:14 -04001550bool
1551ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1552 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1553{
1554 if (auto cert = certStore().getCertificate(h.toString())) {
1555 if (cb)
1556 cb(cert);
1557 } else {
1558 dht()->findCertificate(h,
1559 [cb = std::move(cb), this](
1560 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1561 if (crt)
1562 certStore().pinCertificate(crt);
1563 if (cb)
1564 cb(crt);
1565 });
1566 }
1567 return true;
1568}
1569
Adrien Béraud612b55b2023-05-29 10:42:04 -04001570ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1571 : pimpl_ {std::make_shared<Impl>(config_)}
1572{}
1573
1574ConnectionManager::~ConnectionManager()
1575{
1576 if (pimpl_)
1577 pimpl_->shutdown();
1578}
1579
1580void
1581ConnectionManager::connectDevice(const DeviceId& deviceId,
1582 const std::string& name,
1583 ConnectCallback cb,
1584 bool noNewSocket,
1585 bool forceNewSocket,
1586 const std::string& connType)
1587{
1588 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1589}
1590
1591void
Amna0cf544d2023-07-25 14:25:09 -04001592ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1593 const std::string& name,
1594 ConnectCallbackLegacy cb,
1595 bool noNewSocket,
1596 bool forceNewSocket,
1597 const std::string& connType)
1598{
1599 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1600}
1601
1602
1603void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001604ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1605 const std::string& name,
1606 ConnectCallback cb,
1607 bool noNewSocket,
1608 bool forceNewSocket,
1609 const std::string& connType)
1610{
1611 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1612}
1613
1614bool
1615ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1616{
Adrien Béraud665294f2023-06-13 18:09:11 -04001617 auto pending = pimpl_->getPendingIds(deviceId);
1618 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001619 != pending.end();
1620}
1621
1622void
1623ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1624{
1625 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1626 std::set<DeviceId> peersDevices;
1627 {
1628 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1629 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1630 auto const& [key, value] = *iter;
1631 auto deviceId = key.first;
1632 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1633 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1634 connInfos.emplace_back(value);
1635 peersDevices.emplace(deviceId);
1636 iter = pimpl_->infos_.erase(iter);
1637 } else {
1638 iter++;
1639 }
1640 }
1641 }
1642 // Stop connections to all peers devices
1643 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001644 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001645 // This will close the TLS Session
1646 pimpl_->removeUnusedConnections(deviceId);
1647 }
1648 for (auto& info : connInfos) {
1649 if (info->socket_)
1650 info->socket_->shutdown();
1651 if (info->waitForAnswer_)
1652 info->waitForAnswer_->cancel();
1653 if (info->ice_) {
1654 std::unique_lock<std::mutex> lk {info->mutex_};
1655 dht::ThreadPool::io().run(
1656 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1657 }
1658 }
1659}
1660
1661void
1662ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1663{
1664 pimpl_->onDhtConnected(devicePk);
1665}
1666
1667void
1668ConnectionManager::onICERequest(onICERequestCallback&& cb)
1669{
1670 pimpl_->iceReqCb_ = std::move(cb);
1671}
1672
1673void
1674ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1675{
1676 pimpl_->channelReqCb_ = std::move(cb);
1677}
1678
1679void
1680ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1681{
1682 pimpl_->connReadyCb_ = std::move(cb);
1683}
1684
1685void
1686ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1687{
1688 pimpl_->iOSConnectedCb_ = std::move(cb);
1689}
1690
1691std::size_t
1692ConnectionManager::activeSockets() const
1693{
1694 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1695 return pimpl_->infos_.size();
1696}
1697
1698void
1699ConnectionManager::monitor() const
1700{
1701 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1702 auto logger = pimpl_->config_->logger;
1703 if (!logger)
1704 return;
1705 logger->debug("ConnectionManager current status:");
1706 for (const auto& [_, ci] : pimpl_->infos_) {
1707 if (ci->socket_)
1708 ci->socket_->monitor();
1709 }
1710 logger->debug("ConnectionManager end status.");
1711}
1712
1713void
1714ConnectionManager::connectivityChanged()
1715{
1716 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1717 for (const auto& [_, ci] : pimpl_->infos_) {
1718 if (ci->socket_)
1719 ci->socket_->sendBeacon();
1720 }
1721}
1722
1723void
1724ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1725{
1726 return pimpl_->getIceOptions(std::move(cb));
1727}
1728
1729IceTransportOptions
1730ConnectionManager::getIceOptions() const noexcept
1731{
1732 return pimpl_->getIceOptions();
1733}
1734
1735IpAddr
1736ConnectionManager::getPublishedIpAddress(uint16_t family) const
1737{
1738 return pimpl_->getPublishedIpAddress(family);
1739}
1740
1741void
1742ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1743{
1744 return pimpl_->setPublishedAddress(ip_addr);
1745}
1746
1747void
1748ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1749{
1750 return pimpl_->storeActiveIpAddress(std::move(cb));
1751}
1752
1753std::shared_ptr<ConnectionManager::Config>
1754ConnectionManager::getConfig()
1755{
1756 return pimpl_->config_;
1757}
1758
Sébastien Blin464bdff2023-07-19 08:02:53 -04001759} // namespace dhtnet