blob: 87fcae297ac2de30916fd08512e848d1ac84fbb3 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2019-2023 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18#include "connectionmanager.h"
19#include "peer_connection.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040020#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040021#include "upnp/upnp_control.h"
22#include "certstore.h"
23#include "fileutils.h"
24#include "sip_utils.h"
25#include "string_utils.h"
26
27#include <opendht/crypto.h>
28#include <opendht/thread_pool.h>
29#include <opendht/value.h>
30#include <asio.hpp>
31
32#include <algorithm>
33#include <mutex>
34#include <map>
35#include <condition_variable>
36#include <set>
37#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040038#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040039
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040040namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040041static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
42static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
43
44using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040045using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040046
47struct ConnectionInfo
48{
49 ~ConnectionInfo()
50 {
51 if (socket_)
52 socket_->join();
53 }
54
55 std::mutex mutex_ {};
56 bool responseReceived_ {false};
57 PeerConnectionRequest response_ {};
58 std::unique_ptr<IceTransport> ice_ {nullptr};
59 // Used to store currently non ready TLS Socket
60 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
61 std::shared_ptr<MultiplexedSocket> socket_ {};
62 std::set<CallbackId> cbIds_ {};
63
64 std::function<void(bool)> onConnected_;
65 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
66};
67
68/**
69 * returns whether or not UPnP is enabled and active_
70 * ie: if it is able to make port mappings
71 */
72bool
73ConnectionManager::Config::getUPnPActive() const
74{
75 if (upnpCtrl)
76 return upnpCtrl->isReady();
77 return false;
78}
79
80class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
81{
82public:
83 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
84 : config_ {std::move(config_)}
85 {}
86 ~Impl() {}
87
88 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
89 const dht::crypto::Identity& identity() const { return config_->id; }
90
91 void removeUnusedConnections(const DeviceId& deviceId = {})
92 {
93 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
94
95 {
96 std::lock_guard<std::mutex> lk(infosMtx_);
97 for (auto it = infos_.begin(); it != infos_.end();) {
98 auto& [key, info] = *it;
99 if (info && (!deviceId || key.first == deviceId)) {
100 unused.emplace_back(std::move(info));
101 it = infos_.erase(it);
102 } else {
103 ++it;
104 }
105 }
106 }
107 for (auto& info: unused) {
108 if (info->tls_)
109 info->tls_->shutdown();
110 if (info->socket_)
111 info->socket_->shutdown();
112 if (info->waitForAnswer_)
113 info->waitForAnswer_->cancel();
114 }
115 if (!unused.empty())
116 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
117 }
118
119 void shutdown()
120 {
121 if (isDestroying_.exchange(true))
122 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400123 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400124 {
125 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400126 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400127 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400128 for (auto& [deviceId, pcbs] : po) {
129 for (auto& [id, pending] : pcbs.connecting)
130 pending.cb(nullptr, deviceId);
131 for (auto& [id, pending] : pcbs.waiting)
132 pending.cb(nullptr, deviceId);
133 }
134
Adrien Béraud612b55b2023-05-29 10:42:04 -0400135 removeUnusedConnections();
136 }
137
Adrien Béraud612b55b2023-05-29 10:42:04 -0400138 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
139 const dht::Value::Id& vid,
140 const std::string& connType,
141 std::function<void(bool)> onConnected);
142 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
143 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
144 const std::string& name,
145 const dht::Value::Id& vid,
146 const std::shared_ptr<dht::crypto::Certificate>& cert);
147 void connectDevice(const DeviceId& deviceId,
148 const std::string& uri,
149 ConnectCallback cb,
150 bool noNewSocket = false,
151 bool forceNewSocket = false,
152 const std::string& connType = "");
153 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
154 const std::string& name,
155 ConnectCallback cb,
156 bool noNewSocket = false,
157 bool forceNewSocket = false,
158 const std::string& connType = "");
159 /**
160 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
161 * @param sock socket used to send the request
162 * @param name channel's name
163 * @param vid channel's id
164 * @param deviceId to identify the linked ConnectCallback
165 */
166 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
167 const std::string& name,
168 const DeviceId& deviceId,
169 const dht::Value::Id& vid);
170 /**
171 * Triggered when a PeerConnectionRequest comes from the DHT
172 */
173 void answerTo(IceTransport& ice,
174 const dht::Value::Id& id,
175 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
176 bool onRequestStartIce(const PeerConnectionRequest& req);
177 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
178 void onDhtPeerRequest(const PeerConnectionRequest& req,
179 const std::shared_ptr<dht::crypto::Certificate>& cert);
180
181 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
182 void onPeerResponse(const PeerConnectionRequest& req);
183 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
184
185 const std::shared_future<tls::DhParams> dhParams() const;
186 tls::CertificateStore& certStore() const { return *config_->certStore; }
187
188 mutable std::mutex messageMutex_ {};
189 std::set<std::string, std::less<>> treatedMessages_ {};
190
191 void loadTreatedMessages();
192 void saveTreatedMessages() const;
193
194 /// \return true if the given DHT message identifier has been treated
195 /// \note if message has not been treated yet this method st/ore this id and returns true at
196 /// further calls
197 bool isMessageTreated(std::string_view id);
198
199 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
200
201 /**
202 * Published IPv4/IPv6 addresses, used only if defined by the user in account
203 * configuration
204 *
205 */
206 IpAddr publishedIp_[2] {};
207
208 // This will be stored in the configuration
209 std::string publishedIpAddress_ {};
210
211 /**
212 * Published port, used only if defined by the user
213 */
214 pj_uint16_t publishedPort_ {sip_utils::DEFAULT_SIP_PORT};
215
216 /**
217 * interface name on which this account is bound
218 */
219 std::string interface_ {"default"};
220
221 /**
222 * Get the local interface name on which this account is bound.
223 */
224 const std::string& getLocalInterface() const { return interface_; }
225
226 /**
227 * Get the published IP address, fallbacks to NAT if family is unspecified
228 * Prefers the usage of IPv4 if possible.
229 */
230 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
231
232 /**
233 * Set published IP address according to given family
234 */
235 void setPublishedAddress(const IpAddr& ip_addr);
236
237 /**
238 * Store the local/public addresses used to register
239 */
240 void storeActiveIpAddress(std::function<void()>&& cb = {});
241
242 /**
243 * Create and return ICE options.
244 */
245 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
246 IceTransportOptions getIceOptions() const noexcept;
247
248 /**
249 * Inform that a potential peer device have been found.
250 * Returns true only if the device certificate is a valid device certificate.
251 * In that case (true is returned) the account_id parameter is set to the peer account ID.
252 */
253 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
254 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
255
256 bool findCertificate(const dht::PkId& id,
257 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
258
259 /**
260 * returns whether or not UPnP is enabled and active
261 * ie: if it is able to make port mappings
262 */
263 bool getUPnPActive() const;
264
265 /**
266 * Triggered when a new TLS socket is ready to use
267 * @param ok If succeed
268 * @param deviceId Related device
269 * @param vid vid of the connection request
270 * @param name non empty if TLS was created by connectDevice()
271 */
272 void onTlsNegotiationDone(bool ok,
273 const DeviceId& deviceId,
274 const dht::Value::Id& vid,
275 const std::string& name = "");
276
277 std::shared_ptr<ConnectionManager::Config> config_;
278
279 IceTransportFactory iceFactory_ {};
280
281 mutable std::mt19937_64 rand;
282
283 iOSConnectedCallback iOSConnectedCb_ {};
284
285 std::mutex infosMtx_ {};
286 // Note: Someone can ask multiple sockets, so to avoid any race condition,
287 // each device can have multiple multiplexed sockets.
288 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
289
290 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
291 {
292 std::lock_guard<std::mutex> lk(infosMtx_);
293 auto it = infos_.find({deviceId, id});
294 if (it != infos_.end())
295 return it->second;
296 return {};
297 }
298
299 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
300 {
301 std::lock_guard<std::mutex> lk(infosMtx_);
302 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
303 auto& [key, value] = item;
304 return key.first == deviceId && value && value->socket_;
305 });
306 if (it != infos_.end())
307 return it->second;
308 return {};
309 }
310
311 ChannelRequestCallback channelReqCb_ {};
312 ConnectionReadyCallback connReadyCb_ {};
313 onICERequestCallback iceReqCb_ {};
314
315 /**
316 * Stores callback from connectDevice
317 * @note: each device needs a vector because several connectDevice can
318 * be done in parallel and we only want one socket
319 */
320 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400321
Adrien Béraud665294f2023-06-13 18:09:11 -0400322 struct PendingCb
323 {
324 std::string name;
325 ConnectCallback cb;
326 };
327 struct PendingOperations {
328 std::map<dht::Value::Id, PendingCb> connecting;
329 std::map<dht::Value::Id, PendingCb> waiting;
330 };
331
332 std::map<DeviceId, PendingOperations> pendingOperations_ {};
333
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400334 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400335 {
336 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400337 std::unique_lock<std::mutex> lk(connectCbsMtx_);
338 auto it = pendingOperations_.find(deviceId);
339 if (it == pendingOperations_.end())
340 return;
341 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400342 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400343 // Extract all pending callbacks
344 for (auto& [vid, cb] : pendingOperations.connecting)
345 ret.emplace_back(std::move(cb));
346 pendingOperations.connecting.clear();
347 for (auto& [vid, cb] : pendingOperations.waiting)
348 ret.emplace_back(std::move(cb));
349 pendingOperations.waiting.clear();
350 } else if (auto n = pendingOperations.waiting.extract(vid)) {
351 // If it's a waiting operation, just move it
352 ret.emplace_back(std::move(n.mapped()));
353 } else if (auto n = pendingOperations.connecting.extract(vid)) {
354 ret.emplace_back(std::move(n.mapped()));
355 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400356 // If accepted is false, it means that underlying socket is ok, but channel is declined
357 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400358 for (auto& [vid, cb] : pendingOperations.waiting)
359 ret.emplace_back(std::move(cb));
360 pendingOperations.waiting.clear();
361 for (auto& [vid, cb] : pendingOperations.connecting)
362 ret.emplace_back(std::move(cb));
363 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400364 }
365 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400366 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
367 pendingOperations_.erase(it);
368 lk.unlock();
369 for (auto& cb : ret)
370 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400371 }
372
Adrien Béraud665294f2023-06-13 18:09:11 -0400373 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400374 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400375 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400376 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400377 auto it = pendingOperations_.find(deviceId);
378 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400379 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400380 auto& pendingOp = it->second;
381 for (const auto& [id, pc]: pendingOp.connecting) {
382 if (vid == 0 || id == vid)
383 ret[id] = pc.name;
384 }
385 for (const auto& [id, pc]: pendingOp.waiting) {
386 if (vid == 0 || id == vid)
387 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400388 }
389 return ret;
390 }
391
392 std::shared_ptr<ConnectionManager::Impl> shared()
393 {
394 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
395 }
396 std::shared_ptr<ConnectionManager::Impl const> shared() const
397 {
398 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
399 }
400 std::weak_ptr<ConnectionManager::Impl> weak()
401 {
402 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
403 }
404 std::weak_ptr<ConnectionManager::Impl const> weak() const
405 {
406 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
407 }
408
409 std::atomic_bool isDestroying_ {false};
410};
411
412void
413ConnectionManager::Impl::connectDeviceStartIce(
414 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
415 const dht::Value::Id& vid,
416 const std::string& connType,
417 std::function<void(bool)> onConnected)
418{
419 auto deviceId = devicePk->getLongId();
420 auto info = getInfo(deviceId, vid);
421 if (!info) {
422 onConnected(false);
423 return;
424 }
425
426 std::unique_lock<std::mutex> lk(info->mutex_);
427 auto& ice = info->ice_;
428
429 if (!ice) {
430 if (config_->logger)
431 config_->logger->error("No ICE detected");
432 onConnected(false);
433 return;
434 }
435
436 auto iceAttributes = ice->getLocalAttributes();
437 std::ostringstream icemsg;
438 icemsg << iceAttributes.ufrag << "\n";
439 icemsg << iceAttributes.pwd << "\n";
440 for (const auto& addr : ice->getLocalCandidates(1)) {
441 icemsg << addr << "\n";
442 if (config_->logger)
443 config_->logger->debug("Added local ICE candidate {}", addr);
444 }
445
446 // Prepare connection request as a DHT message
447 PeerConnectionRequest val;
448
449 val.id = vid; /* Random id for the message unicity */
450 val.ice_msg = icemsg.str();
451 val.connType = connType;
452
453 auto value = std::make_shared<dht::Value>(std::move(val));
454 value->user_type = "peer_request";
455
456 // Send connection request through DHT
457 if (config_->logger)
458 config_->logger->debug("Request connection to {}", deviceId);
459 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
460 + devicePk->getId().toString()),
461 devicePk,
462 value,
463 [l=config_->logger,deviceId](bool ok) {
464 if (l)
465 l->debug("Sent connection request to {:s}. Put encrypted {:s}",
466 deviceId,
467 (ok ? "ok" : "failed"));
468 });
469 // Wait for call to onResponse() operated by DHT
470 if (isDestroying_) {
471 onConnected(true); // This avoid to wait new negotiation when destroying
472 return;
473 }
474
475 info->onConnected_ = std::move(onConnected);
476 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
477 std::chrono::steady_clock::now()
478 + DHT_MSG_TIMEOUT);
479 info->waitForAnswer_->async_wait(
480 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
481}
482
483void
484ConnectionManager::Impl::onResponse(const asio::error_code& ec,
485 const DeviceId& deviceId,
486 const dht::Value::Id& vid)
487{
488 if (ec == asio::error::operation_aborted)
489 return;
490 auto info = getInfo(deviceId, vid);
491 if (!info)
492 return;
493
494 std::unique_lock<std::mutex> lk(info->mutex_);
495 auto& ice = info->ice_;
496 if (isDestroying_) {
497 info->onConnected_(true); // The destructor can wake a pending wait here.
498 return;
499 }
500 if (!info->responseReceived_) {
501 if (config_->logger)
502 config_->logger->error("no response from DHT to E2E request.");
503 info->onConnected_(false);
504 return;
505 }
506
507 if (!info->ice_) {
508 info->onConnected_(false);
509 return;
510 }
511
512 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
513
514 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
515 if (config_->logger)
516 config_->logger->warn("start ICE failed");
517 info->onConnected_(false);
518 return;
519 }
520 info->onConnected_(true);
521}
522
523bool
524ConnectionManager::Impl::connectDeviceOnNegoDone(
525 const DeviceId& deviceId,
526 const std::string& name,
527 const dht::Value::Id& vid,
528 const std::shared_ptr<dht::crypto::Certificate>& cert)
529{
530 auto info = getInfo(deviceId, vid);
531 if (!info)
532 return false;
533
534 std::unique_lock<std::mutex> lk {info->mutex_};
535 if (info->waitForAnswer_) {
536 // Negotiation is done and connected, go to handshake
537 // and avoid any cancellation at this point.
538 info->waitForAnswer_->cancel();
539 }
540 auto& ice = info->ice_;
541 if (!ice || !ice->isRunning()) {
542 if (config_->logger)
543 config_->logger->error("No ICE detected or not running");
544 return false;
545 }
546
547 // Build socket
548 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
549 std::move(ice)),
550 true);
551
552 // Negotiate a TLS session
553 if (config_->logger)
554 config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid);
555 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
556 certStore(),
557 identity(),
558 dhParams(),
559 *cert);
560
561 info->tls_->setOnReady(
562 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
563 bool ok) {
564 if (auto shared = w.lock())
565 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
566 });
567 return true;
568}
569
570void
571ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
572 const std::string& name,
573 ConnectCallback cb,
574 bool noNewSocket,
575 bool forceNewSocket,
576 const std::string& connType)
577{
578 if (!dht()) {
579 cb(nullptr, deviceId);
580 return;
581 }
582 if (deviceId.toString() == identity().second->getLongId().toString()) {
583 cb(nullptr, deviceId);
584 return;
585 }
586 findCertificate(deviceId,
587 [w = weak(),
588 deviceId,
589 name,
590 cb = std::move(cb),
591 noNewSocket,
592 forceNewSocket,
593 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
594 if (!cert) {
595 if (auto shared = w.lock())
596 if (shared->config_->logger)
597 shared->config_->logger->error(
598 "No valid certificate found for device {}",
599 deviceId);
600 cb(nullptr, deviceId);
601 return;
602 }
603 if (auto shared = w.lock()) {
604 shared->connectDevice(cert,
605 name,
606 std::move(cb),
607 noNewSocket,
608 forceNewSocket,
609 connType);
610 } else
611 cb(nullptr, deviceId);
612 });
613}
614
615void
616ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
617 const std::string& name,
618 ConnectCallback cb,
619 bool noNewSocket,
620 bool forceNewSocket,
621 const std::string& connType)
622{
623 // Avoid dht operation in a DHT callback to avoid deadlocks
624 dht::ThreadPool::computation().run([w = weak(),
625 name = std::move(name),
626 cert = std::move(cert),
627 cb = std::move(cb),
628 noNewSocket,
629 forceNewSocket,
630 connType] {
631 auto devicePk = cert->getSharedPublicKey();
632 auto deviceId = devicePk->getLongId();
633 auto sthis = w.lock();
634 if (!sthis || sthis->isDestroying_) {
635 cb(nullptr, deviceId);
636 return;
637 }
638 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
639 auto isConnectingToDevice = false;
640 {
641 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400642 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
643 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400644 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400645 while (pendings.connecting.find(vid) != pendings.connecting.end()
646 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400647 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400648 }
649 }
650 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400651 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400652 // Save current request for sendChannelRequest.
653 // Note: do not return here, cause we can be in a state where first
654 // socket is negotiated and first channel is pending
655 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400656 if (isConnectingToDevice && !forceNewSocket)
657 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400658 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400659 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400660 }
661
662 // Check if already negotiated
663 CallbackId cbId(deviceId, vid);
664 if (auto info = sthis->getConnectedInfo(deviceId)) {
665 std::lock_guard<std::mutex> lk(info->mutex_);
666 if (info->socket_) {
667 if (sthis->config_->logger)
668 sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId);
669 info->cbIds_.emplace(cbId);
670 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
671 return;
672 }
673 }
674
675 if (isConnectingToDevice && !forceNewSocket) {
676 if (sthis->config_->logger)
677 sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId);
678 return;
679 }
680 if (noNewSocket) {
681 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400682 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400683 return;
684 }
685
686 // Note: used when the ice negotiation fails to erase
687 // all stored structures.
688 auto eraseInfo = [w, cbId] {
689 if (auto shared = w.lock()) {
690 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400691 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400692 std::lock_guard<std::mutex> lk(shared->infosMtx_);
693 shared->infos_.erase(cbId);
694 }
695 };
696
697 // If no socket exists, we need to initiate an ICE connection.
698 sthis->getIceOptions([w,
699 deviceId = std::move(deviceId),
700 devicePk = std::move(devicePk),
701 name = std::move(name),
702 cert = std::move(cert),
703 vid,
704 connType,
705 eraseInfo](auto&& ice_config) {
706 auto sthis = w.lock();
707 if (!sthis) {
708 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
709 return;
710 }
711 ice_config.tcpEnable = true;
712 ice_config.onInitDone = [w,
713 deviceId = std::move(deviceId),
714 devicePk = std::move(devicePk),
715 name = std::move(name),
716 cert = std::move(cert),
717 vid,
718 connType,
719 eraseInfo](bool ok) {
720 dht::ThreadPool::io().run([w = std::move(w),
721 devicePk = std::move(devicePk),
722 vid = std::move(vid),
723 eraseInfo,
724 connType, ok] {
725 auto sthis = w.lock();
726 if (!ok && sthis && sthis->config_->logger)
727 sthis->config_->logger->error("Cannot initialize ICE session.");
728 if (!sthis || !ok) {
729 eraseInfo();
730 return;
731 }
732 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
733 if (!ok) {
734 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
735 }
736 });
737 });
738 };
739 ice_config.onNegoDone = [w,
740 deviceId,
741 name,
742 cert = std::move(cert),
743 vid,
744 eraseInfo](bool ok) {
745 dht::ThreadPool::io().run([w = std::move(w),
746 deviceId = std::move(deviceId),
747 name = std::move(name),
748 cert = std::move(cert),
749 vid = std::move(vid),
750 eraseInfo = std::move(eraseInfo),
751 ok] {
752 auto sthis = w.lock();
753 if (!ok && sthis && sthis->config_->logger)
754 sthis->config_->logger->error("ICE negotiation failed.");
755 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
756 eraseInfo();
757 });
758 };
759
760 auto info = std::make_shared<ConnectionInfo>();
761 {
762 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
763 sthis->infos_[{deviceId, vid}] = info;
764 }
765 std::unique_lock<std::mutex> lk {info->mutex_};
766 ice_config.master = false;
767 ice_config.streamsCount = 1;
768 ice_config.compCountPerStream = 1;
769 info->ice_ = sthis->iceFactory_.createUTransport("");
770 if (!info->ice_) {
771 if (sthis->config_->logger)
772 sthis->config_->logger->error("Cannot initialize ICE session.");
773 eraseInfo();
774 return;
775 }
776 // We need to detect any shutdown if the ice session is destroyed before going to the
777 // TLS session;
778 info->ice_->setOnShutdown([eraseInfo]() {
779 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
780 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400781 try {
782 info->ice_->initIceInstance(ice_config);
783 } catch (const std::exception& e) {
784 if (sthis->config_->logger)
785 sthis->config_->logger->error("{}", e.what());
786 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
787 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400788 });
789 });
790}
791
792void
793ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
794 const std::string& name,
795 const DeviceId& deviceId,
796 const dht::Value::Id& vid)
797{
798 auto channelSock = sock->addChannel(name);
799 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
800 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400801 if (auto shared = w.lock())
802 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400803 });
804 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400805 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400806 auto shared = w.lock();
807 auto channelSock = wSock.lock();
808 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400809 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400810 });
811
812 ChannelRequest val;
813 val.name = channelSock->name();
814 val.state = ChannelRequestState::REQUEST;
815 val.channel = channelSock->channel();
816 msgpack::sbuffer buffer(256);
817 msgpack::pack(buffer, val);
818
819 std::error_code ec;
820 int res = sock->write(CONTROL_CHANNEL,
821 reinterpret_cast<const uint8_t*>(buffer.data()),
822 buffer.size(),
823 ec);
824 if (res < 0) {
825 // TODO check if we should handle errors here
826 if (config_->logger)
827 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
828 }
829}
830
831void
832ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
833{
834 auto device = req.owner->getLongId();
835 if (config_->logger)
836 config_->logger->debug("New response received from {}", device);
837 if (auto info = getInfo(device, req.id)) {
838 std::lock_guard<std::mutex> lk {info->mutex_};
839 info->responseReceived_ = true;
840 info->response_ = std::move(req);
841 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
842 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
843 this,
844 std::placeholders::_1,
845 device,
846 req.id));
847 } else {
848 if (config_->logger)
849 config_->logger->warn("Respond received, but cannot find request");
850 }
851}
852
853void
854ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
855{
856 if (!dht())
857 return;
858 dht()->listen<PeerConnectionRequest>(
859 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
860 [w = weak()](PeerConnectionRequest&& req) {
861 auto shared = w.lock();
862 if (!shared)
863 return false;
864 if (shared->isMessageTreated(to_hex_string(req.id))) {
865 // Message already treated. Just ignore
866 return true;
867 }
868 if (req.isAnswer) {
869 if (shared->config_->logger)
870 shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId());
871 } else {
872 if (shared->config_->logger)
873 shared->config_->logger->debug("Received request from {}", req.owner->getLongId());
874 }
875 if (req.isAnswer) {
876 shared->onPeerResponse(req);
877 } else {
878 // Async certificate checking
879 shared->dht()->findCertificate(
880 req.from,
881 [w, req = std::move(req)](
882 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
883 auto shared = w.lock();
884 if (!shared)
885 return;
886 dht::InfoHash peer_h;
887 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
888#if TARGET_OS_IOS
889 if (shared->iOSConnectedCb_(req.connType, peer_h))
890 return;
891#endif
892 shared->onDhtPeerRequest(req, cert);
893 } else {
894 if (shared->config_->logger)
895 shared->config_->logger->warn(
896 "Received request from untrusted peer {}",
897 req.owner->getLongId());
898 }
899 });
900 }
901
902 return true;
903 },
904 dht::Value::UserTypeFilter("peer_request"));
905}
906
907void
908ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
909 const DeviceId& deviceId,
910 const dht::Value::Id& vid,
911 const std::string& name)
912{
913 if (isDestroying_)
914 return;
915 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
916 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
917 // asked yet)
918 auto isDhtRequest = name.empty();
919 if (!ok) {
920 if (isDhtRequest) {
921 if (config_->logger)
922 config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}",
923 deviceId,
924 name,
925 vid);
926 if (connReadyCb_)
927 connReadyCb_(deviceId, "", nullptr);
928 } else {
929 if (config_->logger)
930 config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}",
931 deviceId,
932 name,
933 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400934 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400935 }
936 } else {
937 // The socket is ready, store it
938 if (isDhtRequest) {
939 if (config_->logger)
940 config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}",
941 deviceId,
942 vid);
943 } else {
944 if (config_->logger)
945 config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}",
946 deviceId,
947 name,
948 vid);
949 }
950
951 auto info = getInfo(deviceId, vid);
952 addNewMultiplexedSocket({deviceId, vid}, info);
953 // Finally, open the channel and launch pending callbacks
954 if (info->socket_) {
955 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -0400956 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400957 if (config_->logger)
958 config_->logger->debug("Send request on TLS socket for channel {} to {}",
Adrien Béraud665294f2023-06-13 18:09:11 -0400959 name,
960 deviceId.toString());
961 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400962 }
963 }
964 }
965}
966
967void
968ConnectionManager::Impl::answerTo(IceTransport& ice,
969 const dht::Value::Id& id,
970 const std::shared_ptr<dht::crypto::PublicKey>& from)
971{
972 // NOTE: This is a shortest version of a real SDP message to save some bits
973 auto iceAttributes = ice.getLocalAttributes();
974 std::ostringstream icemsg;
975 icemsg << iceAttributes.ufrag << "\n";
976 icemsg << iceAttributes.pwd << "\n";
977 for (const auto& addr : ice.getLocalCandidates(1)) {
978 icemsg << addr << "\n";
979 }
980
981 // Send PeerConnection response
982 PeerConnectionRequest val;
983 val.id = id;
984 val.ice_msg = icemsg.str();
985 val.isAnswer = true;
986 auto value = std::make_shared<dht::Value>(std::move(val));
987 value->user_type = "peer_request";
988
989 if (config_->logger)
990 config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId());
991 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
992 + from->getId().toString()),
993 from,
994 value,
995 [from,l=config_->logger](bool ok) {
996 if (l)
997 l->debug("Answer to connection request from {:s}. Put encrypted {:s}",
998 from->getLongId(),
999 (ok ? "ok" : "failed"));
1000 });
1001}
1002
1003bool
1004ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1005{
1006 auto deviceId = req.owner->getLongId();
1007 auto info = getInfo(deviceId, req.id);
1008 if (!info)
1009 return false;
1010
1011 std::unique_lock<std::mutex> lk {info->mutex_};
1012 auto& ice = info->ice_;
1013 if (!ice) {
1014 if (config_->logger)
1015 config_->logger->error("No ICE detected");
1016 if (connReadyCb_)
1017 connReadyCb_(deviceId, "", nullptr);
1018 return false;
1019 }
1020
1021 auto sdp = ice->parseIceCandidates(req.ice_msg);
1022 answerTo(*ice, req.id, req.owner);
1023 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1024 if (config_->logger)
1025 config_->logger->error("Start ICE failed - fallback to TURN");
1026 ice = nullptr;
1027 if (connReadyCb_)
1028 connReadyCb_(deviceId, "", nullptr);
1029 return false;
1030 }
1031 return true;
1032}
1033
1034bool
1035ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1036{
1037 auto deviceId = req.owner->getLongId();
1038 auto info = getInfo(deviceId, req.id);
1039 if (!info)
1040 return false;
1041
1042 std::unique_lock<std::mutex> lk {info->mutex_};
1043 auto& ice = info->ice_;
1044 if (!ice) {
1045 if (config_->logger)
1046 config_->logger->error("No ICE detected");
1047 return false;
1048 }
1049
1050 // Build socket
1051 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1052 std::move(ice)),
1053 false);
1054
1055 // init TLS session
1056 auto ph = req.from;
1057 if (config_->logger)
1058 config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}",
1059 req.from,
1060 req.id);
1061 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1062 std::move(endpoint),
1063 certStore(),
1064 identity(),
1065 dhParams(),
1066 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1067 auto shared = w.lock();
1068 if (!shared)
1069 return false;
1070 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1071 if (!crt)
1072 return false;
1073 return crt->getPacked() == cert.getPacked();
1074 });
1075
1076 info->tls_->setOnReady(
1077 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1078 if (auto shared = w.lock())
1079 shared->onTlsNegotiationDone(ok, deviceId, vid);
1080 });
1081 return true;
1082}
1083
1084void
1085ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1086 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1087{
1088 auto deviceId = req.owner->getLongId();
1089 if (config_->logger)
1090 config_->logger->debug("New connection request from {}", deviceId);
1091 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1092 if (config_->logger)
1093 config_->logger->debug("Refuse connection from {}", deviceId);
1094 return;
1095 }
1096
1097 // Because the connection is accepted, create an ICE socket.
1098 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1099 auto shared = w.lock();
1100 if (!shared)
1101 return;
1102 // Note: used when the ice negotiation fails to erase
1103 // all stored structures.
1104 auto eraseInfo = [w, id = req.id, deviceId] {
1105 if (auto shared = w.lock()) {
1106 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001107 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001108 if (shared->connReadyCb_)
1109 shared->connReadyCb_(deviceId, "", nullptr);
1110 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1111 shared->infos_.erase({deviceId, id});
1112 }
1113 };
1114
1115 ice_config.tcpEnable = true;
1116 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1117 auto shared = w.lock();
1118 if (!shared)
1119 return;
1120 if (!ok) {
1121 if (shared->config_->logger)
1122 shared->config_->logger->error("Cannot initialize ICE session.");
1123 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1124 return;
1125 }
1126
1127 dht::ThreadPool::io().run(
1128 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1129 auto shared = w.lock();
1130 if (!shared)
1131 return;
1132 if (!shared->onRequestStartIce(req))
1133 eraseInfo();
1134 });
1135 };
1136
1137 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1138 auto shared = w.lock();
1139 if (!shared)
1140 return;
1141 if (!ok) {
1142 if (shared->config_->logger)
1143 shared->config_->logger->error("ICE negotiation failed.");
1144 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1145 return;
1146 }
1147
1148 dht::ThreadPool::io().run(
1149 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1150 if (auto shared = w.lock())
1151 if (!shared->onRequestOnNegoDone(req))
1152 eraseInfo();
1153 });
1154 };
1155
1156 // Negotiate a new ICE socket
1157 auto info = std::make_shared<ConnectionInfo>();
1158 {
1159 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1160 shared->infos_[{deviceId, req.id}] = info;
1161 }
1162 if (shared->config_->logger)
1163 shared->config_->logger->debug("Accepting connection from {}", deviceId);
1164 std::unique_lock<std::mutex> lk {info->mutex_};
1165 ice_config.streamsCount = 1;
1166 ice_config.compCountPerStream = 1; // TCP
1167 ice_config.master = true;
1168 info->ice_ = shared->iceFactory_.createUTransport("");
1169 if (not info->ice_) {
1170 if (shared->config_->logger)
1171 shared->config_->logger->error("Cannot initialize ICE session");
1172 eraseInfo();
1173 return;
1174 }
1175 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1176 info->ice_->setOnShutdown([eraseInfo]() {
1177 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1178 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001179 try {
1180 info->ice_->initIceInstance(ice_config);
1181 } catch (const std::exception& e) {
1182 if (shared->config_->logger)
1183 shared->config_->logger->error("{}", e.what());
1184 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1185 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001186 });
1187}
1188
1189void
1190ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1191{
1192 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1193 info->socket_->setOnReady(
1194 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1195 if (auto sthis = w.lock())
1196 if (sthis->connReadyCb_)
1197 sthis->connReadyCb_(deviceId, socket->name(), socket);
1198 });
1199 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1200 const uint16_t&,
1201 const std::string& name) {
1202 if (auto sthis = w.lock())
1203 if (sthis->channelReqCb_)
1204 return sthis->channelReqCb_(peer, name);
1205 return false;
1206 });
1207 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1208 // Cancel current outgoing connections
1209 dht::ThreadPool::io().run([w, deviceId, vid] {
1210 auto sthis = w.lock();
1211 if (!sthis)
1212 return;
1213
1214 std::set<CallbackId> ids;
1215 if (auto info = sthis->getInfo(deviceId, vid)) {
1216 std::lock_guard<std::mutex> lk(info->mutex_);
1217 if (info->socket_) {
1218 ids = std::move(info->cbIds_);
1219 info->socket_->shutdown();
1220 }
1221 }
1222 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001223 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001224
1225 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1226 sthis->infos_.erase({deviceId, vid});
1227 });
1228 });
1229}
1230
1231const std::shared_future<tls::DhParams>
1232ConnectionManager::Impl::dhParams() const
1233{
1234 return dht::ThreadPool::computation().get<tls::DhParams>(
1235 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1236 ;
1237}
1238
1239template<typename ID = dht::Value::Id>
1240std::set<ID, std::less<>>
1241loadIdList(const std::string& path)
1242{
1243 std::set<ID, std::less<>> ids;
1244 std::ifstream file = fileutils::ifstream(path);
1245 if (!file.is_open()) {
1246 //JAMI_DBG("Could not load %s", path.c_str());
1247 return ids;
1248 }
1249 std::string line;
1250 while (std::getline(file, line)) {
1251 if constexpr (std::is_same<ID, std::string>::value) {
1252 ids.emplace(std::move(line));
1253 } else if constexpr (std::is_integral<ID>::value) {
1254 ID vid;
1255 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1256 ec == std::errc()) {
1257 ids.emplace(vid);
1258 }
1259 }
1260 }
1261 return ids;
1262}
1263
1264template<typename List = std::set<dht::Value::Id>>
1265void
1266saveIdList(const std::string& path, const List& ids)
1267{
1268 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1269 if (!file.is_open()) {
1270 //JAMI_ERR("Could not save to %s", path.c_str());
1271 return;
1272 }
1273 for (auto& c : ids)
1274 file << std::hex << c << "\n";
1275}
1276
1277void
1278ConnectionManager::Impl::loadTreatedMessages()
1279{
1280 std::lock_guard<std::mutex> lock(messageMutex_);
1281 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1282 treatedMessages_ = loadIdList<std::string>(path);
1283 if (treatedMessages_.empty()) {
1284 auto messages = loadIdList(path);
1285 for (const auto& m : messages)
1286 treatedMessages_.emplace(to_hex_string(m));
1287 }
1288}
1289
1290void
1291ConnectionManager::Impl::saveTreatedMessages() const
1292{
1293 dht::ThreadPool::io().run([w = weak()]() {
1294 if (auto sthis = w.lock()) {
1295 auto& this_ = *sthis;
1296 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1297 fileutils::check_dir(this_.config_->cachePath.c_str());
1298 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1299 + DIR_SEPARATOR_STR "treatedMessages",
1300 this_.treatedMessages_);
1301 }
1302 });
1303}
1304
1305bool
1306ConnectionManager::Impl::isMessageTreated(std::string_view id)
1307{
1308 std::lock_guard<std::mutex> lock(messageMutex_);
1309 auto res = treatedMessages_.emplace(id);
1310 if (res.second) {
1311 saveTreatedMessages();
1312 return false;
1313 }
1314 return true;
1315}
1316
1317/**
1318 * returns whether or not UPnP is enabled and active_
1319 * ie: if it is able to make port mappings
1320 */
1321bool
1322ConnectionManager::Impl::getUPnPActive() const
1323{
1324 return config_->getUPnPActive();
1325}
1326
1327IpAddr
1328ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1329{
1330 if (family == AF_INET)
1331 return publishedIp_[0];
1332 if (family == AF_INET6)
1333 return publishedIp_[1];
1334
1335 assert(family == AF_UNSPEC);
1336
1337 // If family is not set, prefere IPv4 if available. It's more
1338 // likely to succeed behind NAT.
1339 if (publishedIp_[0])
1340 return publishedIp_[0];
1341 if (publishedIp_[1])
1342 return publishedIp_[1];
1343 return {};
1344}
1345
1346void
1347ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1348{
1349 if (ip_addr.getFamily() == AF_INET) {
1350 publishedIp_[0] = ip_addr;
1351 } else {
1352 publishedIp_[1] = ip_addr;
1353 }
1354}
1355
1356void
1357ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1358{
1359 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1360 bool hasIpv4 {false}, hasIpv6 {false};
1361 for (auto& result : results) {
1362 auto family = result.getFamily();
1363 if (family == AF_INET) {
1364 if (not hasIpv4) {
1365 hasIpv4 = true;
1366 if (config_->logger)
1367 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1368 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1369 setPublishedAddress(*result.get());
1370 if (config_->upnpCtrl) {
1371 config_->upnpCtrl->setPublicAddress(*result.get());
1372 }
1373 }
1374 } else if (family == AF_INET6) {
1375 if (not hasIpv6) {
1376 hasIpv6 = true;
1377 if (config_->logger)
1378 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1379 setPublishedAddress(*result.get());
1380 }
1381 }
1382 if (hasIpv4 and hasIpv6)
1383 break;
1384 }
1385 if (cb)
1386 cb();
1387 });
1388}
1389
1390void
1391ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1392{
1393 storeActiveIpAddress([this, cb = std::move(cb)] {
1394 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1395 auto publishedAddr = getPublishedIpAddress();
1396
1397 if (publishedAddr) {
1398 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1399 publishedAddr.getFamily());
1400 if (interfaceAddr) {
1401 opts.accountLocalAddr = interfaceAddr;
1402 opts.accountPublicAddr = publishedAddr;
1403 }
1404 }
1405 if (cb)
1406 cb(std::move(opts));
1407 });
1408}
1409
1410IceTransportOptions
1411ConnectionManager::Impl::getIceOptions() const noexcept
1412{
1413 IceTransportOptions opts;
1414 opts.upnpEnable = getUPnPActive();
1415
1416 if (config_->stunEnabled)
1417 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1418 if (config_->turnEnabled) {
1419 auto cached = false;
1420 std::lock_guard<std::mutex> lk(config_->cachedTurnMutex);
1421 cached = config_->cacheTurnV4 || config_->cacheTurnV6;
1422 if (config_->cacheTurnV4) {
1423 opts.turnServers.emplace_back(TurnServerInfo()
1424 .setUri(config_->cacheTurnV4.toString())
1425 .setUsername(config_->turnServerUserName)
1426 .setPassword(config_->turnServerPwd)
1427 .setRealm(config_->turnServerRealm));
1428 }
1429 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1430 // co issues. So this needs some debug. for now just disable
1431 // if (cacheTurnV6 && *cacheTurnV6) {
1432 // opts.turnServers.emplace_back(TurnServerInfo()
1433 // .setUri(cacheTurnV6->toString(true))
1434 // .setUsername(turnServerUserName_)
1435 // .setPassword(turnServerPwd_)
1436 // .setRealm(turnServerRealm_));
1437 //}
1438 // Nothing cached, so do the resolution
1439 if (!cached) {
1440 opts.turnServers.emplace_back(TurnServerInfo()
1441 .setUri(config_->turnServer)
1442 .setUsername(config_->turnServerUserName)
1443 .setPassword(config_->turnServerPwd)
1444 .setRealm(config_->turnServerRealm));
1445 }
1446 }
1447 return opts;
1448}
1449
1450bool
1451ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1452 dht::InfoHash& account_id,
1453 const std::shared_ptr<Logger>& logger)
1454{
1455 if (not crt)
1456 return false;
1457
1458 auto top_issuer = crt;
1459 while (top_issuer->issuer)
1460 top_issuer = top_issuer->issuer;
1461
1462 // Device certificate can't be self-signed
1463 if (top_issuer == crt) {
1464 if (logger)
1465 logger->warn("Found invalid peer device: {}", crt->getLongId());
1466 return false;
1467 }
1468
1469 // Check peer certificate chain
1470 // Trust store with top issuer as the only CA
1471 dht::crypto::TrustList peer_trust;
1472 peer_trust.add(*top_issuer);
1473 if (not peer_trust.verify(*crt)) {
1474 if (logger)
1475 logger->warn("Found invalid peer device: {}", crt->getLongId());
1476 return false;
1477 }
1478
1479 // Check cached OCSP response
1480 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1481 if (logger)
1482 logger->error("Certificate %s is disabled by cached OCSP response", crt->getLongId());
1483 return false;
1484 }
1485
1486 account_id = crt->issuer->getId();
1487 if (logger)
1488 logger->warn("Found peer device: {} account:{} CA:{}",
1489 crt->getLongId(),
1490 account_id,
1491 top_issuer->getId());
1492 return true;
1493}
1494
1495bool
1496ConnectionManager::Impl::findCertificate(
1497 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1498{
1499 if (auto cert = certStore().getCertificate(id.toString())) {
1500 if (cb)
1501 cb(cert);
1502 } else if (cb)
1503 cb(nullptr);
1504 return true;
1505}
1506
1507ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1508 : pimpl_ {std::make_shared<Impl>(config_)}
1509{}
1510
1511ConnectionManager::~ConnectionManager()
1512{
1513 if (pimpl_)
1514 pimpl_->shutdown();
1515}
1516
1517void
1518ConnectionManager::connectDevice(const DeviceId& deviceId,
1519 const std::string& name,
1520 ConnectCallback cb,
1521 bool noNewSocket,
1522 bool forceNewSocket,
1523 const std::string& connType)
1524{
1525 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1526}
1527
1528void
1529ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1530 const std::string& name,
1531 ConnectCallback cb,
1532 bool noNewSocket,
1533 bool forceNewSocket,
1534 const std::string& connType)
1535{
1536 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1537}
1538
1539bool
1540ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1541{
Adrien Béraud665294f2023-06-13 18:09:11 -04001542 auto pending = pimpl_->getPendingIds(deviceId);
1543 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001544 != pending.end();
1545}
1546
1547void
1548ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1549{
1550 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1551 std::set<DeviceId> peersDevices;
1552 {
1553 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1554 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1555 auto const& [key, value] = *iter;
1556 auto deviceId = key.first;
1557 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1558 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1559 connInfos.emplace_back(value);
1560 peersDevices.emplace(deviceId);
1561 iter = pimpl_->infos_.erase(iter);
1562 } else {
1563 iter++;
1564 }
1565 }
1566 }
1567 // Stop connections to all peers devices
1568 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001569 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001570 // This will close the TLS Session
1571 pimpl_->removeUnusedConnections(deviceId);
1572 }
1573 for (auto& info : connInfos) {
1574 if (info->socket_)
1575 info->socket_->shutdown();
1576 if (info->waitForAnswer_)
1577 info->waitForAnswer_->cancel();
1578 if (info->ice_) {
1579 std::unique_lock<std::mutex> lk {info->mutex_};
1580 dht::ThreadPool::io().run(
1581 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1582 }
1583 }
1584}
1585
1586void
1587ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1588{
1589 pimpl_->onDhtConnected(devicePk);
1590}
1591
1592void
1593ConnectionManager::onICERequest(onICERequestCallback&& cb)
1594{
1595 pimpl_->iceReqCb_ = std::move(cb);
1596}
1597
1598void
1599ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1600{
1601 pimpl_->channelReqCb_ = std::move(cb);
1602}
1603
1604void
1605ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1606{
1607 pimpl_->connReadyCb_ = std::move(cb);
1608}
1609
1610void
1611ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1612{
1613 pimpl_->iOSConnectedCb_ = std::move(cb);
1614}
1615
1616std::size_t
1617ConnectionManager::activeSockets() const
1618{
1619 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1620 return pimpl_->infos_.size();
1621}
1622
1623void
1624ConnectionManager::monitor() const
1625{
1626 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1627 auto logger = pimpl_->config_->logger;
1628 if (!logger)
1629 return;
1630 logger->debug("ConnectionManager current status:");
1631 for (const auto& [_, ci] : pimpl_->infos_) {
1632 if (ci->socket_)
1633 ci->socket_->monitor();
1634 }
1635 logger->debug("ConnectionManager end status.");
1636}
1637
1638void
1639ConnectionManager::connectivityChanged()
1640{
1641 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1642 for (const auto& [_, ci] : pimpl_->infos_) {
1643 if (ci->socket_)
1644 ci->socket_->sendBeacon();
1645 }
1646}
1647
1648void
1649ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1650{
1651 return pimpl_->getIceOptions(std::move(cb));
1652}
1653
1654IceTransportOptions
1655ConnectionManager::getIceOptions() const noexcept
1656{
1657 return pimpl_->getIceOptions();
1658}
1659
1660IpAddr
1661ConnectionManager::getPublishedIpAddress(uint16_t family) const
1662{
1663 return pimpl_->getPublishedIpAddress(family);
1664}
1665
1666void
1667ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1668{
1669 return pimpl_->setPublishedAddress(ip_addr);
1670}
1671
1672void
1673ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1674{
1675 return pimpl_->storeActiveIpAddress(std::move(cb));
1676}
1677
1678std::shared_ptr<ConnectionManager::Config>
1679ConnectionManager::getConfig()
1680{
1681 return pimpl_->config_;
1682}
1683
1684} // namespace jami