blob: 6c6de311d58fa40159e46e6fd65a949b0d4b5736 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Adrien Béraud612b55b2023-05-29 10:42:04 -040044
45struct ConnectionInfo
46{
47 ~ConnectionInfo()
48 {
49 if (socket_)
50 socket_->join();
51 }
52
53 std::mutex mutex_ {};
54 bool responseReceived_ {false};
55 PeerConnectionRequest response_ {};
56 std::unique_ptr<IceTransport> ice_ {nullptr};
57 // Used to store currently non ready TLS Socket
58 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
59 std::shared_ptr<MultiplexedSocket> socket_ {};
60 std::set<CallbackId> cbIds_ {};
61
62 std::function<void(bool)> onConnected_;
63 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
64};
65
66/**
67 * returns whether or not UPnP is enabled and active_
68 * ie: if it is able to make port mappings
69 */
70bool
71ConnectionManager::Config::getUPnPActive() const
72{
73 if (upnpCtrl)
74 return upnpCtrl->isReady();
75 return false;
76}
77
78class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
79{
80public:
81 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
82 : config_ {std::move(config_)}
83 {}
84 ~Impl() {}
85
86 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
87 const dht::crypto::Identity& identity() const { return config_->id; }
88
89 void removeUnusedConnections(const DeviceId& deviceId = {})
90 {
91 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
92
93 {
94 std::lock_guard<std::mutex> lk(infosMtx_);
95 for (auto it = infos_.begin(); it != infos_.end();) {
96 auto& [key, info] = *it;
97 if (info && (!deviceId || key.first == deviceId)) {
98 unused.emplace_back(std::move(info));
99 it = infos_.erase(it);
100 } else {
101 ++it;
102 }
103 }
104 }
105 for (auto& info: unused) {
106 if (info->tls_)
107 info->tls_->shutdown();
108 if (info->socket_)
109 info->socket_->shutdown();
110 if (info->waitForAnswer_)
111 info->waitForAnswer_->cancel();
112 }
113 if (!unused.empty())
114 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
115 }
116
117 void shutdown()
118 {
119 if (isDestroying_.exchange(true))
120 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400121 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400122 {
123 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400124 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400125 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400126 for (auto& [deviceId, pcbs] : po) {
127 for (auto& [id, pending] : pcbs.connecting)
128 pending.cb(nullptr, deviceId);
129 for (auto& [id, pending] : pcbs.waiting)
130 pending.cb(nullptr, deviceId);
131 }
132
Adrien Béraud612b55b2023-05-29 10:42:04 -0400133 removeUnusedConnections();
134 }
135
Adrien Béraud612b55b2023-05-29 10:42:04 -0400136 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
137 const dht::Value::Id& vid,
138 const std::string& connType,
139 std::function<void(bool)> onConnected);
140 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
141 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
142 const std::string& name,
143 const dht::Value::Id& vid,
144 const std::shared_ptr<dht::crypto::Certificate>& cert);
145 void connectDevice(const DeviceId& deviceId,
146 const std::string& uri,
147 ConnectCallback cb,
148 bool noNewSocket = false,
149 bool forceNewSocket = false,
150 const std::string& connType = "");
151 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
152 const std::string& name,
153 ConnectCallback cb,
154 bool noNewSocket = false,
155 bool forceNewSocket = false,
156 const std::string& connType = "");
157 /**
158 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
159 * @param sock socket used to send the request
160 * @param name channel's name
161 * @param vid channel's id
162 * @param deviceId to identify the linked ConnectCallback
163 */
164 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
165 const std::string& name,
166 const DeviceId& deviceId,
167 const dht::Value::Id& vid);
168 /**
169 * Triggered when a PeerConnectionRequest comes from the DHT
170 */
171 void answerTo(IceTransport& ice,
172 const dht::Value::Id& id,
173 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
174 bool onRequestStartIce(const PeerConnectionRequest& req);
175 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
176 void onDhtPeerRequest(const PeerConnectionRequest& req,
177 const std::shared_ptr<dht::crypto::Certificate>& cert);
178
179 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
180 void onPeerResponse(const PeerConnectionRequest& req);
181 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
182
183 const std::shared_future<tls::DhParams> dhParams() const;
184 tls::CertificateStore& certStore() const { return *config_->certStore; }
185
186 mutable std::mutex messageMutex_ {};
187 std::set<std::string, std::less<>> treatedMessages_ {};
188
189 void loadTreatedMessages();
190 void saveTreatedMessages() const;
191
192 /// \return true if the given DHT message identifier has been treated
193 /// \note if message has not been treated yet this method st/ore this id and returns true at
194 /// further calls
195 bool isMessageTreated(std::string_view id);
196
197 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
198
199 /**
200 * Published IPv4/IPv6 addresses, used only if defined by the user in account
201 * configuration
202 *
203 */
204 IpAddr publishedIp_[2] {};
205
Adrien Béraud612b55b2023-05-29 10:42:04 -0400206 /**
207 * interface name on which this account is bound
208 */
209 std::string interface_ {"default"};
210
211 /**
212 * Get the local interface name on which this account is bound.
213 */
214 const std::string& getLocalInterface() const { return interface_; }
215
216 /**
217 * Get the published IP address, fallbacks to NAT if family is unspecified
218 * Prefers the usage of IPv4 if possible.
219 */
220 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
221
222 /**
223 * Set published IP address according to given family
224 */
225 void setPublishedAddress(const IpAddr& ip_addr);
226
227 /**
228 * Store the local/public addresses used to register
229 */
230 void storeActiveIpAddress(std::function<void()>&& cb = {});
231
232 /**
233 * Create and return ICE options.
234 */
235 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
236 IceTransportOptions getIceOptions() const noexcept;
237
238 /**
239 * Inform that a potential peer device have been found.
240 * Returns true only if the device certificate is a valid device certificate.
241 * In that case (true is returned) the account_id parameter is set to the peer account ID.
242 */
243 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
244 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
245
246 bool findCertificate(const dht::PkId& id,
247 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400248 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400249
250 /**
251 * returns whether or not UPnP is enabled and active
252 * ie: if it is able to make port mappings
253 */
254 bool getUPnPActive() const;
255
256 /**
257 * Triggered when a new TLS socket is ready to use
258 * @param ok If succeed
259 * @param deviceId Related device
260 * @param vid vid of the connection request
261 * @param name non empty if TLS was created by connectDevice()
262 */
263 void onTlsNegotiationDone(bool ok,
264 const DeviceId& deviceId,
265 const dht::Value::Id& vid,
266 const std::string& name = "");
267
268 std::shared_ptr<ConnectionManager::Config> config_;
269
Adrien Béraud612b55b2023-05-29 10:42:04 -0400270 mutable std::mt19937_64 rand;
271
272 iOSConnectedCallback iOSConnectedCb_ {};
273
274 std::mutex infosMtx_ {};
275 // Note: Someone can ask multiple sockets, so to avoid any race condition,
276 // each device can have multiple multiplexed sockets.
277 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
278
279 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
280 {
281 std::lock_guard<std::mutex> lk(infosMtx_);
282 auto it = infos_.find({deviceId, id});
283 if (it != infos_.end())
284 return it->second;
285 return {};
286 }
287
288 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
289 {
290 std::lock_guard<std::mutex> lk(infosMtx_);
291 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
292 auto& [key, value] = item;
293 return key.first == deviceId && value && value->socket_;
294 });
295 if (it != infos_.end())
296 return it->second;
297 return {};
298 }
299
300 ChannelRequestCallback channelReqCb_ {};
301 ConnectionReadyCallback connReadyCb_ {};
302 onICERequestCallback iceReqCb_ {};
303
304 /**
305 * Stores callback from connectDevice
306 * @note: each device needs a vector because several connectDevice can
307 * be done in parallel and we only want one socket
308 */
309 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400310
Adrien Béraud665294f2023-06-13 18:09:11 -0400311 struct PendingCb
312 {
313 std::string name;
314 ConnectCallback cb;
315 };
316 struct PendingOperations {
317 std::map<dht::Value::Id, PendingCb> connecting;
318 std::map<dht::Value::Id, PendingCb> waiting;
319 };
320
321 std::map<DeviceId, PendingOperations> pendingOperations_ {};
322
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400323 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400324 {
325 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400326 std::unique_lock<std::mutex> lk(connectCbsMtx_);
327 auto it = pendingOperations_.find(deviceId);
328 if (it == pendingOperations_.end())
329 return;
330 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400331 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400332 // Extract all pending callbacks
333 for (auto& [vid, cb] : pendingOperations.connecting)
334 ret.emplace_back(std::move(cb));
335 pendingOperations.connecting.clear();
336 for (auto& [vid, cb] : pendingOperations.waiting)
337 ret.emplace_back(std::move(cb));
338 pendingOperations.waiting.clear();
339 } else if (auto n = pendingOperations.waiting.extract(vid)) {
340 // If it's a waiting operation, just move it
341 ret.emplace_back(std::move(n.mapped()));
342 } else if (auto n = pendingOperations.connecting.extract(vid)) {
343 ret.emplace_back(std::move(n.mapped()));
344 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400345 // If accepted is false, it means that underlying socket is ok, but channel is declined
346 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400347 for (auto& [vid, cb] : pendingOperations.waiting)
348 ret.emplace_back(std::move(cb));
349 pendingOperations.waiting.clear();
350 for (auto& [vid, cb] : pendingOperations.connecting)
351 ret.emplace_back(std::move(cb));
352 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400353 }
354 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400355 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
356 pendingOperations_.erase(it);
357 lk.unlock();
358 for (auto& cb : ret)
359 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400360 }
361
Adrien Béraud665294f2023-06-13 18:09:11 -0400362 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400363 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400364 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400365 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400366 auto it = pendingOperations_.find(deviceId);
367 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400368 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400369 auto& pendingOp = it->second;
370 for (const auto& [id, pc]: pendingOp.connecting) {
371 if (vid == 0 || id == vid)
372 ret[id] = pc.name;
373 }
374 for (const auto& [id, pc]: pendingOp.waiting) {
375 if (vid == 0 || id == vid)
376 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400377 }
378 return ret;
379 }
380
381 std::shared_ptr<ConnectionManager::Impl> shared()
382 {
383 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
384 }
385 std::shared_ptr<ConnectionManager::Impl const> shared() const
386 {
387 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
388 }
389 std::weak_ptr<ConnectionManager::Impl> weak()
390 {
391 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
392 }
393 std::weak_ptr<ConnectionManager::Impl const> weak() const
394 {
395 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
396 }
397
398 std::atomic_bool isDestroying_ {false};
399};
400
401void
402ConnectionManager::Impl::connectDeviceStartIce(
403 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
404 const dht::Value::Id& vid,
405 const std::string& connType,
406 std::function<void(bool)> onConnected)
407{
408 auto deviceId = devicePk->getLongId();
409 auto info = getInfo(deviceId, vid);
410 if (!info) {
411 onConnected(false);
412 return;
413 }
414
415 std::unique_lock<std::mutex> lk(info->mutex_);
416 auto& ice = info->ice_;
417
418 if (!ice) {
419 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400420 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400421 onConnected(false);
422 return;
423 }
424
425 auto iceAttributes = ice->getLocalAttributes();
426 std::ostringstream icemsg;
427 icemsg << iceAttributes.ufrag << "\n";
428 icemsg << iceAttributes.pwd << "\n";
429 for (const auto& addr : ice->getLocalCandidates(1)) {
430 icemsg << addr << "\n";
431 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400432 config_->logger->debug("[device {}] Added local ICE candidate {}", addr, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400433 }
434
435 // Prepare connection request as a DHT message
436 PeerConnectionRequest val;
437
438 val.id = vid; /* Random id for the message unicity */
439 val.ice_msg = icemsg.str();
440 val.connType = connType;
441
442 auto value = std::make_shared<dht::Value>(std::move(val));
443 value->user_type = "peer_request";
444
445 // Send connection request through DHT
446 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400447 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400448 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
449 + devicePk->getId().toString()),
450 devicePk,
451 value,
452 [l=config_->logger,deviceId](bool ok) {
453 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400454 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400455 deviceId,
456 (ok ? "ok" : "failed"));
457 });
458 // Wait for call to onResponse() operated by DHT
459 if (isDestroying_) {
460 onConnected(true); // This avoid to wait new negotiation when destroying
461 return;
462 }
463
464 info->onConnected_ = std::move(onConnected);
465 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
466 std::chrono::steady_clock::now()
467 + DHT_MSG_TIMEOUT);
468 info->waitForAnswer_->async_wait(
469 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
470}
471
472void
473ConnectionManager::Impl::onResponse(const asio::error_code& ec,
474 const DeviceId& deviceId,
475 const dht::Value::Id& vid)
476{
477 if (ec == asio::error::operation_aborted)
478 return;
479 auto info = getInfo(deviceId, vid);
480 if (!info)
481 return;
482
483 std::unique_lock<std::mutex> lk(info->mutex_);
484 auto& ice = info->ice_;
485 if (isDestroying_) {
486 info->onConnected_(true); // The destructor can wake a pending wait here.
487 return;
488 }
489 if (!info->responseReceived_) {
490 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400491 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400492 info->onConnected_(false);
493 return;
494 }
495
496 if (!info->ice_) {
497 info->onConnected_(false);
498 return;
499 }
500
501 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
502
503 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
504 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400505 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400506 info->onConnected_(false);
507 return;
508 }
509 info->onConnected_(true);
510}
511
512bool
513ConnectionManager::Impl::connectDeviceOnNegoDone(
514 const DeviceId& deviceId,
515 const std::string& name,
516 const dht::Value::Id& vid,
517 const std::shared_ptr<dht::crypto::Certificate>& cert)
518{
519 auto info = getInfo(deviceId, vid);
520 if (!info)
521 return false;
522
523 std::unique_lock<std::mutex> lk {info->mutex_};
524 if (info->waitForAnswer_) {
525 // Negotiation is done and connected, go to handshake
526 // and avoid any cancellation at this point.
527 info->waitForAnswer_->cancel();
528 }
529 auto& ice = info->ice_;
530 if (!ice || !ice->isRunning()) {
531 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400532 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400533 return false;
534 }
535
536 // Build socket
537 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
538 std::move(ice)),
539 true);
540
541 // Negotiate a TLS session
542 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400543 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400544 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
545 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400546 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400547 identity(),
548 dhParams(),
549 *cert);
550
551 info->tls_->setOnReady(
552 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
553 bool ok) {
554 if (auto shared = w.lock())
555 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
556 });
557 return true;
558}
559
560void
561ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
562 const std::string& name,
563 ConnectCallback cb,
564 bool noNewSocket,
565 bool forceNewSocket,
566 const std::string& connType)
567{
568 if (!dht()) {
569 cb(nullptr, deviceId);
570 return;
571 }
572 if (deviceId.toString() == identity().second->getLongId().toString()) {
573 cb(nullptr, deviceId);
574 return;
575 }
576 findCertificate(deviceId,
577 [w = weak(),
578 deviceId,
579 name,
580 cb = std::move(cb),
581 noNewSocket,
582 forceNewSocket,
583 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
584 if (!cert) {
585 if (auto shared = w.lock())
586 if (shared->config_->logger)
587 shared->config_->logger->error(
588 "No valid certificate found for device {}",
589 deviceId);
590 cb(nullptr, deviceId);
591 return;
592 }
593 if (auto shared = w.lock()) {
594 shared->connectDevice(cert,
595 name,
596 std::move(cb),
597 noNewSocket,
598 forceNewSocket,
599 connType);
600 } else
601 cb(nullptr, deviceId);
602 });
603}
604
605void
606ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
607 const std::string& name,
608 ConnectCallback cb,
609 bool noNewSocket,
610 bool forceNewSocket,
611 const std::string& connType)
612{
613 // Avoid dht operation in a DHT callback to avoid deadlocks
614 dht::ThreadPool::computation().run([w = weak(),
615 name = std::move(name),
616 cert = std::move(cert),
617 cb = std::move(cb),
618 noNewSocket,
619 forceNewSocket,
620 connType] {
621 auto devicePk = cert->getSharedPublicKey();
622 auto deviceId = devicePk->getLongId();
623 auto sthis = w.lock();
624 if (!sthis || sthis->isDestroying_) {
625 cb(nullptr, deviceId);
626 return;
627 }
628 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
629 auto isConnectingToDevice = false;
630 {
631 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400632 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
633 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400634 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400635 while (pendings.connecting.find(vid) != pendings.connecting.end()
636 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400637 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400638 }
639 }
640 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400641 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400642 // Save current request for sendChannelRequest.
643 // Note: do not return here, cause we can be in a state where first
644 // socket is negotiated and first channel is pending
645 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400646 if (isConnectingToDevice && !forceNewSocket)
647 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400648 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400649 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400650 }
651
652 // Check if already negotiated
653 CallbackId cbId(deviceId, vid);
654 if (auto info = sthis->getConnectedInfo(deviceId)) {
655 std::lock_guard<std::mutex> lk(info->mutex_);
656 if (info->socket_) {
657 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400658 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400659 info->cbIds_.emplace(cbId);
660 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
661 return;
662 }
663 }
664
665 if (isConnectingToDevice && !forceNewSocket) {
666 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400667 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400668 return;
669 }
670 if (noNewSocket) {
671 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400672 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400673 return;
674 }
675
676 // Note: used when the ice negotiation fails to erase
677 // all stored structures.
678 auto eraseInfo = [w, cbId] {
679 if (auto shared = w.lock()) {
680 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400681 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400682 std::lock_guard<std::mutex> lk(shared->infosMtx_);
683 shared->infos_.erase(cbId);
684 }
685 };
686
687 // If no socket exists, we need to initiate an ICE connection.
688 sthis->getIceOptions([w,
689 deviceId = std::move(deviceId),
690 devicePk = std::move(devicePk),
691 name = std::move(name),
692 cert = std::move(cert),
693 vid,
694 connType,
695 eraseInfo](auto&& ice_config) {
696 auto sthis = w.lock();
697 if (!sthis) {
698 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
699 return;
700 }
701 ice_config.tcpEnable = true;
702 ice_config.onInitDone = [w,
703 deviceId = std::move(deviceId),
704 devicePk = std::move(devicePk),
705 name = std::move(name),
706 cert = std::move(cert),
707 vid,
708 connType,
709 eraseInfo](bool ok) {
710 dht::ThreadPool::io().run([w = std::move(w),
711 devicePk = std::move(devicePk),
712 vid = std::move(vid),
713 eraseInfo,
714 connType, ok] {
715 auto sthis = w.lock();
716 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400717 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400718 if (!sthis || !ok) {
719 eraseInfo();
720 return;
721 }
722 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
723 if (!ok) {
724 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
725 }
726 });
727 });
728 };
729 ice_config.onNegoDone = [w,
730 deviceId,
731 name,
732 cert = std::move(cert),
733 vid,
734 eraseInfo](bool ok) {
735 dht::ThreadPool::io().run([w = std::move(w),
736 deviceId = std::move(deviceId),
737 name = std::move(name),
738 cert = std::move(cert),
739 vid = std::move(vid),
740 eraseInfo = std::move(eraseInfo),
741 ok] {
742 auto sthis = w.lock();
743 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400744 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400745 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
746 eraseInfo();
747 });
748 };
749
750 auto info = std::make_shared<ConnectionInfo>();
751 {
752 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
753 sthis->infos_[{deviceId, vid}] = info;
754 }
755 std::unique_lock<std::mutex> lk {info->mutex_};
756 ice_config.master = false;
757 ice_config.streamsCount = 1;
758 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400759 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400760 if (!info->ice_) {
761 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400762 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400763 eraseInfo();
764 return;
765 }
766 // We need to detect any shutdown if the ice session is destroyed before going to the
767 // TLS session;
768 info->ice_->setOnShutdown([eraseInfo]() {
769 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
770 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400771 try {
772 info->ice_->initIceInstance(ice_config);
773 } catch (const std::exception& e) {
774 if (sthis->config_->logger)
775 sthis->config_->logger->error("{}", e.what());
776 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
777 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400778 });
779 });
780}
781
782void
783ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
784 const std::string& name,
785 const DeviceId& deviceId,
786 const dht::Value::Id& vid)
787{
788 auto channelSock = sock->addChannel(name);
789 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
790 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400791 if (auto shared = w.lock())
792 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400793 });
794 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400795 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400796 auto shared = w.lock();
797 auto channelSock = wSock.lock();
798 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400799 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400800 });
801
802 ChannelRequest val;
803 val.name = channelSock->name();
804 val.state = ChannelRequestState::REQUEST;
805 val.channel = channelSock->channel();
806 msgpack::sbuffer buffer(256);
807 msgpack::pack(buffer, val);
808
809 std::error_code ec;
810 int res = sock->write(CONTROL_CHANNEL,
811 reinterpret_cast<const uint8_t*>(buffer.data()),
812 buffer.size(),
813 ec);
814 if (res < 0) {
815 // TODO check if we should handle errors here
816 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400817 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400818 }
819}
820
821void
822ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
823{
824 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400825 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400826 if (config_->logger)
827 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400828 std::lock_guard<std::mutex> lk {info->mutex_};
829 info->responseReceived_ = true;
830 info->response_ = std::move(req);
831 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
832 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
833 this,
834 std::placeholders::_1,
835 device,
836 req.id));
837 } else {
838 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400839 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400840 }
841}
842
843void
844ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
845{
846 if (!dht())
847 return;
848 dht()->listen<PeerConnectionRequest>(
849 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
850 [w = weak()](PeerConnectionRequest&& req) {
851 auto shared = w.lock();
852 if (!shared)
853 return false;
854 if (shared->isMessageTreated(to_hex_string(req.id))) {
855 // Message already treated. Just ignore
856 return true;
857 }
858 if (req.isAnswer) {
859 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400860 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400861 } else {
862 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400863 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400864 }
865 if (req.isAnswer) {
866 shared->onPeerResponse(req);
867 } else {
868 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400869 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400870 req.from,
871 [w, req = std::move(req)](
872 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
873 auto shared = w.lock();
874 if (!shared)
875 return;
876 dht::InfoHash peer_h;
877 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
878#if TARGET_OS_IOS
879 if (shared->iOSConnectedCb_(req.connType, peer_h))
880 return;
881#endif
882 shared->onDhtPeerRequest(req, cert);
883 } else {
884 if (shared->config_->logger)
885 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -0400886 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400887 req.owner->getLongId());
888 }
889 });
890 }
891
892 return true;
893 },
894 dht::Value::UserTypeFilter("peer_request"));
895}
896
897void
898ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
899 const DeviceId& deviceId,
900 const dht::Value::Id& vid,
901 const std::string& name)
902{
903 if (isDestroying_)
904 return;
905 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
906 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
907 // asked yet)
908 auto isDhtRequest = name.empty();
909 if (!ok) {
910 if (isDhtRequest) {
911 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400912 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400913 deviceId,
914 name,
915 vid);
916 if (connReadyCb_)
917 connReadyCb_(deviceId, "", nullptr);
918 } else {
919 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400920 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400921 deviceId,
922 name,
923 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400924 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400925 }
926 } else {
927 // The socket is ready, store it
928 if (isDhtRequest) {
929 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400930 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400931 deviceId,
932 vid);
933 } else {
934 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400935 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400936 deviceId,
937 name,
938 vid);
939 }
940
941 auto info = getInfo(deviceId, vid);
942 addNewMultiplexedSocket({deviceId, vid}, info);
943 // Finally, open the channel and launch pending callbacks
944 if (info->socket_) {
945 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -0400946 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400947 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -0400948 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -0400949 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -0400950 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400951 }
952 }
953 }
954}
955
956void
957ConnectionManager::Impl::answerTo(IceTransport& ice,
958 const dht::Value::Id& id,
959 const std::shared_ptr<dht::crypto::PublicKey>& from)
960{
961 // NOTE: This is a shortest version of a real SDP message to save some bits
962 auto iceAttributes = ice.getLocalAttributes();
963 std::ostringstream icemsg;
964 icemsg << iceAttributes.ufrag << "\n";
965 icemsg << iceAttributes.pwd << "\n";
966 for (const auto& addr : ice.getLocalCandidates(1)) {
967 icemsg << addr << "\n";
968 }
969
970 // Send PeerConnection response
971 PeerConnectionRequest val;
972 val.id = id;
973 val.ice_msg = icemsg.str();
974 val.isAnswer = true;
975 auto value = std::make_shared<dht::Value>(std::move(val));
976 value->user_type = "peer_request";
977
978 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400979 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400980 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
981 + from->getId().toString()),
982 from,
983 value,
984 [from,l=config_->logger](bool ok) {
985 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400986 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400987 from->getLongId(),
988 (ok ? "ok" : "failed"));
989 });
990}
991
992bool
993ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
994{
995 auto deviceId = req.owner->getLongId();
996 auto info = getInfo(deviceId, req.id);
997 if (!info)
998 return false;
999
1000 std::unique_lock<std::mutex> lk {info->mutex_};
1001 auto& ice = info->ice_;
1002 if (!ice) {
1003 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001004 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001005 if (connReadyCb_)
1006 connReadyCb_(deviceId, "", nullptr);
1007 return false;
1008 }
1009
1010 auto sdp = ice->parseIceCandidates(req.ice_msg);
1011 answerTo(*ice, req.id, req.owner);
1012 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1013 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001014 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001015 ice = nullptr;
1016 if (connReadyCb_)
1017 connReadyCb_(deviceId, "", nullptr);
1018 return false;
1019 }
1020 return true;
1021}
1022
1023bool
1024ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1025{
1026 auto deviceId = req.owner->getLongId();
1027 auto info = getInfo(deviceId, req.id);
1028 if (!info)
1029 return false;
1030
1031 std::unique_lock<std::mutex> lk {info->mutex_};
1032 auto& ice = info->ice_;
1033 if (!ice) {
1034 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001035 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001036 return false;
1037 }
1038
1039 // Build socket
1040 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1041 std::move(ice)),
1042 false);
1043
1044 // init TLS session
1045 auto ph = req.from;
1046 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001047 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1048 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001049 req.id);
1050 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1051 std::move(endpoint),
1052 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001053 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001054 identity(),
1055 dhParams(),
1056 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1057 auto shared = w.lock();
1058 if (!shared)
1059 return false;
1060 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1061 if (!crt)
1062 return false;
1063 return crt->getPacked() == cert.getPacked();
1064 });
1065
1066 info->tls_->setOnReady(
1067 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1068 if (auto shared = w.lock())
1069 shared->onTlsNegotiationDone(ok, deviceId, vid);
1070 });
1071 return true;
1072}
1073
1074void
1075ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1076 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1077{
1078 auto deviceId = req.owner->getLongId();
1079 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001080 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001081 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1082 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001083 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001084 return;
1085 }
1086
1087 // Because the connection is accepted, create an ICE socket.
1088 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1089 auto shared = w.lock();
1090 if (!shared)
1091 return;
1092 // Note: used when the ice negotiation fails to erase
1093 // all stored structures.
1094 auto eraseInfo = [w, id = req.id, deviceId] {
1095 if (auto shared = w.lock()) {
1096 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001097 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001098 if (shared->connReadyCb_)
1099 shared->connReadyCb_(deviceId, "", nullptr);
1100 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1101 shared->infos_.erase({deviceId, id});
1102 }
1103 };
1104
1105 ice_config.tcpEnable = true;
1106 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1107 auto shared = w.lock();
1108 if (!shared)
1109 return;
1110 if (!ok) {
1111 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001112 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001113 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1114 return;
1115 }
1116
1117 dht::ThreadPool::io().run(
1118 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1119 auto shared = w.lock();
1120 if (!shared)
1121 return;
1122 if (!shared->onRequestStartIce(req))
1123 eraseInfo();
1124 });
1125 };
1126
1127 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1128 auto shared = w.lock();
1129 if (!shared)
1130 return;
1131 if (!ok) {
1132 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001133 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001134 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1135 return;
1136 }
1137
1138 dht::ThreadPool::io().run(
1139 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1140 if (auto shared = w.lock())
1141 if (!shared->onRequestOnNegoDone(req))
1142 eraseInfo();
1143 });
1144 };
1145
1146 // Negotiate a new ICE socket
1147 auto info = std::make_shared<ConnectionInfo>();
1148 {
1149 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1150 shared->infos_[{deviceId, req.id}] = info;
1151 }
1152 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001153 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001154 std::unique_lock<std::mutex> lk {info->mutex_};
1155 ice_config.streamsCount = 1;
1156 ice_config.compCountPerStream = 1; // TCP
1157 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001158 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001159 if (not info->ice_) {
1160 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001161 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001162 eraseInfo();
1163 return;
1164 }
1165 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1166 info->ice_->setOnShutdown([eraseInfo]() {
1167 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1168 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001169 try {
1170 info->ice_->initIceInstance(ice_config);
1171 } catch (const std::exception& e) {
1172 if (shared->config_->logger)
1173 shared->config_->logger->error("{}", e.what());
1174 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1175 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001176 });
1177}
1178
1179void
1180ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1181{
1182 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1183 info->socket_->setOnReady(
1184 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1185 if (auto sthis = w.lock())
1186 if (sthis->connReadyCb_)
1187 sthis->connReadyCb_(deviceId, socket->name(), socket);
1188 });
1189 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1190 const uint16_t&,
1191 const std::string& name) {
1192 if (auto sthis = w.lock())
1193 if (sthis->channelReqCb_)
1194 return sthis->channelReqCb_(peer, name);
1195 return false;
1196 });
1197 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1198 // Cancel current outgoing connections
1199 dht::ThreadPool::io().run([w, deviceId, vid] {
1200 auto sthis = w.lock();
1201 if (!sthis)
1202 return;
1203
1204 std::set<CallbackId> ids;
1205 if (auto info = sthis->getInfo(deviceId, vid)) {
1206 std::lock_guard<std::mutex> lk(info->mutex_);
1207 if (info->socket_) {
1208 ids = std::move(info->cbIds_);
1209 info->socket_->shutdown();
1210 }
1211 }
1212 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001213 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001214
1215 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1216 sthis->infos_.erase({deviceId, vid});
1217 });
1218 });
1219}
1220
1221const std::shared_future<tls::DhParams>
1222ConnectionManager::Impl::dhParams() const
1223{
1224 return dht::ThreadPool::computation().get<tls::DhParams>(
1225 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
1226 ;
1227}
1228
1229template<typename ID = dht::Value::Id>
1230std::set<ID, std::less<>>
1231loadIdList(const std::string& path)
1232{
1233 std::set<ID, std::less<>> ids;
1234 std::ifstream file = fileutils::ifstream(path);
1235 if (!file.is_open()) {
1236 //JAMI_DBG("Could not load %s", path.c_str());
1237 return ids;
1238 }
1239 std::string line;
1240 while (std::getline(file, line)) {
1241 if constexpr (std::is_same<ID, std::string>::value) {
1242 ids.emplace(std::move(line));
1243 } else if constexpr (std::is_integral<ID>::value) {
1244 ID vid;
1245 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1246 ec == std::errc()) {
1247 ids.emplace(vid);
1248 }
1249 }
1250 }
1251 return ids;
1252}
1253
1254template<typename List = std::set<dht::Value::Id>>
1255void
1256saveIdList(const std::string& path, const List& ids)
1257{
1258 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1259 if (!file.is_open()) {
1260 //JAMI_ERR("Could not save to %s", path.c_str());
1261 return;
1262 }
1263 for (auto& c : ids)
1264 file << std::hex << c << "\n";
1265}
1266
1267void
1268ConnectionManager::Impl::loadTreatedMessages()
1269{
1270 std::lock_guard<std::mutex> lock(messageMutex_);
1271 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1272 treatedMessages_ = loadIdList<std::string>(path);
1273 if (treatedMessages_.empty()) {
1274 auto messages = loadIdList(path);
1275 for (const auto& m : messages)
1276 treatedMessages_.emplace(to_hex_string(m));
1277 }
1278}
1279
1280void
1281ConnectionManager::Impl::saveTreatedMessages() const
1282{
1283 dht::ThreadPool::io().run([w = weak()]() {
1284 if (auto sthis = w.lock()) {
1285 auto& this_ = *sthis;
1286 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1287 fileutils::check_dir(this_.config_->cachePath.c_str());
1288 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1289 + DIR_SEPARATOR_STR "treatedMessages",
1290 this_.treatedMessages_);
1291 }
1292 });
1293}
1294
1295bool
1296ConnectionManager::Impl::isMessageTreated(std::string_view id)
1297{
1298 std::lock_guard<std::mutex> lock(messageMutex_);
1299 auto res = treatedMessages_.emplace(id);
1300 if (res.second) {
1301 saveTreatedMessages();
1302 return false;
1303 }
1304 return true;
1305}
1306
1307/**
1308 * returns whether or not UPnP is enabled and active_
1309 * ie: if it is able to make port mappings
1310 */
1311bool
1312ConnectionManager::Impl::getUPnPActive() const
1313{
1314 return config_->getUPnPActive();
1315}
1316
1317IpAddr
1318ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1319{
1320 if (family == AF_INET)
1321 return publishedIp_[0];
1322 if (family == AF_INET6)
1323 return publishedIp_[1];
1324
1325 assert(family == AF_UNSPEC);
1326
1327 // If family is not set, prefere IPv4 if available. It's more
1328 // likely to succeed behind NAT.
1329 if (publishedIp_[0])
1330 return publishedIp_[0];
1331 if (publishedIp_[1])
1332 return publishedIp_[1];
1333 return {};
1334}
1335
1336void
1337ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1338{
1339 if (ip_addr.getFamily() == AF_INET) {
1340 publishedIp_[0] = ip_addr;
1341 } else {
1342 publishedIp_[1] = ip_addr;
1343 }
1344}
1345
1346void
1347ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1348{
1349 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1350 bool hasIpv4 {false}, hasIpv6 {false};
1351 for (auto& result : results) {
1352 auto family = result.getFamily();
1353 if (family == AF_INET) {
1354 if (not hasIpv4) {
1355 hasIpv4 = true;
1356 if (config_->logger)
1357 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1358 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1359 setPublishedAddress(*result.get());
1360 if (config_->upnpCtrl) {
1361 config_->upnpCtrl->setPublicAddress(*result.get());
1362 }
1363 }
1364 } else if (family == AF_INET6) {
1365 if (not hasIpv6) {
1366 hasIpv6 = true;
1367 if (config_->logger)
1368 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1369 setPublishedAddress(*result.get());
1370 }
1371 }
1372 if (hasIpv4 and hasIpv6)
1373 break;
1374 }
1375 if (cb)
1376 cb();
1377 });
1378}
1379
1380void
1381ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1382{
1383 storeActiveIpAddress([this, cb = std::move(cb)] {
1384 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1385 auto publishedAddr = getPublishedIpAddress();
1386
1387 if (publishedAddr) {
1388 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1389 publishedAddr.getFamily());
1390 if (interfaceAddr) {
1391 opts.accountLocalAddr = interfaceAddr;
1392 opts.accountPublicAddr = publishedAddr;
1393 }
1394 }
1395 if (cb)
1396 cb(std::move(opts));
1397 });
1398}
1399
1400IceTransportOptions
1401ConnectionManager::Impl::getIceOptions() const noexcept
1402{
1403 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001404 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001405 opts.upnpEnable = getUPnPActive();
1406
1407 if (config_->stunEnabled)
1408 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1409 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001410 if (config_->turnCache) {
1411 auto turnAddr = config_->turnCache->getResolvedTurn();
1412 if (turnAddr != std::nullopt) {
1413 opts.turnServers.emplace_back(TurnServerInfo()
1414 .setUri(turnAddr->toString())
1415 .setUsername(config_->turnServerUserName)
1416 .setPassword(config_->turnServerPwd)
1417 .setRealm(config_->turnServerRealm));
1418 }
1419 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001420 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001421 .setUri(config_->turnServer)
1422 .setUsername(config_->turnServerUserName)
1423 .setPassword(config_->turnServerPwd)
1424 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001425 }
1426 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1427 // co issues. So this needs some debug. for now just disable
1428 // if (cacheTurnV6 && *cacheTurnV6) {
1429 // opts.turnServers.emplace_back(TurnServerInfo()
1430 // .setUri(cacheTurnV6->toString(true))
1431 // .setUsername(turnServerUserName_)
1432 // .setPassword(turnServerPwd_)
1433 // .setRealm(turnServerRealm_));
1434 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001435 }
1436 return opts;
1437}
1438
1439bool
1440ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1441 dht::InfoHash& account_id,
1442 const std::shared_ptr<Logger>& logger)
1443{
1444 if (not crt)
1445 return false;
1446
1447 auto top_issuer = crt;
1448 while (top_issuer->issuer)
1449 top_issuer = top_issuer->issuer;
1450
1451 // Device certificate can't be self-signed
1452 if (top_issuer == crt) {
1453 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001454 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001455 return false;
1456 }
1457
1458 // Check peer certificate chain
1459 // Trust store with top issuer as the only CA
1460 dht::crypto::TrustList peer_trust;
1461 peer_trust.add(*top_issuer);
1462 if (not peer_trust.verify(*crt)) {
1463 if (logger)
1464 logger->warn("Found invalid peer device: {}", crt->getLongId());
1465 return false;
1466 }
1467
1468 // Check cached OCSP response
1469 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1470 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001471 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001472 return false;
1473 }
1474
1475 account_id = crt->issuer->getId();
1476 if (logger)
1477 logger->warn("Found peer device: {} account:{} CA:{}",
1478 crt->getLongId(),
1479 account_id,
1480 top_issuer->getId());
1481 return true;
1482}
1483
1484bool
1485ConnectionManager::Impl::findCertificate(
1486 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1487{
1488 if (auto cert = certStore().getCertificate(id.toString())) {
1489 if (cb)
1490 cb(cert);
1491 } else if (cb)
1492 cb(nullptr);
1493 return true;
1494}
1495
Sébastien Blin34086512023-07-25 09:52:14 -04001496bool
1497ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1498 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1499{
1500 if (auto cert = certStore().getCertificate(h.toString())) {
1501 if (cb)
1502 cb(cert);
1503 } else {
1504 dht()->findCertificate(h,
1505 [cb = std::move(cb), this](
1506 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1507 if (crt)
1508 certStore().pinCertificate(crt);
1509 if (cb)
1510 cb(crt);
1511 });
1512 }
1513 return true;
1514}
1515
Adrien Béraud612b55b2023-05-29 10:42:04 -04001516ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1517 : pimpl_ {std::make_shared<Impl>(config_)}
1518{}
1519
1520ConnectionManager::~ConnectionManager()
1521{
1522 if (pimpl_)
1523 pimpl_->shutdown();
1524}
1525
1526void
1527ConnectionManager::connectDevice(const DeviceId& deviceId,
1528 const std::string& name,
1529 ConnectCallback cb,
1530 bool noNewSocket,
1531 bool forceNewSocket,
1532 const std::string& connType)
1533{
1534 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1535}
1536
1537void
1538ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1539 const std::string& name,
1540 ConnectCallback cb,
1541 bool noNewSocket,
1542 bool forceNewSocket,
1543 const std::string& connType)
1544{
1545 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1546}
1547
1548bool
1549ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1550{
Adrien Béraud665294f2023-06-13 18:09:11 -04001551 auto pending = pimpl_->getPendingIds(deviceId);
1552 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001553 != pending.end();
1554}
1555
1556void
1557ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1558{
1559 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1560 std::set<DeviceId> peersDevices;
1561 {
1562 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1563 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1564 auto const& [key, value] = *iter;
1565 auto deviceId = key.first;
1566 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1567 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1568 connInfos.emplace_back(value);
1569 peersDevices.emplace(deviceId);
1570 iter = pimpl_->infos_.erase(iter);
1571 } else {
1572 iter++;
1573 }
1574 }
1575 }
1576 // Stop connections to all peers devices
1577 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001578 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001579 // This will close the TLS Session
1580 pimpl_->removeUnusedConnections(deviceId);
1581 }
1582 for (auto& info : connInfos) {
1583 if (info->socket_)
1584 info->socket_->shutdown();
1585 if (info->waitForAnswer_)
1586 info->waitForAnswer_->cancel();
1587 if (info->ice_) {
1588 std::unique_lock<std::mutex> lk {info->mutex_};
1589 dht::ThreadPool::io().run(
1590 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1591 }
1592 }
1593}
1594
1595void
1596ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1597{
1598 pimpl_->onDhtConnected(devicePk);
1599}
1600
1601void
1602ConnectionManager::onICERequest(onICERequestCallback&& cb)
1603{
1604 pimpl_->iceReqCb_ = std::move(cb);
1605}
1606
1607void
1608ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1609{
1610 pimpl_->channelReqCb_ = std::move(cb);
1611}
1612
1613void
1614ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1615{
1616 pimpl_->connReadyCb_ = std::move(cb);
1617}
1618
1619void
1620ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1621{
1622 pimpl_->iOSConnectedCb_ = std::move(cb);
1623}
1624
1625std::size_t
1626ConnectionManager::activeSockets() const
1627{
1628 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1629 return pimpl_->infos_.size();
1630}
1631
1632void
1633ConnectionManager::monitor() const
1634{
1635 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1636 auto logger = pimpl_->config_->logger;
1637 if (!logger)
1638 return;
1639 logger->debug("ConnectionManager current status:");
1640 for (const auto& [_, ci] : pimpl_->infos_) {
1641 if (ci->socket_)
1642 ci->socket_->monitor();
1643 }
1644 logger->debug("ConnectionManager end status.");
1645}
1646
1647void
1648ConnectionManager::connectivityChanged()
1649{
1650 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1651 for (const auto& [_, ci] : pimpl_->infos_) {
1652 if (ci->socket_)
1653 ci->socket_->sendBeacon();
1654 }
1655}
1656
1657void
1658ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1659{
1660 return pimpl_->getIceOptions(std::move(cb));
1661}
1662
1663IceTransportOptions
1664ConnectionManager::getIceOptions() const noexcept
1665{
1666 return pimpl_->getIceOptions();
1667}
1668
1669IpAddr
1670ConnectionManager::getPublishedIpAddress(uint16_t family) const
1671{
1672 return pimpl_->getPublishedIpAddress(family);
1673}
1674
1675void
1676ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1677{
1678 return pimpl_->setPublishedAddress(ip_addr);
1679}
1680
1681void
1682ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1683{
1684 return pimpl_->storeActiveIpAddress(std::move(cb));
1685}
1686
1687std::shared_ptr<ConnectionManager::Config>
1688ConnectionManager::getConfig()
1689{
1690 return pimpl_->config_;
1691}
1692
Sébastien Blin464bdff2023-07-19 08:02:53 -04001693} // namespace dhtnet