blob: 4ddc28e3f82c76195418fe9d6455b57169cefb06 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040019#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040020#include "upnp/upnp_control.h"
21#include "certstore.h"
22#include "fileutils.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25
26#include <opendht/crypto.h>
27#include <opendht/thread_pool.h>
28#include <opendht/value.h>
29#include <asio.hpp>
30
31#include <algorithm>
32#include <mutex>
33#include <map>
34#include <condition_variable>
35#include <set>
36#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040037#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040038
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040039namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040040static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
41static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
42
43using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040044using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040045
46struct ConnectionInfo
47{
48 ~ConnectionInfo()
49 {
50 if (socket_)
51 socket_->join();
52 }
53
54 std::mutex mutex_ {};
55 bool responseReceived_ {false};
56 PeerConnectionRequest response_ {};
57 std::unique_ptr<IceTransport> ice_ {nullptr};
58 // Used to store currently non ready TLS Socket
59 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
60 std::shared_ptr<MultiplexedSocket> socket_ {};
61 std::set<CallbackId> cbIds_ {};
62
63 std::function<void(bool)> onConnected_;
64 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
65};
66
67/**
68 * returns whether or not UPnP is enabled and active_
69 * ie: if it is able to make port mappings
70 */
71bool
72ConnectionManager::Config::getUPnPActive() const
73{
74 if (upnpCtrl)
75 return upnpCtrl->isReady();
76 return false;
77}
78
79class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
80{
81public:
82 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
83 : config_ {std::move(config_)}
84 {}
85 ~Impl() {}
86
87 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
88 const dht::crypto::Identity& identity() const { return config_->id; }
89
90 void removeUnusedConnections(const DeviceId& deviceId = {})
91 {
92 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
93
94 {
95 std::lock_guard<std::mutex> lk(infosMtx_);
96 for (auto it = infos_.begin(); it != infos_.end();) {
97 auto& [key, info] = *it;
98 if (info && (!deviceId || key.first == deviceId)) {
99 unused.emplace_back(std::move(info));
100 it = infos_.erase(it);
101 } else {
102 ++it;
103 }
104 }
105 }
106 for (auto& info: unused) {
107 if (info->tls_)
108 info->tls_->shutdown();
109 if (info->socket_)
110 info->socket_->shutdown();
111 if (info->waitForAnswer_)
112 info->waitForAnswer_->cancel();
113 }
114 if (!unused.empty())
115 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
116 }
117
118 void shutdown()
119 {
120 if (isDestroying_.exchange(true))
121 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400122 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400123 {
124 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400125 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400126 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400127 for (auto& [deviceId, pcbs] : po) {
128 for (auto& [id, pending] : pcbs.connecting)
129 pending.cb(nullptr, deviceId);
130 for (auto& [id, pending] : pcbs.waiting)
131 pending.cb(nullptr, deviceId);
132 }
133
Adrien Béraud612b55b2023-05-29 10:42:04 -0400134 removeUnusedConnections();
135 }
136
Adrien Béraud612b55b2023-05-29 10:42:04 -0400137 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
138 const dht::Value::Id& vid,
139 const std::string& connType,
140 std::function<void(bool)> onConnected);
141 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
142 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
143 const std::string& name,
144 const dht::Value::Id& vid,
145 const std::shared_ptr<dht::crypto::Certificate>& cert);
146 void connectDevice(const DeviceId& deviceId,
147 const std::string& uri,
148 ConnectCallback cb,
149 bool noNewSocket = false,
150 bool forceNewSocket = false,
151 const std::string& connType = "");
152 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
153 const std::string& name,
154 ConnectCallback cb,
155 bool noNewSocket = false,
156 bool forceNewSocket = false,
157 const std::string& connType = "");
158 /**
159 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
160 * @param sock socket used to send the request
161 * @param name channel's name
162 * @param vid channel's id
163 * @param deviceId to identify the linked ConnectCallback
164 */
165 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
166 const std::string& name,
167 const DeviceId& deviceId,
168 const dht::Value::Id& vid);
169 /**
170 * Triggered when a PeerConnectionRequest comes from the DHT
171 */
172 void answerTo(IceTransport& ice,
173 const dht::Value::Id& id,
174 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
175 bool onRequestStartIce(const PeerConnectionRequest& req);
176 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
177 void onDhtPeerRequest(const PeerConnectionRequest& req,
178 const std::shared_ptr<dht::crypto::Certificate>& cert);
179
180 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
181 void onPeerResponse(const PeerConnectionRequest& req);
182 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
183
184 const std::shared_future<tls::DhParams> dhParams() const;
185 tls::CertificateStore& certStore() const { return *config_->certStore; }
186
187 mutable std::mutex messageMutex_ {};
188 std::set<std::string, std::less<>> treatedMessages_ {};
189
190 void loadTreatedMessages();
191 void saveTreatedMessages() const;
192
193 /// \return true if the given DHT message identifier has been treated
194 /// \note if message has not been treated yet this method st/ore this id and returns true at
195 /// further calls
196 bool isMessageTreated(std::string_view id);
197
198 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
199
200 /**
201 * Published IPv4/IPv6 addresses, used only if defined by the user in account
202 * configuration
203 *
204 */
205 IpAddr publishedIp_[2] {};
206
Adrien Béraud612b55b2023-05-29 10:42:04 -0400207 /**
208 * interface name on which this account is bound
209 */
210 std::string interface_ {"default"};
211
212 /**
213 * Get the local interface name on which this account is bound.
214 */
215 const std::string& getLocalInterface() const { return interface_; }
216
217 /**
218 * Get the published IP address, fallbacks to NAT if family is unspecified
219 * Prefers the usage of IPv4 if possible.
220 */
221 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
222
223 /**
224 * Set published IP address according to given family
225 */
226 void setPublishedAddress(const IpAddr& ip_addr);
227
228 /**
229 * Store the local/public addresses used to register
230 */
231 void storeActiveIpAddress(std::function<void()>&& cb = {});
232
233 /**
234 * Create and return ICE options.
235 */
236 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
237 IceTransportOptions getIceOptions() const noexcept;
238
239 /**
240 * Inform that a potential peer device have been found.
241 * Returns true only if the device certificate is a valid device certificate.
242 * In that case (true is returned) the account_id parameter is set to the peer account ID.
243 */
244 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
245 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
246
247 bool findCertificate(const dht::PkId& id,
248 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
249
250 /**
251 * returns whether or not UPnP is enabled and active
252 * ie: if it is able to make port mappings
253 */
254 bool getUPnPActive() const;
255
256 /**
257 * Triggered when a new TLS socket is ready to use
258 * @param ok If succeed
259 * @param deviceId Related device
260 * @param vid vid of the connection request
261 * @param name non empty if TLS was created by connectDevice()
262 */
263 void onTlsNegotiationDone(bool ok,
264 const DeviceId& deviceId,
265 const dht::Value::Id& vid,
266 const std::string& name = "");
267
268 std::shared_ptr<ConnectionManager::Config> config_;
269
270 IceTransportFactory iceFactory_ {};
271
272 mutable std::mt19937_64 rand;
273
274 iOSConnectedCallback iOSConnectedCb_ {};
275
276 std::mutex infosMtx_ {};
277 // Note: Someone can ask multiple sockets, so to avoid any race condition,
278 // each device can have multiple multiplexed sockets.
279 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
280
281 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
282 {
283 std::lock_guard<std::mutex> lk(infosMtx_);
284 auto it = infos_.find({deviceId, id});
285 if (it != infos_.end())
286 return it->second;
287 return {};
288 }
289
290 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
291 {
292 std::lock_guard<std::mutex> lk(infosMtx_);
293 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
294 auto& [key, value] = item;
295 return key.first == deviceId && value && value->socket_;
296 });
297 if (it != infos_.end())
298 return it->second;
299 return {};
300 }
301
302 ChannelRequestCallback channelReqCb_ {};
303 ConnectionReadyCallback connReadyCb_ {};
304 onICERequestCallback iceReqCb_ {};
305
306 /**
307 * Stores callback from connectDevice
308 * @note: each device needs a vector because several connectDevice can
309 * be done in parallel and we only want one socket
310 */
311 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400312
Adrien Béraud665294f2023-06-13 18:09:11 -0400313 struct PendingCb
314 {
315 std::string name;
316 ConnectCallback cb;
317 };
318 struct PendingOperations {
319 std::map<dht::Value::Id, PendingCb> connecting;
320 std::map<dht::Value::Id, PendingCb> waiting;
321 };
322
323 std::map<DeviceId, PendingOperations> pendingOperations_ {};
324
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400325 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400326 {
327 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400328 std::unique_lock<std::mutex> lk(connectCbsMtx_);
329 auto it = pendingOperations_.find(deviceId);
330 if (it == pendingOperations_.end())
331 return;
332 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400333 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400334 // Extract all pending callbacks
335 for (auto& [vid, cb] : pendingOperations.connecting)
336 ret.emplace_back(std::move(cb));
337 pendingOperations.connecting.clear();
338 for (auto& [vid, cb] : pendingOperations.waiting)
339 ret.emplace_back(std::move(cb));
340 pendingOperations.waiting.clear();
341 } else if (auto n = pendingOperations.waiting.extract(vid)) {
342 // If it's a waiting operation, just move it
343 ret.emplace_back(std::move(n.mapped()));
344 } else if (auto n = pendingOperations.connecting.extract(vid)) {
345 ret.emplace_back(std::move(n.mapped()));
346 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400347 // If accepted is false, it means that underlying socket is ok, but channel is declined
348 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400349 for (auto& [vid, cb] : pendingOperations.waiting)
350 ret.emplace_back(std::move(cb));
351 pendingOperations.waiting.clear();
352 for (auto& [vid, cb] : pendingOperations.connecting)
353 ret.emplace_back(std::move(cb));
354 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400355 }
356 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400357 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
358 pendingOperations_.erase(it);
359 lk.unlock();
360 for (auto& cb : ret)
361 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400362 }
363
Adrien Béraud665294f2023-06-13 18:09:11 -0400364 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400365 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400366 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400367 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400368 auto it = pendingOperations_.find(deviceId);
369 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400370 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400371 auto& pendingOp = it->second;
372 for (const auto& [id, pc]: pendingOp.connecting) {
373 if (vid == 0 || id == vid)
374 ret[id] = pc.name;
375 }
376 for (const auto& [id, pc]: pendingOp.waiting) {
377 if (vid == 0 || id == vid)
378 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400379 }
380 return ret;
381 }
382
383 std::shared_ptr<ConnectionManager::Impl> shared()
384 {
385 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
386 }
387 std::shared_ptr<ConnectionManager::Impl const> shared() const
388 {
389 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
390 }
391 std::weak_ptr<ConnectionManager::Impl> weak()
392 {
393 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
394 }
395 std::weak_ptr<ConnectionManager::Impl const> weak() const
396 {
397 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
398 }
399
400 std::atomic_bool isDestroying_ {false};
401};
402
403void
404ConnectionManager::Impl::connectDeviceStartIce(
405 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
406 const dht::Value::Id& vid,
407 const std::string& connType,
408 std::function<void(bool)> onConnected)
409{
410 auto deviceId = devicePk->getLongId();
411 auto info = getInfo(deviceId, vid);
412 if (!info) {
413 onConnected(false);
414 return;
415 }
416
417 std::unique_lock<std::mutex> lk(info->mutex_);
418 auto& ice = info->ice_;
419
420 if (!ice) {
421 if (config_->logger)
422 config_->logger->error("No ICE detected");
423 onConnected(false);
424 return;
425 }
426
427 auto iceAttributes = ice->getLocalAttributes();
428 std::ostringstream icemsg;
429 icemsg << iceAttributes.ufrag << "\n";
430 icemsg << iceAttributes.pwd << "\n";
431 for (const auto& addr : ice->getLocalCandidates(1)) {
432 icemsg << addr << "\n";
433 if (config_->logger)
434 config_->logger->debug("Added local ICE candidate {}", addr);
435 }
436
437 // Prepare connection request as a DHT message
438 PeerConnectionRequest val;
439
440 val.id = vid; /* Random id for the message unicity */
441 val.ice_msg = icemsg.str();
442 val.connType = connType;
443
444 auto value = std::make_shared<dht::Value>(std::move(val));
445 value->user_type = "peer_request";
446
447 // Send connection request through DHT
448 if (config_->logger)
449 config_->logger->debug("Request connection to {}", deviceId);
450 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
451 + devicePk->getId().toString()),
452 devicePk,
453 value,
454 [l=config_->logger,deviceId](bool ok) {
455 if (l)
456 l->debug("Sent connection request to {:s}. Put encrypted {:s}",
457 deviceId,
458 (ok ? "ok" : "failed"));
459 });
460 // Wait for call to onResponse() operated by DHT
461 if (isDestroying_) {
462 onConnected(true); // This avoid to wait new negotiation when destroying
463 return;
464 }
465
466 info->onConnected_ = std::move(onConnected);
467 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
468 std::chrono::steady_clock::now()
469 + DHT_MSG_TIMEOUT);
470 info->waitForAnswer_->async_wait(
471 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
472}
473
474void
475ConnectionManager::Impl::onResponse(const asio::error_code& ec,
476 const DeviceId& deviceId,
477 const dht::Value::Id& vid)
478{
479 if (ec == asio::error::operation_aborted)
480 return;
481 auto info = getInfo(deviceId, vid);
482 if (!info)
483 return;
484
485 std::unique_lock<std::mutex> lk(info->mutex_);
486 auto& ice = info->ice_;
487 if (isDestroying_) {
488 info->onConnected_(true); // The destructor can wake a pending wait here.
489 return;
490 }
491 if (!info->responseReceived_) {
492 if (config_->logger)
493 config_->logger->error("no response from DHT to E2E request.");
494 info->onConnected_(false);
495 return;
496 }
497
498 if (!info->ice_) {
499 info->onConnected_(false);
500 return;
501 }
502
503 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
504
505 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
506 if (config_->logger)
507 config_->logger->warn("start ICE failed");
508 info->onConnected_(false);
509 return;
510 }
511 info->onConnected_(true);
512}
513
514bool
515ConnectionManager::Impl::connectDeviceOnNegoDone(
516 const DeviceId& deviceId,
517 const std::string& name,
518 const dht::Value::Id& vid,
519 const std::shared_ptr<dht::crypto::Certificate>& cert)
520{
521 auto info = getInfo(deviceId, vid);
522 if (!info)
523 return false;
524
525 std::unique_lock<std::mutex> lk {info->mutex_};
526 if (info->waitForAnswer_) {
527 // Negotiation is done and connected, go to handshake
528 // and avoid any cancellation at this point.
529 info->waitForAnswer_->cancel();
530 }
531 auto& ice = info->ice_;
532 if (!ice || !ice->isRunning()) {
533 if (config_->logger)
534 config_->logger->error("No ICE detected or not running");
535 return false;
536 }
537
538 // Build socket
539 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
540 std::move(ice)),
541 true);
542
543 // Negotiate a TLS session
544 if (config_->logger)
545 config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid);
546 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
547 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400548 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400549 identity(),
550 dhParams(),
551 *cert);
552
553 info->tls_->setOnReady(
554 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
555 bool ok) {
556 if (auto shared = w.lock())
557 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
558 });
559 return true;
560}
561
562void
563ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
564 const std::string& name,
565 ConnectCallback cb,
566 bool noNewSocket,
567 bool forceNewSocket,
568 const std::string& connType)
569{
570 if (!dht()) {
571 cb(nullptr, deviceId);
572 return;
573 }
574 if (deviceId.toString() == identity().second->getLongId().toString()) {
575 cb(nullptr, deviceId);
576 return;
577 }
578 findCertificate(deviceId,
579 [w = weak(),
580 deviceId,
581 name,
582 cb = std::move(cb),
583 noNewSocket,
584 forceNewSocket,
585 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
586 if (!cert) {
587 if (auto shared = w.lock())
588 if (shared->config_->logger)
589 shared->config_->logger->error(
590 "No valid certificate found for device {}",
591 deviceId);
592 cb(nullptr, deviceId);
593 return;
594 }
595 if (auto shared = w.lock()) {
596 shared->connectDevice(cert,
597 name,
598 std::move(cb),
599 noNewSocket,
600 forceNewSocket,
601 connType);
602 } else
603 cb(nullptr, deviceId);
604 });
605}
606
607void
608ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
609 const std::string& name,
610 ConnectCallback cb,
611 bool noNewSocket,
612 bool forceNewSocket,
613 const std::string& connType)
614{
615 // Avoid dht operation in a DHT callback to avoid deadlocks
616 dht::ThreadPool::computation().run([w = weak(),
617 name = std::move(name),
618 cert = std::move(cert),
619 cb = std::move(cb),
620 noNewSocket,
621 forceNewSocket,
622 connType] {
623 auto devicePk = cert->getSharedPublicKey();
624 auto deviceId = devicePk->getLongId();
625 auto sthis = w.lock();
626 if (!sthis || sthis->isDestroying_) {
627 cb(nullptr, deviceId);
628 return;
629 }
630 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
631 auto isConnectingToDevice = false;
632 {
633 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400634 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
635 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400636 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400637 while (pendings.connecting.find(vid) != pendings.connecting.end()
638 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400639 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400640 }
641 }
642 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400643 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400644 // Save current request for sendChannelRequest.
645 // Note: do not return here, cause we can be in a state where first
646 // socket is negotiated and first channel is pending
647 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400648 if (isConnectingToDevice && !forceNewSocket)
649 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400650 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400651 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400652 }
653
654 // Check if already negotiated
655 CallbackId cbId(deviceId, vid);
656 if (auto info = sthis->getConnectedInfo(deviceId)) {
657 std::lock_guard<std::mutex> lk(info->mutex_);
658 if (info->socket_) {
659 if (sthis->config_->logger)
660 sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId);
661 info->cbIds_.emplace(cbId);
662 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
663 return;
664 }
665 }
666
667 if (isConnectingToDevice && !forceNewSocket) {
668 if (sthis->config_->logger)
669 sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId);
670 return;
671 }
672 if (noNewSocket) {
673 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400674 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400675 return;
676 }
677
678 // Note: used when the ice negotiation fails to erase
679 // all stored structures.
680 auto eraseInfo = [w, cbId] {
681 if (auto shared = w.lock()) {
682 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400683 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400684 std::lock_guard<std::mutex> lk(shared->infosMtx_);
685 shared->infos_.erase(cbId);
686 }
687 };
688
689 // If no socket exists, we need to initiate an ICE connection.
690 sthis->getIceOptions([w,
691 deviceId = std::move(deviceId),
692 devicePk = std::move(devicePk),
693 name = std::move(name),
694 cert = std::move(cert),
695 vid,
696 connType,
697 eraseInfo](auto&& ice_config) {
698 auto sthis = w.lock();
699 if (!sthis) {
700 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
701 return;
702 }
703 ice_config.tcpEnable = true;
704 ice_config.onInitDone = [w,
705 deviceId = std::move(deviceId),
706 devicePk = std::move(devicePk),
707 name = std::move(name),
708 cert = std::move(cert),
709 vid,
710 connType,
711 eraseInfo](bool ok) {
712 dht::ThreadPool::io().run([w = std::move(w),
713 devicePk = std::move(devicePk),
714 vid = std::move(vid),
715 eraseInfo,
716 connType, ok] {
717 auto sthis = w.lock();
718 if (!ok && sthis && sthis->config_->logger)
719 sthis->config_->logger->error("Cannot initialize ICE session.");
720 if (!sthis || !ok) {
721 eraseInfo();
722 return;
723 }
724 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
725 if (!ok) {
726 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
727 }
728 });
729 });
730 };
731 ice_config.onNegoDone = [w,
732 deviceId,
733 name,
734 cert = std::move(cert),
735 vid,
736 eraseInfo](bool ok) {
737 dht::ThreadPool::io().run([w = std::move(w),
738 deviceId = std::move(deviceId),
739 name = std::move(name),
740 cert = std::move(cert),
741 vid = std::move(vid),
742 eraseInfo = std::move(eraseInfo),
743 ok] {
744 auto sthis = w.lock();
745 if (!ok && sthis && sthis->config_->logger)
746 sthis->config_->logger->error("ICE negotiation failed.");
747 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
748 eraseInfo();
749 });
750 };
751
752 auto info = std::make_shared<ConnectionInfo>();
753 {
754 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
755 sthis->infos_[{deviceId, vid}] = info;
756 }
757 std::unique_lock<std::mutex> lk {info->mutex_};
758 ice_config.master = false;
759 ice_config.streamsCount = 1;
760 ice_config.compCountPerStream = 1;
761 info->ice_ = sthis->iceFactory_.createUTransport("");
762 if (!info->ice_) {
763 if (sthis->config_->logger)
764 sthis->config_->logger->error("Cannot initialize ICE session.");
765 eraseInfo();
766 return;
767 }
768 // We need to detect any shutdown if the ice session is destroyed before going to the
769 // TLS session;
770 info->ice_->setOnShutdown([eraseInfo]() {
771 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
772 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400773 try {
774 info->ice_->initIceInstance(ice_config);
775 } catch (const std::exception& e) {
776 if (sthis->config_->logger)
777 sthis->config_->logger->error("{}", e.what());
778 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
779 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400780 });
781 });
782}
783
784void
785ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
786 const std::string& name,
787 const DeviceId& deviceId,
788 const dht::Value::Id& vid)
789{
790 auto channelSock = sock->addChannel(name);
791 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
792 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400793 if (auto shared = w.lock())
794 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400795 });
796 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400797 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400798 auto shared = w.lock();
799 auto channelSock = wSock.lock();
800 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400801 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400802 });
803
804 ChannelRequest val;
805 val.name = channelSock->name();
806 val.state = ChannelRequestState::REQUEST;
807 val.channel = channelSock->channel();
808 msgpack::sbuffer buffer(256);
809 msgpack::pack(buffer, val);
810
811 std::error_code ec;
812 int res = sock->write(CONTROL_CHANNEL,
813 reinterpret_cast<const uint8_t*>(buffer.data()),
814 buffer.size(),
815 ec);
816 if (res < 0) {
817 // TODO check if we should handle errors here
818 if (config_->logger)
819 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
820 }
821}
822
823void
824ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
825{
826 auto device = req.owner->getLongId();
827 if (config_->logger)
828 config_->logger->debug("New response received from {}", device);
829 if (auto info = getInfo(device, req.id)) {
830 std::lock_guard<std::mutex> lk {info->mutex_};
831 info->responseReceived_ = true;
832 info->response_ = std::move(req);
833 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
834 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
835 this,
836 std::placeholders::_1,
837 device,
838 req.id));
839 } else {
840 if (config_->logger)
841 config_->logger->warn("Respond received, but cannot find request");
842 }
843}
844
845void
846ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
847{
848 if (!dht())
849 return;
850 dht()->listen<PeerConnectionRequest>(
851 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
852 [w = weak()](PeerConnectionRequest&& req) {
853 auto shared = w.lock();
854 if (!shared)
855 return false;
856 if (shared->isMessageTreated(to_hex_string(req.id))) {
857 // Message already treated. Just ignore
858 return true;
859 }
860 if (req.isAnswer) {
861 if (shared->config_->logger)
862 shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId());
863 } else {
864 if (shared->config_->logger)
865 shared->config_->logger->debug("Received request from {}", req.owner->getLongId());
866 }
867 if (req.isAnswer) {
868 shared->onPeerResponse(req);
869 } else {
870 // Async certificate checking
871 shared->dht()->findCertificate(
872 req.from,
873 [w, req = std::move(req)](
874 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
875 auto shared = w.lock();
876 if (!shared)
877 return;
878 dht::InfoHash peer_h;
879 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
880#if TARGET_OS_IOS
881 if (shared->iOSConnectedCb_(req.connType, peer_h))
882 return;
883#endif
884 shared->onDhtPeerRequest(req, cert);
885 } else {
886 if (shared->config_->logger)
887 shared->config_->logger->warn(
888 "Received request from untrusted peer {}",
889 req.owner->getLongId());
890 }
891 });
892 }
893
894 return true;
895 },
896 dht::Value::UserTypeFilter("peer_request"));
897}
898
899void
900ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
901 const DeviceId& deviceId,
902 const dht::Value::Id& vid,
903 const std::string& name)
904{
905 if (isDestroying_)
906 return;
907 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
908 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
909 // asked yet)
910 auto isDhtRequest = name.empty();
911 if (!ok) {
912 if (isDhtRequest) {
913 if (config_->logger)
914 config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}",
915 deviceId,
916 name,
917 vid);
918 if (connReadyCb_)
919 connReadyCb_(deviceId, "", nullptr);
920 } else {
921 if (config_->logger)
922 config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}",
923 deviceId,
924 name,
925 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400926 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400927 }
928 } else {
929 // The socket is ready, store it
930 if (isDhtRequest) {
931 if (config_->logger)
932 config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}",
933 deviceId,
934 vid);
935 } else {
936 if (config_->logger)
937 config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}",
938 deviceId,
939 name,
940 vid);
941 }
942
943 auto info = getInfo(deviceId, vid);
944 addNewMultiplexedSocket({deviceId, vid}, info);
945 // Finally, open the channel and launch pending callbacks
946 if (info->socket_) {
947 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -0400948 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400949 if (config_->logger)
950 config_->logger->debug("Send request on TLS socket for channel {} to {}",
Adrien Béraud665294f2023-06-13 18:09:11 -0400951 name,
952 deviceId.toString());
953 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400954 }
955 }
956 }
957}
958
959void
960ConnectionManager::Impl::answerTo(IceTransport& ice,
961 const dht::Value::Id& id,
962 const std::shared_ptr<dht::crypto::PublicKey>& from)
963{
964 // NOTE: This is a shortest version of a real SDP message to save some bits
965 auto iceAttributes = ice.getLocalAttributes();
966 std::ostringstream icemsg;
967 icemsg << iceAttributes.ufrag << "\n";
968 icemsg << iceAttributes.pwd << "\n";
969 for (const auto& addr : ice.getLocalCandidates(1)) {
970 icemsg << addr << "\n";
971 }
972
973 // Send PeerConnection response
974 PeerConnectionRequest val;
975 val.id = id;
976 val.ice_msg = icemsg.str();
977 val.isAnswer = true;
978 auto value = std::make_shared<dht::Value>(std::move(val));
979 value->user_type = "peer_request";
980
981 if (config_->logger)
982 config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId());
983 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
984 + from->getId().toString()),
985 from,
986 value,
987 [from,l=config_->logger](bool ok) {
988 if (l)
989 l->debug("Answer to connection request from {:s}. Put encrypted {:s}",
990 from->getLongId(),
991 (ok ? "ok" : "failed"));
992 });
993}
994
995bool
996ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
997{
998 auto deviceId = req.owner->getLongId();
999 auto info = getInfo(deviceId, req.id);
1000 if (!info)
1001 return false;
1002
1003 std::unique_lock<std::mutex> lk {info->mutex_};
1004 auto& ice = info->ice_;
1005 if (!ice) {
1006 if (config_->logger)
1007 config_->logger->error("No ICE detected");
1008 if (connReadyCb_)
1009 connReadyCb_(deviceId, "", nullptr);
1010 return false;
1011 }
1012
1013 auto sdp = ice->parseIceCandidates(req.ice_msg);
1014 answerTo(*ice, req.id, req.owner);
1015 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1016 if (config_->logger)
1017 config_->logger->error("Start ICE failed - fallback to TURN");
1018 ice = nullptr;
1019 if (connReadyCb_)
1020 connReadyCb_(deviceId, "", nullptr);
1021 return false;
1022 }
1023 return true;
1024}
1025
1026bool
1027ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1028{
1029 auto deviceId = req.owner->getLongId();
1030 auto info = getInfo(deviceId, req.id);
1031 if (!info)
1032 return false;
1033
1034 std::unique_lock<std::mutex> lk {info->mutex_};
1035 auto& ice = info->ice_;
1036 if (!ice) {
1037 if (config_->logger)
1038 config_->logger->error("No ICE detected");
1039 return false;
1040 }
1041
1042 // Build socket
1043 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1044 std::move(ice)),
1045 false);
1046
1047 // init TLS session
1048 auto ph = req.from;
1049 if (config_->logger)
1050 config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}",
1051 req.from,
1052 req.id);
1053 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1054 std::move(endpoint),
1055 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001056 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001057 identity(),
1058 dhParams(),
1059 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1060 auto shared = w.lock();
1061 if (!shared)
1062 return false;
1063 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1064 if (!crt)
1065 return false;
1066 return crt->getPacked() == cert.getPacked();
1067 });
1068
1069 info->tls_->setOnReady(
1070 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1071 if (auto shared = w.lock())
1072 shared->onTlsNegotiationDone(ok, deviceId, vid);
1073 });
1074 return true;
1075}
1076
1077void
1078ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1079 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1080{
1081 auto deviceId = req.owner->getLongId();
1082 if (config_->logger)
1083 config_->logger->debug("New connection request from {}", deviceId);
1084 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1085 if (config_->logger)
1086 config_->logger->debug("Refuse connection from {}", deviceId);
1087 return;
1088 }
1089
1090 // Because the connection is accepted, create an ICE socket.
1091 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1092 auto shared = w.lock();
1093 if (!shared)
1094 return;
1095 // Note: used when the ice negotiation fails to erase
1096 // all stored structures.
1097 auto eraseInfo = [w, id = req.id, deviceId] {
1098 if (auto shared = w.lock()) {
1099 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001100 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001101 if (shared->connReadyCb_)
1102 shared->connReadyCb_(deviceId, "", nullptr);
1103 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1104 shared->infos_.erase({deviceId, id});
1105 }
1106 };
1107
1108 ice_config.tcpEnable = true;
1109 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1110 auto shared = w.lock();
1111 if (!shared)
1112 return;
1113 if (!ok) {
1114 if (shared->config_->logger)
1115 shared->config_->logger->error("Cannot initialize ICE session.");
1116 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1117 return;
1118 }
1119
1120 dht::ThreadPool::io().run(
1121 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1122 auto shared = w.lock();
1123 if (!shared)
1124 return;
1125 if (!shared->onRequestStartIce(req))
1126 eraseInfo();
1127 });
1128 };
1129
1130 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1131 auto shared = w.lock();
1132 if (!shared)
1133 return;
1134 if (!ok) {
1135 if (shared->config_->logger)
1136 shared->config_->logger->error("ICE negotiation failed.");
1137 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1138 return;
1139 }
1140
1141 dht::ThreadPool::io().run(
1142 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1143 if (auto shared = w.lock())
1144 if (!shared->onRequestOnNegoDone(req))
1145 eraseInfo();
1146 });
1147 };
1148
1149 // Negotiate a new ICE socket
1150 auto info = std::make_shared<ConnectionInfo>();
1151 {
1152 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1153 shared->infos_[{deviceId, req.id}] = info;
1154 }
1155 if (shared->config_->logger)
1156 shared->config_->logger->debug("Accepting connection from {}", deviceId);
1157 std::unique_lock<std::mutex> lk {info->mutex_};
1158 ice_config.streamsCount = 1;
1159 ice_config.compCountPerStream = 1; // TCP
1160 ice_config.master = true;
1161 info->ice_ = shared->iceFactory_.createUTransport("");
1162 if (not info->ice_) {
1163 if (shared->config_->logger)
1164 shared->config_->logger->error("Cannot initialize ICE session");
1165 eraseInfo();
1166 return;
1167 }
1168 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1169 info->ice_->setOnShutdown([eraseInfo]() {
1170 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1171 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001172 try {
1173 info->ice_->initIceInstance(ice_config);
1174 } catch (const std::exception& e) {
1175 if (shared->config_->logger)
1176 shared->config_->logger->error("{}", e.what());
1177 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1178 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001179 });
1180}
1181
1182void
1183ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1184{
1185 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1186 info->socket_->setOnReady(
1187 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1188 if (auto sthis = w.lock())
1189 if (sthis->connReadyCb_)
1190 sthis->connReadyCb_(deviceId, socket->name(), socket);
1191 });
1192 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1193 const uint16_t&,
1194 const std::string& name) {
1195 if (auto sthis = w.lock())
1196 if (sthis->channelReqCb_)
1197 return sthis->channelReqCb_(peer, name);
1198 return false;
1199 });
1200 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1201 // Cancel current outgoing connections
1202 dht::ThreadPool::io().run([w, deviceId, vid] {
1203 auto sthis = w.lock();
1204 if (!sthis)
1205 return;
1206
1207 std::set<CallbackId> ids;
1208 if (auto info = sthis->getInfo(deviceId, vid)) {
1209 std::lock_guard<std::mutex> lk(info->mutex_);
1210 if (info->socket_) {
1211 ids = std::move(info->cbIds_);
1212 info->socket_->shutdown();
1213 }
1214 }
1215 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001216 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001217
1218 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1219 sthis->infos_.erase({deviceId, vid});
1220 });
1221 });
1222}
1223
1224const std::shared_future<tls::DhParams>
1225ConnectionManager::Impl::dhParams() const
1226{
1227 return dht::ThreadPool::computation().get<tls::DhParams>(
1228 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1229 ;
1230}
1231
1232template<typename ID = dht::Value::Id>
1233std::set<ID, std::less<>>
1234loadIdList(const std::string& path)
1235{
1236 std::set<ID, std::less<>> ids;
1237 std::ifstream file = fileutils::ifstream(path);
1238 if (!file.is_open()) {
1239 //JAMI_DBG("Could not load %s", path.c_str());
1240 return ids;
1241 }
1242 std::string line;
1243 while (std::getline(file, line)) {
1244 if constexpr (std::is_same<ID, std::string>::value) {
1245 ids.emplace(std::move(line));
1246 } else if constexpr (std::is_integral<ID>::value) {
1247 ID vid;
1248 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1249 ec == std::errc()) {
1250 ids.emplace(vid);
1251 }
1252 }
1253 }
1254 return ids;
1255}
1256
1257template<typename List = std::set<dht::Value::Id>>
1258void
1259saveIdList(const std::string& path, const List& ids)
1260{
1261 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1262 if (!file.is_open()) {
1263 //JAMI_ERR("Could not save to %s", path.c_str());
1264 return;
1265 }
1266 for (auto& c : ids)
1267 file << std::hex << c << "\n";
1268}
1269
1270void
1271ConnectionManager::Impl::loadTreatedMessages()
1272{
1273 std::lock_guard<std::mutex> lock(messageMutex_);
1274 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1275 treatedMessages_ = loadIdList<std::string>(path);
1276 if (treatedMessages_.empty()) {
1277 auto messages = loadIdList(path);
1278 for (const auto& m : messages)
1279 treatedMessages_.emplace(to_hex_string(m));
1280 }
1281}
1282
1283void
1284ConnectionManager::Impl::saveTreatedMessages() const
1285{
1286 dht::ThreadPool::io().run([w = weak()]() {
1287 if (auto sthis = w.lock()) {
1288 auto& this_ = *sthis;
1289 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1290 fileutils::check_dir(this_.config_->cachePath.c_str());
1291 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1292 + DIR_SEPARATOR_STR "treatedMessages",
1293 this_.treatedMessages_);
1294 }
1295 });
1296}
1297
1298bool
1299ConnectionManager::Impl::isMessageTreated(std::string_view id)
1300{
1301 std::lock_guard<std::mutex> lock(messageMutex_);
1302 auto res = treatedMessages_.emplace(id);
1303 if (res.second) {
1304 saveTreatedMessages();
1305 return false;
1306 }
1307 return true;
1308}
1309
1310/**
1311 * returns whether or not UPnP is enabled and active_
1312 * ie: if it is able to make port mappings
1313 */
1314bool
1315ConnectionManager::Impl::getUPnPActive() const
1316{
1317 return config_->getUPnPActive();
1318}
1319
1320IpAddr
1321ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1322{
1323 if (family == AF_INET)
1324 return publishedIp_[0];
1325 if (family == AF_INET6)
1326 return publishedIp_[1];
1327
1328 assert(family == AF_UNSPEC);
1329
1330 // If family is not set, prefere IPv4 if available. It's more
1331 // likely to succeed behind NAT.
1332 if (publishedIp_[0])
1333 return publishedIp_[0];
1334 if (publishedIp_[1])
1335 return publishedIp_[1];
1336 return {};
1337}
1338
1339void
1340ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1341{
1342 if (ip_addr.getFamily() == AF_INET) {
1343 publishedIp_[0] = ip_addr;
1344 } else {
1345 publishedIp_[1] = ip_addr;
1346 }
1347}
1348
1349void
1350ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1351{
1352 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1353 bool hasIpv4 {false}, hasIpv6 {false};
1354 for (auto& result : results) {
1355 auto family = result.getFamily();
1356 if (family == AF_INET) {
1357 if (not hasIpv4) {
1358 hasIpv4 = true;
1359 if (config_->logger)
1360 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1361 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1362 setPublishedAddress(*result.get());
1363 if (config_->upnpCtrl) {
1364 config_->upnpCtrl->setPublicAddress(*result.get());
1365 }
1366 }
1367 } else if (family == AF_INET6) {
1368 if (not hasIpv6) {
1369 hasIpv6 = true;
1370 if (config_->logger)
1371 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1372 setPublishedAddress(*result.get());
1373 }
1374 }
1375 if (hasIpv4 and hasIpv6)
1376 break;
1377 }
1378 if (cb)
1379 cb();
1380 });
1381}
1382
1383void
1384ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1385{
1386 storeActiveIpAddress([this, cb = std::move(cb)] {
1387 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1388 auto publishedAddr = getPublishedIpAddress();
1389
1390 if (publishedAddr) {
1391 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1392 publishedAddr.getFamily());
1393 if (interfaceAddr) {
1394 opts.accountLocalAddr = interfaceAddr;
1395 opts.accountPublicAddr = publishedAddr;
1396 }
1397 }
1398 if (cb)
1399 cb(std::move(opts));
1400 });
1401}
1402
1403IceTransportOptions
1404ConnectionManager::Impl::getIceOptions() const noexcept
1405{
1406 IceTransportOptions opts;
Adrien Béraudf7081d32023-07-19 23:02:11 -04001407 opts.factory = (IceTransportFactory*)&iceFactory_;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001408 opts.upnpEnable = getUPnPActive();
1409
1410 if (config_->stunEnabled)
1411 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1412 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001413 if (config_->turnCache) {
1414 auto turnAddr = config_->turnCache->getResolvedTurn();
1415 if (turnAddr != std::nullopt) {
1416 opts.turnServers.emplace_back(TurnServerInfo()
1417 .setUri(turnAddr->toString())
1418 .setUsername(config_->turnServerUserName)
1419 .setPassword(config_->turnServerPwd)
1420 .setRealm(config_->turnServerRealm));
1421 }
1422 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001423 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001424 .setUri(config_->turnServer)
1425 .setUsername(config_->turnServerUserName)
1426 .setPassword(config_->turnServerPwd)
1427 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001428 }
1429 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1430 // co issues. So this needs some debug. for now just disable
1431 // if (cacheTurnV6 && *cacheTurnV6) {
1432 // opts.turnServers.emplace_back(TurnServerInfo()
1433 // .setUri(cacheTurnV6->toString(true))
1434 // .setUsername(turnServerUserName_)
1435 // .setPassword(turnServerPwd_)
1436 // .setRealm(turnServerRealm_));
1437 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001438 }
1439 return opts;
1440}
1441
1442bool
1443ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1444 dht::InfoHash& account_id,
1445 const std::shared_ptr<Logger>& logger)
1446{
1447 if (not crt)
1448 return false;
1449
1450 auto top_issuer = crt;
1451 while (top_issuer->issuer)
1452 top_issuer = top_issuer->issuer;
1453
1454 // Device certificate can't be self-signed
1455 if (top_issuer == crt) {
1456 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001457 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001458 return false;
1459 }
1460
1461 // Check peer certificate chain
1462 // Trust store with top issuer as the only CA
1463 dht::crypto::TrustList peer_trust;
1464 peer_trust.add(*top_issuer);
1465 if (not peer_trust.verify(*crt)) {
1466 if (logger)
1467 logger->warn("Found invalid peer device: {}", crt->getLongId());
1468 return false;
1469 }
1470
1471 // Check cached OCSP response
1472 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1473 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001474 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001475 return false;
1476 }
1477
1478 account_id = crt->issuer->getId();
1479 if (logger)
1480 logger->warn("Found peer device: {} account:{} CA:{}",
1481 crt->getLongId(),
1482 account_id,
1483 top_issuer->getId());
1484 return true;
1485}
1486
1487bool
1488ConnectionManager::Impl::findCertificate(
1489 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1490{
1491 if (auto cert = certStore().getCertificate(id.toString())) {
1492 if (cb)
1493 cb(cert);
1494 } else if (cb)
1495 cb(nullptr);
1496 return true;
1497}
1498
1499ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1500 : pimpl_ {std::make_shared<Impl>(config_)}
1501{}
1502
1503ConnectionManager::~ConnectionManager()
1504{
1505 if (pimpl_)
1506 pimpl_->shutdown();
1507}
1508
1509void
1510ConnectionManager::connectDevice(const DeviceId& deviceId,
1511 const std::string& name,
1512 ConnectCallback cb,
1513 bool noNewSocket,
1514 bool forceNewSocket,
1515 const std::string& connType)
1516{
1517 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1518}
1519
1520void
1521ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1522 const std::string& name,
1523 ConnectCallback cb,
1524 bool noNewSocket,
1525 bool forceNewSocket,
1526 const std::string& connType)
1527{
1528 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1529}
1530
1531bool
1532ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1533{
Adrien Béraud665294f2023-06-13 18:09:11 -04001534 auto pending = pimpl_->getPendingIds(deviceId);
1535 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001536 != pending.end();
1537}
1538
1539void
1540ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1541{
1542 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1543 std::set<DeviceId> peersDevices;
1544 {
1545 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1546 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1547 auto const& [key, value] = *iter;
1548 auto deviceId = key.first;
1549 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1550 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1551 connInfos.emplace_back(value);
1552 peersDevices.emplace(deviceId);
1553 iter = pimpl_->infos_.erase(iter);
1554 } else {
1555 iter++;
1556 }
1557 }
1558 }
1559 // Stop connections to all peers devices
1560 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001561 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001562 // This will close the TLS Session
1563 pimpl_->removeUnusedConnections(deviceId);
1564 }
1565 for (auto& info : connInfos) {
1566 if (info->socket_)
1567 info->socket_->shutdown();
1568 if (info->waitForAnswer_)
1569 info->waitForAnswer_->cancel();
1570 if (info->ice_) {
1571 std::unique_lock<std::mutex> lk {info->mutex_};
1572 dht::ThreadPool::io().run(
1573 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1574 }
1575 }
1576}
1577
1578void
1579ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1580{
1581 pimpl_->onDhtConnected(devicePk);
1582}
1583
1584void
1585ConnectionManager::onICERequest(onICERequestCallback&& cb)
1586{
1587 pimpl_->iceReqCb_ = std::move(cb);
1588}
1589
1590void
1591ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1592{
1593 pimpl_->channelReqCb_ = std::move(cb);
1594}
1595
1596void
1597ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1598{
1599 pimpl_->connReadyCb_ = std::move(cb);
1600}
1601
1602void
1603ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1604{
1605 pimpl_->iOSConnectedCb_ = std::move(cb);
1606}
1607
1608std::size_t
1609ConnectionManager::activeSockets() const
1610{
1611 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1612 return pimpl_->infos_.size();
1613}
1614
1615void
1616ConnectionManager::monitor() const
1617{
1618 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1619 auto logger = pimpl_->config_->logger;
1620 if (!logger)
1621 return;
1622 logger->debug("ConnectionManager current status:");
1623 for (const auto& [_, ci] : pimpl_->infos_) {
1624 if (ci->socket_)
1625 ci->socket_->monitor();
1626 }
1627 logger->debug("ConnectionManager end status.");
1628}
1629
1630void
1631ConnectionManager::connectivityChanged()
1632{
1633 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1634 for (const auto& [_, ci] : pimpl_->infos_) {
1635 if (ci->socket_)
1636 ci->socket_->sendBeacon();
1637 }
1638}
1639
1640void
1641ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1642{
1643 return pimpl_->getIceOptions(std::move(cb));
1644}
1645
1646IceTransportOptions
1647ConnectionManager::getIceOptions() const noexcept
1648{
1649 return pimpl_->getIceOptions();
1650}
1651
1652IpAddr
1653ConnectionManager::getPublishedIpAddress(uint16_t family) const
1654{
1655 return pimpl_->getPublishedIpAddress(family);
1656}
1657
1658void
1659ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1660{
1661 return pimpl_->setPublishedAddress(ip_addr);
1662}
1663
1664void
1665ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1666{
1667 return pimpl_->storeActiveIpAddress(std::move(cb));
1668}
1669
1670std::shared_ptr<ConnectionManager::Config>
1671ConnectionManager::getConfig()
1672{
1673 return pimpl_->config_;
1674}
1675
Sébastien Blin464bdff2023-07-19 08:02:53 -04001676} // namespace dhtnet