blob: 9d955a5275e132978c4b49d870ac34c58cb9fbf9 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
Adrien Béraud6de3f882023-07-06 12:56:29 -040019#include "ice_transport_factory.h"
Adrien Béraud612b55b2023-05-29 10:42:04 -040020#include "upnp/upnp_control.h"
21#include "certstore.h"
22#include "fileutils.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25
26#include <opendht/crypto.h>
27#include <opendht/thread_pool.h>
28#include <opendht/value.h>
29#include <asio.hpp>
30
31#include <algorithm>
32#include <mutex>
33#include <map>
34#include <condition_variable>
35#include <set>
36#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040037#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040038
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040039namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040040static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
41static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
42
43using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040044using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040045
46struct ConnectionInfo
47{
48 ~ConnectionInfo()
49 {
50 if (socket_)
51 socket_->join();
52 }
53
54 std::mutex mutex_ {};
55 bool responseReceived_ {false};
56 PeerConnectionRequest response_ {};
57 std::unique_ptr<IceTransport> ice_ {nullptr};
58 // Used to store currently non ready TLS Socket
59 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
60 std::shared_ptr<MultiplexedSocket> socket_ {};
61 std::set<CallbackId> cbIds_ {};
62
63 std::function<void(bool)> onConnected_;
64 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
65};
66
67/**
68 * returns whether or not UPnP is enabled and active_
69 * ie: if it is able to make port mappings
70 */
71bool
72ConnectionManager::Config::getUPnPActive() const
73{
74 if (upnpCtrl)
75 return upnpCtrl->isReady();
76 return false;
77}
78
79class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
80{
81public:
82 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
83 : config_ {std::move(config_)}
84 {}
85 ~Impl() {}
86
87 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
88 const dht::crypto::Identity& identity() const { return config_->id; }
89
90 void removeUnusedConnections(const DeviceId& deviceId = {})
91 {
92 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
93
94 {
95 std::lock_guard<std::mutex> lk(infosMtx_);
96 for (auto it = infos_.begin(); it != infos_.end();) {
97 auto& [key, info] = *it;
98 if (info && (!deviceId || key.first == deviceId)) {
99 unused.emplace_back(std::move(info));
100 it = infos_.erase(it);
101 } else {
102 ++it;
103 }
104 }
105 }
106 for (auto& info: unused) {
107 if (info->tls_)
108 info->tls_->shutdown();
109 if (info->socket_)
110 info->socket_->shutdown();
111 if (info->waitForAnswer_)
112 info->waitForAnswer_->cancel();
113 }
114 if (!unused.empty())
115 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
116 }
117
118 void shutdown()
119 {
120 if (isDestroying_.exchange(true))
121 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400122 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400123 {
124 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400125 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400126 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400127 for (auto& [deviceId, pcbs] : po) {
128 for (auto& [id, pending] : pcbs.connecting)
129 pending.cb(nullptr, deviceId);
130 for (auto& [id, pending] : pcbs.waiting)
131 pending.cb(nullptr, deviceId);
132 }
133
Adrien Béraud612b55b2023-05-29 10:42:04 -0400134 removeUnusedConnections();
135 }
136
Adrien Béraud612b55b2023-05-29 10:42:04 -0400137 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
138 const dht::Value::Id& vid,
139 const std::string& connType,
140 std::function<void(bool)> onConnected);
141 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
142 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
143 const std::string& name,
144 const dht::Value::Id& vid,
145 const std::shared_ptr<dht::crypto::Certificate>& cert);
146 void connectDevice(const DeviceId& deviceId,
147 const std::string& uri,
148 ConnectCallback cb,
149 bool noNewSocket = false,
150 bool forceNewSocket = false,
151 const std::string& connType = "");
152 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
153 const std::string& name,
154 ConnectCallback cb,
155 bool noNewSocket = false,
156 bool forceNewSocket = false,
157 const std::string& connType = "");
158 /**
159 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
160 * @param sock socket used to send the request
161 * @param name channel's name
162 * @param vid channel's id
163 * @param deviceId to identify the linked ConnectCallback
164 */
165 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
166 const std::string& name,
167 const DeviceId& deviceId,
168 const dht::Value::Id& vid);
169 /**
170 * Triggered when a PeerConnectionRequest comes from the DHT
171 */
172 void answerTo(IceTransport& ice,
173 const dht::Value::Id& id,
174 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
175 bool onRequestStartIce(const PeerConnectionRequest& req);
176 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
177 void onDhtPeerRequest(const PeerConnectionRequest& req,
178 const std::shared_ptr<dht::crypto::Certificate>& cert);
179
180 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
181 void onPeerResponse(const PeerConnectionRequest& req);
182 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
183
184 const std::shared_future<tls::DhParams> dhParams() const;
185 tls::CertificateStore& certStore() const { return *config_->certStore; }
186
187 mutable std::mutex messageMutex_ {};
188 std::set<std::string, std::less<>> treatedMessages_ {};
189
190 void loadTreatedMessages();
191 void saveTreatedMessages() const;
192
193 /// \return true if the given DHT message identifier has been treated
194 /// \note if message has not been treated yet this method st/ore this id and returns true at
195 /// further calls
196 bool isMessageTreated(std::string_view id);
197
198 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
199
200 /**
201 * Published IPv4/IPv6 addresses, used only if defined by the user in account
202 * configuration
203 *
204 */
205 IpAddr publishedIp_[2] {};
206
207 // This will be stored in the configuration
208 std::string publishedIpAddress_ {};
209
210 /**
211 * Published port, used only if defined by the user
212 */
213 pj_uint16_t publishedPort_ {sip_utils::DEFAULT_SIP_PORT};
214
215 /**
216 * interface name on which this account is bound
217 */
218 std::string interface_ {"default"};
219
220 /**
221 * Get the local interface name on which this account is bound.
222 */
223 const std::string& getLocalInterface() const { return interface_; }
224
225 /**
226 * Get the published IP address, fallbacks to NAT if family is unspecified
227 * Prefers the usage of IPv4 if possible.
228 */
229 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
230
231 /**
232 * Set published IP address according to given family
233 */
234 void setPublishedAddress(const IpAddr& ip_addr);
235
236 /**
237 * Store the local/public addresses used to register
238 */
239 void storeActiveIpAddress(std::function<void()>&& cb = {});
240
241 /**
242 * Create and return ICE options.
243 */
244 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
245 IceTransportOptions getIceOptions() const noexcept;
246
247 /**
248 * Inform that a potential peer device have been found.
249 * Returns true only if the device certificate is a valid device certificate.
250 * In that case (true is returned) the account_id parameter is set to the peer account ID.
251 */
252 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
253 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
254
255 bool findCertificate(const dht::PkId& id,
256 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
257
258 /**
259 * returns whether or not UPnP is enabled and active
260 * ie: if it is able to make port mappings
261 */
262 bool getUPnPActive() const;
263
264 /**
265 * Triggered when a new TLS socket is ready to use
266 * @param ok If succeed
267 * @param deviceId Related device
268 * @param vid vid of the connection request
269 * @param name non empty if TLS was created by connectDevice()
270 */
271 void onTlsNegotiationDone(bool ok,
272 const DeviceId& deviceId,
273 const dht::Value::Id& vid,
274 const std::string& name = "");
275
276 std::shared_ptr<ConnectionManager::Config> config_;
277
278 IceTransportFactory iceFactory_ {};
279
280 mutable std::mt19937_64 rand;
281
282 iOSConnectedCallback iOSConnectedCb_ {};
283
284 std::mutex infosMtx_ {};
285 // Note: Someone can ask multiple sockets, so to avoid any race condition,
286 // each device can have multiple multiplexed sockets.
287 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
288
289 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
290 {
291 std::lock_guard<std::mutex> lk(infosMtx_);
292 auto it = infos_.find({deviceId, id});
293 if (it != infos_.end())
294 return it->second;
295 return {};
296 }
297
298 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
299 {
300 std::lock_guard<std::mutex> lk(infosMtx_);
301 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
302 auto& [key, value] = item;
303 return key.first == deviceId && value && value->socket_;
304 });
305 if (it != infos_.end())
306 return it->second;
307 return {};
308 }
309
310 ChannelRequestCallback channelReqCb_ {};
311 ConnectionReadyCallback connReadyCb_ {};
312 onICERequestCallback iceReqCb_ {};
313
314 /**
315 * Stores callback from connectDevice
316 * @note: each device needs a vector because several connectDevice can
317 * be done in parallel and we only want one socket
318 */
319 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400320
Adrien Béraud665294f2023-06-13 18:09:11 -0400321 struct PendingCb
322 {
323 std::string name;
324 ConnectCallback cb;
325 };
326 struct PendingOperations {
327 std::map<dht::Value::Id, PendingCb> connecting;
328 std::map<dht::Value::Id, PendingCb> waiting;
329 };
330
331 std::map<DeviceId, PendingOperations> pendingOperations_ {};
332
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400333 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400334 {
335 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400336 std::unique_lock<std::mutex> lk(connectCbsMtx_);
337 auto it = pendingOperations_.find(deviceId);
338 if (it == pendingOperations_.end())
339 return;
340 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400341 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400342 // Extract all pending callbacks
343 for (auto& [vid, cb] : pendingOperations.connecting)
344 ret.emplace_back(std::move(cb));
345 pendingOperations.connecting.clear();
346 for (auto& [vid, cb] : pendingOperations.waiting)
347 ret.emplace_back(std::move(cb));
348 pendingOperations.waiting.clear();
349 } else if (auto n = pendingOperations.waiting.extract(vid)) {
350 // If it's a waiting operation, just move it
351 ret.emplace_back(std::move(n.mapped()));
352 } else if (auto n = pendingOperations.connecting.extract(vid)) {
353 ret.emplace_back(std::move(n.mapped()));
354 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400355 // If accepted is false, it means that underlying socket is ok, but channel is declined
356 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400357 for (auto& [vid, cb] : pendingOperations.waiting)
358 ret.emplace_back(std::move(cb));
359 pendingOperations.waiting.clear();
360 for (auto& [vid, cb] : pendingOperations.connecting)
361 ret.emplace_back(std::move(cb));
362 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400363 }
364 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400365 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
366 pendingOperations_.erase(it);
367 lk.unlock();
368 for (auto& cb : ret)
369 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400370 }
371
Adrien Béraud665294f2023-06-13 18:09:11 -0400372 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400373 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400374 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400375 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400376 auto it = pendingOperations_.find(deviceId);
377 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400378 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400379 auto& pendingOp = it->second;
380 for (const auto& [id, pc]: pendingOp.connecting) {
381 if (vid == 0 || id == vid)
382 ret[id] = pc.name;
383 }
384 for (const auto& [id, pc]: pendingOp.waiting) {
385 if (vid == 0 || id == vid)
386 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400387 }
388 return ret;
389 }
390
391 std::shared_ptr<ConnectionManager::Impl> shared()
392 {
393 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
394 }
395 std::shared_ptr<ConnectionManager::Impl const> shared() const
396 {
397 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
398 }
399 std::weak_ptr<ConnectionManager::Impl> weak()
400 {
401 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
402 }
403 std::weak_ptr<ConnectionManager::Impl const> weak() const
404 {
405 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
406 }
407
408 std::atomic_bool isDestroying_ {false};
409};
410
411void
412ConnectionManager::Impl::connectDeviceStartIce(
413 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
414 const dht::Value::Id& vid,
415 const std::string& connType,
416 std::function<void(bool)> onConnected)
417{
418 auto deviceId = devicePk->getLongId();
419 auto info = getInfo(deviceId, vid);
420 if (!info) {
421 onConnected(false);
422 return;
423 }
424
425 std::unique_lock<std::mutex> lk(info->mutex_);
426 auto& ice = info->ice_;
427
428 if (!ice) {
429 if (config_->logger)
430 config_->logger->error("No ICE detected");
431 onConnected(false);
432 return;
433 }
434
435 auto iceAttributes = ice->getLocalAttributes();
436 std::ostringstream icemsg;
437 icemsg << iceAttributes.ufrag << "\n";
438 icemsg << iceAttributes.pwd << "\n";
439 for (const auto& addr : ice->getLocalCandidates(1)) {
440 icemsg << addr << "\n";
441 if (config_->logger)
442 config_->logger->debug("Added local ICE candidate {}", addr);
443 }
444
445 // Prepare connection request as a DHT message
446 PeerConnectionRequest val;
447
448 val.id = vid; /* Random id for the message unicity */
449 val.ice_msg = icemsg.str();
450 val.connType = connType;
451
452 auto value = std::make_shared<dht::Value>(std::move(val));
453 value->user_type = "peer_request";
454
455 // Send connection request through DHT
456 if (config_->logger)
457 config_->logger->debug("Request connection to {}", deviceId);
458 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
459 + devicePk->getId().toString()),
460 devicePk,
461 value,
462 [l=config_->logger,deviceId](bool ok) {
463 if (l)
464 l->debug("Sent connection request to {:s}. Put encrypted {:s}",
465 deviceId,
466 (ok ? "ok" : "failed"));
467 });
468 // Wait for call to onResponse() operated by DHT
469 if (isDestroying_) {
470 onConnected(true); // This avoid to wait new negotiation when destroying
471 return;
472 }
473
474 info->onConnected_ = std::move(onConnected);
475 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
476 std::chrono::steady_clock::now()
477 + DHT_MSG_TIMEOUT);
478 info->waitForAnswer_->async_wait(
479 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
480}
481
482void
483ConnectionManager::Impl::onResponse(const asio::error_code& ec,
484 const DeviceId& deviceId,
485 const dht::Value::Id& vid)
486{
487 if (ec == asio::error::operation_aborted)
488 return;
489 auto info = getInfo(deviceId, vid);
490 if (!info)
491 return;
492
493 std::unique_lock<std::mutex> lk(info->mutex_);
494 auto& ice = info->ice_;
495 if (isDestroying_) {
496 info->onConnected_(true); // The destructor can wake a pending wait here.
497 return;
498 }
499 if (!info->responseReceived_) {
500 if (config_->logger)
501 config_->logger->error("no response from DHT to E2E request.");
502 info->onConnected_(false);
503 return;
504 }
505
506 if (!info->ice_) {
507 info->onConnected_(false);
508 return;
509 }
510
511 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
512
513 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
514 if (config_->logger)
515 config_->logger->warn("start ICE failed");
516 info->onConnected_(false);
517 return;
518 }
519 info->onConnected_(true);
520}
521
522bool
523ConnectionManager::Impl::connectDeviceOnNegoDone(
524 const DeviceId& deviceId,
525 const std::string& name,
526 const dht::Value::Id& vid,
527 const std::shared_ptr<dht::crypto::Certificate>& cert)
528{
529 auto info = getInfo(deviceId, vid);
530 if (!info)
531 return false;
532
533 std::unique_lock<std::mutex> lk {info->mutex_};
534 if (info->waitForAnswer_) {
535 // Negotiation is done and connected, go to handshake
536 // and avoid any cancellation at this point.
537 info->waitForAnswer_->cancel();
538 }
539 auto& ice = info->ice_;
540 if (!ice || !ice->isRunning()) {
541 if (config_->logger)
542 config_->logger->error("No ICE detected or not running");
543 return false;
544 }
545
546 // Build socket
547 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
548 std::move(ice)),
549 true);
550
551 // Negotiate a TLS session
552 if (config_->logger)
553 config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid);
554 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
555 certStore(),
556 identity(),
557 dhParams(),
558 *cert);
559
560 info->tls_->setOnReady(
561 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
562 bool ok) {
563 if (auto shared = w.lock())
564 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
565 });
566 return true;
567}
568
569void
570ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
571 const std::string& name,
572 ConnectCallback cb,
573 bool noNewSocket,
574 bool forceNewSocket,
575 const std::string& connType)
576{
577 if (!dht()) {
578 cb(nullptr, deviceId);
579 return;
580 }
581 if (deviceId.toString() == identity().second->getLongId().toString()) {
582 cb(nullptr, deviceId);
583 return;
584 }
585 findCertificate(deviceId,
586 [w = weak(),
587 deviceId,
588 name,
589 cb = std::move(cb),
590 noNewSocket,
591 forceNewSocket,
592 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
593 if (!cert) {
594 if (auto shared = w.lock())
595 if (shared->config_->logger)
596 shared->config_->logger->error(
597 "No valid certificate found for device {}",
598 deviceId);
599 cb(nullptr, deviceId);
600 return;
601 }
602 if (auto shared = w.lock()) {
603 shared->connectDevice(cert,
604 name,
605 std::move(cb),
606 noNewSocket,
607 forceNewSocket,
608 connType);
609 } else
610 cb(nullptr, deviceId);
611 });
612}
613
614void
615ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
616 const std::string& name,
617 ConnectCallback cb,
618 bool noNewSocket,
619 bool forceNewSocket,
620 const std::string& connType)
621{
622 // Avoid dht operation in a DHT callback to avoid deadlocks
623 dht::ThreadPool::computation().run([w = weak(),
624 name = std::move(name),
625 cert = std::move(cert),
626 cb = std::move(cb),
627 noNewSocket,
628 forceNewSocket,
629 connType] {
630 auto devicePk = cert->getSharedPublicKey();
631 auto deviceId = devicePk->getLongId();
632 auto sthis = w.lock();
633 if (!sthis || sthis->isDestroying_) {
634 cb(nullptr, deviceId);
635 return;
636 }
637 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
638 auto isConnectingToDevice = false;
639 {
640 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400641 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
642 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400643 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400644 while (pendings.connecting.find(vid) != pendings.connecting.end()
645 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400646 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400647 }
648 }
649 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400650 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400651 // Save current request for sendChannelRequest.
652 // Note: do not return here, cause we can be in a state where first
653 // socket is negotiated and first channel is pending
654 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400655 if (isConnectingToDevice && !forceNewSocket)
656 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400657 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400658 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400659 }
660
661 // Check if already negotiated
662 CallbackId cbId(deviceId, vid);
663 if (auto info = sthis->getConnectedInfo(deviceId)) {
664 std::lock_guard<std::mutex> lk(info->mutex_);
665 if (info->socket_) {
666 if (sthis->config_->logger)
667 sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId);
668 info->cbIds_.emplace(cbId);
669 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
670 return;
671 }
672 }
673
674 if (isConnectingToDevice && !forceNewSocket) {
675 if (sthis->config_->logger)
676 sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId);
677 return;
678 }
679 if (noNewSocket) {
680 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400681 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400682 return;
683 }
684
685 // Note: used when the ice negotiation fails to erase
686 // all stored structures.
687 auto eraseInfo = [w, cbId] {
688 if (auto shared = w.lock()) {
689 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400690 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400691 std::lock_guard<std::mutex> lk(shared->infosMtx_);
692 shared->infos_.erase(cbId);
693 }
694 };
695
696 // If no socket exists, we need to initiate an ICE connection.
697 sthis->getIceOptions([w,
698 deviceId = std::move(deviceId),
699 devicePk = std::move(devicePk),
700 name = std::move(name),
701 cert = std::move(cert),
702 vid,
703 connType,
704 eraseInfo](auto&& ice_config) {
705 auto sthis = w.lock();
706 if (!sthis) {
707 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
708 return;
709 }
710 ice_config.tcpEnable = true;
711 ice_config.onInitDone = [w,
712 deviceId = std::move(deviceId),
713 devicePk = std::move(devicePk),
714 name = std::move(name),
715 cert = std::move(cert),
716 vid,
717 connType,
718 eraseInfo](bool ok) {
719 dht::ThreadPool::io().run([w = std::move(w),
720 devicePk = std::move(devicePk),
721 vid = std::move(vid),
722 eraseInfo,
723 connType, ok] {
724 auto sthis = w.lock();
725 if (!ok && sthis && sthis->config_->logger)
726 sthis->config_->logger->error("Cannot initialize ICE session.");
727 if (!sthis || !ok) {
728 eraseInfo();
729 return;
730 }
731 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
732 if (!ok) {
733 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
734 }
735 });
736 });
737 };
738 ice_config.onNegoDone = [w,
739 deviceId,
740 name,
741 cert = std::move(cert),
742 vid,
743 eraseInfo](bool ok) {
744 dht::ThreadPool::io().run([w = std::move(w),
745 deviceId = std::move(deviceId),
746 name = std::move(name),
747 cert = std::move(cert),
748 vid = std::move(vid),
749 eraseInfo = std::move(eraseInfo),
750 ok] {
751 auto sthis = w.lock();
752 if (!ok && sthis && sthis->config_->logger)
753 sthis->config_->logger->error("ICE negotiation failed.");
754 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
755 eraseInfo();
756 });
757 };
758
759 auto info = std::make_shared<ConnectionInfo>();
760 {
761 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
762 sthis->infos_[{deviceId, vid}] = info;
763 }
764 std::unique_lock<std::mutex> lk {info->mutex_};
765 ice_config.master = false;
766 ice_config.streamsCount = 1;
767 ice_config.compCountPerStream = 1;
768 info->ice_ = sthis->iceFactory_.createUTransport("");
769 if (!info->ice_) {
770 if (sthis->config_->logger)
771 sthis->config_->logger->error("Cannot initialize ICE session.");
772 eraseInfo();
773 return;
774 }
775 // We need to detect any shutdown if the ice session is destroyed before going to the
776 // TLS session;
777 info->ice_->setOnShutdown([eraseInfo]() {
778 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
779 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400780 try {
781 info->ice_->initIceInstance(ice_config);
782 } catch (const std::exception& e) {
783 if (sthis->config_->logger)
784 sthis->config_->logger->error("{}", e.what());
785 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
786 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400787 });
788 });
789}
790
791void
792ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
793 const std::string& name,
794 const DeviceId& deviceId,
795 const dht::Value::Id& vid)
796{
797 auto channelSock = sock->addChannel(name);
798 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
799 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400800 if (auto shared = w.lock())
801 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400802 });
803 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400804 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400805 auto shared = w.lock();
806 auto channelSock = wSock.lock();
807 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400808 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400809 });
810
811 ChannelRequest val;
812 val.name = channelSock->name();
813 val.state = ChannelRequestState::REQUEST;
814 val.channel = channelSock->channel();
815 msgpack::sbuffer buffer(256);
816 msgpack::pack(buffer, val);
817
818 std::error_code ec;
819 int res = sock->write(CONTROL_CHANNEL,
820 reinterpret_cast<const uint8_t*>(buffer.data()),
821 buffer.size(),
822 ec);
823 if (res < 0) {
824 // TODO check if we should handle errors here
825 if (config_->logger)
826 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
827 }
828}
829
830void
831ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
832{
833 auto device = req.owner->getLongId();
834 if (config_->logger)
835 config_->logger->debug("New response received from {}", device);
836 if (auto info = getInfo(device, req.id)) {
837 std::lock_guard<std::mutex> lk {info->mutex_};
838 info->responseReceived_ = true;
839 info->response_ = std::move(req);
840 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
841 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
842 this,
843 std::placeholders::_1,
844 device,
845 req.id));
846 } else {
847 if (config_->logger)
848 config_->logger->warn("Respond received, but cannot find request");
849 }
850}
851
852void
853ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
854{
855 if (!dht())
856 return;
857 dht()->listen<PeerConnectionRequest>(
858 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
859 [w = weak()](PeerConnectionRequest&& req) {
860 auto shared = w.lock();
861 if (!shared)
862 return false;
863 if (shared->isMessageTreated(to_hex_string(req.id))) {
864 // Message already treated. Just ignore
865 return true;
866 }
867 if (req.isAnswer) {
868 if (shared->config_->logger)
869 shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId());
870 } else {
871 if (shared->config_->logger)
872 shared->config_->logger->debug("Received request from {}", req.owner->getLongId());
873 }
874 if (req.isAnswer) {
875 shared->onPeerResponse(req);
876 } else {
877 // Async certificate checking
878 shared->dht()->findCertificate(
879 req.from,
880 [w, req = std::move(req)](
881 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
882 auto shared = w.lock();
883 if (!shared)
884 return;
885 dht::InfoHash peer_h;
886 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
887#if TARGET_OS_IOS
888 if (shared->iOSConnectedCb_(req.connType, peer_h))
889 return;
890#endif
891 shared->onDhtPeerRequest(req, cert);
892 } else {
893 if (shared->config_->logger)
894 shared->config_->logger->warn(
895 "Received request from untrusted peer {}",
896 req.owner->getLongId());
897 }
898 });
899 }
900
901 return true;
902 },
903 dht::Value::UserTypeFilter("peer_request"));
904}
905
906void
907ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
908 const DeviceId& deviceId,
909 const dht::Value::Id& vid,
910 const std::string& name)
911{
912 if (isDestroying_)
913 return;
914 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
915 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
916 // asked yet)
917 auto isDhtRequest = name.empty();
918 if (!ok) {
919 if (isDhtRequest) {
920 if (config_->logger)
921 config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}",
922 deviceId,
923 name,
924 vid);
925 if (connReadyCb_)
926 connReadyCb_(deviceId, "", nullptr);
927 } else {
928 if (config_->logger)
929 config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}",
930 deviceId,
931 name,
932 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400933 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400934 }
935 } else {
936 // The socket is ready, store it
937 if (isDhtRequest) {
938 if (config_->logger)
939 config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}",
940 deviceId,
941 vid);
942 } else {
943 if (config_->logger)
944 config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}",
945 deviceId,
946 name,
947 vid);
948 }
949
950 auto info = getInfo(deviceId, vid);
951 addNewMultiplexedSocket({deviceId, vid}, info);
952 // Finally, open the channel and launch pending callbacks
953 if (info->socket_) {
954 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -0400955 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400956 if (config_->logger)
957 config_->logger->debug("Send request on TLS socket for channel {} to {}",
Adrien Béraud665294f2023-06-13 18:09:11 -0400958 name,
959 deviceId.toString());
960 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400961 }
962 }
963 }
964}
965
966void
967ConnectionManager::Impl::answerTo(IceTransport& ice,
968 const dht::Value::Id& id,
969 const std::shared_ptr<dht::crypto::PublicKey>& from)
970{
971 // NOTE: This is a shortest version of a real SDP message to save some bits
972 auto iceAttributes = ice.getLocalAttributes();
973 std::ostringstream icemsg;
974 icemsg << iceAttributes.ufrag << "\n";
975 icemsg << iceAttributes.pwd << "\n";
976 for (const auto& addr : ice.getLocalCandidates(1)) {
977 icemsg << addr << "\n";
978 }
979
980 // Send PeerConnection response
981 PeerConnectionRequest val;
982 val.id = id;
983 val.ice_msg = icemsg.str();
984 val.isAnswer = true;
985 auto value = std::make_shared<dht::Value>(std::move(val));
986 value->user_type = "peer_request";
987
988 if (config_->logger)
989 config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId());
990 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
991 + from->getId().toString()),
992 from,
993 value,
994 [from,l=config_->logger](bool ok) {
995 if (l)
996 l->debug("Answer to connection request from {:s}. Put encrypted {:s}",
997 from->getLongId(),
998 (ok ? "ok" : "failed"));
999 });
1000}
1001
1002bool
1003ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1004{
1005 auto deviceId = req.owner->getLongId();
1006 auto info = getInfo(deviceId, req.id);
1007 if (!info)
1008 return false;
1009
1010 std::unique_lock<std::mutex> lk {info->mutex_};
1011 auto& ice = info->ice_;
1012 if (!ice) {
1013 if (config_->logger)
1014 config_->logger->error("No ICE detected");
1015 if (connReadyCb_)
1016 connReadyCb_(deviceId, "", nullptr);
1017 return false;
1018 }
1019
1020 auto sdp = ice->parseIceCandidates(req.ice_msg);
1021 answerTo(*ice, req.id, req.owner);
1022 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1023 if (config_->logger)
1024 config_->logger->error("Start ICE failed - fallback to TURN");
1025 ice = nullptr;
1026 if (connReadyCb_)
1027 connReadyCb_(deviceId, "", nullptr);
1028 return false;
1029 }
1030 return true;
1031}
1032
1033bool
1034ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1035{
1036 auto deviceId = req.owner->getLongId();
1037 auto info = getInfo(deviceId, req.id);
1038 if (!info)
1039 return false;
1040
1041 std::unique_lock<std::mutex> lk {info->mutex_};
1042 auto& ice = info->ice_;
1043 if (!ice) {
1044 if (config_->logger)
1045 config_->logger->error("No ICE detected");
1046 return false;
1047 }
1048
1049 // Build socket
1050 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1051 std::move(ice)),
1052 false);
1053
1054 // init TLS session
1055 auto ph = req.from;
1056 if (config_->logger)
1057 config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}",
1058 req.from,
1059 req.id);
1060 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1061 std::move(endpoint),
1062 certStore(),
1063 identity(),
1064 dhParams(),
1065 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1066 auto shared = w.lock();
1067 if (!shared)
1068 return false;
1069 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1070 if (!crt)
1071 return false;
1072 return crt->getPacked() == cert.getPacked();
1073 });
1074
1075 info->tls_->setOnReady(
1076 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1077 if (auto shared = w.lock())
1078 shared->onTlsNegotiationDone(ok, deviceId, vid);
1079 });
1080 return true;
1081}
1082
1083void
1084ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1085 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1086{
1087 auto deviceId = req.owner->getLongId();
1088 if (config_->logger)
1089 config_->logger->debug("New connection request from {}", deviceId);
1090 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1091 if (config_->logger)
1092 config_->logger->debug("Refuse connection from {}", deviceId);
1093 return;
1094 }
1095
1096 // Because the connection is accepted, create an ICE socket.
1097 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1098 auto shared = w.lock();
1099 if (!shared)
1100 return;
1101 // Note: used when the ice negotiation fails to erase
1102 // all stored structures.
1103 auto eraseInfo = [w, id = req.id, deviceId] {
1104 if (auto shared = w.lock()) {
1105 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001106 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001107 if (shared->connReadyCb_)
1108 shared->connReadyCb_(deviceId, "", nullptr);
1109 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1110 shared->infos_.erase({deviceId, id});
1111 }
1112 };
1113
1114 ice_config.tcpEnable = true;
1115 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1116 auto shared = w.lock();
1117 if (!shared)
1118 return;
1119 if (!ok) {
1120 if (shared->config_->logger)
1121 shared->config_->logger->error("Cannot initialize ICE session.");
1122 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1123 return;
1124 }
1125
1126 dht::ThreadPool::io().run(
1127 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1128 auto shared = w.lock();
1129 if (!shared)
1130 return;
1131 if (!shared->onRequestStartIce(req))
1132 eraseInfo();
1133 });
1134 };
1135
1136 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1137 auto shared = w.lock();
1138 if (!shared)
1139 return;
1140 if (!ok) {
1141 if (shared->config_->logger)
1142 shared->config_->logger->error("ICE negotiation failed.");
1143 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1144 return;
1145 }
1146
1147 dht::ThreadPool::io().run(
1148 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1149 if (auto shared = w.lock())
1150 if (!shared->onRequestOnNegoDone(req))
1151 eraseInfo();
1152 });
1153 };
1154
1155 // Negotiate a new ICE socket
1156 auto info = std::make_shared<ConnectionInfo>();
1157 {
1158 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1159 shared->infos_[{deviceId, req.id}] = info;
1160 }
1161 if (shared->config_->logger)
1162 shared->config_->logger->debug("Accepting connection from {}", deviceId);
1163 std::unique_lock<std::mutex> lk {info->mutex_};
1164 ice_config.streamsCount = 1;
1165 ice_config.compCountPerStream = 1; // TCP
1166 ice_config.master = true;
1167 info->ice_ = shared->iceFactory_.createUTransport("");
1168 if (not info->ice_) {
1169 if (shared->config_->logger)
1170 shared->config_->logger->error("Cannot initialize ICE session");
1171 eraseInfo();
1172 return;
1173 }
1174 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1175 info->ice_->setOnShutdown([eraseInfo]() {
1176 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1177 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001178 try {
1179 info->ice_->initIceInstance(ice_config);
1180 } catch (const std::exception& e) {
1181 if (shared->config_->logger)
1182 shared->config_->logger->error("{}", e.what());
1183 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1184 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001185 });
1186}
1187
1188void
1189ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1190{
1191 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1192 info->socket_->setOnReady(
1193 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1194 if (auto sthis = w.lock())
1195 if (sthis->connReadyCb_)
1196 sthis->connReadyCb_(deviceId, socket->name(), socket);
1197 });
1198 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1199 const uint16_t&,
1200 const std::string& name) {
1201 if (auto sthis = w.lock())
1202 if (sthis->channelReqCb_)
1203 return sthis->channelReqCb_(peer, name);
1204 return false;
1205 });
1206 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1207 // Cancel current outgoing connections
1208 dht::ThreadPool::io().run([w, deviceId, vid] {
1209 auto sthis = w.lock();
1210 if (!sthis)
1211 return;
1212
1213 std::set<CallbackId> ids;
1214 if (auto info = sthis->getInfo(deviceId, vid)) {
1215 std::lock_guard<std::mutex> lk(info->mutex_);
1216 if (info->socket_) {
1217 ids = std::move(info->cbIds_);
1218 info->socket_->shutdown();
1219 }
1220 }
1221 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001222 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001223
1224 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1225 sthis->infos_.erase({deviceId, vid});
1226 });
1227 });
1228}
1229
1230const std::shared_future<tls::DhParams>
1231ConnectionManager::Impl::dhParams() const
1232{
1233 return dht::ThreadPool::computation().get<tls::DhParams>(
1234 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1235 ;
1236}
1237
1238template<typename ID = dht::Value::Id>
1239std::set<ID, std::less<>>
1240loadIdList(const std::string& path)
1241{
1242 std::set<ID, std::less<>> ids;
1243 std::ifstream file = fileutils::ifstream(path);
1244 if (!file.is_open()) {
1245 //JAMI_DBG("Could not load %s", path.c_str());
1246 return ids;
1247 }
1248 std::string line;
1249 while (std::getline(file, line)) {
1250 if constexpr (std::is_same<ID, std::string>::value) {
1251 ids.emplace(std::move(line));
1252 } else if constexpr (std::is_integral<ID>::value) {
1253 ID vid;
1254 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1255 ec == std::errc()) {
1256 ids.emplace(vid);
1257 }
1258 }
1259 }
1260 return ids;
1261}
1262
1263template<typename List = std::set<dht::Value::Id>>
1264void
1265saveIdList(const std::string& path, const List& ids)
1266{
1267 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1268 if (!file.is_open()) {
1269 //JAMI_ERR("Could not save to %s", path.c_str());
1270 return;
1271 }
1272 for (auto& c : ids)
1273 file << std::hex << c << "\n";
1274}
1275
1276void
1277ConnectionManager::Impl::loadTreatedMessages()
1278{
1279 std::lock_guard<std::mutex> lock(messageMutex_);
1280 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1281 treatedMessages_ = loadIdList<std::string>(path);
1282 if (treatedMessages_.empty()) {
1283 auto messages = loadIdList(path);
1284 for (const auto& m : messages)
1285 treatedMessages_.emplace(to_hex_string(m));
1286 }
1287}
1288
1289void
1290ConnectionManager::Impl::saveTreatedMessages() const
1291{
1292 dht::ThreadPool::io().run([w = weak()]() {
1293 if (auto sthis = w.lock()) {
1294 auto& this_ = *sthis;
1295 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1296 fileutils::check_dir(this_.config_->cachePath.c_str());
1297 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1298 + DIR_SEPARATOR_STR "treatedMessages",
1299 this_.treatedMessages_);
1300 }
1301 });
1302}
1303
1304bool
1305ConnectionManager::Impl::isMessageTreated(std::string_view id)
1306{
1307 std::lock_guard<std::mutex> lock(messageMutex_);
1308 auto res = treatedMessages_.emplace(id);
1309 if (res.second) {
1310 saveTreatedMessages();
1311 return false;
1312 }
1313 return true;
1314}
1315
1316/**
1317 * returns whether or not UPnP is enabled and active_
1318 * ie: if it is able to make port mappings
1319 */
1320bool
1321ConnectionManager::Impl::getUPnPActive() const
1322{
1323 return config_->getUPnPActive();
1324}
1325
1326IpAddr
1327ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1328{
1329 if (family == AF_INET)
1330 return publishedIp_[0];
1331 if (family == AF_INET6)
1332 return publishedIp_[1];
1333
1334 assert(family == AF_UNSPEC);
1335
1336 // If family is not set, prefere IPv4 if available. It's more
1337 // likely to succeed behind NAT.
1338 if (publishedIp_[0])
1339 return publishedIp_[0];
1340 if (publishedIp_[1])
1341 return publishedIp_[1];
1342 return {};
1343}
1344
1345void
1346ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1347{
1348 if (ip_addr.getFamily() == AF_INET) {
1349 publishedIp_[0] = ip_addr;
1350 } else {
1351 publishedIp_[1] = ip_addr;
1352 }
1353}
1354
1355void
1356ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1357{
1358 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1359 bool hasIpv4 {false}, hasIpv6 {false};
1360 for (auto& result : results) {
1361 auto family = result.getFamily();
1362 if (family == AF_INET) {
1363 if (not hasIpv4) {
1364 hasIpv4 = true;
1365 if (config_->logger)
1366 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1367 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1368 setPublishedAddress(*result.get());
1369 if (config_->upnpCtrl) {
1370 config_->upnpCtrl->setPublicAddress(*result.get());
1371 }
1372 }
1373 } else if (family == AF_INET6) {
1374 if (not hasIpv6) {
1375 hasIpv6 = true;
1376 if (config_->logger)
1377 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1378 setPublishedAddress(*result.get());
1379 }
1380 }
1381 if (hasIpv4 and hasIpv6)
1382 break;
1383 }
1384 if (cb)
1385 cb();
1386 });
1387}
1388
1389void
1390ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1391{
1392 storeActiveIpAddress([this, cb = std::move(cb)] {
1393 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1394 auto publishedAddr = getPublishedIpAddress();
1395
1396 if (publishedAddr) {
1397 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1398 publishedAddr.getFamily());
1399 if (interfaceAddr) {
1400 opts.accountLocalAddr = interfaceAddr;
1401 opts.accountPublicAddr = publishedAddr;
1402 }
1403 }
1404 if (cb)
1405 cb(std::move(opts));
1406 });
1407}
1408
1409IceTransportOptions
1410ConnectionManager::Impl::getIceOptions() const noexcept
1411{
1412 IceTransportOptions opts;
1413 opts.upnpEnable = getUPnPActive();
1414
1415 if (config_->stunEnabled)
1416 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1417 if (config_->turnEnabled) {
1418 auto cached = false;
1419 std::lock_guard<std::mutex> lk(config_->cachedTurnMutex);
1420 cached = config_->cacheTurnV4 || config_->cacheTurnV6;
1421 if (config_->cacheTurnV4) {
1422 opts.turnServers.emplace_back(TurnServerInfo()
1423 .setUri(config_->cacheTurnV4.toString())
1424 .setUsername(config_->turnServerUserName)
1425 .setPassword(config_->turnServerPwd)
1426 .setRealm(config_->turnServerRealm));
1427 }
1428 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1429 // co issues. So this needs some debug. for now just disable
1430 // if (cacheTurnV6 && *cacheTurnV6) {
1431 // opts.turnServers.emplace_back(TurnServerInfo()
1432 // .setUri(cacheTurnV6->toString(true))
1433 // .setUsername(turnServerUserName_)
1434 // .setPassword(turnServerPwd_)
1435 // .setRealm(turnServerRealm_));
1436 //}
1437 // Nothing cached, so do the resolution
1438 if (!cached) {
1439 opts.turnServers.emplace_back(TurnServerInfo()
1440 .setUri(config_->turnServer)
1441 .setUsername(config_->turnServerUserName)
1442 .setPassword(config_->turnServerPwd)
1443 .setRealm(config_->turnServerRealm));
1444 }
1445 }
1446 return opts;
1447}
1448
1449bool
1450ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1451 dht::InfoHash& account_id,
1452 const std::shared_ptr<Logger>& logger)
1453{
1454 if (not crt)
1455 return false;
1456
1457 auto top_issuer = crt;
1458 while (top_issuer->issuer)
1459 top_issuer = top_issuer->issuer;
1460
1461 // Device certificate can't be self-signed
1462 if (top_issuer == crt) {
1463 if (logger)
1464 logger->warn("Found invalid peer device: {}", crt->getLongId());
1465 return false;
1466 }
1467
1468 // Check peer certificate chain
1469 // Trust store with top issuer as the only CA
1470 dht::crypto::TrustList peer_trust;
1471 peer_trust.add(*top_issuer);
1472 if (not peer_trust.verify(*crt)) {
1473 if (logger)
1474 logger->warn("Found invalid peer device: {}", crt->getLongId());
1475 return false;
1476 }
1477
1478 // Check cached OCSP response
1479 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1480 if (logger)
1481 logger->error("Certificate %s is disabled by cached OCSP response", crt->getLongId());
1482 return false;
1483 }
1484
1485 account_id = crt->issuer->getId();
1486 if (logger)
1487 logger->warn("Found peer device: {} account:{} CA:{}",
1488 crt->getLongId(),
1489 account_id,
1490 top_issuer->getId());
1491 return true;
1492}
1493
1494bool
1495ConnectionManager::Impl::findCertificate(
1496 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1497{
1498 if (auto cert = certStore().getCertificate(id.toString())) {
1499 if (cb)
1500 cb(cert);
1501 } else if (cb)
1502 cb(nullptr);
1503 return true;
1504}
1505
1506ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1507 : pimpl_ {std::make_shared<Impl>(config_)}
1508{}
1509
1510ConnectionManager::~ConnectionManager()
1511{
1512 if (pimpl_)
1513 pimpl_->shutdown();
1514}
1515
1516void
1517ConnectionManager::connectDevice(const DeviceId& deviceId,
1518 const std::string& name,
1519 ConnectCallback cb,
1520 bool noNewSocket,
1521 bool forceNewSocket,
1522 const std::string& connType)
1523{
1524 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1525}
1526
1527void
1528ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1529 const std::string& name,
1530 ConnectCallback cb,
1531 bool noNewSocket,
1532 bool forceNewSocket,
1533 const std::string& connType)
1534{
1535 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1536}
1537
1538bool
1539ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1540{
Adrien Béraud665294f2023-06-13 18:09:11 -04001541 auto pending = pimpl_->getPendingIds(deviceId);
1542 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001543 != pending.end();
1544}
1545
1546void
1547ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1548{
1549 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1550 std::set<DeviceId> peersDevices;
1551 {
1552 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1553 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1554 auto const& [key, value] = *iter;
1555 auto deviceId = key.first;
1556 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1557 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1558 connInfos.emplace_back(value);
1559 peersDevices.emplace(deviceId);
1560 iter = pimpl_->infos_.erase(iter);
1561 } else {
1562 iter++;
1563 }
1564 }
1565 }
1566 // Stop connections to all peers devices
1567 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001568 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001569 // This will close the TLS Session
1570 pimpl_->removeUnusedConnections(deviceId);
1571 }
1572 for (auto& info : connInfos) {
1573 if (info->socket_)
1574 info->socket_->shutdown();
1575 if (info->waitForAnswer_)
1576 info->waitForAnswer_->cancel();
1577 if (info->ice_) {
1578 std::unique_lock<std::mutex> lk {info->mutex_};
1579 dht::ThreadPool::io().run(
1580 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1581 }
1582 }
1583}
1584
1585void
1586ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1587{
1588 pimpl_->onDhtConnected(devicePk);
1589}
1590
1591void
1592ConnectionManager::onICERequest(onICERequestCallback&& cb)
1593{
1594 pimpl_->iceReqCb_ = std::move(cb);
1595}
1596
1597void
1598ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1599{
1600 pimpl_->channelReqCb_ = std::move(cb);
1601}
1602
1603void
1604ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1605{
1606 pimpl_->connReadyCb_ = std::move(cb);
1607}
1608
1609void
1610ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1611{
1612 pimpl_->iOSConnectedCb_ = std::move(cb);
1613}
1614
1615std::size_t
1616ConnectionManager::activeSockets() const
1617{
1618 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1619 return pimpl_->infos_.size();
1620}
1621
1622void
1623ConnectionManager::monitor() const
1624{
1625 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1626 auto logger = pimpl_->config_->logger;
1627 if (!logger)
1628 return;
1629 logger->debug("ConnectionManager current status:");
1630 for (const auto& [_, ci] : pimpl_->infos_) {
1631 if (ci->socket_)
1632 ci->socket_->monitor();
1633 }
1634 logger->debug("ConnectionManager end status.");
1635}
1636
1637void
1638ConnectionManager::connectivityChanged()
1639{
1640 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1641 for (const auto& [_, ci] : pimpl_->infos_) {
1642 if (ci->socket_)
1643 ci->socket_->sendBeacon();
1644 }
1645}
1646
1647void
1648ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1649{
1650 return pimpl_->getIceOptions(std::move(cb));
1651}
1652
1653IceTransportOptions
1654ConnectionManager::getIceOptions() const noexcept
1655{
1656 return pimpl_->getIceOptions();
1657}
1658
1659IpAddr
1660ConnectionManager::getPublishedIpAddress(uint16_t family) const
1661{
1662 return pimpl_->getPublishedIpAddress(family);
1663}
1664
1665void
1666ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1667{
1668 return pimpl_->setPublishedAddress(ip_addr);
1669}
1670
1671void
1672ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1673{
1674 return pimpl_->storeActiveIpAddress(std::move(cb));
1675}
1676
1677std::shared_ptr<ConnectionManager::Config>
1678ConnectionManager::getConfig()
1679{
1680 return pimpl_->config_;
1681}
1682
1683} // namespace jami