blob: 17206d3c3e49bcd817fa1a974ef19568afcd4fbe [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2019-2023 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18#include "connectionmanager.h"
19#include "peer_connection.h"
20#include "upnp/upnp_control.h"
21#include "certstore.h"
22#include "fileutils.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25
26#include <opendht/crypto.h>
27#include <opendht/thread_pool.h>
28#include <opendht/value.h>
29#include <asio.hpp>
30
31#include <algorithm>
32#include <mutex>
33#include <map>
34#include <condition_variable>
35#include <set>
36#include <charconv>
37
38namespace jami {
39static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
43using CallbackId = std::pair<jami::DeviceId, dht::Value::Id>;
44
45struct ConnectionInfo
46{
47 ~ConnectionInfo()
48 {
49 if (socket_)
50 socket_->join();
51 }
52
53 std::mutex mutex_ {};
54 bool responseReceived_ {false};
55 PeerConnectionRequest response_ {};
56 std::unique_ptr<IceTransport> ice_ {nullptr};
57 // Used to store currently non ready TLS Socket
58 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
59 std::shared_ptr<MultiplexedSocket> socket_ {};
60 std::set<CallbackId> cbIds_ {};
61
62 std::function<void(bool)> onConnected_;
63 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
64};
65
66/**
67 * returns whether or not UPnP is enabled and active_
68 * ie: if it is able to make port mappings
69 */
70bool
71ConnectionManager::Config::getUPnPActive() const
72{
73 if (upnpCtrl)
74 return upnpCtrl->isReady();
75 return false;
76}
77
78class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
79{
80public:
81 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
82 : config_ {std::move(config_)}
83 {}
84 ~Impl() {}
85
86 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
87 const dht::crypto::Identity& identity() const { return config_->id; }
88
89 void removeUnusedConnections(const DeviceId& deviceId = {})
90 {
91 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
92
93 {
94 std::lock_guard<std::mutex> lk(infosMtx_);
95 for (auto it = infos_.begin(); it != infos_.end();) {
96 auto& [key, info] = *it;
97 if (info && (!deviceId || key.first == deviceId)) {
98 unused.emplace_back(std::move(info));
99 it = infos_.erase(it);
100 } else {
101 ++it;
102 }
103 }
104 }
105 for (auto& info: unused) {
106 if (info->tls_)
107 info->tls_->shutdown();
108 if (info->socket_)
109 info->socket_->shutdown();
110 if (info->waitForAnswer_)
111 info->waitForAnswer_->cancel();
112 }
113 if (!unused.empty())
114 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
115 }
116
117 void shutdown()
118 {
119 if (isDestroying_.exchange(true))
120 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400121 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400122 {
123 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400124 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400125 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400126 for (auto& [deviceId, pcbs] : po) {
127 for (auto& [id, pending] : pcbs.connecting)
128 pending.cb(nullptr, deviceId);
129 for (auto& [id, pending] : pcbs.waiting)
130 pending.cb(nullptr, deviceId);
131 }
132
Adrien Béraud612b55b2023-05-29 10:42:04 -0400133 removeUnusedConnections();
134 }
135
Adrien Béraud612b55b2023-05-29 10:42:04 -0400136 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
137 const dht::Value::Id& vid,
138 const std::string& connType,
139 std::function<void(bool)> onConnected);
140 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
141 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
142 const std::string& name,
143 const dht::Value::Id& vid,
144 const std::shared_ptr<dht::crypto::Certificate>& cert);
145 void connectDevice(const DeviceId& deviceId,
146 const std::string& uri,
147 ConnectCallback cb,
148 bool noNewSocket = false,
149 bool forceNewSocket = false,
150 const std::string& connType = "");
151 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
152 const std::string& name,
153 ConnectCallback cb,
154 bool noNewSocket = false,
155 bool forceNewSocket = false,
156 const std::string& connType = "");
157 /**
158 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
159 * @param sock socket used to send the request
160 * @param name channel's name
161 * @param vid channel's id
162 * @param deviceId to identify the linked ConnectCallback
163 */
164 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
165 const std::string& name,
166 const DeviceId& deviceId,
167 const dht::Value::Id& vid);
168 /**
169 * Triggered when a PeerConnectionRequest comes from the DHT
170 */
171 void answerTo(IceTransport& ice,
172 const dht::Value::Id& id,
173 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
174 bool onRequestStartIce(const PeerConnectionRequest& req);
175 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
176 void onDhtPeerRequest(const PeerConnectionRequest& req,
177 const std::shared_ptr<dht::crypto::Certificate>& cert);
178
179 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
180 void onPeerResponse(const PeerConnectionRequest& req);
181 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
182
183 const std::shared_future<tls::DhParams> dhParams() const;
184 tls::CertificateStore& certStore() const { return *config_->certStore; }
185
186 mutable std::mutex messageMutex_ {};
187 std::set<std::string, std::less<>> treatedMessages_ {};
188
189 void loadTreatedMessages();
190 void saveTreatedMessages() const;
191
192 /// \return true if the given DHT message identifier has been treated
193 /// \note if message has not been treated yet this method st/ore this id and returns true at
194 /// further calls
195 bool isMessageTreated(std::string_view id);
196
197 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
198
199 /**
200 * Published IPv4/IPv6 addresses, used only if defined by the user in account
201 * configuration
202 *
203 */
204 IpAddr publishedIp_[2] {};
205
206 // This will be stored in the configuration
207 std::string publishedIpAddress_ {};
208
209 /**
210 * Published port, used only if defined by the user
211 */
212 pj_uint16_t publishedPort_ {sip_utils::DEFAULT_SIP_PORT};
213
214 /**
215 * interface name on which this account is bound
216 */
217 std::string interface_ {"default"};
218
219 /**
220 * Get the local interface name on which this account is bound.
221 */
222 const std::string& getLocalInterface() const { return interface_; }
223
224 /**
225 * Get the published IP address, fallbacks to NAT if family is unspecified
226 * Prefers the usage of IPv4 if possible.
227 */
228 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
229
230 /**
231 * Set published IP address according to given family
232 */
233 void setPublishedAddress(const IpAddr& ip_addr);
234
235 /**
236 * Store the local/public addresses used to register
237 */
238 void storeActiveIpAddress(std::function<void()>&& cb = {});
239
240 /**
241 * Create and return ICE options.
242 */
243 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
244 IceTransportOptions getIceOptions() const noexcept;
245
246 /**
247 * Inform that a potential peer device have been found.
248 * Returns true only if the device certificate is a valid device certificate.
249 * In that case (true is returned) the account_id parameter is set to the peer account ID.
250 */
251 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
252 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
253
254 bool findCertificate(const dht::PkId& id,
255 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
256
257 /**
258 * returns whether or not UPnP is enabled and active
259 * ie: if it is able to make port mappings
260 */
261 bool getUPnPActive() const;
262
263 /**
264 * Triggered when a new TLS socket is ready to use
265 * @param ok If succeed
266 * @param deviceId Related device
267 * @param vid vid of the connection request
268 * @param name non empty if TLS was created by connectDevice()
269 */
270 void onTlsNegotiationDone(bool ok,
271 const DeviceId& deviceId,
272 const dht::Value::Id& vid,
273 const std::string& name = "");
274
275 std::shared_ptr<ConnectionManager::Config> config_;
276
277 IceTransportFactory iceFactory_ {};
278
279 mutable std::mt19937_64 rand;
280
281 iOSConnectedCallback iOSConnectedCb_ {};
282
283 std::mutex infosMtx_ {};
284 // Note: Someone can ask multiple sockets, so to avoid any race condition,
285 // each device can have multiple multiplexed sockets.
286 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
287
288 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
289 {
290 std::lock_guard<std::mutex> lk(infosMtx_);
291 auto it = infos_.find({deviceId, id});
292 if (it != infos_.end())
293 return it->second;
294 return {};
295 }
296
297 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
298 {
299 std::lock_guard<std::mutex> lk(infosMtx_);
300 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
301 auto& [key, value] = item;
302 return key.first == deviceId && value && value->socket_;
303 });
304 if (it != infos_.end())
305 return it->second;
306 return {};
307 }
308
309 ChannelRequestCallback channelReqCb_ {};
310 ConnectionReadyCallback connReadyCb_ {};
311 onICERequestCallback iceReqCb_ {};
312
313 /**
314 * Stores callback from connectDevice
315 * @note: each device needs a vector because several connectDevice can
316 * be done in parallel and we only want one socket
317 */
318 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400319
Adrien Béraud665294f2023-06-13 18:09:11 -0400320 struct PendingCb
321 {
322 std::string name;
323 ConnectCallback cb;
324 };
325 struct PendingOperations {
326 std::map<dht::Value::Id, PendingCb> connecting;
327 std::map<dht::Value::Id, PendingCb> waiting;
328 };
329
330 std::map<DeviceId, PendingOperations> pendingOperations_ {};
331
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400332 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400333 {
334 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400335 std::unique_lock<std::mutex> lk(connectCbsMtx_);
336 auto it = pendingOperations_.find(deviceId);
337 if (it == pendingOperations_.end())
338 return;
339 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400340 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400341 // Extract all pending callbacks
342 for (auto& [vid, cb] : pendingOperations.connecting)
343 ret.emplace_back(std::move(cb));
344 pendingOperations.connecting.clear();
345 for (auto& [vid, cb] : pendingOperations.waiting)
346 ret.emplace_back(std::move(cb));
347 pendingOperations.waiting.clear();
348 } else if (auto n = pendingOperations.waiting.extract(vid)) {
349 // If it's a waiting operation, just move it
350 ret.emplace_back(std::move(n.mapped()));
351 } else if (auto n = pendingOperations.connecting.extract(vid)) {
352 ret.emplace_back(std::move(n.mapped()));
353 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400354 // If accepted is false, it means that underlying socket is ok, but channel is declined
355 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400356 for (auto& [vid, cb] : pendingOperations.waiting)
357 ret.emplace_back(std::move(cb));
358 pendingOperations.waiting.clear();
359 for (auto& [vid, cb] : pendingOperations.connecting)
360 ret.emplace_back(std::move(cb));
361 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400362 }
363 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400364 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
365 pendingOperations_.erase(it);
366 lk.unlock();
367 for (auto& cb : ret)
368 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400369 }
370
Adrien Béraud665294f2023-06-13 18:09:11 -0400371 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400372 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400373 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400374 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400375 auto it = pendingOperations_.find(deviceId);
376 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400377 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400378 auto& pendingOp = it->second;
379 for (const auto& [id, pc]: pendingOp.connecting) {
380 if (vid == 0 || id == vid)
381 ret[id] = pc.name;
382 }
383 for (const auto& [id, pc]: pendingOp.waiting) {
384 if (vid == 0 || id == vid)
385 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400386 }
387 return ret;
388 }
389
390 std::shared_ptr<ConnectionManager::Impl> shared()
391 {
392 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
393 }
394 std::shared_ptr<ConnectionManager::Impl const> shared() const
395 {
396 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
397 }
398 std::weak_ptr<ConnectionManager::Impl> weak()
399 {
400 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
401 }
402 std::weak_ptr<ConnectionManager::Impl const> weak() const
403 {
404 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
405 }
406
407 std::atomic_bool isDestroying_ {false};
408};
409
410void
411ConnectionManager::Impl::connectDeviceStartIce(
412 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
413 const dht::Value::Id& vid,
414 const std::string& connType,
415 std::function<void(bool)> onConnected)
416{
417 auto deviceId = devicePk->getLongId();
418 auto info = getInfo(deviceId, vid);
419 if (!info) {
420 onConnected(false);
421 return;
422 }
423
424 std::unique_lock<std::mutex> lk(info->mutex_);
425 auto& ice = info->ice_;
426
427 if (!ice) {
428 if (config_->logger)
429 config_->logger->error("No ICE detected");
430 onConnected(false);
431 return;
432 }
433
434 auto iceAttributes = ice->getLocalAttributes();
435 std::ostringstream icemsg;
436 icemsg << iceAttributes.ufrag << "\n";
437 icemsg << iceAttributes.pwd << "\n";
438 for (const auto& addr : ice->getLocalCandidates(1)) {
439 icemsg << addr << "\n";
440 if (config_->logger)
441 config_->logger->debug("Added local ICE candidate {}", addr);
442 }
443
444 // Prepare connection request as a DHT message
445 PeerConnectionRequest val;
446
447 val.id = vid; /* Random id for the message unicity */
448 val.ice_msg = icemsg.str();
449 val.connType = connType;
450
451 auto value = std::make_shared<dht::Value>(std::move(val));
452 value->user_type = "peer_request";
453
454 // Send connection request through DHT
455 if (config_->logger)
456 config_->logger->debug("Request connection to {}", deviceId);
457 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
458 + devicePk->getId().toString()),
459 devicePk,
460 value,
461 [l=config_->logger,deviceId](bool ok) {
462 if (l)
463 l->debug("Sent connection request to {:s}. Put encrypted {:s}",
464 deviceId,
465 (ok ? "ok" : "failed"));
466 });
467 // Wait for call to onResponse() operated by DHT
468 if (isDestroying_) {
469 onConnected(true); // This avoid to wait new negotiation when destroying
470 return;
471 }
472
473 info->onConnected_ = std::move(onConnected);
474 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
475 std::chrono::steady_clock::now()
476 + DHT_MSG_TIMEOUT);
477 info->waitForAnswer_->async_wait(
478 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
479}
480
481void
482ConnectionManager::Impl::onResponse(const asio::error_code& ec,
483 const DeviceId& deviceId,
484 const dht::Value::Id& vid)
485{
486 if (ec == asio::error::operation_aborted)
487 return;
488 auto info = getInfo(deviceId, vid);
489 if (!info)
490 return;
491
492 std::unique_lock<std::mutex> lk(info->mutex_);
493 auto& ice = info->ice_;
494 if (isDestroying_) {
495 info->onConnected_(true); // The destructor can wake a pending wait here.
496 return;
497 }
498 if (!info->responseReceived_) {
499 if (config_->logger)
500 config_->logger->error("no response from DHT to E2E request.");
501 info->onConnected_(false);
502 return;
503 }
504
505 if (!info->ice_) {
506 info->onConnected_(false);
507 return;
508 }
509
510 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
511
512 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
513 if (config_->logger)
514 config_->logger->warn("start ICE failed");
515 info->onConnected_(false);
516 return;
517 }
518 info->onConnected_(true);
519}
520
521bool
522ConnectionManager::Impl::connectDeviceOnNegoDone(
523 const DeviceId& deviceId,
524 const std::string& name,
525 const dht::Value::Id& vid,
526 const std::shared_ptr<dht::crypto::Certificate>& cert)
527{
528 auto info = getInfo(deviceId, vid);
529 if (!info)
530 return false;
531
532 std::unique_lock<std::mutex> lk {info->mutex_};
533 if (info->waitForAnswer_) {
534 // Negotiation is done and connected, go to handshake
535 // and avoid any cancellation at this point.
536 info->waitForAnswer_->cancel();
537 }
538 auto& ice = info->ice_;
539 if (!ice || !ice->isRunning()) {
540 if (config_->logger)
541 config_->logger->error("No ICE detected or not running");
542 return false;
543 }
544
545 // Build socket
546 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
547 std::move(ice)),
548 true);
549
550 // Negotiate a TLS session
551 if (config_->logger)
552 config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid);
553 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
554 certStore(),
555 identity(),
556 dhParams(),
557 *cert);
558
559 info->tls_->setOnReady(
560 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
561 bool ok) {
562 if (auto shared = w.lock())
563 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
564 });
565 return true;
566}
567
568void
569ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
570 const std::string& name,
571 ConnectCallback cb,
572 bool noNewSocket,
573 bool forceNewSocket,
574 const std::string& connType)
575{
576 if (!dht()) {
577 cb(nullptr, deviceId);
578 return;
579 }
580 if (deviceId.toString() == identity().second->getLongId().toString()) {
581 cb(nullptr, deviceId);
582 return;
583 }
584 findCertificate(deviceId,
585 [w = weak(),
586 deviceId,
587 name,
588 cb = std::move(cb),
589 noNewSocket,
590 forceNewSocket,
591 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
592 if (!cert) {
593 if (auto shared = w.lock())
594 if (shared->config_->logger)
595 shared->config_->logger->error(
596 "No valid certificate found for device {}",
597 deviceId);
598 cb(nullptr, deviceId);
599 return;
600 }
601 if (auto shared = w.lock()) {
602 shared->connectDevice(cert,
603 name,
604 std::move(cb),
605 noNewSocket,
606 forceNewSocket,
607 connType);
608 } else
609 cb(nullptr, deviceId);
610 });
611}
612
613void
614ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
615 const std::string& name,
616 ConnectCallback cb,
617 bool noNewSocket,
618 bool forceNewSocket,
619 const std::string& connType)
620{
621 // Avoid dht operation in a DHT callback to avoid deadlocks
622 dht::ThreadPool::computation().run([w = weak(),
623 name = std::move(name),
624 cert = std::move(cert),
625 cb = std::move(cb),
626 noNewSocket,
627 forceNewSocket,
628 connType] {
629 auto devicePk = cert->getSharedPublicKey();
630 auto deviceId = devicePk->getLongId();
631 auto sthis = w.lock();
632 if (!sthis || sthis->isDestroying_) {
633 cb(nullptr, deviceId);
634 return;
635 }
636 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
637 auto isConnectingToDevice = false;
638 {
639 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400640 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
641 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400642 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400643 while (pendings.connecting.find(vid) != pendings.connecting.end()
644 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400645 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400646 }
647 }
648 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400649 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400650 // Save current request for sendChannelRequest.
651 // Note: do not return here, cause we can be in a state where first
652 // socket is negotiated and first channel is pending
653 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400654 if (isConnectingToDevice && !forceNewSocket)
655 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400656 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400657 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400658 }
659
660 // Check if already negotiated
661 CallbackId cbId(deviceId, vid);
662 if (auto info = sthis->getConnectedInfo(deviceId)) {
663 std::lock_guard<std::mutex> lk(info->mutex_);
664 if (info->socket_) {
665 if (sthis->config_->logger)
666 sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId);
667 info->cbIds_.emplace(cbId);
668 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
669 return;
670 }
671 }
672
673 if (isConnectingToDevice && !forceNewSocket) {
674 if (sthis->config_->logger)
675 sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId);
676 return;
677 }
678 if (noNewSocket) {
679 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400680 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400681 return;
682 }
683
684 // Note: used when the ice negotiation fails to erase
685 // all stored structures.
686 auto eraseInfo = [w, cbId] {
687 if (auto shared = w.lock()) {
688 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400689 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400690 std::lock_guard<std::mutex> lk(shared->infosMtx_);
691 shared->infos_.erase(cbId);
692 }
693 };
694
695 // If no socket exists, we need to initiate an ICE connection.
696 sthis->getIceOptions([w,
697 deviceId = std::move(deviceId),
698 devicePk = std::move(devicePk),
699 name = std::move(name),
700 cert = std::move(cert),
701 vid,
702 connType,
703 eraseInfo](auto&& ice_config) {
704 auto sthis = w.lock();
705 if (!sthis) {
706 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
707 return;
708 }
709 ice_config.tcpEnable = true;
710 ice_config.onInitDone = [w,
711 deviceId = std::move(deviceId),
712 devicePk = std::move(devicePk),
713 name = std::move(name),
714 cert = std::move(cert),
715 vid,
716 connType,
717 eraseInfo](bool ok) {
718 dht::ThreadPool::io().run([w = std::move(w),
719 devicePk = std::move(devicePk),
720 vid = std::move(vid),
721 eraseInfo,
722 connType, ok] {
723 auto sthis = w.lock();
724 if (!ok && sthis && sthis->config_->logger)
725 sthis->config_->logger->error("Cannot initialize ICE session.");
726 if (!sthis || !ok) {
727 eraseInfo();
728 return;
729 }
730 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
731 if (!ok) {
732 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
733 }
734 });
735 });
736 };
737 ice_config.onNegoDone = [w,
738 deviceId,
739 name,
740 cert = std::move(cert),
741 vid,
742 eraseInfo](bool ok) {
743 dht::ThreadPool::io().run([w = std::move(w),
744 deviceId = std::move(deviceId),
745 name = std::move(name),
746 cert = std::move(cert),
747 vid = std::move(vid),
748 eraseInfo = std::move(eraseInfo),
749 ok] {
750 auto sthis = w.lock();
751 if (!ok && sthis && sthis->config_->logger)
752 sthis->config_->logger->error("ICE negotiation failed.");
753 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
754 eraseInfo();
755 });
756 };
757
758 auto info = std::make_shared<ConnectionInfo>();
759 {
760 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
761 sthis->infos_[{deviceId, vid}] = info;
762 }
763 std::unique_lock<std::mutex> lk {info->mutex_};
764 ice_config.master = false;
765 ice_config.streamsCount = 1;
766 ice_config.compCountPerStream = 1;
767 info->ice_ = sthis->iceFactory_.createUTransport("");
768 if (!info->ice_) {
769 if (sthis->config_->logger)
770 sthis->config_->logger->error("Cannot initialize ICE session.");
771 eraseInfo();
772 return;
773 }
774 // We need to detect any shutdown if the ice session is destroyed before going to the
775 // TLS session;
776 info->ice_->setOnShutdown([eraseInfo]() {
777 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
778 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400779 try {
780 info->ice_->initIceInstance(ice_config);
781 } catch (const std::exception& e) {
782 if (sthis->config_->logger)
783 sthis->config_->logger->error("{}", e.what());
784 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
785 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400786 });
787 });
788}
789
790void
791ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
792 const std::string& name,
793 const DeviceId& deviceId,
794 const dht::Value::Id& vid)
795{
796 auto channelSock = sock->addChannel(name);
797 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
798 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400799 if (auto shared = w.lock())
800 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400801 });
802 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400803 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400804 auto shared = w.lock();
805 auto channelSock = wSock.lock();
806 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400807 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400808 });
809
810 ChannelRequest val;
811 val.name = channelSock->name();
812 val.state = ChannelRequestState::REQUEST;
813 val.channel = channelSock->channel();
814 msgpack::sbuffer buffer(256);
815 msgpack::pack(buffer, val);
816
817 std::error_code ec;
818 int res = sock->write(CONTROL_CHANNEL,
819 reinterpret_cast<const uint8_t*>(buffer.data()),
820 buffer.size(),
821 ec);
822 if (res < 0) {
823 // TODO check if we should handle errors here
824 if (config_->logger)
825 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
826 }
827}
828
829void
830ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
831{
832 auto device = req.owner->getLongId();
833 if (config_->logger)
834 config_->logger->debug("New response received from {}", device);
835 if (auto info = getInfo(device, req.id)) {
836 std::lock_guard<std::mutex> lk {info->mutex_};
837 info->responseReceived_ = true;
838 info->response_ = std::move(req);
839 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
840 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
841 this,
842 std::placeholders::_1,
843 device,
844 req.id));
845 } else {
846 if (config_->logger)
847 config_->logger->warn("Respond received, but cannot find request");
848 }
849}
850
851void
852ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
853{
854 if (!dht())
855 return;
856 dht()->listen<PeerConnectionRequest>(
857 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
858 [w = weak()](PeerConnectionRequest&& req) {
859 auto shared = w.lock();
860 if (!shared)
861 return false;
862 if (shared->isMessageTreated(to_hex_string(req.id))) {
863 // Message already treated. Just ignore
864 return true;
865 }
866 if (req.isAnswer) {
867 if (shared->config_->logger)
868 shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId());
869 } else {
870 if (shared->config_->logger)
871 shared->config_->logger->debug("Received request from {}", req.owner->getLongId());
872 }
873 if (req.isAnswer) {
874 shared->onPeerResponse(req);
875 } else {
876 // Async certificate checking
877 shared->dht()->findCertificate(
878 req.from,
879 [w, req = std::move(req)](
880 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
881 auto shared = w.lock();
882 if (!shared)
883 return;
884 dht::InfoHash peer_h;
885 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
886#if TARGET_OS_IOS
887 if (shared->iOSConnectedCb_(req.connType, peer_h))
888 return;
889#endif
890 shared->onDhtPeerRequest(req, cert);
891 } else {
892 if (shared->config_->logger)
893 shared->config_->logger->warn(
894 "Received request from untrusted peer {}",
895 req.owner->getLongId());
896 }
897 });
898 }
899
900 return true;
901 },
902 dht::Value::UserTypeFilter("peer_request"));
903}
904
905void
906ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
907 const DeviceId& deviceId,
908 const dht::Value::Id& vid,
909 const std::string& name)
910{
911 if (isDestroying_)
912 return;
913 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
914 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
915 // asked yet)
916 auto isDhtRequest = name.empty();
917 if (!ok) {
918 if (isDhtRequest) {
919 if (config_->logger)
920 config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}",
921 deviceId,
922 name,
923 vid);
924 if (connReadyCb_)
925 connReadyCb_(deviceId, "", nullptr);
926 } else {
927 if (config_->logger)
928 config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}",
929 deviceId,
930 name,
931 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400932 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400933 }
934 } else {
935 // The socket is ready, store it
936 if (isDhtRequest) {
937 if (config_->logger)
938 config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}",
939 deviceId,
940 vid);
941 } else {
942 if (config_->logger)
943 config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}",
944 deviceId,
945 name,
946 vid);
947 }
948
949 auto info = getInfo(deviceId, vid);
950 addNewMultiplexedSocket({deviceId, vid}, info);
951 // Finally, open the channel and launch pending callbacks
952 if (info->socket_) {
953 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -0400954 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400955 if (config_->logger)
956 config_->logger->debug("Send request on TLS socket for channel {} to {}",
Adrien Béraud665294f2023-06-13 18:09:11 -0400957 name,
958 deviceId.toString());
959 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400960 }
961 }
962 }
963}
964
965void
966ConnectionManager::Impl::answerTo(IceTransport& ice,
967 const dht::Value::Id& id,
968 const std::shared_ptr<dht::crypto::PublicKey>& from)
969{
970 // NOTE: This is a shortest version of a real SDP message to save some bits
971 auto iceAttributes = ice.getLocalAttributes();
972 std::ostringstream icemsg;
973 icemsg << iceAttributes.ufrag << "\n";
974 icemsg << iceAttributes.pwd << "\n";
975 for (const auto& addr : ice.getLocalCandidates(1)) {
976 icemsg << addr << "\n";
977 }
978
979 // Send PeerConnection response
980 PeerConnectionRequest val;
981 val.id = id;
982 val.ice_msg = icemsg.str();
983 val.isAnswer = true;
984 auto value = std::make_shared<dht::Value>(std::move(val));
985 value->user_type = "peer_request";
986
987 if (config_->logger)
988 config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId());
989 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
990 + from->getId().toString()),
991 from,
992 value,
993 [from,l=config_->logger](bool ok) {
994 if (l)
995 l->debug("Answer to connection request from {:s}. Put encrypted {:s}",
996 from->getLongId(),
997 (ok ? "ok" : "failed"));
998 });
999}
1000
1001bool
1002ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1003{
1004 auto deviceId = req.owner->getLongId();
1005 auto info = getInfo(deviceId, req.id);
1006 if (!info)
1007 return false;
1008
1009 std::unique_lock<std::mutex> lk {info->mutex_};
1010 auto& ice = info->ice_;
1011 if (!ice) {
1012 if (config_->logger)
1013 config_->logger->error("No ICE detected");
1014 if (connReadyCb_)
1015 connReadyCb_(deviceId, "", nullptr);
1016 return false;
1017 }
1018
1019 auto sdp = ice->parseIceCandidates(req.ice_msg);
1020 answerTo(*ice, req.id, req.owner);
1021 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1022 if (config_->logger)
1023 config_->logger->error("Start ICE failed - fallback to TURN");
1024 ice = nullptr;
1025 if (connReadyCb_)
1026 connReadyCb_(deviceId, "", nullptr);
1027 return false;
1028 }
1029 return true;
1030}
1031
1032bool
1033ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1034{
1035 auto deviceId = req.owner->getLongId();
1036 auto info = getInfo(deviceId, req.id);
1037 if (!info)
1038 return false;
1039
1040 std::unique_lock<std::mutex> lk {info->mutex_};
1041 auto& ice = info->ice_;
1042 if (!ice) {
1043 if (config_->logger)
1044 config_->logger->error("No ICE detected");
1045 return false;
1046 }
1047
1048 // Build socket
1049 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1050 std::move(ice)),
1051 false);
1052
1053 // init TLS session
1054 auto ph = req.from;
1055 if (config_->logger)
1056 config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}",
1057 req.from,
1058 req.id);
1059 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1060 std::move(endpoint),
1061 certStore(),
1062 identity(),
1063 dhParams(),
1064 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1065 auto shared = w.lock();
1066 if (!shared)
1067 return false;
1068 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1069 if (!crt)
1070 return false;
1071 return crt->getPacked() == cert.getPacked();
1072 });
1073
1074 info->tls_->setOnReady(
1075 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1076 if (auto shared = w.lock())
1077 shared->onTlsNegotiationDone(ok, deviceId, vid);
1078 });
1079 return true;
1080}
1081
1082void
1083ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1084 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1085{
1086 auto deviceId = req.owner->getLongId();
1087 if (config_->logger)
1088 config_->logger->debug("New connection request from {}", deviceId);
1089 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1090 if (config_->logger)
1091 config_->logger->debug("Refuse connection from {}", deviceId);
1092 return;
1093 }
1094
1095 // Because the connection is accepted, create an ICE socket.
1096 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1097 auto shared = w.lock();
1098 if (!shared)
1099 return;
1100 // Note: used when the ice negotiation fails to erase
1101 // all stored structures.
1102 auto eraseInfo = [w, id = req.id, deviceId] {
1103 if (auto shared = w.lock()) {
1104 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001105 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001106 if (shared->connReadyCb_)
1107 shared->connReadyCb_(deviceId, "", nullptr);
1108 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1109 shared->infos_.erase({deviceId, id});
1110 }
1111 };
1112
1113 ice_config.tcpEnable = true;
1114 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1115 auto shared = w.lock();
1116 if (!shared)
1117 return;
1118 if (!ok) {
1119 if (shared->config_->logger)
1120 shared->config_->logger->error("Cannot initialize ICE session.");
1121 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1122 return;
1123 }
1124
1125 dht::ThreadPool::io().run(
1126 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1127 auto shared = w.lock();
1128 if (!shared)
1129 return;
1130 if (!shared->onRequestStartIce(req))
1131 eraseInfo();
1132 });
1133 };
1134
1135 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1136 auto shared = w.lock();
1137 if (!shared)
1138 return;
1139 if (!ok) {
1140 if (shared->config_->logger)
1141 shared->config_->logger->error("ICE negotiation failed.");
1142 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1143 return;
1144 }
1145
1146 dht::ThreadPool::io().run(
1147 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1148 if (auto shared = w.lock())
1149 if (!shared->onRequestOnNegoDone(req))
1150 eraseInfo();
1151 });
1152 };
1153
1154 // Negotiate a new ICE socket
1155 auto info = std::make_shared<ConnectionInfo>();
1156 {
1157 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1158 shared->infos_[{deviceId, req.id}] = info;
1159 }
1160 if (shared->config_->logger)
1161 shared->config_->logger->debug("Accepting connection from {}", deviceId);
1162 std::unique_lock<std::mutex> lk {info->mutex_};
1163 ice_config.streamsCount = 1;
1164 ice_config.compCountPerStream = 1; // TCP
1165 ice_config.master = true;
1166 info->ice_ = shared->iceFactory_.createUTransport("");
1167 if (not info->ice_) {
1168 if (shared->config_->logger)
1169 shared->config_->logger->error("Cannot initialize ICE session");
1170 eraseInfo();
1171 return;
1172 }
1173 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1174 info->ice_->setOnShutdown([eraseInfo]() {
1175 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1176 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001177 try {
1178 info->ice_->initIceInstance(ice_config);
1179 } catch (const std::exception& e) {
1180 if (shared->config_->logger)
1181 shared->config_->logger->error("{}", e.what());
1182 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1183 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001184 });
1185}
1186
1187void
1188ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1189{
1190 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1191 info->socket_->setOnReady(
1192 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1193 if (auto sthis = w.lock())
1194 if (sthis->connReadyCb_)
1195 sthis->connReadyCb_(deviceId, socket->name(), socket);
1196 });
1197 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1198 const uint16_t&,
1199 const std::string& name) {
1200 if (auto sthis = w.lock())
1201 if (sthis->channelReqCb_)
1202 return sthis->channelReqCb_(peer, name);
1203 return false;
1204 });
1205 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1206 // Cancel current outgoing connections
1207 dht::ThreadPool::io().run([w, deviceId, vid] {
1208 auto sthis = w.lock();
1209 if (!sthis)
1210 return;
1211
1212 std::set<CallbackId> ids;
1213 if (auto info = sthis->getInfo(deviceId, vid)) {
1214 std::lock_guard<std::mutex> lk(info->mutex_);
1215 if (info->socket_) {
1216 ids = std::move(info->cbIds_);
1217 info->socket_->shutdown();
1218 }
1219 }
1220 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001221 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001222
1223 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1224 sthis->infos_.erase({deviceId, vid});
1225 });
1226 });
1227}
1228
1229const std::shared_future<tls::DhParams>
1230ConnectionManager::Impl::dhParams() const
1231{
1232 return dht::ThreadPool::computation().get<tls::DhParams>(
1233 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1234 ;
1235}
1236
1237template<typename ID = dht::Value::Id>
1238std::set<ID, std::less<>>
1239loadIdList(const std::string& path)
1240{
1241 std::set<ID, std::less<>> ids;
1242 std::ifstream file = fileutils::ifstream(path);
1243 if (!file.is_open()) {
1244 //JAMI_DBG("Could not load %s", path.c_str());
1245 return ids;
1246 }
1247 std::string line;
1248 while (std::getline(file, line)) {
1249 if constexpr (std::is_same<ID, std::string>::value) {
1250 ids.emplace(std::move(line));
1251 } else if constexpr (std::is_integral<ID>::value) {
1252 ID vid;
1253 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1254 ec == std::errc()) {
1255 ids.emplace(vid);
1256 }
1257 }
1258 }
1259 return ids;
1260}
1261
1262template<typename List = std::set<dht::Value::Id>>
1263void
1264saveIdList(const std::string& path, const List& ids)
1265{
1266 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1267 if (!file.is_open()) {
1268 //JAMI_ERR("Could not save to %s", path.c_str());
1269 return;
1270 }
1271 for (auto& c : ids)
1272 file << std::hex << c << "\n";
1273}
1274
1275void
1276ConnectionManager::Impl::loadTreatedMessages()
1277{
1278 std::lock_guard<std::mutex> lock(messageMutex_);
1279 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1280 treatedMessages_ = loadIdList<std::string>(path);
1281 if (treatedMessages_.empty()) {
1282 auto messages = loadIdList(path);
1283 for (const auto& m : messages)
1284 treatedMessages_.emplace(to_hex_string(m));
1285 }
1286}
1287
1288void
1289ConnectionManager::Impl::saveTreatedMessages() const
1290{
1291 dht::ThreadPool::io().run([w = weak()]() {
1292 if (auto sthis = w.lock()) {
1293 auto& this_ = *sthis;
1294 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1295 fileutils::check_dir(this_.config_->cachePath.c_str());
1296 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1297 + DIR_SEPARATOR_STR "treatedMessages",
1298 this_.treatedMessages_);
1299 }
1300 });
1301}
1302
1303bool
1304ConnectionManager::Impl::isMessageTreated(std::string_view id)
1305{
1306 std::lock_guard<std::mutex> lock(messageMutex_);
1307 auto res = treatedMessages_.emplace(id);
1308 if (res.second) {
1309 saveTreatedMessages();
1310 return false;
1311 }
1312 return true;
1313}
1314
1315/**
1316 * returns whether or not UPnP is enabled and active_
1317 * ie: if it is able to make port mappings
1318 */
1319bool
1320ConnectionManager::Impl::getUPnPActive() const
1321{
1322 return config_->getUPnPActive();
1323}
1324
1325IpAddr
1326ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1327{
1328 if (family == AF_INET)
1329 return publishedIp_[0];
1330 if (family == AF_INET6)
1331 return publishedIp_[1];
1332
1333 assert(family == AF_UNSPEC);
1334
1335 // If family is not set, prefere IPv4 if available. It's more
1336 // likely to succeed behind NAT.
1337 if (publishedIp_[0])
1338 return publishedIp_[0];
1339 if (publishedIp_[1])
1340 return publishedIp_[1];
1341 return {};
1342}
1343
1344void
1345ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1346{
1347 if (ip_addr.getFamily() == AF_INET) {
1348 publishedIp_[0] = ip_addr;
1349 } else {
1350 publishedIp_[1] = ip_addr;
1351 }
1352}
1353
1354void
1355ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1356{
1357 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1358 bool hasIpv4 {false}, hasIpv6 {false};
1359 for (auto& result : results) {
1360 auto family = result.getFamily();
1361 if (family == AF_INET) {
1362 if (not hasIpv4) {
1363 hasIpv4 = true;
1364 if (config_->logger)
1365 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1366 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1367 setPublishedAddress(*result.get());
1368 if (config_->upnpCtrl) {
1369 config_->upnpCtrl->setPublicAddress(*result.get());
1370 }
1371 }
1372 } else if (family == AF_INET6) {
1373 if (not hasIpv6) {
1374 hasIpv6 = true;
1375 if (config_->logger)
1376 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1377 setPublishedAddress(*result.get());
1378 }
1379 }
1380 if (hasIpv4 and hasIpv6)
1381 break;
1382 }
1383 if (cb)
1384 cb();
1385 });
1386}
1387
1388void
1389ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1390{
1391 storeActiveIpAddress([this, cb = std::move(cb)] {
1392 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1393 auto publishedAddr = getPublishedIpAddress();
1394
1395 if (publishedAddr) {
1396 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1397 publishedAddr.getFamily());
1398 if (interfaceAddr) {
1399 opts.accountLocalAddr = interfaceAddr;
1400 opts.accountPublicAddr = publishedAddr;
1401 }
1402 }
1403 if (cb)
1404 cb(std::move(opts));
1405 });
1406}
1407
1408IceTransportOptions
1409ConnectionManager::Impl::getIceOptions() const noexcept
1410{
1411 IceTransportOptions opts;
1412 opts.upnpEnable = getUPnPActive();
1413
1414 if (config_->stunEnabled)
1415 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1416 if (config_->turnEnabled) {
1417 auto cached = false;
1418 std::lock_guard<std::mutex> lk(config_->cachedTurnMutex);
1419 cached = config_->cacheTurnV4 || config_->cacheTurnV6;
1420 if (config_->cacheTurnV4) {
1421 opts.turnServers.emplace_back(TurnServerInfo()
1422 .setUri(config_->cacheTurnV4.toString())
1423 .setUsername(config_->turnServerUserName)
1424 .setPassword(config_->turnServerPwd)
1425 .setRealm(config_->turnServerRealm));
1426 }
1427 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1428 // co issues. So this needs some debug. for now just disable
1429 // if (cacheTurnV6 && *cacheTurnV6) {
1430 // opts.turnServers.emplace_back(TurnServerInfo()
1431 // .setUri(cacheTurnV6->toString(true))
1432 // .setUsername(turnServerUserName_)
1433 // .setPassword(turnServerPwd_)
1434 // .setRealm(turnServerRealm_));
1435 //}
1436 // Nothing cached, so do the resolution
1437 if (!cached) {
1438 opts.turnServers.emplace_back(TurnServerInfo()
1439 .setUri(config_->turnServer)
1440 .setUsername(config_->turnServerUserName)
1441 .setPassword(config_->turnServerPwd)
1442 .setRealm(config_->turnServerRealm));
1443 }
1444 }
1445 return opts;
1446}
1447
1448bool
1449ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1450 dht::InfoHash& account_id,
1451 const std::shared_ptr<Logger>& logger)
1452{
1453 if (not crt)
1454 return false;
1455
1456 auto top_issuer = crt;
1457 while (top_issuer->issuer)
1458 top_issuer = top_issuer->issuer;
1459
1460 // Device certificate can't be self-signed
1461 if (top_issuer == crt) {
1462 if (logger)
1463 logger->warn("Found invalid peer device: {}", crt->getLongId());
1464 return false;
1465 }
1466
1467 // Check peer certificate chain
1468 // Trust store with top issuer as the only CA
1469 dht::crypto::TrustList peer_trust;
1470 peer_trust.add(*top_issuer);
1471 if (not peer_trust.verify(*crt)) {
1472 if (logger)
1473 logger->warn("Found invalid peer device: {}", crt->getLongId());
1474 return false;
1475 }
1476
1477 // Check cached OCSP response
1478 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1479 if (logger)
1480 logger->error("Certificate %s is disabled by cached OCSP response", crt->getLongId());
1481 return false;
1482 }
1483
1484 account_id = crt->issuer->getId();
1485 if (logger)
1486 logger->warn("Found peer device: {} account:{} CA:{}",
1487 crt->getLongId(),
1488 account_id,
1489 top_issuer->getId());
1490 return true;
1491}
1492
1493bool
1494ConnectionManager::Impl::findCertificate(
1495 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1496{
1497 if (auto cert = certStore().getCertificate(id.toString())) {
1498 if (cb)
1499 cb(cert);
1500 } else if (cb)
1501 cb(nullptr);
1502 return true;
1503}
1504
1505ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1506 : pimpl_ {std::make_shared<Impl>(config_)}
1507{}
1508
1509ConnectionManager::~ConnectionManager()
1510{
1511 if (pimpl_)
1512 pimpl_->shutdown();
1513}
1514
1515void
1516ConnectionManager::connectDevice(const DeviceId& deviceId,
1517 const std::string& name,
1518 ConnectCallback cb,
1519 bool noNewSocket,
1520 bool forceNewSocket,
1521 const std::string& connType)
1522{
1523 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1524}
1525
1526void
1527ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1528 const std::string& name,
1529 ConnectCallback cb,
1530 bool noNewSocket,
1531 bool forceNewSocket,
1532 const std::string& connType)
1533{
1534 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1535}
1536
1537bool
1538ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1539{
Adrien Béraud665294f2023-06-13 18:09:11 -04001540 auto pending = pimpl_->getPendingIds(deviceId);
1541 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001542 != pending.end();
1543}
1544
1545void
1546ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1547{
1548 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1549 std::set<DeviceId> peersDevices;
1550 {
1551 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1552 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1553 auto const& [key, value] = *iter;
1554 auto deviceId = key.first;
1555 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1556 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1557 connInfos.emplace_back(value);
1558 peersDevices.emplace(deviceId);
1559 iter = pimpl_->infos_.erase(iter);
1560 } else {
1561 iter++;
1562 }
1563 }
1564 }
1565 // Stop connections to all peers devices
1566 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001567 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001568 // This will close the TLS Session
1569 pimpl_->removeUnusedConnections(deviceId);
1570 }
1571 for (auto& info : connInfos) {
1572 if (info->socket_)
1573 info->socket_->shutdown();
1574 if (info->waitForAnswer_)
1575 info->waitForAnswer_->cancel();
1576 if (info->ice_) {
1577 std::unique_lock<std::mutex> lk {info->mutex_};
1578 dht::ThreadPool::io().run(
1579 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1580 }
1581 }
1582}
1583
1584void
1585ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1586{
1587 pimpl_->onDhtConnected(devicePk);
1588}
1589
1590void
1591ConnectionManager::onICERequest(onICERequestCallback&& cb)
1592{
1593 pimpl_->iceReqCb_ = std::move(cb);
1594}
1595
1596void
1597ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1598{
1599 pimpl_->channelReqCb_ = std::move(cb);
1600}
1601
1602void
1603ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1604{
1605 pimpl_->connReadyCb_ = std::move(cb);
1606}
1607
1608void
1609ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1610{
1611 pimpl_->iOSConnectedCb_ = std::move(cb);
1612}
1613
1614std::size_t
1615ConnectionManager::activeSockets() const
1616{
1617 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1618 return pimpl_->infos_.size();
1619}
1620
1621void
1622ConnectionManager::monitor() const
1623{
1624 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1625 auto logger = pimpl_->config_->logger;
1626 if (!logger)
1627 return;
1628 logger->debug("ConnectionManager current status:");
1629 for (const auto& [_, ci] : pimpl_->infos_) {
1630 if (ci->socket_)
1631 ci->socket_->monitor();
1632 }
1633 logger->debug("ConnectionManager end status.");
1634}
1635
1636void
1637ConnectionManager::connectivityChanged()
1638{
1639 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1640 for (const auto& [_, ci] : pimpl_->infos_) {
1641 if (ci->socket_)
1642 ci->socket_->sendBeacon();
1643 }
1644}
1645
1646void
1647ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1648{
1649 return pimpl_->getIceOptions(std::move(cb));
1650}
1651
1652IceTransportOptions
1653ConnectionManager::getIceOptions() const noexcept
1654{
1655 return pimpl_->getIceOptions();
1656}
1657
1658IpAddr
1659ConnectionManager::getPublishedIpAddress(uint16_t family) const
1660{
1661 return pimpl_->getPublishedIpAddress(family);
1662}
1663
1664void
1665ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1666{
1667 return pimpl_->setPublishedAddress(ip_addr);
1668}
1669
1670void
1671ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1672{
1673 return pimpl_->storeActiveIpAddress(std::move(cb));
1674}
1675
1676std::shared_ptr<ConnectionManager::Config>
1677ConnectionManager::getConfig()
1678{
1679 return pimpl_->config_;
1680}
1681
1682} // namespace jami