blob: 7fa8aac6426f933c6dc65d3ee05d83d04938c6dc [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040019#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040020#include "upnp/upnp_control.h"
21#include "certstore.h"
22#include "fileutils.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25
26#include <opendht/crypto.h>
27#include <opendht/thread_pool.h>
28#include <opendht/value.h>
29#include <asio.hpp>
30
31#include <algorithm>
32#include <mutex>
33#include <map>
34#include <condition_variable>
35#include <set>
36#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040037#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040038
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040039namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040040static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
41static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
42
43using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040044using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040045
46struct ConnectionInfo
47{
48 ~ConnectionInfo()
49 {
50 if (socket_)
51 socket_->join();
52 }
53
54 std::mutex mutex_ {};
55 bool responseReceived_ {false};
56 PeerConnectionRequest response_ {};
57 std::unique_ptr<IceTransport> ice_ {nullptr};
58 // Used to store currently non ready TLS Socket
59 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
60 std::shared_ptr<MultiplexedSocket> socket_ {};
61 std::set<CallbackId> cbIds_ {};
62
63 std::function<void(bool)> onConnected_;
64 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
65};
66
67/**
68 * returns whether or not UPnP is enabled and active_
69 * ie: if it is able to make port mappings
70 */
71bool
72ConnectionManager::Config::getUPnPActive() const
73{
74 if (upnpCtrl)
75 return upnpCtrl->isReady();
76 return false;
77}
78
79class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
80{
81public:
82 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
83 : config_ {std::move(config_)}
84 {}
85 ~Impl() {}
86
87 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
88 const dht::crypto::Identity& identity() const { return config_->id; }
89
90 void removeUnusedConnections(const DeviceId& deviceId = {})
91 {
92 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
93
94 {
95 std::lock_guard<std::mutex> lk(infosMtx_);
96 for (auto it = infos_.begin(); it != infos_.end();) {
97 auto& [key, info] = *it;
98 if (info && (!deviceId || key.first == deviceId)) {
99 unused.emplace_back(std::move(info));
100 it = infos_.erase(it);
101 } else {
102 ++it;
103 }
104 }
105 }
106 for (auto& info: unused) {
107 if (info->tls_)
108 info->tls_->shutdown();
109 if (info->socket_)
110 info->socket_->shutdown();
111 if (info->waitForAnswer_)
112 info->waitForAnswer_->cancel();
113 }
114 if (!unused.empty())
115 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
116 }
117
118 void shutdown()
119 {
120 if (isDestroying_.exchange(true))
121 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400122 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400123 {
124 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400125 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400126 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400127 for (auto& [deviceId, pcbs] : po) {
128 for (auto& [id, pending] : pcbs.connecting)
129 pending.cb(nullptr, deviceId);
130 for (auto& [id, pending] : pcbs.waiting)
131 pending.cb(nullptr, deviceId);
132 }
133
Adrien Béraud612b55b2023-05-29 10:42:04 -0400134 removeUnusedConnections();
135 }
136
Adrien Béraud612b55b2023-05-29 10:42:04 -0400137 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
138 const dht::Value::Id& vid,
139 const std::string& connType,
140 std::function<void(bool)> onConnected);
141 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
142 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
143 const std::string& name,
144 const dht::Value::Id& vid,
145 const std::shared_ptr<dht::crypto::Certificate>& cert);
146 void connectDevice(const DeviceId& deviceId,
147 const std::string& uri,
148 ConnectCallback cb,
149 bool noNewSocket = false,
150 bool forceNewSocket = false,
151 const std::string& connType = "");
152 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
153 const std::string& name,
154 ConnectCallback cb,
155 bool noNewSocket = false,
156 bool forceNewSocket = false,
157 const std::string& connType = "");
158 /**
159 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
160 * @param sock socket used to send the request
161 * @param name channel's name
162 * @param vid channel's id
163 * @param deviceId to identify the linked ConnectCallback
164 */
165 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
166 const std::string& name,
167 const DeviceId& deviceId,
168 const dht::Value::Id& vid);
169 /**
170 * Triggered when a PeerConnectionRequest comes from the DHT
171 */
172 void answerTo(IceTransport& ice,
173 const dht::Value::Id& id,
174 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
175 bool onRequestStartIce(const PeerConnectionRequest& req);
176 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
177 void onDhtPeerRequest(const PeerConnectionRequest& req,
178 const std::shared_ptr<dht::crypto::Certificate>& cert);
179
180 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
181 void onPeerResponse(const PeerConnectionRequest& req);
182 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
183
184 const std::shared_future<tls::DhParams> dhParams() const;
185 tls::CertificateStore& certStore() const { return *config_->certStore; }
186
187 mutable std::mutex messageMutex_ {};
188 std::set<std::string, std::less<>> treatedMessages_ {};
189
190 void loadTreatedMessages();
191 void saveTreatedMessages() const;
192
193 /// \return true if the given DHT message identifier has been treated
194 /// \note if message has not been treated yet this method st/ore this id and returns true at
195 /// further calls
196 bool isMessageTreated(std::string_view id);
197
198 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
199
200 /**
201 * Published IPv4/IPv6 addresses, used only if defined by the user in account
202 * configuration
203 *
204 */
205 IpAddr publishedIp_[2] {};
206
Adrien Béraud612b55b2023-05-29 10:42:04 -0400207 /**
208 * interface name on which this account is bound
209 */
210 std::string interface_ {"default"};
211
212 /**
213 * Get the local interface name on which this account is bound.
214 */
215 const std::string& getLocalInterface() const { return interface_; }
216
217 /**
218 * Get the published IP address, fallbacks to NAT if family is unspecified
219 * Prefers the usage of IPv4 if possible.
220 */
221 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
222
223 /**
224 * Set published IP address according to given family
225 */
226 void setPublishedAddress(const IpAddr& ip_addr);
227
228 /**
229 * Store the local/public addresses used to register
230 */
231 void storeActiveIpAddress(std::function<void()>&& cb = {});
232
233 /**
234 * Create and return ICE options.
235 */
236 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
237 IceTransportOptions getIceOptions() const noexcept;
238
239 /**
240 * Inform that a potential peer device have been found.
241 * Returns true only if the device certificate is a valid device certificate.
242 * In that case (true is returned) the account_id parameter is set to the peer account ID.
243 */
244 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
245 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
246
247 bool findCertificate(const dht::PkId& id,
248 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
249
250 /**
251 * returns whether or not UPnP is enabled and active
252 * ie: if it is able to make port mappings
253 */
254 bool getUPnPActive() const;
255
256 /**
257 * Triggered when a new TLS socket is ready to use
258 * @param ok If succeed
259 * @param deviceId Related device
260 * @param vid vid of the connection request
261 * @param name non empty if TLS was created by connectDevice()
262 */
263 void onTlsNegotiationDone(bool ok,
264 const DeviceId& deviceId,
265 const dht::Value::Id& vid,
266 const std::string& name = "");
267
268 std::shared_ptr<ConnectionManager::Config> config_;
269
270 IceTransportFactory iceFactory_ {};
271
272 mutable std::mt19937_64 rand;
273
274 iOSConnectedCallback iOSConnectedCb_ {};
275
276 std::mutex infosMtx_ {};
277 // Note: Someone can ask multiple sockets, so to avoid any race condition,
278 // each device can have multiple multiplexed sockets.
279 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
280
281 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
282 {
283 std::lock_guard<std::mutex> lk(infosMtx_);
284 auto it = infos_.find({deviceId, id});
285 if (it != infos_.end())
286 return it->second;
287 return {};
288 }
289
290 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
291 {
292 std::lock_guard<std::mutex> lk(infosMtx_);
293 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
294 auto& [key, value] = item;
295 return key.first == deviceId && value && value->socket_;
296 });
297 if (it != infos_.end())
298 return it->second;
299 return {};
300 }
301
302 ChannelRequestCallback channelReqCb_ {};
303 ConnectionReadyCallback connReadyCb_ {};
304 onICERequestCallback iceReqCb_ {};
305
306 /**
307 * Stores callback from connectDevice
308 * @note: each device needs a vector because several connectDevice can
309 * be done in parallel and we only want one socket
310 */
311 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400312
Adrien Béraud665294f2023-06-13 18:09:11 -0400313 struct PendingCb
314 {
315 std::string name;
316 ConnectCallback cb;
317 };
318 struct PendingOperations {
319 std::map<dht::Value::Id, PendingCb> connecting;
320 std::map<dht::Value::Id, PendingCb> waiting;
321 };
322
323 std::map<DeviceId, PendingOperations> pendingOperations_ {};
324
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400325 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400326 {
327 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400328 std::unique_lock<std::mutex> lk(connectCbsMtx_);
329 auto it = pendingOperations_.find(deviceId);
330 if (it == pendingOperations_.end())
331 return;
332 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400333 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400334 // Extract all pending callbacks
335 for (auto& [vid, cb] : pendingOperations.connecting)
336 ret.emplace_back(std::move(cb));
337 pendingOperations.connecting.clear();
338 for (auto& [vid, cb] : pendingOperations.waiting)
339 ret.emplace_back(std::move(cb));
340 pendingOperations.waiting.clear();
341 } else if (auto n = pendingOperations.waiting.extract(vid)) {
342 // If it's a waiting operation, just move it
343 ret.emplace_back(std::move(n.mapped()));
344 } else if (auto n = pendingOperations.connecting.extract(vid)) {
345 ret.emplace_back(std::move(n.mapped()));
346 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400347 // If accepted is false, it means that underlying socket is ok, but channel is declined
348 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400349 for (auto& [vid, cb] : pendingOperations.waiting)
350 ret.emplace_back(std::move(cb));
351 pendingOperations.waiting.clear();
352 for (auto& [vid, cb] : pendingOperations.connecting)
353 ret.emplace_back(std::move(cb));
354 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400355 }
356 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400357 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
358 pendingOperations_.erase(it);
359 lk.unlock();
360 for (auto& cb : ret)
361 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400362 }
363
Adrien Béraud665294f2023-06-13 18:09:11 -0400364 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400365 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400366 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400367 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400368 auto it = pendingOperations_.find(deviceId);
369 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400370 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400371 auto& pendingOp = it->second;
372 for (const auto& [id, pc]: pendingOp.connecting) {
373 if (vid == 0 || id == vid)
374 ret[id] = pc.name;
375 }
376 for (const auto& [id, pc]: pendingOp.waiting) {
377 if (vid == 0 || id == vid)
378 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400379 }
380 return ret;
381 }
382
383 std::shared_ptr<ConnectionManager::Impl> shared()
384 {
385 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
386 }
387 std::shared_ptr<ConnectionManager::Impl const> shared() const
388 {
389 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
390 }
391 std::weak_ptr<ConnectionManager::Impl> weak()
392 {
393 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
394 }
395 std::weak_ptr<ConnectionManager::Impl const> weak() const
396 {
397 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
398 }
399
400 std::atomic_bool isDestroying_ {false};
401};
402
403void
404ConnectionManager::Impl::connectDeviceStartIce(
405 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
406 const dht::Value::Id& vid,
407 const std::string& connType,
408 std::function<void(bool)> onConnected)
409{
410 auto deviceId = devicePk->getLongId();
411 auto info = getInfo(deviceId, vid);
412 if (!info) {
413 onConnected(false);
414 return;
415 }
416
417 std::unique_lock<std::mutex> lk(info->mutex_);
418 auto& ice = info->ice_;
419
420 if (!ice) {
421 if (config_->logger)
422 config_->logger->error("No ICE detected");
423 onConnected(false);
424 return;
425 }
426
427 auto iceAttributes = ice->getLocalAttributes();
428 std::ostringstream icemsg;
429 icemsg << iceAttributes.ufrag << "\n";
430 icemsg << iceAttributes.pwd << "\n";
431 for (const auto& addr : ice->getLocalCandidates(1)) {
432 icemsg << addr << "\n";
433 if (config_->logger)
434 config_->logger->debug("Added local ICE candidate {}", addr);
435 }
436
437 // Prepare connection request as a DHT message
438 PeerConnectionRequest val;
439
440 val.id = vid; /* Random id for the message unicity */
441 val.ice_msg = icemsg.str();
442 val.connType = connType;
443
444 auto value = std::make_shared<dht::Value>(std::move(val));
445 value->user_type = "peer_request";
446
447 // Send connection request through DHT
448 if (config_->logger)
449 config_->logger->debug("Request connection to {}", deviceId);
450 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
451 + devicePk->getId().toString()),
452 devicePk,
453 value,
454 [l=config_->logger,deviceId](bool ok) {
455 if (l)
456 l->debug("Sent connection request to {:s}. Put encrypted {:s}",
457 deviceId,
458 (ok ? "ok" : "failed"));
459 });
460 // Wait for call to onResponse() operated by DHT
461 if (isDestroying_) {
462 onConnected(true); // This avoid to wait new negotiation when destroying
463 return;
464 }
465
466 info->onConnected_ = std::move(onConnected);
467 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
468 std::chrono::steady_clock::now()
469 + DHT_MSG_TIMEOUT);
470 info->waitForAnswer_->async_wait(
471 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
472}
473
474void
475ConnectionManager::Impl::onResponse(const asio::error_code& ec,
476 const DeviceId& deviceId,
477 const dht::Value::Id& vid)
478{
479 if (ec == asio::error::operation_aborted)
480 return;
481 auto info = getInfo(deviceId, vid);
482 if (!info)
483 return;
484
485 std::unique_lock<std::mutex> lk(info->mutex_);
486 auto& ice = info->ice_;
487 if (isDestroying_) {
488 info->onConnected_(true); // The destructor can wake a pending wait here.
489 return;
490 }
491 if (!info->responseReceived_) {
492 if (config_->logger)
493 config_->logger->error("no response from DHT to E2E request.");
494 info->onConnected_(false);
495 return;
496 }
497
498 if (!info->ice_) {
499 info->onConnected_(false);
500 return;
501 }
502
503 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
504
505 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
506 if (config_->logger)
507 config_->logger->warn("start ICE failed");
508 info->onConnected_(false);
509 return;
510 }
511 info->onConnected_(true);
512}
513
514bool
515ConnectionManager::Impl::connectDeviceOnNegoDone(
516 const DeviceId& deviceId,
517 const std::string& name,
518 const dht::Value::Id& vid,
519 const std::shared_ptr<dht::crypto::Certificate>& cert)
520{
521 auto info = getInfo(deviceId, vid);
522 if (!info)
523 return false;
524
525 std::unique_lock<std::mutex> lk {info->mutex_};
526 if (info->waitForAnswer_) {
527 // Negotiation is done and connected, go to handshake
528 // and avoid any cancellation at this point.
529 info->waitForAnswer_->cancel();
530 }
531 auto& ice = info->ice_;
532 if (!ice || !ice->isRunning()) {
533 if (config_->logger)
534 config_->logger->error("No ICE detected or not running");
535 return false;
536 }
537
538 // Build socket
539 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
540 std::move(ice)),
541 true);
542
543 // Negotiate a TLS session
544 if (config_->logger)
545 config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid);
546 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
547 certStore(),
548 identity(),
549 dhParams(),
550 *cert);
551
552 info->tls_->setOnReady(
553 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
554 bool ok) {
555 if (auto shared = w.lock())
556 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
557 });
558 return true;
559}
560
561void
562ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
563 const std::string& name,
564 ConnectCallback cb,
565 bool noNewSocket,
566 bool forceNewSocket,
567 const std::string& connType)
568{
569 if (!dht()) {
570 cb(nullptr, deviceId);
571 return;
572 }
573 if (deviceId.toString() == identity().second->getLongId().toString()) {
574 cb(nullptr, deviceId);
575 return;
576 }
577 findCertificate(deviceId,
578 [w = weak(),
579 deviceId,
580 name,
581 cb = std::move(cb),
582 noNewSocket,
583 forceNewSocket,
584 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
585 if (!cert) {
586 if (auto shared = w.lock())
587 if (shared->config_->logger)
588 shared->config_->logger->error(
589 "No valid certificate found for device {}",
590 deviceId);
591 cb(nullptr, deviceId);
592 return;
593 }
594 if (auto shared = w.lock()) {
595 shared->connectDevice(cert,
596 name,
597 std::move(cb),
598 noNewSocket,
599 forceNewSocket,
600 connType);
601 } else
602 cb(nullptr, deviceId);
603 });
604}
605
606void
607ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
608 const std::string& name,
609 ConnectCallback cb,
610 bool noNewSocket,
611 bool forceNewSocket,
612 const std::string& connType)
613{
614 // Avoid dht operation in a DHT callback to avoid deadlocks
615 dht::ThreadPool::computation().run([w = weak(),
616 name = std::move(name),
617 cert = std::move(cert),
618 cb = std::move(cb),
619 noNewSocket,
620 forceNewSocket,
621 connType] {
622 auto devicePk = cert->getSharedPublicKey();
623 auto deviceId = devicePk->getLongId();
624 auto sthis = w.lock();
625 if (!sthis || sthis->isDestroying_) {
626 cb(nullptr, deviceId);
627 return;
628 }
629 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
630 auto isConnectingToDevice = false;
631 {
632 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400633 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
634 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400635 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400636 while (pendings.connecting.find(vid) != pendings.connecting.end()
637 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400638 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400639 }
640 }
641 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400642 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400643 // Save current request for sendChannelRequest.
644 // Note: do not return here, cause we can be in a state where first
645 // socket is negotiated and first channel is pending
646 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400647 if (isConnectingToDevice && !forceNewSocket)
648 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400649 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400650 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400651 }
652
653 // Check if already negotiated
654 CallbackId cbId(deviceId, vid);
655 if (auto info = sthis->getConnectedInfo(deviceId)) {
656 std::lock_guard<std::mutex> lk(info->mutex_);
657 if (info->socket_) {
658 if (sthis->config_->logger)
659 sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId);
660 info->cbIds_.emplace(cbId);
661 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
662 return;
663 }
664 }
665
666 if (isConnectingToDevice && !forceNewSocket) {
667 if (sthis->config_->logger)
668 sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId);
669 return;
670 }
671 if (noNewSocket) {
672 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400673 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400674 return;
675 }
676
677 // Note: used when the ice negotiation fails to erase
678 // all stored structures.
679 auto eraseInfo = [w, cbId] {
680 if (auto shared = w.lock()) {
681 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400682 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400683 std::lock_guard<std::mutex> lk(shared->infosMtx_);
684 shared->infos_.erase(cbId);
685 }
686 };
687
688 // If no socket exists, we need to initiate an ICE connection.
689 sthis->getIceOptions([w,
690 deviceId = std::move(deviceId),
691 devicePk = std::move(devicePk),
692 name = std::move(name),
693 cert = std::move(cert),
694 vid,
695 connType,
696 eraseInfo](auto&& ice_config) {
697 auto sthis = w.lock();
698 if (!sthis) {
699 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
700 return;
701 }
702 ice_config.tcpEnable = true;
703 ice_config.onInitDone = [w,
704 deviceId = std::move(deviceId),
705 devicePk = std::move(devicePk),
706 name = std::move(name),
707 cert = std::move(cert),
708 vid,
709 connType,
710 eraseInfo](bool ok) {
711 dht::ThreadPool::io().run([w = std::move(w),
712 devicePk = std::move(devicePk),
713 vid = std::move(vid),
714 eraseInfo,
715 connType, ok] {
716 auto sthis = w.lock();
717 if (!ok && sthis && sthis->config_->logger)
718 sthis->config_->logger->error("Cannot initialize ICE session.");
719 if (!sthis || !ok) {
720 eraseInfo();
721 return;
722 }
723 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
724 if (!ok) {
725 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
726 }
727 });
728 });
729 };
730 ice_config.onNegoDone = [w,
731 deviceId,
732 name,
733 cert = std::move(cert),
734 vid,
735 eraseInfo](bool ok) {
736 dht::ThreadPool::io().run([w = std::move(w),
737 deviceId = std::move(deviceId),
738 name = std::move(name),
739 cert = std::move(cert),
740 vid = std::move(vid),
741 eraseInfo = std::move(eraseInfo),
742 ok] {
743 auto sthis = w.lock();
744 if (!ok && sthis && sthis->config_->logger)
745 sthis->config_->logger->error("ICE negotiation failed.");
746 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
747 eraseInfo();
748 });
749 };
750
751 auto info = std::make_shared<ConnectionInfo>();
752 {
753 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
754 sthis->infos_[{deviceId, vid}] = info;
755 }
756 std::unique_lock<std::mutex> lk {info->mutex_};
757 ice_config.master = false;
758 ice_config.streamsCount = 1;
759 ice_config.compCountPerStream = 1;
760 info->ice_ = sthis->iceFactory_.createUTransport("");
761 if (!info->ice_) {
762 if (sthis->config_->logger)
763 sthis->config_->logger->error("Cannot initialize ICE session.");
764 eraseInfo();
765 return;
766 }
767 // We need to detect any shutdown if the ice session is destroyed before going to the
768 // TLS session;
769 info->ice_->setOnShutdown([eraseInfo]() {
770 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
771 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400772 try {
773 info->ice_->initIceInstance(ice_config);
774 } catch (const std::exception& e) {
775 if (sthis->config_->logger)
776 sthis->config_->logger->error("{}", e.what());
777 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
778 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400779 });
780 });
781}
782
783void
784ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
785 const std::string& name,
786 const DeviceId& deviceId,
787 const dht::Value::Id& vid)
788{
789 auto channelSock = sock->addChannel(name);
790 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
791 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400792 if (auto shared = w.lock())
793 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400794 });
795 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400796 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400797 auto shared = w.lock();
798 auto channelSock = wSock.lock();
799 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400800 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400801 });
802
803 ChannelRequest val;
804 val.name = channelSock->name();
805 val.state = ChannelRequestState::REQUEST;
806 val.channel = channelSock->channel();
807 msgpack::sbuffer buffer(256);
808 msgpack::pack(buffer, val);
809
810 std::error_code ec;
811 int res = sock->write(CONTROL_CHANNEL,
812 reinterpret_cast<const uint8_t*>(buffer.data()),
813 buffer.size(),
814 ec);
815 if (res < 0) {
816 // TODO check if we should handle errors here
817 if (config_->logger)
818 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
819 }
820}
821
822void
823ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
824{
825 auto device = req.owner->getLongId();
826 if (config_->logger)
827 config_->logger->debug("New response received from {}", device);
828 if (auto info = getInfo(device, req.id)) {
829 std::lock_guard<std::mutex> lk {info->mutex_};
830 info->responseReceived_ = true;
831 info->response_ = std::move(req);
832 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
833 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
834 this,
835 std::placeholders::_1,
836 device,
837 req.id));
838 } else {
839 if (config_->logger)
840 config_->logger->warn("Respond received, but cannot find request");
841 }
842}
843
844void
845ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
846{
847 if (!dht())
848 return;
849 dht()->listen<PeerConnectionRequest>(
850 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
851 [w = weak()](PeerConnectionRequest&& req) {
852 auto shared = w.lock();
853 if (!shared)
854 return false;
855 if (shared->isMessageTreated(to_hex_string(req.id))) {
856 // Message already treated. Just ignore
857 return true;
858 }
859 if (req.isAnswer) {
860 if (shared->config_->logger)
861 shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId());
862 } else {
863 if (shared->config_->logger)
864 shared->config_->logger->debug("Received request from {}", req.owner->getLongId());
865 }
866 if (req.isAnswer) {
867 shared->onPeerResponse(req);
868 } else {
869 // Async certificate checking
870 shared->dht()->findCertificate(
871 req.from,
872 [w, req = std::move(req)](
873 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
874 auto shared = w.lock();
875 if (!shared)
876 return;
877 dht::InfoHash peer_h;
878 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
879#if TARGET_OS_IOS
880 if (shared->iOSConnectedCb_(req.connType, peer_h))
881 return;
882#endif
883 shared->onDhtPeerRequest(req, cert);
884 } else {
885 if (shared->config_->logger)
886 shared->config_->logger->warn(
887 "Received request from untrusted peer {}",
888 req.owner->getLongId());
889 }
890 });
891 }
892
893 return true;
894 },
895 dht::Value::UserTypeFilter("peer_request"));
896}
897
898void
899ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
900 const DeviceId& deviceId,
901 const dht::Value::Id& vid,
902 const std::string& name)
903{
904 if (isDestroying_)
905 return;
906 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
907 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
908 // asked yet)
909 auto isDhtRequest = name.empty();
910 if (!ok) {
911 if (isDhtRequest) {
912 if (config_->logger)
913 config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}",
914 deviceId,
915 name,
916 vid);
917 if (connReadyCb_)
918 connReadyCb_(deviceId, "", nullptr);
919 } else {
920 if (config_->logger)
921 config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}",
922 deviceId,
923 name,
924 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400925 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400926 }
927 } else {
928 // The socket is ready, store it
929 if (isDhtRequest) {
930 if (config_->logger)
931 config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}",
932 deviceId,
933 vid);
934 } else {
935 if (config_->logger)
936 config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}",
937 deviceId,
938 name,
939 vid);
940 }
941
942 auto info = getInfo(deviceId, vid);
943 addNewMultiplexedSocket({deviceId, vid}, info);
944 // Finally, open the channel and launch pending callbacks
945 if (info->socket_) {
946 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -0400947 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400948 if (config_->logger)
949 config_->logger->debug("Send request on TLS socket for channel {} to {}",
Adrien Béraud665294f2023-06-13 18:09:11 -0400950 name,
951 deviceId.toString());
952 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400953 }
954 }
955 }
956}
957
958void
959ConnectionManager::Impl::answerTo(IceTransport& ice,
960 const dht::Value::Id& id,
961 const std::shared_ptr<dht::crypto::PublicKey>& from)
962{
963 // NOTE: This is a shortest version of a real SDP message to save some bits
964 auto iceAttributes = ice.getLocalAttributes();
965 std::ostringstream icemsg;
966 icemsg << iceAttributes.ufrag << "\n";
967 icemsg << iceAttributes.pwd << "\n";
968 for (const auto& addr : ice.getLocalCandidates(1)) {
969 icemsg << addr << "\n";
970 }
971
972 // Send PeerConnection response
973 PeerConnectionRequest val;
974 val.id = id;
975 val.ice_msg = icemsg.str();
976 val.isAnswer = true;
977 auto value = std::make_shared<dht::Value>(std::move(val));
978 value->user_type = "peer_request";
979
980 if (config_->logger)
981 config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId());
982 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
983 + from->getId().toString()),
984 from,
985 value,
986 [from,l=config_->logger](bool ok) {
987 if (l)
988 l->debug("Answer to connection request from {:s}. Put encrypted {:s}",
989 from->getLongId(),
990 (ok ? "ok" : "failed"));
991 });
992}
993
994bool
995ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
996{
997 auto deviceId = req.owner->getLongId();
998 auto info = getInfo(deviceId, req.id);
999 if (!info)
1000 return false;
1001
1002 std::unique_lock<std::mutex> lk {info->mutex_};
1003 auto& ice = info->ice_;
1004 if (!ice) {
1005 if (config_->logger)
1006 config_->logger->error("No ICE detected");
1007 if (connReadyCb_)
1008 connReadyCb_(deviceId, "", nullptr);
1009 return false;
1010 }
1011
1012 auto sdp = ice->parseIceCandidates(req.ice_msg);
1013 answerTo(*ice, req.id, req.owner);
1014 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1015 if (config_->logger)
1016 config_->logger->error("Start ICE failed - fallback to TURN");
1017 ice = nullptr;
1018 if (connReadyCb_)
1019 connReadyCb_(deviceId, "", nullptr);
1020 return false;
1021 }
1022 return true;
1023}
1024
1025bool
1026ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1027{
1028 auto deviceId = req.owner->getLongId();
1029 auto info = getInfo(deviceId, req.id);
1030 if (!info)
1031 return false;
1032
1033 std::unique_lock<std::mutex> lk {info->mutex_};
1034 auto& ice = info->ice_;
1035 if (!ice) {
1036 if (config_->logger)
1037 config_->logger->error("No ICE detected");
1038 return false;
1039 }
1040
1041 // Build socket
1042 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1043 std::move(ice)),
1044 false);
1045
1046 // init TLS session
1047 auto ph = req.from;
1048 if (config_->logger)
1049 config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}",
1050 req.from,
1051 req.id);
1052 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1053 std::move(endpoint),
1054 certStore(),
1055 identity(),
1056 dhParams(),
1057 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1058 auto shared = w.lock();
1059 if (!shared)
1060 return false;
1061 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1062 if (!crt)
1063 return false;
1064 return crt->getPacked() == cert.getPacked();
1065 });
1066
1067 info->tls_->setOnReady(
1068 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1069 if (auto shared = w.lock())
1070 shared->onTlsNegotiationDone(ok, deviceId, vid);
1071 });
1072 return true;
1073}
1074
1075void
1076ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1077 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1078{
1079 auto deviceId = req.owner->getLongId();
1080 if (config_->logger)
1081 config_->logger->debug("New connection request from {}", deviceId);
1082 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1083 if (config_->logger)
1084 config_->logger->debug("Refuse connection from {}", deviceId);
1085 return;
1086 }
1087
1088 // Because the connection is accepted, create an ICE socket.
1089 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1090 auto shared = w.lock();
1091 if (!shared)
1092 return;
1093 // Note: used when the ice negotiation fails to erase
1094 // all stored structures.
1095 auto eraseInfo = [w, id = req.id, deviceId] {
1096 if (auto shared = w.lock()) {
1097 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001098 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001099 if (shared->connReadyCb_)
1100 shared->connReadyCb_(deviceId, "", nullptr);
1101 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1102 shared->infos_.erase({deviceId, id});
1103 }
1104 };
1105
1106 ice_config.tcpEnable = true;
1107 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1108 auto shared = w.lock();
1109 if (!shared)
1110 return;
1111 if (!ok) {
1112 if (shared->config_->logger)
1113 shared->config_->logger->error("Cannot initialize ICE session.");
1114 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1115 return;
1116 }
1117
1118 dht::ThreadPool::io().run(
1119 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1120 auto shared = w.lock();
1121 if (!shared)
1122 return;
1123 if (!shared->onRequestStartIce(req))
1124 eraseInfo();
1125 });
1126 };
1127
1128 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1129 auto shared = w.lock();
1130 if (!shared)
1131 return;
1132 if (!ok) {
1133 if (shared->config_->logger)
1134 shared->config_->logger->error("ICE negotiation failed.");
1135 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1136 return;
1137 }
1138
1139 dht::ThreadPool::io().run(
1140 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1141 if (auto shared = w.lock())
1142 if (!shared->onRequestOnNegoDone(req))
1143 eraseInfo();
1144 });
1145 };
1146
1147 // Negotiate a new ICE socket
1148 auto info = std::make_shared<ConnectionInfo>();
1149 {
1150 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1151 shared->infos_[{deviceId, req.id}] = info;
1152 }
1153 if (shared->config_->logger)
1154 shared->config_->logger->debug("Accepting connection from {}", deviceId);
1155 std::unique_lock<std::mutex> lk {info->mutex_};
1156 ice_config.streamsCount = 1;
1157 ice_config.compCountPerStream = 1; // TCP
1158 ice_config.master = true;
1159 info->ice_ = shared->iceFactory_.createUTransport("");
1160 if (not info->ice_) {
1161 if (shared->config_->logger)
1162 shared->config_->logger->error("Cannot initialize ICE session");
1163 eraseInfo();
1164 return;
1165 }
1166 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1167 info->ice_->setOnShutdown([eraseInfo]() {
1168 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1169 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001170 try {
1171 info->ice_->initIceInstance(ice_config);
1172 } catch (const std::exception& e) {
1173 if (shared->config_->logger)
1174 shared->config_->logger->error("{}", e.what());
1175 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1176 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001177 });
1178}
1179
1180void
1181ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1182{
1183 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1184 info->socket_->setOnReady(
1185 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1186 if (auto sthis = w.lock())
1187 if (sthis->connReadyCb_)
1188 sthis->connReadyCb_(deviceId, socket->name(), socket);
1189 });
1190 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1191 const uint16_t&,
1192 const std::string& name) {
1193 if (auto sthis = w.lock())
1194 if (sthis->channelReqCb_)
1195 return sthis->channelReqCb_(peer, name);
1196 return false;
1197 });
1198 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1199 // Cancel current outgoing connections
1200 dht::ThreadPool::io().run([w, deviceId, vid] {
1201 auto sthis = w.lock();
1202 if (!sthis)
1203 return;
1204
1205 std::set<CallbackId> ids;
1206 if (auto info = sthis->getInfo(deviceId, vid)) {
1207 std::lock_guard<std::mutex> lk(info->mutex_);
1208 if (info->socket_) {
1209 ids = std::move(info->cbIds_);
1210 info->socket_->shutdown();
1211 }
1212 }
1213 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001214 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001215
1216 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1217 sthis->infos_.erase({deviceId, vid});
1218 });
1219 });
1220}
1221
1222const std::shared_future<tls::DhParams>
1223ConnectionManager::Impl::dhParams() const
1224{
1225 return dht::ThreadPool::computation().get<tls::DhParams>(
1226 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1227 ;
1228}
1229
1230template<typename ID = dht::Value::Id>
1231std::set<ID, std::less<>>
1232loadIdList(const std::string& path)
1233{
1234 std::set<ID, std::less<>> ids;
1235 std::ifstream file = fileutils::ifstream(path);
1236 if (!file.is_open()) {
1237 //JAMI_DBG("Could not load %s", path.c_str());
1238 return ids;
1239 }
1240 std::string line;
1241 while (std::getline(file, line)) {
1242 if constexpr (std::is_same<ID, std::string>::value) {
1243 ids.emplace(std::move(line));
1244 } else if constexpr (std::is_integral<ID>::value) {
1245 ID vid;
1246 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1247 ec == std::errc()) {
1248 ids.emplace(vid);
1249 }
1250 }
1251 }
1252 return ids;
1253}
1254
1255template<typename List = std::set<dht::Value::Id>>
1256void
1257saveIdList(const std::string& path, const List& ids)
1258{
1259 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1260 if (!file.is_open()) {
1261 //JAMI_ERR("Could not save to %s", path.c_str());
1262 return;
1263 }
1264 for (auto& c : ids)
1265 file << std::hex << c << "\n";
1266}
1267
1268void
1269ConnectionManager::Impl::loadTreatedMessages()
1270{
1271 std::lock_guard<std::mutex> lock(messageMutex_);
1272 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1273 treatedMessages_ = loadIdList<std::string>(path);
1274 if (treatedMessages_.empty()) {
1275 auto messages = loadIdList(path);
1276 for (const auto& m : messages)
1277 treatedMessages_.emplace(to_hex_string(m));
1278 }
1279}
1280
1281void
1282ConnectionManager::Impl::saveTreatedMessages() const
1283{
1284 dht::ThreadPool::io().run([w = weak()]() {
1285 if (auto sthis = w.lock()) {
1286 auto& this_ = *sthis;
1287 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1288 fileutils::check_dir(this_.config_->cachePath.c_str());
1289 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1290 + DIR_SEPARATOR_STR "treatedMessages",
1291 this_.treatedMessages_);
1292 }
1293 });
1294}
1295
1296bool
1297ConnectionManager::Impl::isMessageTreated(std::string_view id)
1298{
1299 std::lock_guard<std::mutex> lock(messageMutex_);
1300 auto res = treatedMessages_.emplace(id);
1301 if (res.second) {
1302 saveTreatedMessages();
1303 return false;
1304 }
1305 return true;
1306}
1307
1308/**
1309 * returns whether or not UPnP is enabled and active_
1310 * ie: if it is able to make port mappings
1311 */
1312bool
1313ConnectionManager::Impl::getUPnPActive() const
1314{
1315 return config_->getUPnPActive();
1316}
1317
1318IpAddr
1319ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1320{
1321 if (family == AF_INET)
1322 return publishedIp_[0];
1323 if (family == AF_INET6)
1324 return publishedIp_[1];
1325
1326 assert(family == AF_UNSPEC);
1327
1328 // If family is not set, prefere IPv4 if available. It's more
1329 // likely to succeed behind NAT.
1330 if (publishedIp_[0])
1331 return publishedIp_[0];
1332 if (publishedIp_[1])
1333 return publishedIp_[1];
1334 return {};
1335}
1336
1337void
1338ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1339{
1340 if (ip_addr.getFamily() == AF_INET) {
1341 publishedIp_[0] = ip_addr;
1342 } else {
1343 publishedIp_[1] = ip_addr;
1344 }
1345}
1346
1347void
1348ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1349{
1350 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1351 bool hasIpv4 {false}, hasIpv6 {false};
1352 for (auto& result : results) {
1353 auto family = result.getFamily();
1354 if (family == AF_INET) {
1355 if (not hasIpv4) {
1356 hasIpv4 = true;
1357 if (config_->logger)
1358 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1359 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1360 setPublishedAddress(*result.get());
1361 if (config_->upnpCtrl) {
1362 config_->upnpCtrl->setPublicAddress(*result.get());
1363 }
1364 }
1365 } else if (family == AF_INET6) {
1366 if (not hasIpv6) {
1367 hasIpv6 = true;
1368 if (config_->logger)
1369 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1370 setPublishedAddress(*result.get());
1371 }
1372 }
1373 if (hasIpv4 and hasIpv6)
1374 break;
1375 }
1376 if (cb)
1377 cb();
1378 });
1379}
1380
1381void
1382ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1383{
1384 storeActiveIpAddress([this, cb = std::move(cb)] {
1385 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1386 auto publishedAddr = getPublishedIpAddress();
1387
1388 if (publishedAddr) {
1389 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1390 publishedAddr.getFamily());
1391 if (interfaceAddr) {
1392 opts.accountLocalAddr = interfaceAddr;
1393 opts.accountPublicAddr = publishedAddr;
1394 }
1395 }
1396 if (cb)
1397 cb(std::move(opts));
1398 });
1399}
1400
1401IceTransportOptions
1402ConnectionManager::Impl::getIceOptions() const noexcept
1403{
1404 IceTransportOptions opts;
Adrien Béraudf7081d32023-07-19 23:02:11 -04001405 opts.factory = (IceTransportFactory*)&iceFactory_;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001406 opts.upnpEnable = getUPnPActive();
1407
1408 if (config_->stunEnabled)
1409 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1410 if (config_->turnEnabled) {
1411 auto cached = false;
1412 std::lock_guard<std::mutex> lk(config_->cachedTurnMutex);
1413 cached = config_->cacheTurnV4 || config_->cacheTurnV6;
1414 if (config_->cacheTurnV4) {
1415 opts.turnServers.emplace_back(TurnServerInfo()
1416 .setUri(config_->cacheTurnV4.toString())
1417 .setUsername(config_->turnServerUserName)
1418 .setPassword(config_->turnServerPwd)
1419 .setRealm(config_->turnServerRealm));
1420 }
1421 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1422 // co issues. So this needs some debug. for now just disable
1423 // if (cacheTurnV6 && *cacheTurnV6) {
1424 // opts.turnServers.emplace_back(TurnServerInfo()
1425 // .setUri(cacheTurnV6->toString(true))
1426 // .setUsername(turnServerUserName_)
1427 // .setPassword(turnServerPwd_)
1428 // .setRealm(turnServerRealm_));
1429 //}
1430 // Nothing cached, so do the resolution
1431 if (!cached) {
1432 opts.turnServers.emplace_back(TurnServerInfo()
1433 .setUri(config_->turnServer)
1434 .setUsername(config_->turnServerUserName)
1435 .setPassword(config_->turnServerPwd)
1436 .setRealm(config_->turnServerRealm));
1437 }
1438 }
1439 return opts;
1440}
1441
1442bool
1443ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1444 dht::InfoHash& account_id,
1445 const std::shared_ptr<Logger>& logger)
1446{
1447 if (not crt)
1448 return false;
1449
1450 auto top_issuer = crt;
1451 while (top_issuer->issuer)
1452 top_issuer = top_issuer->issuer;
1453
1454 // Device certificate can't be self-signed
1455 if (top_issuer == crt) {
1456 if (logger)
1457 logger->warn("Found invalid peer device: {}", crt->getLongId());
1458 return false;
1459 }
1460
1461 // Check peer certificate chain
1462 // Trust store with top issuer as the only CA
1463 dht::crypto::TrustList peer_trust;
1464 peer_trust.add(*top_issuer);
1465 if (not peer_trust.verify(*crt)) {
1466 if (logger)
1467 logger->warn("Found invalid peer device: {}", crt->getLongId());
1468 return false;
1469 }
1470
1471 // Check cached OCSP response
1472 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1473 if (logger)
1474 logger->error("Certificate %s is disabled by cached OCSP response", crt->getLongId());
1475 return false;
1476 }
1477
1478 account_id = crt->issuer->getId();
1479 if (logger)
1480 logger->warn("Found peer device: {} account:{} CA:{}",
1481 crt->getLongId(),
1482 account_id,
1483 top_issuer->getId());
1484 return true;
1485}
1486
1487bool
1488ConnectionManager::Impl::findCertificate(
1489 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1490{
1491 if (auto cert = certStore().getCertificate(id.toString())) {
1492 if (cb)
1493 cb(cert);
1494 } else if (cb)
1495 cb(nullptr);
1496 return true;
1497}
1498
1499ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1500 : pimpl_ {std::make_shared<Impl>(config_)}
1501{}
1502
1503ConnectionManager::~ConnectionManager()
1504{
1505 if (pimpl_)
1506 pimpl_->shutdown();
1507}
1508
1509void
1510ConnectionManager::connectDevice(const DeviceId& deviceId,
1511 const std::string& name,
1512 ConnectCallback cb,
1513 bool noNewSocket,
1514 bool forceNewSocket,
1515 const std::string& connType)
1516{
1517 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1518}
1519
1520void
1521ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1522 const std::string& name,
1523 ConnectCallback cb,
1524 bool noNewSocket,
1525 bool forceNewSocket,
1526 const std::string& connType)
1527{
1528 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1529}
1530
1531bool
1532ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1533{
Adrien Béraud665294f2023-06-13 18:09:11 -04001534 auto pending = pimpl_->getPendingIds(deviceId);
1535 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001536 != pending.end();
1537}
1538
1539void
1540ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1541{
1542 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1543 std::set<DeviceId> peersDevices;
1544 {
1545 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1546 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1547 auto const& [key, value] = *iter;
1548 auto deviceId = key.first;
1549 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1550 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1551 connInfos.emplace_back(value);
1552 peersDevices.emplace(deviceId);
1553 iter = pimpl_->infos_.erase(iter);
1554 } else {
1555 iter++;
1556 }
1557 }
1558 }
1559 // Stop connections to all peers devices
1560 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001561 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001562 // This will close the TLS Session
1563 pimpl_->removeUnusedConnections(deviceId);
1564 }
1565 for (auto& info : connInfos) {
1566 if (info->socket_)
1567 info->socket_->shutdown();
1568 if (info->waitForAnswer_)
1569 info->waitForAnswer_->cancel();
1570 if (info->ice_) {
1571 std::unique_lock<std::mutex> lk {info->mutex_};
1572 dht::ThreadPool::io().run(
1573 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1574 }
1575 }
1576}
1577
1578void
1579ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1580{
1581 pimpl_->onDhtConnected(devicePk);
1582}
1583
1584void
1585ConnectionManager::onICERequest(onICERequestCallback&& cb)
1586{
1587 pimpl_->iceReqCb_ = std::move(cb);
1588}
1589
1590void
1591ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1592{
1593 pimpl_->channelReqCb_ = std::move(cb);
1594}
1595
1596void
1597ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1598{
1599 pimpl_->connReadyCb_ = std::move(cb);
1600}
1601
1602void
1603ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1604{
1605 pimpl_->iOSConnectedCb_ = std::move(cb);
1606}
1607
1608std::size_t
1609ConnectionManager::activeSockets() const
1610{
1611 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1612 return pimpl_->infos_.size();
1613}
1614
1615void
1616ConnectionManager::monitor() const
1617{
1618 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1619 auto logger = pimpl_->config_->logger;
1620 if (!logger)
1621 return;
1622 logger->debug("ConnectionManager current status:");
1623 for (const auto& [_, ci] : pimpl_->infos_) {
1624 if (ci->socket_)
1625 ci->socket_->monitor();
1626 }
1627 logger->debug("ConnectionManager end status.");
1628}
1629
1630void
1631ConnectionManager::connectivityChanged()
1632{
1633 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1634 for (const auto& [_, ci] : pimpl_->infos_) {
1635 if (ci->socket_)
1636 ci->socket_->sendBeacon();
1637 }
1638}
1639
1640void
1641ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1642{
1643 return pimpl_->getIceOptions(std::move(cb));
1644}
1645
1646IceTransportOptions
1647ConnectionManager::getIceOptions() const noexcept
1648{
1649 return pimpl_->getIceOptions();
1650}
1651
1652IpAddr
1653ConnectionManager::getPublishedIpAddress(uint16_t family) const
1654{
1655 return pimpl_->getPublishedIpAddress(family);
1656}
1657
1658void
1659ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1660{
1661 return pimpl_->setPublishedAddress(ip_addr);
1662}
1663
1664void
1665ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1666{
1667 return pimpl_->storeActiveIpAddress(std::move(cb));
1668}
1669
1670std::shared_ptr<ConnectionManager::Config>
1671ConnectionManager::getConfig()
1672{
1673 return pimpl_->config_;
1674}
1675
Sébastien Blin464bdff2023-07-19 08:02:53 -04001676} // namespace dhtnet