blob: e4453f0db26aec2b18ed90faac973a52c3335a14 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
2 * Copyright (C) 2019-2023 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 */
18#include "connectionmanager.h"
19#include "peer_connection.h"
20#include "upnp/upnp_control.h"
21#include "certstore.h"
22#include "fileutils.h"
23#include "sip_utils.h"
24#include "string_utils.h"
25
26#include <opendht/crypto.h>
27#include <opendht/thread_pool.h>
28#include <opendht/value.h>
29#include <asio.hpp>
30
31#include <algorithm>
32#include <mutex>
33#include <map>
34#include <condition_variable>
35#include <set>
36#include <charconv>
37
38namespace jami {
39static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
43using CallbackId = std::pair<jami::DeviceId, dht::Value::Id>;
44
45struct ConnectionInfo
46{
47 ~ConnectionInfo()
48 {
49 if (socket_)
50 socket_->join();
51 }
52
53 std::mutex mutex_ {};
54 bool responseReceived_ {false};
55 PeerConnectionRequest response_ {};
56 std::unique_ptr<IceTransport> ice_ {nullptr};
57 // Used to store currently non ready TLS Socket
58 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
59 std::shared_ptr<MultiplexedSocket> socket_ {};
60 std::set<CallbackId> cbIds_ {};
61
62 std::function<void(bool)> onConnected_;
63 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
64};
65
66/**
67 * returns whether or not UPnP is enabled and active_
68 * ie: if it is able to make port mappings
69 */
70bool
71ConnectionManager::Config::getUPnPActive() const
72{
73 if (upnpCtrl)
74 return upnpCtrl->isReady();
75 return false;
76}
77
78class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
79{
80public:
81 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
82 : config_ {std::move(config_)}
83 {}
84 ~Impl() {}
85
86 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
87 const dht::crypto::Identity& identity() const { return config_->id; }
88
89 void removeUnusedConnections(const DeviceId& deviceId = {})
90 {
91 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
92
93 {
94 std::lock_guard<std::mutex> lk(infosMtx_);
95 for (auto it = infos_.begin(); it != infos_.end();) {
96 auto& [key, info] = *it;
97 if (info && (!deviceId || key.first == deviceId)) {
98 unused.emplace_back(std::move(info));
99 it = infos_.erase(it);
100 } else {
101 ++it;
102 }
103 }
104 }
105 for (auto& info: unused) {
106 if (info->tls_)
107 info->tls_->shutdown();
108 if (info->socket_)
109 info->socket_->shutdown();
110 if (info->waitForAnswer_)
111 info->waitForAnswer_->cancel();
112 }
113 if (!unused.empty())
114 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
115 }
116
117 void shutdown()
118 {
119 if (isDestroying_.exchange(true))
120 return;
121 {
122 std::lock_guard<std::mutex> lk(connectCbsMtx_);
123 // Call all pending callbacks that channel is not ready
124 for (auto& [deviceId, pcbs] : pendingCbs_)
125 for (auto& pending : pcbs)
126 pending.cb(nullptr, deviceId);
127 pendingCbs_.clear();
128 }
129 removeUnusedConnections();
130 }
131
132 struct PendingCb
133 {
134 std::string name;
135 ConnectCallback cb;
136 dht::Value::Id vid;
137 };
138
139 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
140 const dht::Value::Id& vid,
141 const std::string& connType,
142 std::function<void(bool)> onConnected);
143 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
144 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
145 const std::string& name,
146 const dht::Value::Id& vid,
147 const std::shared_ptr<dht::crypto::Certificate>& cert);
148 void connectDevice(const DeviceId& deviceId,
149 const std::string& uri,
150 ConnectCallback cb,
151 bool noNewSocket = false,
152 bool forceNewSocket = false,
153 const std::string& connType = "");
154 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
155 const std::string& name,
156 ConnectCallback cb,
157 bool noNewSocket = false,
158 bool forceNewSocket = false,
159 const std::string& connType = "");
160 /**
161 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
162 * @param sock socket used to send the request
163 * @param name channel's name
164 * @param vid channel's id
165 * @param deviceId to identify the linked ConnectCallback
166 */
167 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
168 const std::string& name,
169 const DeviceId& deviceId,
170 const dht::Value::Id& vid);
171 /**
172 * Triggered when a PeerConnectionRequest comes from the DHT
173 */
174 void answerTo(IceTransport& ice,
175 const dht::Value::Id& id,
176 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
177 bool onRequestStartIce(const PeerConnectionRequest& req);
178 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
179 void onDhtPeerRequest(const PeerConnectionRequest& req,
180 const std::shared_ptr<dht::crypto::Certificate>& cert);
181
182 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
183 void onPeerResponse(const PeerConnectionRequest& req);
184 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
185
186 const std::shared_future<tls::DhParams> dhParams() const;
187 tls::CertificateStore& certStore() const { return *config_->certStore; }
188
189 mutable std::mutex messageMutex_ {};
190 std::set<std::string, std::less<>> treatedMessages_ {};
191
192 void loadTreatedMessages();
193 void saveTreatedMessages() const;
194
195 /// \return true if the given DHT message identifier has been treated
196 /// \note if message has not been treated yet this method st/ore this id and returns true at
197 /// further calls
198 bool isMessageTreated(std::string_view id);
199
200 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
201
202 /**
203 * Published IPv4/IPv6 addresses, used only if defined by the user in account
204 * configuration
205 *
206 */
207 IpAddr publishedIp_[2] {};
208
209 // This will be stored in the configuration
210 std::string publishedIpAddress_ {};
211
212 /**
213 * Published port, used only if defined by the user
214 */
215 pj_uint16_t publishedPort_ {sip_utils::DEFAULT_SIP_PORT};
216
217 /**
218 * interface name on which this account is bound
219 */
220 std::string interface_ {"default"};
221
222 /**
223 * Get the local interface name on which this account is bound.
224 */
225 const std::string& getLocalInterface() const { return interface_; }
226
227 /**
228 * Get the published IP address, fallbacks to NAT if family is unspecified
229 * Prefers the usage of IPv4 if possible.
230 */
231 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
232
233 /**
234 * Set published IP address according to given family
235 */
236 void setPublishedAddress(const IpAddr& ip_addr);
237
238 /**
239 * Store the local/public addresses used to register
240 */
241 void storeActiveIpAddress(std::function<void()>&& cb = {});
242
243 /**
244 * Create and return ICE options.
245 */
246 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
247 IceTransportOptions getIceOptions() const noexcept;
248
249 /**
250 * Inform that a potential peer device have been found.
251 * Returns true only if the device certificate is a valid device certificate.
252 * In that case (true is returned) the account_id parameter is set to the peer account ID.
253 */
254 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
255 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
256
257 bool findCertificate(const dht::PkId& id,
258 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
259
260 /**
261 * returns whether or not UPnP is enabled and active
262 * ie: if it is able to make port mappings
263 */
264 bool getUPnPActive() const;
265
266 /**
267 * Triggered when a new TLS socket is ready to use
268 * @param ok If succeed
269 * @param deviceId Related device
270 * @param vid vid of the connection request
271 * @param name non empty if TLS was created by connectDevice()
272 */
273 void onTlsNegotiationDone(bool ok,
274 const DeviceId& deviceId,
275 const dht::Value::Id& vid,
276 const std::string& name = "");
277
278 std::shared_ptr<ConnectionManager::Config> config_;
279
280 IceTransportFactory iceFactory_ {};
281
282 mutable std::mt19937_64 rand;
283
284 iOSConnectedCallback iOSConnectedCb_ {};
285
286 std::mutex infosMtx_ {};
287 // Note: Someone can ask multiple sockets, so to avoid any race condition,
288 // each device can have multiple multiplexed sockets.
289 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
290
291 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
292 {
293 std::lock_guard<std::mutex> lk(infosMtx_);
294 auto it = infos_.find({deviceId, id});
295 if (it != infos_.end())
296 return it->second;
297 return {};
298 }
299
300 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
301 {
302 std::lock_guard<std::mutex> lk(infosMtx_);
303 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
304 auto& [key, value] = item;
305 return key.first == deviceId && value && value->socket_;
306 });
307 if (it != infos_.end())
308 return it->second;
309 return {};
310 }
311
312 ChannelRequestCallback channelReqCb_ {};
313 ConnectionReadyCallback connReadyCb_ {};
314 onICERequestCallback iceReqCb_ {};
315
316 /**
317 * Stores callback from connectDevice
318 * @note: each device needs a vector because several connectDevice can
319 * be done in parallel and we only want one socket
320 */
321 std::mutex connectCbsMtx_ {};
322 std::map<DeviceId, std::vector<PendingCb>> pendingCbs_ {};
323
324 std::vector<PendingCb> extractPendingCallbacks(const DeviceId& deviceId,
325 const dht::Value::Id vid = 0)
326 {
327 std::vector<PendingCb> ret;
328 std::lock_guard<std::mutex> lk(connectCbsMtx_);
329 auto pendingIt = pendingCbs_.find(deviceId);
330 if (pendingIt == pendingCbs_.end())
331 return ret;
332 auto& pendings = pendingIt->second;
333 if (vid == 0) {
334 ret = std::move(pendings);
335 } else {
336 for (auto it = pendings.begin(); it != pendings.end(); ++it) {
337 if (it->vid == vid) {
338 ret.emplace_back(std::move(*it));
339 pendings.erase(it);
340 break;
341 }
342 }
343 }
344 if (pendings.empty())
345 pendingCbs_.erase(pendingIt);
346 return ret;
347 }
348
349 std::vector<PendingCb> getPendingCallbacks(const DeviceId& deviceId,
350 const dht::Value::Id vid = 0)
351 {
352 std::vector<PendingCb> ret;
353 std::lock_guard<std::mutex> lk(connectCbsMtx_);
354 auto pendingIt = pendingCbs_.find(deviceId);
355 if (pendingIt == pendingCbs_.end())
356 return ret;
357 auto& pendings = pendingIt->second;
358 if (vid == 0) {
359 ret = pendings;
360 } else {
361 std::copy_if(pendings.begin(),
362 pendings.end(),
363 std::back_inserter(ret),
364 [&](auto pending) { return pending.vid == vid; });
365 }
366 return ret;
367 }
368
369 std::shared_ptr<ConnectionManager::Impl> shared()
370 {
371 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
372 }
373 std::shared_ptr<ConnectionManager::Impl const> shared() const
374 {
375 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
376 }
377 std::weak_ptr<ConnectionManager::Impl> weak()
378 {
379 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
380 }
381 std::weak_ptr<ConnectionManager::Impl const> weak() const
382 {
383 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
384 }
385
386 std::atomic_bool isDestroying_ {false};
387};
388
389void
390ConnectionManager::Impl::connectDeviceStartIce(
391 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
392 const dht::Value::Id& vid,
393 const std::string& connType,
394 std::function<void(bool)> onConnected)
395{
396 auto deviceId = devicePk->getLongId();
397 auto info = getInfo(deviceId, vid);
398 if (!info) {
399 onConnected(false);
400 return;
401 }
402
403 std::unique_lock<std::mutex> lk(info->mutex_);
404 auto& ice = info->ice_;
405
406 if (!ice) {
407 if (config_->logger)
408 config_->logger->error("No ICE detected");
409 onConnected(false);
410 return;
411 }
412
413 auto iceAttributes = ice->getLocalAttributes();
414 std::ostringstream icemsg;
415 icemsg << iceAttributes.ufrag << "\n";
416 icemsg << iceAttributes.pwd << "\n";
417 for (const auto& addr : ice->getLocalCandidates(1)) {
418 icemsg << addr << "\n";
419 if (config_->logger)
420 config_->logger->debug("Added local ICE candidate {}", addr);
421 }
422
423 // Prepare connection request as a DHT message
424 PeerConnectionRequest val;
425
426 val.id = vid; /* Random id for the message unicity */
427 val.ice_msg = icemsg.str();
428 val.connType = connType;
429
430 auto value = std::make_shared<dht::Value>(std::move(val));
431 value->user_type = "peer_request";
432
433 // Send connection request through DHT
434 if (config_->logger)
435 config_->logger->debug("Request connection to {}", deviceId);
436 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
437 + devicePk->getId().toString()),
438 devicePk,
439 value,
440 [l=config_->logger,deviceId](bool ok) {
441 if (l)
442 l->debug("Sent connection request to {:s}. Put encrypted {:s}",
443 deviceId,
444 (ok ? "ok" : "failed"));
445 });
446 // Wait for call to onResponse() operated by DHT
447 if (isDestroying_) {
448 onConnected(true); // This avoid to wait new negotiation when destroying
449 return;
450 }
451
452 info->onConnected_ = std::move(onConnected);
453 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
454 std::chrono::steady_clock::now()
455 + DHT_MSG_TIMEOUT);
456 info->waitForAnswer_->async_wait(
457 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
458}
459
460void
461ConnectionManager::Impl::onResponse(const asio::error_code& ec,
462 const DeviceId& deviceId,
463 const dht::Value::Id& vid)
464{
465 if (ec == asio::error::operation_aborted)
466 return;
467 auto info = getInfo(deviceId, vid);
468 if (!info)
469 return;
470
471 std::unique_lock<std::mutex> lk(info->mutex_);
472 auto& ice = info->ice_;
473 if (isDestroying_) {
474 info->onConnected_(true); // The destructor can wake a pending wait here.
475 return;
476 }
477 if (!info->responseReceived_) {
478 if (config_->logger)
479 config_->logger->error("no response from DHT to E2E request.");
480 info->onConnected_(false);
481 return;
482 }
483
484 if (!info->ice_) {
485 info->onConnected_(false);
486 return;
487 }
488
489 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
490
491 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
492 if (config_->logger)
493 config_->logger->warn("start ICE failed");
494 info->onConnected_(false);
495 return;
496 }
497 info->onConnected_(true);
498}
499
500bool
501ConnectionManager::Impl::connectDeviceOnNegoDone(
502 const DeviceId& deviceId,
503 const std::string& name,
504 const dht::Value::Id& vid,
505 const std::shared_ptr<dht::crypto::Certificate>& cert)
506{
507 auto info = getInfo(deviceId, vid);
508 if (!info)
509 return false;
510
511 std::unique_lock<std::mutex> lk {info->mutex_};
512 if (info->waitForAnswer_) {
513 // Negotiation is done and connected, go to handshake
514 // and avoid any cancellation at this point.
515 info->waitForAnswer_->cancel();
516 }
517 auto& ice = info->ice_;
518 if (!ice || !ice->isRunning()) {
519 if (config_->logger)
520 config_->logger->error("No ICE detected or not running");
521 return false;
522 }
523
524 // Build socket
525 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
526 std::move(ice)),
527 true);
528
529 // Negotiate a TLS session
530 if (config_->logger)
531 config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid);
532 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
533 certStore(),
534 identity(),
535 dhParams(),
536 *cert);
537
538 info->tls_->setOnReady(
539 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
540 bool ok) {
541 if (auto shared = w.lock())
542 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
543 });
544 return true;
545}
546
547void
548ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
549 const std::string& name,
550 ConnectCallback cb,
551 bool noNewSocket,
552 bool forceNewSocket,
553 const std::string& connType)
554{
555 if (!dht()) {
556 cb(nullptr, deviceId);
557 return;
558 }
559 if (deviceId.toString() == identity().second->getLongId().toString()) {
560 cb(nullptr, deviceId);
561 return;
562 }
563 findCertificate(deviceId,
564 [w = weak(),
565 deviceId,
566 name,
567 cb = std::move(cb),
568 noNewSocket,
569 forceNewSocket,
570 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
571 if (!cert) {
572 if (auto shared = w.lock())
573 if (shared->config_->logger)
574 shared->config_->logger->error(
575 "No valid certificate found for device {}",
576 deviceId);
577 cb(nullptr, deviceId);
578 return;
579 }
580 if (auto shared = w.lock()) {
581 shared->connectDevice(cert,
582 name,
583 std::move(cb),
584 noNewSocket,
585 forceNewSocket,
586 connType);
587 } else
588 cb(nullptr, deviceId);
589 });
590}
591
592void
593ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
594 const std::string& name,
595 ConnectCallback cb,
596 bool noNewSocket,
597 bool forceNewSocket,
598 const std::string& connType)
599{
600 // Avoid dht operation in a DHT callback to avoid deadlocks
601 dht::ThreadPool::computation().run([w = weak(),
602 name = std::move(name),
603 cert = std::move(cert),
604 cb = std::move(cb),
605 noNewSocket,
606 forceNewSocket,
607 connType] {
608 auto devicePk = cert->getSharedPublicKey();
609 auto deviceId = devicePk->getLongId();
610 auto sthis = w.lock();
611 if (!sthis || sthis->isDestroying_) {
612 cb(nullptr, deviceId);
613 return;
614 }
615 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
616 auto isConnectingToDevice = false;
617 {
618 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
619 auto pendingsIt = sthis->pendingCbs_.find(deviceId);
620 if (pendingsIt != sthis->pendingCbs_.end()) {
621 const auto& pendings = pendingsIt->second;
622 while (std::find_if(pendings.begin(), pendings.end(), [&](const auto& it){ return it.vid == vid; }) != pendings.end()) {
623 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
624 }
625 }
626 // Check if already connecting
627 isConnectingToDevice = pendingsIt != sthis->pendingCbs_.end();
628 // Save current request for sendChannelRequest.
629 // Note: do not return here, cause we can be in a state where first
630 // socket is negotiated and first channel is pending
631 // so return only after we checked the info
632 if (isConnectingToDevice)
633 pendingsIt->second.emplace_back(PendingCb {name, std::move(cb), vid});
634 else
635 sthis->pendingCbs_[deviceId] = {{name, std::move(cb), vid}};
636 }
637
638 // Check if already negotiated
639 CallbackId cbId(deviceId, vid);
640 if (auto info = sthis->getConnectedInfo(deviceId)) {
641 std::lock_guard<std::mutex> lk(info->mutex_);
642 if (info->socket_) {
643 if (sthis->config_->logger)
644 sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId);
645 info->cbIds_.emplace(cbId);
646 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
647 return;
648 }
649 }
650
651 if (isConnectingToDevice && !forceNewSocket) {
652 if (sthis->config_->logger)
653 sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId);
654 return;
655 }
656 if (noNewSocket) {
657 // If no new socket is specified, we don't try to generate a new socket
658 for (const auto& pending : sthis->extractPendingCallbacks(deviceId, vid))
659 pending.cb(nullptr, deviceId);
660 return;
661 }
662
663 // Note: used when the ice negotiation fails to erase
664 // all stored structures.
665 auto eraseInfo = [w, cbId] {
666 if (auto shared = w.lock()) {
667 // If no new socket is specified, we don't try to generate a new socket
668 for (const auto& pending : shared->extractPendingCallbacks(cbId.first, cbId.second))
669 pending.cb(nullptr, cbId.first);
670 std::lock_guard<std::mutex> lk(shared->infosMtx_);
671 shared->infos_.erase(cbId);
672 }
673 };
674
675 // If no socket exists, we need to initiate an ICE connection.
676 sthis->getIceOptions([w,
677 deviceId = std::move(deviceId),
678 devicePk = std::move(devicePk),
679 name = std::move(name),
680 cert = std::move(cert),
681 vid,
682 connType,
683 eraseInfo](auto&& ice_config) {
684 auto sthis = w.lock();
685 if (!sthis) {
686 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
687 return;
688 }
689 ice_config.tcpEnable = true;
690 ice_config.onInitDone = [w,
691 deviceId = std::move(deviceId),
692 devicePk = std::move(devicePk),
693 name = std::move(name),
694 cert = std::move(cert),
695 vid,
696 connType,
697 eraseInfo](bool ok) {
698 dht::ThreadPool::io().run([w = std::move(w),
699 devicePk = std::move(devicePk),
700 vid = std::move(vid),
701 eraseInfo,
702 connType, ok] {
703 auto sthis = w.lock();
704 if (!ok && sthis && sthis->config_->logger)
705 sthis->config_->logger->error("Cannot initialize ICE session.");
706 if (!sthis || !ok) {
707 eraseInfo();
708 return;
709 }
710 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
711 if (!ok) {
712 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
713 }
714 });
715 });
716 };
717 ice_config.onNegoDone = [w,
718 deviceId,
719 name,
720 cert = std::move(cert),
721 vid,
722 eraseInfo](bool ok) {
723 dht::ThreadPool::io().run([w = std::move(w),
724 deviceId = std::move(deviceId),
725 name = std::move(name),
726 cert = std::move(cert),
727 vid = std::move(vid),
728 eraseInfo = std::move(eraseInfo),
729 ok] {
730 auto sthis = w.lock();
731 if (!ok && sthis && sthis->config_->logger)
732 sthis->config_->logger->error("ICE negotiation failed.");
733 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
734 eraseInfo();
735 });
736 };
737
738 auto info = std::make_shared<ConnectionInfo>();
739 {
740 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
741 sthis->infos_[{deviceId, vid}] = info;
742 }
743 std::unique_lock<std::mutex> lk {info->mutex_};
744 ice_config.master = false;
745 ice_config.streamsCount = 1;
746 ice_config.compCountPerStream = 1;
747 info->ice_ = sthis->iceFactory_.createUTransport("");
748 if (!info->ice_) {
749 if (sthis->config_->logger)
750 sthis->config_->logger->error("Cannot initialize ICE session.");
751 eraseInfo();
752 return;
753 }
754 // We need to detect any shutdown if the ice session is destroyed before going to the
755 // TLS session;
756 info->ice_->setOnShutdown([eraseInfo]() {
757 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
758 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400759 try {
760 info->ice_->initIceInstance(ice_config);
761 } catch (const std::exception& e) {
762 if (sthis->config_->logger)
763 sthis->config_->logger->error("{}", e.what());
764 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
765 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400766 });
767 });
768}
769
770void
771ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
772 const std::string& name,
773 const DeviceId& deviceId,
774 const dht::Value::Id& vid)
775{
776 auto channelSock = sock->addChannel(name);
777 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
778 auto shared = w.lock();
779 if (shared)
780 for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
781 pending.cb(nullptr, deviceId);
782 });
783 channelSock->onReady(
784 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()]() {
785 auto shared = w.lock();
786 auto channelSock = wSock.lock();
787 if (shared)
788 for (const auto& pending : shared->extractPendingCallbacks(deviceId, vid))
789 pending.cb(channelSock, deviceId);
790 });
791
792 ChannelRequest val;
793 val.name = channelSock->name();
794 val.state = ChannelRequestState::REQUEST;
795 val.channel = channelSock->channel();
796 msgpack::sbuffer buffer(256);
797 msgpack::pack(buffer, val);
798
799 std::error_code ec;
800 int res = sock->write(CONTROL_CHANNEL,
801 reinterpret_cast<const uint8_t*>(buffer.data()),
802 buffer.size(),
803 ec);
804 if (res < 0) {
805 // TODO check if we should handle errors here
806 if (config_->logger)
807 config_->logger->error("sendChannelRequest failed - error: {}", ec.message());
808 }
809}
810
811void
812ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
813{
814 auto device = req.owner->getLongId();
815 if (config_->logger)
816 config_->logger->debug("New response received from {}", device);
817 if (auto info = getInfo(device, req.id)) {
818 std::lock_guard<std::mutex> lk {info->mutex_};
819 info->responseReceived_ = true;
820 info->response_ = std::move(req);
821 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
822 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
823 this,
824 std::placeholders::_1,
825 device,
826 req.id));
827 } else {
828 if (config_->logger)
829 config_->logger->warn("Respond received, but cannot find request");
830 }
831}
832
833void
834ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
835{
836 if (!dht())
837 return;
838 dht()->listen<PeerConnectionRequest>(
839 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
840 [w = weak()](PeerConnectionRequest&& req) {
841 auto shared = w.lock();
842 if (!shared)
843 return false;
844 if (shared->isMessageTreated(to_hex_string(req.id))) {
845 // Message already treated. Just ignore
846 return true;
847 }
848 if (req.isAnswer) {
849 if (shared->config_->logger)
850 shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId());
851 } else {
852 if (shared->config_->logger)
853 shared->config_->logger->debug("Received request from {}", req.owner->getLongId());
854 }
855 if (req.isAnswer) {
856 shared->onPeerResponse(req);
857 } else {
858 // Async certificate checking
859 shared->dht()->findCertificate(
860 req.from,
861 [w, req = std::move(req)](
862 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
863 auto shared = w.lock();
864 if (!shared)
865 return;
866 dht::InfoHash peer_h;
867 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
868#if TARGET_OS_IOS
869 if (shared->iOSConnectedCb_(req.connType, peer_h))
870 return;
871#endif
872 shared->onDhtPeerRequest(req, cert);
873 } else {
874 if (shared->config_->logger)
875 shared->config_->logger->warn(
876 "Received request from untrusted peer {}",
877 req.owner->getLongId());
878 }
879 });
880 }
881
882 return true;
883 },
884 dht::Value::UserTypeFilter("peer_request"));
885}
886
887void
888ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
889 const DeviceId& deviceId,
890 const dht::Value::Id& vid,
891 const std::string& name)
892{
893 if (isDestroying_)
894 return;
895 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
896 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
897 // asked yet)
898 auto isDhtRequest = name.empty();
899 if (!ok) {
900 if (isDhtRequest) {
901 if (config_->logger)
902 config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}",
903 deviceId,
904 name,
905 vid);
906 if (connReadyCb_)
907 connReadyCb_(deviceId, "", nullptr);
908 } else {
909 if (config_->logger)
910 config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}",
911 deviceId,
912 name,
913 vid);
914 for (const auto& pending : extractPendingCallbacks(deviceId))
915 pending.cb(nullptr, deviceId);
916 }
917 } else {
918 // The socket is ready, store it
919 if (isDhtRequest) {
920 if (config_->logger)
921 config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}",
922 deviceId,
923 vid);
924 } else {
925 if (config_->logger)
926 config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}",
927 deviceId,
928 name,
929 vid);
930 }
931
932 auto info = getInfo(deviceId, vid);
933 addNewMultiplexedSocket({deviceId, vid}, info);
934 // Finally, open the channel and launch pending callbacks
935 if (info->socket_) {
936 // Note: do not remove pending there it's done in sendChannelRequest
937 for (const auto& pending : getPendingCallbacks(deviceId)) {
938 if (config_->logger)
939 config_->logger->debug("Send request on TLS socket for channel {} to {}",
940 pending.name,
941 deviceId);
942 sendChannelRequest(info->socket_, pending.name, deviceId, pending.vid);
943 }
944 }
945 }
946}
947
948void
949ConnectionManager::Impl::answerTo(IceTransport& ice,
950 const dht::Value::Id& id,
951 const std::shared_ptr<dht::crypto::PublicKey>& from)
952{
953 // NOTE: This is a shortest version of a real SDP message to save some bits
954 auto iceAttributes = ice.getLocalAttributes();
955 std::ostringstream icemsg;
956 icemsg << iceAttributes.ufrag << "\n";
957 icemsg << iceAttributes.pwd << "\n";
958 for (const auto& addr : ice.getLocalCandidates(1)) {
959 icemsg << addr << "\n";
960 }
961
962 // Send PeerConnection response
963 PeerConnectionRequest val;
964 val.id = id;
965 val.ice_msg = icemsg.str();
966 val.isAnswer = true;
967 auto value = std::make_shared<dht::Value>(std::move(val));
968 value->user_type = "peer_request";
969
970 if (config_->logger)
971 config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId());
972 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
973 + from->getId().toString()),
974 from,
975 value,
976 [from,l=config_->logger](bool ok) {
977 if (l)
978 l->debug("Answer to connection request from {:s}. Put encrypted {:s}",
979 from->getLongId(),
980 (ok ? "ok" : "failed"));
981 });
982}
983
984bool
985ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
986{
987 auto deviceId = req.owner->getLongId();
988 auto info = getInfo(deviceId, req.id);
989 if (!info)
990 return false;
991
992 std::unique_lock<std::mutex> lk {info->mutex_};
993 auto& ice = info->ice_;
994 if (!ice) {
995 if (config_->logger)
996 config_->logger->error("No ICE detected");
997 if (connReadyCb_)
998 connReadyCb_(deviceId, "", nullptr);
999 return false;
1000 }
1001
1002 auto sdp = ice->parseIceCandidates(req.ice_msg);
1003 answerTo(*ice, req.id, req.owner);
1004 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1005 if (config_->logger)
1006 config_->logger->error("Start ICE failed - fallback to TURN");
1007 ice = nullptr;
1008 if (connReadyCb_)
1009 connReadyCb_(deviceId, "", nullptr);
1010 return false;
1011 }
1012 return true;
1013}
1014
1015bool
1016ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1017{
1018 auto deviceId = req.owner->getLongId();
1019 auto info = getInfo(deviceId, req.id);
1020 if (!info)
1021 return false;
1022
1023 std::unique_lock<std::mutex> lk {info->mutex_};
1024 auto& ice = info->ice_;
1025 if (!ice) {
1026 if (config_->logger)
1027 config_->logger->error("No ICE detected");
1028 return false;
1029 }
1030
1031 // Build socket
1032 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1033 std::move(ice)),
1034 false);
1035
1036 // init TLS session
1037 auto ph = req.from;
1038 if (config_->logger)
1039 config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}",
1040 req.from,
1041 req.id);
1042 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1043 std::move(endpoint),
1044 certStore(),
1045 identity(),
1046 dhParams(),
1047 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1048 auto shared = w.lock();
1049 if (!shared)
1050 return false;
1051 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1052 if (!crt)
1053 return false;
1054 return crt->getPacked() == cert.getPacked();
1055 });
1056
1057 info->tls_->setOnReady(
1058 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1059 if (auto shared = w.lock())
1060 shared->onTlsNegotiationDone(ok, deviceId, vid);
1061 });
1062 return true;
1063}
1064
1065void
1066ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1067 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1068{
1069 auto deviceId = req.owner->getLongId();
1070 if (config_->logger)
1071 config_->logger->debug("New connection request from {}", deviceId);
1072 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1073 if (config_->logger)
1074 config_->logger->debug("Refuse connection from {}", deviceId);
1075 return;
1076 }
1077
1078 // Because the connection is accepted, create an ICE socket.
1079 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1080 auto shared = w.lock();
1081 if (!shared)
1082 return;
1083 // Note: used when the ice negotiation fails to erase
1084 // all stored structures.
1085 auto eraseInfo = [w, id = req.id, deviceId] {
1086 if (auto shared = w.lock()) {
1087 // If no new socket is specified, we don't try to generate a new socket
1088 for (const auto& pending : shared->extractPendingCallbacks(deviceId, id))
1089 pending.cb(nullptr, deviceId);
1090 if (shared->connReadyCb_)
1091 shared->connReadyCb_(deviceId, "", nullptr);
1092 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1093 shared->infos_.erase({deviceId, id});
1094 }
1095 };
1096
1097 ice_config.tcpEnable = true;
1098 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1099 auto shared = w.lock();
1100 if (!shared)
1101 return;
1102 if (!ok) {
1103 if (shared->config_->logger)
1104 shared->config_->logger->error("Cannot initialize ICE session.");
1105 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1106 return;
1107 }
1108
1109 dht::ThreadPool::io().run(
1110 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1111 auto shared = w.lock();
1112 if (!shared)
1113 return;
1114 if (!shared->onRequestStartIce(req))
1115 eraseInfo();
1116 });
1117 };
1118
1119 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1120 auto shared = w.lock();
1121 if (!shared)
1122 return;
1123 if (!ok) {
1124 if (shared->config_->logger)
1125 shared->config_->logger->error("ICE negotiation failed.");
1126 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1127 return;
1128 }
1129
1130 dht::ThreadPool::io().run(
1131 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1132 if (auto shared = w.lock())
1133 if (!shared->onRequestOnNegoDone(req))
1134 eraseInfo();
1135 });
1136 };
1137
1138 // Negotiate a new ICE socket
1139 auto info = std::make_shared<ConnectionInfo>();
1140 {
1141 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1142 shared->infos_[{deviceId, req.id}] = info;
1143 }
1144 if (shared->config_->logger)
1145 shared->config_->logger->debug("Accepting connection from {}", deviceId);
1146 std::unique_lock<std::mutex> lk {info->mutex_};
1147 ice_config.streamsCount = 1;
1148 ice_config.compCountPerStream = 1; // TCP
1149 ice_config.master = true;
1150 info->ice_ = shared->iceFactory_.createUTransport("");
1151 if (not info->ice_) {
1152 if (shared->config_->logger)
1153 shared->config_->logger->error("Cannot initialize ICE session");
1154 eraseInfo();
1155 return;
1156 }
1157 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1158 info->ice_->setOnShutdown([eraseInfo]() {
1159 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1160 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001161 try {
1162 info->ice_->initIceInstance(ice_config);
1163 } catch (const std::exception& e) {
1164 if (shared->config_->logger)
1165 shared->config_->logger->error("{}", e.what());
1166 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1167 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001168 });
1169}
1170
1171void
1172ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1173{
1174 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1175 info->socket_->setOnReady(
1176 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1177 if (auto sthis = w.lock())
1178 if (sthis->connReadyCb_)
1179 sthis->connReadyCb_(deviceId, socket->name(), socket);
1180 });
1181 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1182 const uint16_t&,
1183 const std::string& name) {
1184 if (auto sthis = w.lock())
1185 if (sthis->channelReqCb_)
1186 return sthis->channelReqCb_(peer, name);
1187 return false;
1188 });
1189 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1190 // Cancel current outgoing connections
1191 dht::ThreadPool::io().run([w, deviceId, vid] {
1192 auto sthis = w.lock();
1193 if (!sthis)
1194 return;
1195
1196 std::set<CallbackId> ids;
1197 if (auto info = sthis->getInfo(deviceId, vid)) {
1198 std::lock_guard<std::mutex> lk(info->mutex_);
1199 if (info->socket_) {
1200 ids = std::move(info->cbIds_);
1201 info->socket_->shutdown();
1202 }
1203 }
1204 for (const auto& cbId : ids)
1205 for (const auto& pending : sthis->extractPendingCallbacks(cbId.first, cbId.second))
1206 pending.cb(nullptr, deviceId);
1207
1208 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1209 sthis->infos_.erase({deviceId, vid});
1210 });
1211 });
1212}
1213
1214const std::shared_future<tls::DhParams>
1215ConnectionManager::Impl::dhParams() const
1216{
1217 return dht::ThreadPool::computation().get<tls::DhParams>(
1218 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1219 ;
1220}
1221
1222template<typename ID = dht::Value::Id>
1223std::set<ID, std::less<>>
1224loadIdList(const std::string& path)
1225{
1226 std::set<ID, std::less<>> ids;
1227 std::ifstream file = fileutils::ifstream(path);
1228 if (!file.is_open()) {
1229 //JAMI_DBG("Could not load %s", path.c_str());
1230 return ids;
1231 }
1232 std::string line;
1233 while (std::getline(file, line)) {
1234 if constexpr (std::is_same<ID, std::string>::value) {
1235 ids.emplace(std::move(line));
1236 } else if constexpr (std::is_integral<ID>::value) {
1237 ID vid;
1238 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1239 ec == std::errc()) {
1240 ids.emplace(vid);
1241 }
1242 }
1243 }
1244 return ids;
1245}
1246
1247template<typename List = std::set<dht::Value::Id>>
1248void
1249saveIdList(const std::string& path, const List& ids)
1250{
1251 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1252 if (!file.is_open()) {
1253 //JAMI_ERR("Could not save to %s", path.c_str());
1254 return;
1255 }
1256 for (auto& c : ids)
1257 file << std::hex << c << "\n";
1258}
1259
1260void
1261ConnectionManager::Impl::loadTreatedMessages()
1262{
1263 std::lock_guard<std::mutex> lock(messageMutex_);
1264 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1265 treatedMessages_ = loadIdList<std::string>(path);
1266 if (treatedMessages_.empty()) {
1267 auto messages = loadIdList(path);
1268 for (const auto& m : messages)
1269 treatedMessages_.emplace(to_hex_string(m));
1270 }
1271}
1272
1273void
1274ConnectionManager::Impl::saveTreatedMessages() const
1275{
1276 dht::ThreadPool::io().run([w = weak()]() {
1277 if (auto sthis = w.lock()) {
1278 auto& this_ = *sthis;
1279 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1280 fileutils::check_dir(this_.config_->cachePath.c_str());
1281 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1282 + DIR_SEPARATOR_STR "treatedMessages",
1283 this_.treatedMessages_);
1284 }
1285 });
1286}
1287
1288bool
1289ConnectionManager::Impl::isMessageTreated(std::string_view id)
1290{
1291 std::lock_guard<std::mutex> lock(messageMutex_);
1292 auto res = treatedMessages_.emplace(id);
1293 if (res.second) {
1294 saveTreatedMessages();
1295 return false;
1296 }
1297 return true;
1298}
1299
1300/**
1301 * returns whether or not UPnP is enabled and active_
1302 * ie: if it is able to make port mappings
1303 */
1304bool
1305ConnectionManager::Impl::getUPnPActive() const
1306{
1307 return config_->getUPnPActive();
1308}
1309
1310IpAddr
1311ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1312{
1313 if (family == AF_INET)
1314 return publishedIp_[0];
1315 if (family == AF_INET6)
1316 return publishedIp_[1];
1317
1318 assert(family == AF_UNSPEC);
1319
1320 // If family is not set, prefere IPv4 if available. It's more
1321 // likely to succeed behind NAT.
1322 if (publishedIp_[0])
1323 return publishedIp_[0];
1324 if (publishedIp_[1])
1325 return publishedIp_[1];
1326 return {};
1327}
1328
1329void
1330ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1331{
1332 if (ip_addr.getFamily() == AF_INET) {
1333 publishedIp_[0] = ip_addr;
1334 } else {
1335 publishedIp_[1] = ip_addr;
1336 }
1337}
1338
1339void
1340ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1341{
1342 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1343 bool hasIpv4 {false}, hasIpv6 {false};
1344 for (auto& result : results) {
1345 auto family = result.getFamily();
1346 if (family == AF_INET) {
1347 if (not hasIpv4) {
1348 hasIpv4 = true;
1349 if (config_->logger)
1350 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1351 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1352 setPublishedAddress(*result.get());
1353 if (config_->upnpCtrl) {
1354 config_->upnpCtrl->setPublicAddress(*result.get());
1355 }
1356 }
1357 } else if (family == AF_INET6) {
1358 if (not hasIpv6) {
1359 hasIpv6 = true;
1360 if (config_->logger)
1361 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1362 setPublishedAddress(*result.get());
1363 }
1364 }
1365 if (hasIpv4 and hasIpv6)
1366 break;
1367 }
1368 if (cb)
1369 cb();
1370 });
1371}
1372
1373void
1374ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1375{
1376 storeActiveIpAddress([this, cb = std::move(cb)] {
1377 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1378 auto publishedAddr = getPublishedIpAddress();
1379
1380 if (publishedAddr) {
1381 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1382 publishedAddr.getFamily());
1383 if (interfaceAddr) {
1384 opts.accountLocalAddr = interfaceAddr;
1385 opts.accountPublicAddr = publishedAddr;
1386 }
1387 }
1388 if (cb)
1389 cb(std::move(opts));
1390 });
1391}
1392
1393IceTransportOptions
1394ConnectionManager::Impl::getIceOptions() const noexcept
1395{
1396 IceTransportOptions opts;
1397 opts.upnpEnable = getUPnPActive();
1398
1399 if (config_->stunEnabled)
1400 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1401 if (config_->turnEnabled) {
1402 auto cached = false;
1403 std::lock_guard<std::mutex> lk(config_->cachedTurnMutex);
1404 cached = config_->cacheTurnV4 || config_->cacheTurnV6;
1405 if (config_->cacheTurnV4) {
1406 opts.turnServers.emplace_back(TurnServerInfo()
1407 .setUri(config_->cacheTurnV4.toString())
1408 .setUsername(config_->turnServerUserName)
1409 .setPassword(config_->turnServerPwd)
1410 .setRealm(config_->turnServerRealm));
1411 }
1412 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1413 // co issues. So this needs some debug. for now just disable
1414 // if (cacheTurnV6 && *cacheTurnV6) {
1415 // opts.turnServers.emplace_back(TurnServerInfo()
1416 // .setUri(cacheTurnV6->toString(true))
1417 // .setUsername(turnServerUserName_)
1418 // .setPassword(turnServerPwd_)
1419 // .setRealm(turnServerRealm_));
1420 //}
1421 // Nothing cached, so do the resolution
1422 if (!cached) {
1423 opts.turnServers.emplace_back(TurnServerInfo()
1424 .setUri(config_->turnServer)
1425 .setUsername(config_->turnServerUserName)
1426 .setPassword(config_->turnServerPwd)
1427 .setRealm(config_->turnServerRealm));
1428 }
1429 }
1430 return opts;
1431}
1432
1433bool
1434ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1435 dht::InfoHash& account_id,
1436 const std::shared_ptr<Logger>& logger)
1437{
1438 if (not crt)
1439 return false;
1440
1441 auto top_issuer = crt;
1442 while (top_issuer->issuer)
1443 top_issuer = top_issuer->issuer;
1444
1445 // Device certificate can't be self-signed
1446 if (top_issuer == crt) {
1447 if (logger)
1448 logger->warn("Found invalid peer device: {}", crt->getLongId());
1449 return false;
1450 }
1451
1452 // Check peer certificate chain
1453 // Trust store with top issuer as the only CA
1454 dht::crypto::TrustList peer_trust;
1455 peer_trust.add(*top_issuer);
1456 if (not peer_trust.verify(*crt)) {
1457 if (logger)
1458 logger->warn("Found invalid peer device: {}", crt->getLongId());
1459 return false;
1460 }
1461
1462 // Check cached OCSP response
1463 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1464 if (logger)
1465 logger->error("Certificate %s is disabled by cached OCSP response", crt->getLongId());
1466 return false;
1467 }
1468
1469 account_id = crt->issuer->getId();
1470 if (logger)
1471 logger->warn("Found peer device: {} account:{} CA:{}",
1472 crt->getLongId(),
1473 account_id,
1474 top_issuer->getId());
1475 return true;
1476}
1477
1478bool
1479ConnectionManager::Impl::findCertificate(
1480 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1481{
1482 if (auto cert = certStore().getCertificate(id.toString())) {
1483 if (cb)
1484 cb(cert);
1485 } else if (cb)
1486 cb(nullptr);
1487 return true;
1488}
1489
1490ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1491 : pimpl_ {std::make_shared<Impl>(config_)}
1492{}
1493
1494ConnectionManager::~ConnectionManager()
1495{
1496 if (pimpl_)
1497 pimpl_->shutdown();
1498}
1499
1500void
1501ConnectionManager::connectDevice(const DeviceId& deviceId,
1502 const std::string& name,
1503 ConnectCallback cb,
1504 bool noNewSocket,
1505 bool forceNewSocket,
1506 const std::string& connType)
1507{
1508 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1509}
1510
1511void
1512ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1513 const std::string& name,
1514 ConnectCallback cb,
1515 bool noNewSocket,
1516 bool forceNewSocket,
1517 const std::string& connType)
1518{
1519 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1520}
1521
1522bool
1523ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1524{
1525 auto pending = pimpl_->getPendingCallbacks(deviceId);
1526 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.name == name; })
1527 != pending.end();
1528}
1529
1530void
1531ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1532{
1533 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1534 std::set<DeviceId> peersDevices;
1535 {
1536 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1537 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1538 auto const& [key, value] = *iter;
1539 auto deviceId = key.first;
1540 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1541 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1542 connInfos.emplace_back(value);
1543 peersDevices.emplace(deviceId);
1544 iter = pimpl_->infos_.erase(iter);
1545 } else {
1546 iter++;
1547 }
1548 }
1549 }
1550 // Stop connections to all peers devices
1551 for (const auto& deviceId : peersDevices) {
1552 for (const auto& pending : pimpl_->extractPendingCallbacks(deviceId))
1553 pending.cb(nullptr, deviceId);
1554 // This will close the TLS Session
1555 pimpl_->removeUnusedConnections(deviceId);
1556 }
1557 for (auto& info : connInfos) {
1558 if (info->socket_)
1559 info->socket_->shutdown();
1560 if (info->waitForAnswer_)
1561 info->waitForAnswer_->cancel();
1562 if (info->ice_) {
1563 std::unique_lock<std::mutex> lk {info->mutex_};
1564 dht::ThreadPool::io().run(
1565 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1566 }
1567 }
1568}
1569
1570void
1571ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1572{
1573 pimpl_->onDhtConnected(devicePk);
1574}
1575
1576void
1577ConnectionManager::onICERequest(onICERequestCallback&& cb)
1578{
1579 pimpl_->iceReqCb_ = std::move(cb);
1580}
1581
1582void
1583ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1584{
1585 pimpl_->channelReqCb_ = std::move(cb);
1586}
1587
1588void
1589ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1590{
1591 pimpl_->connReadyCb_ = std::move(cb);
1592}
1593
1594void
1595ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1596{
1597 pimpl_->iOSConnectedCb_ = std::move(cb);
1598}
1599
1600std::size_t
1601ConnectionManager::activeSockets() const
1602{
1603 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1604 return pimpl_->infos_.size();
1605}
1606
1607void
1608ConnectionManager::monitor() const
1609{
1610 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1611 auto logger = pimpl_->config_->logger;
1612 if (!logger)
1613 return;
1614 logger->debug("ConnectionManager current status:");
1615 for (const auto& [_, ci] : pimpl_->infos_) {
1616 if (ci->socket_)
1617 ci->socket_->monitor();
1618 }
1619 logger->debug("ConnectionManager end status.");
1620}
1621
1622void
1623ConnectionManager::connectivityChanged()
1624{
1625 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1626 for (const auto& [_, ci] : pimpl_->infos_) {
1627 if (ci->socket_)
1628 ci->socket_->sendBeacon();
1629 }
1630}
1631
1632void
1633ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1634{
1635 return pimpl_->getIceOptions(std::move(cb));
1636}
1637
1638IceTransportOptions
1639ConnectionManager::getIceOptions() const noexcept
1640{
1641 return pimpl_->getIceOptions();
1642}
1643
1644IpAddr
1645ConnectionManager::getPublishedIpAddress(uint16_t family) const
1646{
1647 return pimpl_->getPublishedIpAddress(family);
1648}
1649
1650void
1651ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1652{
1653 return pimpl_->setPublishedAddress(ip_addr);
1654}
1655
1656void
1657ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1658{
1659 return pimpl_->storeActiveIpAddress(std::move(cb));
1660}
1661
1662std::shared_ptr<ConnectionManager::Config>
1663ConnectionManager::getConfig()
1664{
1665 return pimpl_->config_;
1666}
1667
1668} // namespace jami