blob: 96bd6ba1b7e855a850fce42de3fbd4d946e2b279 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2019-2023 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18#include "connectionmanager.h"
19#include "peer_connection.h"
20#include "upnp/upnp_control.h"
21#include "certstore.h"
22#include "fileutils.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25
26#include <opendht/crypto.h>
27#include <opendht/thread_pool.h>
28#include <opendht/value.h>
29#include <asio.hpp>
30
31#include <algorithm>
32#include <mutex>
33#include <map>
34#include <condition_variable>
35#include <set>
36#include <charconv>
37
38namespace jami {
39static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
43using CallbackId = std::pair<jami::DeviceId, dht::Value::Id>;
44
45struct ConnectionInfo
46{
47 ~ConnectionInfo()
48 {
49 if (socket_)
50 socket_->join();
51 }
52
53 std::mutex mutex_ {};
54 bool responseReceived_ {false};
55 PeerConnectionRequest response_ {};
56 std::unique_ptr<IceTransport> ice_ {nullptr};
57 // Used to store currently non ready TLS Socket
58 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
59 std::shared_ptr<MultiplexedSocket> socket_ {};
60 std::set<CallbackId> cbIds_ {};
61
62 std::function<void(bool)> onConnected_;
63 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
64};
65
66/**
67 * returns whether or not UPnP is enabled and active_
68 * ie: if it is able to make port mappings
69 */
70bool
71ConnectionManager::Config::getUPnPActive() const
72{
73 if (upnpCtrl)
74 return upnpCtrl->isReady();
75 return false;
76}
77
78class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
79{
80public:
81 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
82 : config_ {std::move(config_)}
83 {}
84 ~Impl() {}
85
86 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
87 const dht::crypto::Identity& identity() const { return config_->id; }
88
89 void removeUnusedConnections(const DeviceId& deviceId = {})
90 {
91 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
92
93 {
94 std::lock_guard<std::mutex> lk(infosMtx_);
95 for (auto it = infos_.begin(); it != infos_.end();) {
96 auto& [key, info] = *it;
97 if (info && (!deviceId || key.first == deviceId)) {
98 unused.emplace_back(std::move(info));
99 it = infos_.erase(it);
100 } else {
101 ++it;
102 }
103 }
104 }
105 for (auto& info: unused) {
106 if (info->tls_)
107 info->tls_->shutdown();
108 if (info->socket_)
109 info->socket_->shutdown();
110 if (info->waitForAnswer_)
111 info->waitForAnswer_->cancel();
112 }
113 if (!unused.empty())
114 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
115 }
116
117 void shutdown()
118 {
119 if (isDestroying_.exchange(true))
120 return;
121 {
122 std::lock_guard<std::mutex> lk(connectCbsMtx_);
123 // Call all pending callbacks that channel is not ready
124 for (auto& [deviceId, pcbs] : pendingCbs_)
125 for (auto& pending : pcbs)
126 pending.cb(nullptr, deviceId);
127 pendingCbs_.clear();
128 }
129 removeUnusedConnections();
130 }
131
132 struct PendingCb
133 {
134 std::string name;
135 ConnectCallback cb;
136 dht::Value::Id vid;
137 };
138
139 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
140 const dht::Value::Id& vid,
141 const std::string& connType,
142 std::function<void(bool)> onConnected);
143 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
144 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
145 const std::string& name,
146 const dht::Value::Id& vid,
147 const std::shared_ptr<dht::crypto::Certificate>& cert);
148 void connectDevice(const DeviceId& deviceId,
149 const std::string& uri,
150 ConnectCallback cb,
151 bool noNewSocket = false,
152 bool forceNewSocket = false,
153 const std::string& connType = "");
154 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
155 const std::string& name,
156 ConnectCallback cb,
157 bool noNewSocket = false,
158 bool forceNewSocket = false,
159 const std::string& connType = "");
160 /**
161 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
162 * @param sock socket used to send the request
163 * @param name channel's name
164 * @param vid channel's id
165 * @param deviceId to identify the linked ConnectCallback
166 */
167 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
168 const std::string& name,
169 const DeviceId& deviceId,
170 const dht::Value::Id& vid);
171 /**
172 * Triggered when a PeerConnectionRequest comes from the DHT
173 */
174 void answerTo(IceTransport& ice,
175 const dht::Value::Id& id,
176 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
177 bool onRequestStartIce(const PeerConnectionRequest& req);
178 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
179 void onDhtPeerRequest(const PeerConnectionRequest& req,
180 const std::shared_ptr<dht::crypto::Certificate>& cert);
181
182 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
183 void onPeerResponse(const PeerConnectionRequest& req);
184 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
185
186 const std::shared_future<tls::DhParams> dhParams() const;
187 tls::CertificateStore& certStore() const { return *config_->certStore; }
188
189 mutable std::mutex messageMutex_ {};
190 std::set<std::string, std::less<>> treatedMessages_ {};
191
192 void loadTreatedMessages();
193 void saveTreatedMessages() const;
194
195 /// \return true if the given DHT message identifier has been treated
196 /// \note if message has not been treated yet this method st/ore this id and returns true at
197 /// further calls
198 bool isMessageTreated(std::string_view id);
199
200 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
201
202 /**
203 * Published IPv4/IPv6 addresses, used only if defined by the user in account
204 * configuration
205 *
206 */
207 IpAddr publishedIp_[2] {};
208
209 // This will be stored in the configuration
210 std::string publishedIpAddress_ {};
211
212 /**
213 * Published port, used only if defined by the user
214 */
215 pj_uint16_t publishedPort_ {sip_utils::DEFAULT_SIP_PORT};
216
217 /**
218 * interface name on which this account is bound
219 */
220 std::string interface_ {"default"};
221
222 /**
223 * Get the local interface name on which this account is bound.
224 */
225 const std::string& getLocalInterface() const { return interface_; }
226
227 /**
228 * Get the published IP address, fallbacks to NAT if family is unspecified
229 * Prefers the usage of IPv4 if possible.
230 */
231 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
232
233 /**
234 * Set published IP address according to given family
235 */
236 void setPublishedAddress(const IpAddr& ip_addr);
237
238 /**
239 * Store the local/public addresses used to register
240 */
241 void storeActiveIpAddress(std::function<void()>&& cb = {});
242
243 /**
244 * Create and return ICE options.
245 */
246 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
247 IceTransportOptions getIceOptions() const noexcept;
248
249 /**
250 * Inform that a potential peer device have been found.
251 * Returns true only if the device certificate is a valid device certificate.
252 * In that case (true is returned) the account_id parameter is set to the peer account ID.
253 */
254 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
255 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
256
257 bool findCertificate(const dht::PkId& id,
258 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
259
260 /**
261 * returns whether or not UPnP is enabled and active
262 * ie: if it is able to make port mappings
263 */
264 bool getUPnPActive() const;
265
266 /**
267 * Triggered when a new TLS socket is ready to use
268 * @param ok If succeed
269 * @param deviceId Related device
270 * @param vid vid of the connection request
271 * @param name non empty if TLS was created by connectDevice()
272 */
273 void onTlsNegotiationDone(bool ok,
274 const DeviceId& deviceId,
275 const dht::Value::Id& vid,
276 const std::string& name = "");
277
278 std::shared_ptr<ConnectionManager::Config> config_;
279
280 IceTransportFactory iceFactory_ {};
281
282 mutable std::mt19937_64 rand;
283
284 iOSConnectedCallback iOSConnectedCb_ {};
285
286 std::mutex infosMtx_ {};
287 // Note: Someone can ask multiple sockets, so to avoid any race condition,
288 // each device can have multiple multiplexed sockets.
289 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
290
291 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
292 {
293 std::lock_guard<std::mutex> lk(infosMtx_);
294 auto it = infos_.find({deviceId, id});
295 if (it != infos_.end())
296 return it->second;
297 return {};
298 }
299
300 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
301 {
302 std::lock_guard<std::mutex> lk(infosMtx_);
303 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
304 auto& [key, value] = item;
305 return key.first == deviceId && value && value->socket_;
306 });
307 if (it != infos_.end())
308 return it->second;
309 return {};
310 }
311
312 ChannelRequestCallback channelReqCb_ {};
313 ConnectionReadyCallback connReadyCb_ {};
314 onICERequestCallback iceReqCb_ {};
315
316 /**
317 * Stores callback from connectDevice
318 * @note: each device needs a vector because several connectDevice can
319 * be done in parallel and we only want one socket
320 */
321 std::mutex connectCbsMtx_ {};
322 std::map<DeviceId, std::vector<PendingCb>> pendingCbs_ {};
323
324 std::vector<PendingCb> extractPendingCallbacks(const DeviceId& deviceId,
325 const dht::Value::Id vid = 0)
326 {
327 std::vector<PendingCb> ret;
328 std::lock_guard<std::mutex> lk(connectCbsMtx_);
329 auto pendingIt = pendingCbs_.find(deviceId);
330 if (pendingIt == pendingCbs_.end())
331 return ret;
332 auto& pendings = pendingIt->second;
333 if (vid == 0) {
334 ret = std::move(pendings);
335 } else {
336 for (auto it = pendings.begin(); it != pendings.end(); ++it) {
337 if (it->vid == vid) {
338 ret.emplace_back(std::move(*it));
339 pendings.erase(it);
340 break;
341 }
342 }
343 }
344 if (pendings.empty())
345 pendingCbs_.erase(pendingIt);
346 return ret;
347 }
348
349 std::vector<PendingCb> getPendingCallbacks(const DeviceId& deviceId,
350 const dht::Value::Id vid = 0)
351 {
352 std::vector<PendingCb> ret;
353 std::lock_guard<std::mutex> lk(connectCbsMtx_);
354 auto pendingIt = pendingCbs_.find(deviceId);
355 if (pendingIt == pendingCbs_.end())
356 return ret;
357 auto& pendings = pendingIt->second;
358 if (vid == 0) {
359 ret = pendings;
360 } else {
361 std::copy_if(pendings.begin(),
362 pendings.end(),
363 std::back_inserter(ret),
364 [&](auto pending) { return pending.vid == vid; });
365 }
366 return ret;
367 }
368
369 std::shared_ptr<ConnectionManager::Impl> shared()
370 {
371 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
372 }
373 std::shared_ptr<ConnectionManager::Impl const> shared() const
374 {
375 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
376 }
377 std::weak_ptr<ConnectionManager::Impl> weak()
378 {
379 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
380 }
381 std::weak_ptr<ConnectionManager::Impl const> weak() const
382 {
383 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
384 }
385
386 std::atomic_bool isDestroying_ {false};
387};
388
389void
390ConnectionManager::Impl::connectDeviceStartIce(
391 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
392 const dht::Value::Id& vid,
393 const std::string& connType,
394 std::function<void(bool)> onConnected)
395{
396 auto deviceId = devicePk->getLongId();
397 auto info = getInfo(deviceId, vid);
398 if (!info) {
399 onConnected(false);
400 return;
401 }
402
403 std::unique_lock<std::mutex> lk(info->mutex_);
404 auto& ice = info->ice_;
405
406 if (!ice) {
407 if (config_->logger)
408 config_->logger->error("No ICE detected");
409 onConnected(false);
410 return;
411 }
412
413 auto iceAttributes = ice->getLocalAttributes();
414 std::ostringstream icemsg;
415 icemsg << iceAttributes.ufrag << "\n";
416 icemsg << iceAttributes.pwd << "\n";
417 for (const auto& addr : ice->getLocalCandidates(1)) {
418 icemsg << addr << "\n";
419 if (config_->logger)
420 config_->logger->debug("Added local ICE candidate {}", addr);
421 }
422
423 // Prepare connection request as a DHT message
424 PeerConnectionRequest val;
425
426 val.id = vid; /* Random id for the message unicity */
427 val.ice_msg = icemsg.str();
428 val.connType = connType;
429
430 auto value = std::make_shared<dht::Value>(std::move(val));
431 value->user_type = "peer_request";
432
433 // Send connection request through DHT
434 if (config_->logger)
435 config_->logger->debug("Request connection to {}", deviceId);
436 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
437 + devicePk->getId().toString()),
438 devicePk,
439 value,
440 [l=config_->logger,deviceId](bool ok) {
441 if (l)
442 l->debug("Sent connection request to {:s}. Put encrypted {:s}",
443 deviceId,
444 (ok ? "ok" : "failed"));
445 });
446 // Wait for call to onResponse() operated by DHT
447 if (isDestroying_) {
448 onConnected(true); // This avoid to wait new negotiation when destroying
449 return;
450 }
451
452 info->onConnected_ = std::move(onConnected);
453 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
454 std::chrono::steady_clock::now()
455 + DHT_MSG_TIMEOUT);
456 info->waitForAnswer_->async_wait(
457 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
458}
459
460void
461ConnectionManager::Impl::onResponse(const asio::error_code& ec,
462 const DeviceId& deviceId,
463 const dht::Value::Id& vid)
464{
465 if (ec == asio::error::operation_aborted)
466 return;
467 auto info = getInfo(deviceId, vid);
468 if (!info)
469 return;
470
471 std::unique_lock<std::mutex> lk(info->mutex_);
472 auto& ice = info->ice_;
473 if (isDestroying_) {
474 info->onConnected_(true); // The destructor can wake a pending wait here.
475 return;
476 }
477 if (!info->responseReceived_) {
478 if (config_->logger)
479 config_->logger->error("no response from DHT to E2E request.");
480 info->onConnected_(false);
481 return;
482 }
483
484 if (!info->ice_) {
485 info->onConnected_(false);
486 return;
487 }
488
489 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
490
491 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
492 if (config_->logger)
493 config_->logger->warn("start ICE failed");
494 info->onConnected_(false);
495 return;
496 }
497 info->onConnected_(true);
498}
499
500bool
501ConnectionManager::Impl::connectDeviceOnNegoDone(
502 const DeviceId& deviceId,
503 const std::string& name,
504 const dht::Value::Id& vid,
505 const std::shared_ptr<dht::crypto::Certificate>& cert)
506{
507 auto info = getInfo(deviceId, vid);
508 if (!info)
509 return false;
510
511 std::unique_lock<std::mutex> lk {info->mutex_};
512 if (info->waitForAnswer_) {
513 // Negotiation is done and connected, go to handshake
514 // and avoid any cancellation at this point.
515 info->waitForAnswer_->cancel();
516 }
517 auto& ice = info->ice_;
518 if (!ice || !ice->isRunning()) {
519 if (config_->logger)
520 config_->logger->error("No ICE detected or not running");
521 return false;
522 }
523
524 // Build socket
525 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
526 std::move(ice)),
527 true);
528
529 // Negotiate a TLS session
530 if (config_->logger)
531 config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid);
532 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
533 certStore(),
534 identity(),
535 dhParams(),
536 *cert);
537
538 info->tls_->setOnReady(
539 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
540 bool ok) {
541 if (auto shared = w.lock())
542 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
543 });
544 return true;
545}
546
547void
548ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
549 const std::string& name,
550 ConnectCallback cb,
551 bool noNewSocket,
552 bool forceNewSocket,
553 const std::string& connType)
554{
555 if (!dht()) {
556 cb(nullptr, deviceId);
557 return;
558 }
559 if (deviceId.toString() == identity().second->getLongId().toString()) {
560 cb(nullptr, deviceId);
561 return;
562 }
563 findCertificate(deviceId,
564 [w = weak(),
565 deviceId,
566 name,
567 cb = std::move(cb),
568 noNewSocket,
569 forceNewSocket,
570 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
571 if (!cert) {
572 if (auto shared = w.lock())
573 if (shared->config_->logger)
574 shared->config_->logger->error(
575 "No valid certificate found for device {}",
576 deviceId);
577 cb(nullptr, deviceId);
578 return;
579 }
580 if (auto shared = w.lock()) {
581 shared->connectDevice(cert,
582 name,
583 std::move(cb),
584 noNewSocket,
585 forceNewSocket,
586 connType);
587 } else
588 cb(nullptr, deviceId);
589 });
590}
591
592void
593ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
594 const std::string& name,
595 ConnectCallback cb,
596 bool noNewSocket,
597 bool forceNewSocket,
598 const std::string& connType)
599{
600 // Avoid dht operation in a DHT callback to avoid deadlocks
601 dht::ThreadPool::computation().run([w = weak(),
602 name = std::move(name),
603 cert = std::move(cert),
604 cb = std::move(cb),
605 noNewSocket,
606 forceNewSocket,
607 connType] {
608 auto devicePk = cert->getSharedPublicKey();
609 auto deviceId = devicePk->getLongId();
610 auto sthis = w.lock();
611 if (!sthis || sthis->isDestroying_) {
612 cb(nullptr, deviceId);
613 return;
614 }
615 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
616 auto isConnectingToDevice = false;
617 {
618 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
619 auto pendingsIt = sthis->pendingCbs_.find(deviceId);
620 if (pendingsIt != sthis->pendingCbs_.end()) {
621 const auto& pendings = pendingsIt->second;
622 while (std::find_if(pendings.begin(), pendings.end(), [&](const auto& it){ return it.vid == vid; }) != pendings.end()) {
623 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
624 }
625 }
626 // Check if already connecting
627 isConnectingToDevice = pendingsIt != sthis->pendingCbs_.end();
628 // Save current request for sendChannelRequest.
629 // Note: do not return here, cause we can be in a state where first
630 // socket is negotiated and first channel is pending
631 // so return only after we checked the info
632 if (isConnectingToDevice)
633 pendingsIt->second.emplace_back(PendingCb {name, std::move(cb), vid});
634 else
635 sthis->pendingCbs_[deviceId] = {{name, std::move(cb), vid}};
636 }
637
638 // Check if already negotiated
639 CallbackId cbId(deviceId, vid);
640 if (auto info = sthis->getConnectedInfo(deviceId)) {
641 std::lock_guard<std::mutex> lk(info->mutex_);
642 if (info->socket_) {
643 if (sthis->config_->logger)
644 sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId);
645 info->cbIds_.emplace(cbId);
646 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
647 return;
648 }
649 }
650
651 if (isConnectingToDevice && !forceNewSocket) {
652 if (sthis->config_->logger)
653 sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId);
654 return;
655 }
656 if (noNewSocket) {
657 // If no new socket is specified, we don't try to generate a new socket
658 for (const auto& pending : sthis->extractPendingCallbacks(deviceId, vid))
659 pending.cb(nullptr, deviceId);
660 return;
661 }
662
663 // Note: used when the ice negotiation fails to erase
664 // all stored structures.
665 auto eraseInfo = [w, cbId] {
666 if (auto shared = w.lock()) {
667 // If no new socket is specified, we don't try to generate a new socket
668 for (const auto& pending : shared->extractPendingCallbacks(cbId.first, cbId.second))
669 pending.cb(nullptr, cbId.first);
670 std::lock_guard<std::mutex> lk(shared->infosMtx_);
671 shared->infos_.erase(cbId);
672 }
673 };
674
675 // If no socket exists, we need to initiate an ICE connection.
676 sthis->getIceOptions([w,
677 deviceId = std::move(deviceId),
678 devicePk = std::move(devicePk),
679 name = std::move(name),
680 cert = std::move(cert),
681 vid,
682 connType,
683 eraseInfo](auto&& ice_config) {
684 auto sthis = w.lock();
685 if (!sthis) {
686 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
687 return;
688 }
689 ice_config.tcpEnable = true;
690 ice_config.onInitDone = [w,
691 deviceId = std::move(deviceId),
692 devicePk = std::move(devicePk),
693 name = std::move(name),
694 cert = std::move(cert),
695 vid,
696 connType,
697 eraseInfo](bool ok) {
698 dht::ThreadPool::io().run([w = std::move(w),
699 devicePk = std::move(devicePk),
700 vid = std::move(vid),
701 eraseInfo,
702 connType, ok] {
703 auto sthis = w.lock();
704 if (!ok && sthis && sthis->config_->logger)
705 sthis->config_->logger->error("Cannot initialize ICE session.");
706 if (!sthis || !ok) {
707 eraseInfo();
708 return;
709 }
710 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
711 if (!ok) {
712 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
713 }
714 });
715 });
716 };
717 ice_config.onNegoDone = [w,
718 deviceId,
719 name,
720 cert = std::move(cert),
721 vid,
722 eraseInfo](bool ok) {
723 dht::ThreadPool::io().run([w = std::move(w),
724 deviceId = std::move(deviceId),
725 name = std::move(name),
726 cert = std::move(cert),
727 vid = std::move(vid),
728 eraseInfo = std::move(eraseInfo),
729 ok] {
730 auto sthis = w.lock();
731 if (!ok && sthis && sthis->config_->logger)
732 sthis->config_->logger->error("ICE negotiation failed.");
733 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
734 eraseInfo();
735 });
736 };
737
738 auto info = std::make_shared<ConnectionInfo>();
739 {
740 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
741 sthis->infos_[{deviceId, vid}] = info;
742 }
743 std::unique_lock<std::mutex> lk {info->mutex_};
744 ice_config.master = false;
745 ice_config.streamsCount = 1;
746 ice_config.compCountPerStream = 1;
747 info->ice_ = sthis->iceFactory_.createUTransport("");
748 if (!info->ice_) {
749 if (sthis->config_->logger)
750 sthis->config_->logger->error("Cannot initialize ICE session.");
751 eraseInfo();
752 return;
753 }
754 // We need to detect any shutdown if the ice session is destroyed before going to the
755 // TLS session;
756 info->ice_->setOnShutdown([eraseInfo]() {
757 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
758 });
759 info->ice_->initIceInstance(ice_config);
760 });
761 });
762}
763
764void
765ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
766 const std::string& name,
767 const DeviceId& deviceId,
768 const dht::Value::Id& vid)
769{
770 auto channelSock = sock->addChannel(name);
771 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
772 auto shared = w.lock();
773 if (shared)
774 for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
775 pending.cb(nullptr, deviceId);
776 });
777 channelSock->onReady(
778 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()]() {
779 auto shared = w.lock();
780 auto channelSock = wSock.lock();
781 if (shared)
782 for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
783 pending.cb(channelSock, deviceId);
784 });
785
786 ChannelRequest val;
787 val.name = channelSock->name();
788 val.state = ChannelRequestState::REQUEST;
789 val.channel = channelSock->channel();
790 msgpack::sbuffer buffer(256);
791 msgpack::pack(buffer, val);
792
793 std::error_code ec;
794 int res = sock->write(CONTROL_CHANNEL,
795 reinterpret_cast<const uint8_t*>(buffer.data()),
796 buffer.size(),
797 ec);
798 if (res < 0) {
799 // TODO check if we should handle errors here
800 if (config_->logger)
801 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
802 }
803}
804
805void
806ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
807{
808 auto device = req.owner->getLongId();
809 if (config_->logger)
810 config_->logger->debug("New response received from {}", device);
811 if (auto info = getInfo(device, req.id)) {
812 std::lock_guard<std::mutex> lk {info->mutex_};
813 info->responseReceived_ = true;
814 info->response_ = std::move(req);
815 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
816 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
817 this,
818 std::placeholders::_1,
819 device,
820 req.id));
821 } else {
822 if (config_->logger)
823 config_->logger->warn("Respond received, but cannot find request");
824 }
825}
826
827void
828ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
829{
830 if (!dht())
831 return;
832 dht()->listen<PeerConnectionRequest>(
833 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
834 [w = weak()](PeerConnectionRequest&& req) {
835 auto shared = w.lock();
836 if (!shared)
837 return false;
838 if (shared->isMessageTreated(to_hex_string(req.id))) {
839 // Message already treated. Just ignore
840 return true;
841 }
842 if (req.isAnswer) {
843 if (shared->config_->logger)
844 shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId());
845 } else {
846 if (shared->config_->logger)
847 shared->config_->logger->debug("Received request from {}", req.owner->getLongId());
848 }
849 if (req.isAnswer) {
850 shared->onPeerResponse(req);
851 } else {
852 // Async certificate checking
853 shared->dht()->findCertificate(
854 req.from,
855 [w, req = std::move(req)](
856 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
857 auto shared = w.lock();
858 if (!shared)
859 return;
860 dht::InfoHash peer_h;
861 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
862#if TARGET_OS_IOS
863 if (shared->iOSConnectedCb_(req.connType, peer_h))
864 return;
865#endif
866 shared->onDhtPeerRequest(req, cert);
867 } else {
868 if (shared->config_->logger)
869 shared->config_->logger->warn(
870 "Received request from untrusted peer {}",
871 req.owner->getLongId());
872 }
873 });
874 }
875
876 return true;
877 },
878 dht::Value::UserTypeFilter("peer_request"));
879}
880
881void
882ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
883 const DeviceId& deviceId,
884 const dht::Value::Id& vid,
885 const std::string& name)
886{
887 if (isDestroying_)
888 return;
889 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
890 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
891 // asked yet)
892 auto isDhtRequest = name.empty();
893 if (!ok) {
894 if (isDhtRequest) {
895 if (config_->logger)
896 config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}",
897 deviceId,
898 name,
899 vid);
900 if (connReadyCb_)
901 connReadyCb_(deviceId, "", nullptr);
902 } else {
903 if (config_->logger)
904 config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}",
905 deviceId,
906 name,
907 vid);
908 for (const auto& pending : extractPendingCallbacks(deviceId))
909 pending.cb(nullptr, deviceId);
910 }
911 } else {
912 // The socket is ready, store it
913 if (isDhtRequest) {
914 if (config_->logger)
915 config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}",
916 deviceId,
917 vid);
918 } else {
919 if (config_->logger)
920 config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}",
921 deviceId,
922 name,
923 vid);
924 }
925
926 auto info = getInfo(deviceId, vid);
927 addNewMultiplexedSocket({deviceId, vid}, info);
928 // Finally, open the channel and launch pending callbacks
929 if (info->socket_) {
930 // Note: do not remove pending there it's done in sendChannelRequest
931 for (const auto& pending : getPendingCallbacks(deviceId)) {
932 if (config_->logger)
933 config_->logger->debug("Send request on TLS socket for channel {} to {}",
934 pending.name,
935 deviceId);
936 sendChannelRequest(info->socket_, pending.name, deviceId, pending.vid);
937 }
938 }
939 }
940}
941
942void
943ConnectionManager::Impl::answerTo(IceTransport& ice,
944 const dht::Value::Id& id,
945 const std::shared_ptr<dht::crypto::PublicKey>& from)
946{
947 // NOTE: This is a shortest version of a real SDP message to save some bits
948 auto iceAttributes = ice.getLocalAttributes();
949 std::ostringstream icemsg;
950 icemsg << iceAttributes.ufrag << "\n";
951 icemsg << iceAttributes.pwd << "\n";
952 for (const auto& addr : ice.getLocalCandidates(1)) {
953 icemsg << addr << "\n";
954 }
955
956 // Send PeerConnection response
957 PeerConnectionRequest val;
958 val.id = id;
959 val.ice_msg = icemsg.str();
960 val.isAnswer = true;
961 auto value = std::make_shared<dht::Value>(std::move(val));
962 value->user_type = "peer_request";
963
964 if (config_->logger)
965 config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId());
966 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
967 + from->getId().toString()),
968 from,
969 value,
970 [from,l=config_->logger](bool ok) {
971 if (l)
972 l->debug("Answer to connection request from {:s}. Put encrypted {:s}",
973 from->getLongId(),
974 (ok ? "ok" : "failed"));
975 });
976}
977
978bool
979ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
980{
981 auto deviceId = req.owner->getLongId();
982 auto info = getInfo(deviceId, req.id);
983 if (!info)
984 return false;
985
986 std::unique_lock<std::mutex> lk {info->mutex_};
987 auto& ice = info->ice_;
988 if (!ice) {
989 if (config_->logger)
990 config_->logger->error("No ICE detected");
991 if (connReadyCb_)
992 connReadyCb_(deviceId, "", nullptr);
993 return false;
994 }
995
996 auto sdp = ice->parseIceCandidates(req.ice_msg);
997 answerTo(*ice, req.id, req.owner);
998 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
999 if (config_->logger)
1000 config_->logger->error("Start ICE failed - fallback to TURN");
1001 ice = nullptr;
1002 if (connReadyCb_)
1003 connReadyCb_(deviceId, "", nullptr);
1004 return false;
1005 }
1006 return true;
1007}
1008
1009bool
1010ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1011{
1012 auto deviceId = req.owner->getLongId();
1013 auto info = getInfo(deviceId, req.id);
1014 if (!info)
1015 return false;
1016
1017 std::unique_lock<std::mutex> lk {info->mutex_};
1018 auto& ice = info->ice_;
1019 if (!ice) {
1020 if (config_->logger)
1021 config_->logger->error("No ICE detected");
1022 return false;
1023 }
1024
1025 // Build socket
1026 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1027 std::move(ice)),
1028 false);
1029
1030 // init TLS session
1031 auto ph = req.from;
1032 if (config_->logger)
1033 config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}",
1034 req.from,
1035 req.id);
1036 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1037 std::move(endpoint),
1038 certStore(),
1039 identity(),
1040 dhParams(),
1041 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1042 auto shared = w.lock();
1043 if (!shared)
1044 return false;
1045 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1046 if (!crt)
1047 return false;
1048 return crt->getPacked() == cert.getPacked();
1049 });
1050
1051 info->tls_->setOnReady(
1052 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1053 if (auto shared = w.lock())
1054 shared->onTlsNegotiationDone(ok, deviceId, vid);
1055 });
1056 return true;
1057}
1058
1059void
1060ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1061 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1062{
1063 auto deviceId = req.owner->getLongId();
1064 if (config_->logger)
1065 config_->logger->debug("New connection request from {}", deviceId);
1066 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1067 if (config_->logger)
1068 config_->logger->debug("Refuse connection from {}", deviceId);
1069 return;
1070 }
1071
1072 // Because the connection is accepted, create an ICE socket.
1073 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1074 auto shared = w.lock();
1075 if (!shared)
1076 return;
1077 // Note: used when the ice negotiation fails to erase
1078 // all stored structures.
1079 auto eraseInfo = [w, id = req.id, deviceId] {
1080 if (auto shared = w.lock()) {
1081 // If no new socket is specified, we don't try to generate a new socket
1082 for (const auto& pending : shared->extractPendingCallbacks(deviceId, id))
1083 pending.cb(nullptr, deviceId);
1084 if (shared->connReadyCb_)
1085 shared->connReadyCb_(deviceId, "", nullptr);
1086 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1087 shared->infos_.erase({deviceId, id});
1088 }
1089 };
1090
1091 ice_config.tcpEnable = true;
1092 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1093 auto shared = w.lock();
1094 if (!shared)
1095 return;
1096 if (!ok) {
1097 if (shared->config_->logger)
1098 shared->config_->logger->error("Cannot initialize ICE session.");
1099 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1100 return;
1101 }
1102
1103 dht::ThreadPool::io().run(
1104 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1105 auto shared = w.lock();
1106 if (!shared)
1107 return;
1108 if (!shared->onRequestStartIce(req))
1109 eraseInfo();
1110 });
1111 };
1112
1113 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1114 auto shared = w.lock();
1115 if (!shared)
1116 return;
1117 if (!ok) {
1118 if (shared->config_->logger)
1119 shared->config_->logger->error("ICE negotiation failed.");
1120 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1121 return;
1122 }
1123
1124 dht::ThreadPool::io().run(
1125 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1126 if (auto shared = w.lock())
1127 if (!shared->onRequestOnNegoDone(req))
1128 eraseInfo();
1129 });
1130 };
1131
1132 // Negotiate a new ICE socket
1133 auto info = std::make_shared<ConnectionInfo>();
1134 {
1135 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1136 shared->infos_[{deviceId, req.id}] = info;
1137 }
1138 if (shared->config_->logger)
1139 shared->config_->logger->debug("Accepting connection from {}", deviceId);
1140 std::unique_lock<std::mutex> lk {info->mutex_};
1141 ice_config.streamsCount = 1;
1142 ice_config.compCountPerStream = 1; // TCP
1143 ice_config.master = true;
1144 info->ice_ = shared->iceFactory_.createUTransport("");
1145 if (not info->ice_) {
1146 if (shared->config_->logger)
1147 shared->config_->logger->error("Cannot initialize ICE session");
1148 eraseInfo();
1149 return;
1150 }
1151 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1152 info->ice_->setOnShutdown([eraseInfo]() {
1153 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1154 });
1155 info->ice_->initIceInstance(ice_config);
1156 });
1157}
1158
1159void
1160ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1161{
1162 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1163 info->socket_->setOnReady(
1164 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1165 if (auto sthis = w.lock())
1166 if (sthis->connReadyCb_)
1167 sthis->connReadyCb_(deviceId, socket->name(), socket);
1168 });
1169 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1170 const uint16_t&,
1171 const std::string& name) {
1172 if (auto sthis = w.lock())
1173 if (sthis->channelReqCb_)
1174 return sthis->channelReqCb_(peer, name);
1175 return false;
1176 });
1177 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1178 // Cancel current outgoing connections
1179 dht::ThreadPool::io().run([w, deviceId, vid] {
1180 auto sthis = w.lock();
1181 if (!sthis)
1182 return;
1183
1184 std::set<CallbackId> ids;
1185 if (auto info = sthis->getInfo(deviceId, vid)) {
1186 std::lock_guard<std::mutex> lk(info->mutex_);
1187 if (info->socket_) {
1188 ids = std::move(info->cbIds_);
1189 info->socket_->shutdown();
1190 }
1191 }
1192 for (const auto& cbId : ids)
1193 for (const auto& pending : sthis->extractPendingCallbacks(cbId.first, cbId.second))
1194 pending.cb(nullptr, deviceId);
1195
1196 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1197 sthis->infos_.erase({deviceId, vid});
1198 });
1199 });
1200}
1201
1202const std::shared_future<tls::DhParams>
1203ConnectionManager::Impl::dhParams() const
1204{
1205 return dht::ThreadPool::computation().get<tls::DhParams>(
1206 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1207 ;
1208}
1209
1210template<typename ID = dht::Value::Id>
1211std::set<ID, std::less<>>
1212loadIdList(const std::string& path)
1213{
1214 std::set<ID, std::less<>> ids;
1215 std::ifstream file = fileutils::ifstream(path);
1216 if (!file.is_open()) {
1217 //JAMI_DBG("Could not load %s", path.c_str());
1218 return ids;
1219 }
1220 std::string line;
1221 while (std::getline(file, line)) {
1222 if constexpr (std::is_same<ID, std::string>::value) {
1223 ids.emplace(std::move(line));
1224 } else if constexpr (std::is_integral<ID>::value) {
1225 ID vid;
1226 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1227 ec == std::errc()) {
1228 ids.emplace(vid);
1229 }
1230 }
1231 }
1232 return ids;
1233}
1234
1235template<typename List = std::set<dht::Value::Id>>
1236void
1237saveIdList(const std::string& path, const List& ids)
1238{
1239 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1240 if (!file.is_open()) {
1241 //JAMI_ERR("Could not save to %s", path.c_str());
1242 return;
1243 }
1244 for (auto& c : ids)
1245 file << std::hex << c << "\n";
1246}
1247
1248void
1249ConnectionManager::Impl::loadTreatedMessages()
1250{
1251 std::lock_guard<std::mutex> lock(messageMutex_);
1252 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1253 treatedMessages_ = loadIdList<std::string>(path);
1254 if (treatedMessages_.empty()) {
1255 auto messages = loadIdList(path);
1256 for (const auto& m : messages)
1257 treatedMessages_.emplace(to_hex_string(m));
1258 }
1259}
1260
1261void
1262ConnectionManager::Impl::saveTreatedMessages() const
1263{
1264 dht::ThreadPool::io().run([w = weak()]() {
1265 if (auto sthis = w.lock()) {
1266 auto& this_ = *sthis;
1267 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1268 fileutils::check_dir(this_.config_->cachePath.c_str());
1269 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1270 + DIR_SEPARATOR_STR "treatedMessages",
1271 this_.treatedMessages_);
1272 }
1273 });
1274}
1275
1276bool
1277ConnectionManager::Impl::isMessageTreated(std::string_view id)
1278{
1279 std::lock_guard<std::mutex> lock(messageMutex_);
1280 auto res = treatedMessages_.emplace(id);
1281 if (res.second) {
1282 saveTreatedMessages();
1283 return false;
1284 }
1285 return true;
1286}
1287
1288/**
1289 * returns whether or not UPnP is enabled and active_
1290 * ie: if it is able to make port mappings
1291 */
1292bool
1293ConnectionManager::Impl::getUPnPActive() const
1294{
1295 return config_->getUPnPActive();
1296}
1297
1298IpAddr
1299ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1300{
1301 if (family == AF_INET)
1302 return publishedIp_[0];
1303 if (family == AF_INET6)
1304 return publishedIp_[1];
1305
1306 assert(family == AF_UNSPEC);
1307
1308 // If family is not set, prefere IPv4 if available. It's more
1309 // likely to succeed behind NAT.
1310 if (publishedIp_[0])
1311 return publishedIp_[0];
1312 if (publishedIp_[1])
1313 return publishedIp_[1];
1314 return {};
1315}
1316
1317void
1318ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1319{
1320 if (ip_addr.getFamily() == AF_INET) {
1321 publishedIp_[0] = ip_addr;
1322 } else {
1323 publishedIp_[1] = ip_addr;
1324 }
1325}
1326
1327void
1328ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1329{
1330 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1331 bool hasIpv4 {false}, hasIpv6 {false};
1332 for (auto& result : results) {
1333 auto family = result.getFamily();
1334 if (family == AF_INET) {
1335 if (not hasIpv4) {
1336 hasIpv4 = true;
1337 if (config_->logger)
1338 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1339 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1340 setPublishedAddress(*result.get());
1341 if (config_->upnpCtrl) {
1342 config_->upnpCtrl->setPublicAddress(*result.get());
1343 }
1344 }
1345 } else if (family == AF_INET6) {
1346 if (not hasIpv6) {
1347 hasIpv6 = true;
1348 if (config_->logger)
1349 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1350 setPublishedAddress(*result.get());
1351 }
1352 }
1353 if (hasIpv4 and hasIpv6)
1354 break;
1355 }
1356 if (cb)
1357 cb();
1358 });
1359}
1360
1361void
1362ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1363{
1364 storeActiveIpAddress([this, cb = std::move(cb)] {
1365 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1366 auto publishedAddr = getPublishedIpAddress();
1367
1368 if (publishedAddr) {
1369 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1370 publishedAddr.getFamily());
1371 if (interfaceAddr) {
1372 opts.accountLocalAddr = interfaceAddr;
1373 opts.accountPublicAddr = publishedAddr;
1374 }
1375 }
1376 if (cb)
1377 cb(std::move(opts));
1378 });
1379}
1380
1381IceTransportOptions
1382ConnectionManager::Impl::getIceOptions() const noexcept
1383{
1384 IceTransportOptions opts;
1385 opts.upnpEnable = getUPnPActive();
1386
1387 if (config_->stunEnabled)
1388 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1389 if (config_->turnEnabled) {
1390 auto cached = false;
1391 std::lock_guard<std::mutex> lk(config_->cachedTurnMutex);
1392 cached = config_->cacheTurnV4 || config_->cacheTurnV6;
1393 if (config_->cacheTurnV4) {
1394 opts.turnServers.emplace_back(TurnServerInfo()
1395 .setUri(config_->cacheTurnV4.toString())
1396 .setUsername(config_->turnServerUserName)
1397 .setPassword(config_->turnServerPwd)
1398 .setRealm(config_->turnServerRealm));
1399 }
1400 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1401 // co issues. So this needs some debug. for now just disable
1402 // if (cacheTurnV6 && *cacheTurnV6) {
1403 // opts.turnServers.emplace_back(TurnServerInfo()
1404 // .setUri(cacheTurnV6->toString(true))
1405 // .setUsername(turnServerUserName_)
1406 // .setPassword(turnServerPwd_)
1407 // .setRealm(turnServerRealm_));
1408 //}
1409 // Nothing cached, so do the resolution
1410 if (!cached) {
1411 opts.turnServers.emplace_back(TurnServerInfo()
1412 .setUri(config_->turnServer)
1413 .setUsername(config_->turnServerUserName)
1414 .setPassword(config_->turnServerPwd)
1415 .setRealm(config_->turnServerRealm));
1416 }
1417 }
1418 return opts;
1419}
1420
1421bool
1422ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1423 dht::InfoHash& account_id,
1424 const std::shared_ptr<Logger>& logger)
1425{
1426 if (not crt)
1427 return false;
1428
1429 auto top_issuer = crt;
1430 while (top_issuer->issuer)
1431 top_issuer = top_issuer->issuer;
1432
1433 // Device certificate can't be self-signed
1434 if (top_issuer == crt) {
1435 if (logger)
1436 logger->warn("Found invalid peer device: {}", crt->getLongId());
1437 return false;
1438 }
1439
1440 // Check peer certificate chain
1441 // Trust store with top issuer as the only CA
1442 dht::crypto::TrustList peer_trust;
1443 peer_trust.add(*top_issuer);
1444 if (not peer_trust.verify(*crt)) {
1445 if (logger)
1446 logger->warn("Found invalid peer device: {}", crt->getLongId());
1447 return false;
1448 }
1449
1450 // Check cached OCSP response
1451 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1452 if (logger)
1453 logger->error("Certificate %s is disabled by cached OCSP response", crt->getLongId());
1454 return false;
1455 }
1456
1457 account_id = crt->issuer->getId();
1458 if (logger)
1459 logger->warn("Found peer device: {} account:{} CA:{}",
1460 crt->getLongId(),
1461 account_id,
1462 top_issuer->getId());
1463 return true;
1464}
1465
1466bool
1467ConnectionManager::Impl::findCertificate(
1468 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1469{
1470 if (auto cert = certStore().getCertificate(id.toString())) {
1471 if (cb)
1472 cb(cert);
1473 } else if (cb)
1474 cb(nullptr);
1475 return true;
1476}
1477
1478ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1479 : pimpl_ {std::make_shared<Impl>(config_)}
1480{}
1481
1482ConnectionManager::~ConnectionManager()
1483{
1484 if (pimpl_)
1485 pimpl_->shutdown();
1486}
1487
1488void
1489ConnectionManager::connectDevice(const DeviceId& deviceId,
1490 const std::string& name,
1491 ConnectCallback cb,
1492 bool noNewSocket,
1493 bool forceNewSocket,
1494 const std::string& connType)
1495{
1496 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1497}
1498
1499void
1500ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1501 const std::string& name,
1502 ConnectCallback cb,
1503 bool noNewSocket,
1504 bool forceNewSocket,
1505 const std::string& connType)
1506{
1507 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1508}
1509
1510bool
1511ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1512{
1513 auto pending = pimpl_->getPendingCallbacks(deviceId);
1514 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; })
1515 != pending.end();
1516}
1517
1518void
1519ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1520{
1521 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1522 std::set<DeviceId> peersDevices;
1523 {
1524 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1525 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1526 auto const& [key, value] = *iter;
1527 auto deviceId = key.first;
1528 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1529 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1530 connInfos.emplace_back(value);
1531 peersDevices.emplace(deviceId);
1532 iter = pimpl_->infos_.erase(iter);
1533 } else {
1534 iter++;
1535 }
1536 }
1537 }
1538 // Stop connections to all peers devices
1539 for (const auto& deviceId : peersDevices) {
1540 for (const auto& pending : pimpl_->extractPendingCallbacks(deviceId))
1541 pending.cb(nullptr, deviceId);
1542 // This will close the TLS Session
1543 pimpl_->removeUnusedConnections(deviceId);
1544 }
1545 for (auto& info : connInfos) {
1546 if (info->socket_)
1547 info->socket_->shutdown();
1548 if (info->waitForAnswer_)
1549 info->waitForAnswer_->cancel();
1550 if (info->ice_) {
1551 std::unique_lock<std::mutex> lk {info->mutex_};
1552 dht::ThreadPool::io().run(
1553 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1554 }
1555 }
1556}
1557
1558void
1559ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1560{
1561 pimpl_->onDhtConnected(devicePk);
1562}
1563
1564void
1565ConnectionManager::onICERequest(onICERequestCallback&& cb)
1566{
1567 pimpl_->iceReqCb_ = std::move(cb);
1568}
1569
1570void
1571ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1572{
1573 pimpl_->channelReqCb_ = std::move(cb);
1574}
1575
1576void
1577ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1578{
1579 pimpl_->connReadyCb_ = std::move(cb);
1580}
1581
1582void
1583ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1584{
1585 pimpl_->iOSConnectedCb_ = std::move(cb);
1586}
1587
1588std::size_t
1589ConnectionManager::activeSockets() const
1590{
1591 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1592 return pimpl_->infos_.size();
1593}
1594
1595void
1596ConnectionManager::monitor() const
1597{
1598 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1599 auto logger = pimpl_->config_->logger;
1600 if (!logger)
1601 return;
1602 logger->debug("ConnectionManager current status:");
1603 for (const auto& [_, ci] : pimpl_->infos_) {
1604 if (ci->socket_)
1605 ci->socket_->monitor();
1606 }
1607 logger->debug("ConnectionManager end status.");
1608}
1609
1610void
1611ConnectionManager::connectivityChanged()
1612{
1613 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1614 for (const auto& [_, ci] : pimpl_->infos_) {
1615 if (ci->socket_)
1616 ci->socket_->sendBeacon();
1617 }
1618}
1619
1620void
1621ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1622{
1623 return pimpl_->getIceOptions(std::move(cb));
1624}
1625
1626IceTransportOptions
1627ConnectionManager::getIceOptions() const noexcept
1628{
1629 return pimpl_->getIceOptions();
1630}
1631
1632IpAddr
1633ConnectionManager::getPublishedIpAddress(uint16_t family) const
1634{
1635 return pimpl_->getPublishedIpAddress(family);
1636}
1637
1638void
1639ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1640{
1641 return pimpl_->setPublishedAddress(ip_addr);
1642}
1643
1644void
1645ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1646{
1647 return pimpl_->storeActiveIpAddress(std::move(cb));
1648}
1649
1650std::shared_ptr<ConnectionManager::Config>
1651ConnectionManager::getConfig()
1652{
1653 return pimpl_->config_;
1654}
1655
1656} // namespace jami