blob: 339c8d56ec5b6b76f753ce8320fa3ed3bb945c4f [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Amna31791e52023-08-03 12:40:57 -040044std::string
45callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
46{
47 return fmt::format("{} {}", did.to_view(), vid);
48}
Adrien Béraud612b55b2023-05-29 10:42:04 -040049
Amna31791e52023-08-03 12:40:57 -040050CallbackId parseCallbackId(std::string_view ci)
51{
52 auto sep = ci.find(' ');
53 std::string_view deviceIdString = ci.substr(0, sep);
54 std::string_view vidString = ci.substr(sep + 1);
55
56 dhtnet::DeviceId deviceId(deviceIdString);
57 dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);
58
59 return CallbackId(deviceId, vid);
60}
Amna81221ad2023-09-14 17:33:26 -040061
62std::shared_ptr<ConnectionManager::Config>
63createConfig(std::shared_ptr<ConnectionManager::Config> config_)
64{
65 if (!config_->certStore){
66 config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger);
67 }
68 if (!config_->dht) {
69 dht::DhtRunner::Config dhtConfig;
70 dhtConfig.dht_config.id = config_->id;
71 dhtConfig.threaded = true;
72 dht::DhtRunner::Context dhtContext;
73 dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) {
74 std::vector<std::shared_ptr<dht::crypto::Certificate>> ret;
75 if (auto cert = c->getCertificate(pk_id.toString()))
76 ret.emplace_back(std::move(cert));
77 return ret;
78 };
79 config_->dht = std::make_shared<dht::DhtRunner>();
80 config_->dht->run(dhtConfig, std::move(dhtContext));
81 config_->dht->bootstrap("bootstrap.jami.net");
82 }
83 if (!config_->factory){
84 config_->factory = std::make_shared<IceTransportFactory>(config_->logger);
85 }
86 return config_;
87}
88
Adrien Béraud612b55b2023-05-29 10:42:04 -040089struct ConnectionInfo
90{
91 ~ConnectionInfo()
92 {
93 if (socket_)
94 socket_->join();
95 }
96
97 std::mutex mutex_ {};
98 bool responseReceived_ {false};
99 PeerConnectionRequest response_ {};
100 std::unique_ptr<IceTransport> ice_ {nullptr};
101 // Used to store currently non ready TLS Socket
102 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
103 std::shared_ptr<MultiplexedSocket> socket_ {};
104 std::set<CallbackId> cbIds_ {};
105
106 std::function<void(bool)> onConnected_;
107 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
108};
109
110/**
111 * returns whether or not UPnP is enabled and active_
112 * ie: if it is able to make port mappings
113 */
114bool
115ConnectionManager::Config::getUPnPActive() const
116{
117 if (upnpCtrl)
118 return upnpCtrl->isReady();
119 return false;
120}
121
122class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
123{
124public:
125 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
Amna81221ad2023-09-14 17:33:26 -0400126 : config_ {std::move(createConfig(config_))}
Sébastien Blincf569402023-07-27 09:46:40 -0400127 , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400128 {
Amna81221ad2023-09-14 17:33:26 -0400129 if(!config_->ioContext) {
130 config_->ioContext = std::make_shared<asio::io_context>();
131 ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() {
132 try {
133 auto work = asio::make_work_guard(*context);
134 context->run();
135 } catch (const std::exception& ex) {
136 if (l) l->error("Exception: {}", ex.what());
137 }
138 });
139 }
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400140 }
Amna81221ad2023-09-14 17:33:26 -0400141 ~Impl() {
142 if (ioContextRunner_) {
143 if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread");
144 config_->ioContext->stop();
145 ioContextRunner_->join();
146 ioContextRunner_.reset();
147 }
148 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400149
150 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
151 const dht::crypto::Identity& identity() const { return config_->id; }
152
153 void removeUnusedConnections(const DeviceId& deviceId = {})
154 {
155 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
156
157 {
158 std::lock_guard<std::mutex> lk(infosMtx_);
159 for (auto it = infos_.begin(); it != infos_.end();) {
160 auto& [key, info] = *it;
161 if (info && (!deviceId || key.first == deviceId)) {
162 unused.emplace_back(std::move(info));
163 it = infos_.erase(it);
164 } else {
165 ++it;
166 }
167 }
168 }
169 for (auto& info: unused) {
170 if (info->tls_)
171 info->tls_->shutdown();
172 if (info->socket_)
173 info->socket_->shutdown();
174 if (info->waitForAnswer_)
175 info->waitForAnswer_->cancel();
176 }
177 if (!unused.empty())
Amna81221ad2023-09-14 17:33:26 -0400178 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable {
179 infos.clear();
180 });
Adrien Béraud612b55b2023-05-29 10:42:04 -0400181 }
182
183 void shutdown()
184 {
185 if (isDestroying_.exchange(true))
186 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400187 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400188 {
189 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400190 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400191 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400192 for (auto& [deviceId, pcbs] : po) {
193 for (auto& [id, pending] : pcbs.connecting)
194 pending.cb(nullptr, deviceId);
195 for (auto& [id, pending] : pcbs.waiting)
196 pending.cb(nullptr, deviceId);
197 }
198
Adrien Béraud612b55b2023-05-29 10:42:04 -0400199 removeUnusedConnections();
200 }
201
Adrien Béraud612b55b2023-05-29 10:42:04 -0400202 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
203 const dht::Value::Id& vid,
204 const std::string& connType,
205 std::function<void(bool)> onConnected);
206 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
207 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
208 const std::string& name,
209 const dht::Value::Id& vid,
210 const std::shared_ptr<dht::crypto::Certificate>& cert);
211 void connectDevice(const DeviceId& deviceId,
212 const std::string& uri,
213 ConnectCallback cb,
214 bool noNewSocket = false,
215 bool forceNewSocket = false,
216 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400217 void connectDevice(const dht::InfoHash& deviceId,
218 const std::string& uri,
219 ConnectCallbackLegacy cb,
220 bool noNewSocket = false,
221 bool forceNewSocket = false,
222 const std::string& connType = "");
223
Adrien Béraud612b55b2023-05-29 10:42:04 -0400224 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
225 const std::string& name,
226 ConnectCallback cb,
227 bool noNewSocket = false,
228 bool forceNewSocket = false,
229 const std::string& connType = "");
230 /**
231 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
232 * @param sock socket used to send the request
233 * @param name channel's name
234 * @param vid channel's id
235 * @param deviceId to identify the linked ConnectCallback
236 */
237 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
238 const std::string& name,
239 const DeviceId& deviceId,
240 const dht::Value::Id& vid);
241 /**
242 * Triggered when a PeerConnectionRequest comes from the DHT
243 */
244 void answerTo(IceTransport& ice,
245 const dht::Value::Id& id,
246 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
247 bool onRequestStartIce(const PeerConnectionRequest& req);
248 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
249 void onDhtPeerRequest(const PeerConnectionRequest& req,
250 const std::shared_ptr<dht::crypto::Certificate>& cert);
251
252 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
253 void onPeerResponse(const PeerConnectionRequest& req);
254 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
255
256 const std::shared_future<tls::DhParams> dhParams() const;
257 tls::CertificateStore& certStore() const { return *config_->certStore; }
258
259 mutable std::mutex messageMutex_ {};
260 std::set<std::string, std::less<>> treatedMessages_ {};
261
262 void loadTreatedMessages();
263 void saveTreatedMessages() const;
264
265 /// \return true if the given DHT message identifier has been treated
266 /// \note if message has not been treated yet this method st/ore this id and returns true at
267 /// further calls
268 bool isMessageTreated(std::string_view id);
269
270 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
271
272 /**
273 * Published IPv4/IPv6 addresses, used only if defined by the user in account
274 * configuration
275 *
276 */
277 IpAddr publishedIp_[2] {};
278
Adrien Béraud612b55b2023-05-29 10:42:04 -0400279 /**
280 * interface name on which this account is bound
281 */
282 std::string interface_ {"default"};
283
284 /**
285 * Get the local interface name on which this account is bound.
286 */
287 const std::string& getLocalInterface() const { return interface_; }
288
289 /**
290 * Get the published IP address, fallbacks to NAT if family is unspecified
291 * Prefers the usage of IPv4 if possible.
292 */
293 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
294
295 /**
296 * Set published IP address according to given family
297 */
298 void setPublishedAddress(const IpAddr& ip_addr);
299
300 /**
301 * Store the local/public addresses used to register
302 */
303 void storeActiveIpAddress(std::function<void()>&& cb = {});
304
305 /**
306 * Create and return ICE options.
307 */
308 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
309 IceTransportOptions getIceOptions() const noexcept;
310
311 /**
312 * Inform that a potential peer device have been found.
313 * Returns true only if the device certificate is a valid device certificate.
314 * In that case (true is returned) the account_id parameter is set to the peer account ID.
315 */
316 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
317 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
318
319 bool findCertificate(const dht::PkId& id,
320 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400321 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400322
323 /**
324 * returns whether or not UPnP is enabled and active
325 * ie: if it is able to make port mappings
326 */
327 bool getUPnPActive() const;
328
329 /**
330 * Triggered when a new TLS socket is ready to use
331 * @param ok If succeed
332 * @param deviceId Related device
333 * @param vid vid of the connection request
334 * @param name non empty if TLS was created by connectDevice()
335 */
336 void onTlsNegotiationDone(bool ok,
337 const DeviceId& deviceId,
338 const dht::Value::Id& vid,
339 const std::string& name = "");
340
341 std::shared_ptr<ConnectionManager::Config> config_;
Amna81221ad2023-09-14 17:33:26 -0400342 std::unique_ptr<std::thread> ioContextRunner_;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400343
Adrien Béraud612b55b2023-05-29 10:42:04 -0400344 mutable std::mt19937_64 rand;
345
346 iOSConnectedCallback iOSConnectedCb_ {};
347
348 std::mutex infosMtx_ {};
349 // Note: Someone can ask multiple sockets, so to avoid any race condition,
350 // each device can have multiple multiplexed sockets.
351 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
352
353 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
354 {
355 std::lock_guard<std::mutex> lk(infosMtx_);
356 auto it = infos_.find({deviceId, id});
357 if (it != infos_.end())
358 return it->second;
359 return {};
360 }
361
362 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
363 {
364 std::lock_guard<std::mutex> lk(infosMtx_);
365 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
366 auto& [key, value] = item;
367 return key.first == deviceId && value && value->socket_;
368 });
369 if (it != infos_.end())
370 return it->second;
371 return {};
372 }
373
374 ChannelRequestCallback channelReqCb_ {};
375 ConnectionReadyCallback connReadyCb_ {};
376 onICERequestCallback iceReqCb_ {};
377
378 /**
379 * Stores callback from connectDevice
380 * @note: each device needs a vector because several connectDevice can
381 * be done in parallel and we only want one socket
382 */
383 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400384
Amna81221ad2023-09-14 17:33:26 -0400385
Adrien Béraud665294f2023-06-13 18:09:11 -0400386 struct PendingCb
387 {
388 std::string name;
389 ConnectCallback cb;
390 };
391 struct PendingOperations {
392 std::map<dht::Value::Id, PendingCb> connecting;
393 std::map<dht::Value::Id, PendingCb> waiting;
394 };
395
396 std::map<DeviceId, PendingOperations> pendingOperations_ {};
397
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400398 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400399 {
400 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400401 std::unique_lock<std::mutex> lk(connectCbsMtx_);
402 auto it = pendingOperations_.find(deviceId);
403 if (it == pendingOperations_.end())
404 return;
405 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400406 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400407 // Extract all pending callbacks
408 for (auto& [vid, cb] : pendingOperations.connecting)
409 ret.emplace_back(std::move(cb));
410 pendingOperations.connecting.clear();
411 for (auto& [vid, cb] : pendingOperations.waiting)
412 ret.emplace_back(std::move(cb));
413 pendingOperations.waiting.clear();
414 } else if (auto n = pendingOperations.waiting.extract(vid)) {
415 // If it's a waiting operation, just move it
416 ret.emplace_back(std::move(n.mapped()));
417 } else if (auto n = pendingOperations.connecting.extract(vid)) {
418 ret.emplace_back(std::move(n.mapped()));
419 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400420 // If accepted is false, it means that underlying socket is ok, but channel is declined
421 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400422 for (auto& [vid, cb] : pendingOperations.waiting)
423 ret.emplace_back(std::move(cb));
424 pendingOperations.waiting.clear();
425 for (auto& [vid, cb] : pendingOperations.connecting)
426 ret.emplace_back(std::move(cb));
427 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400428 }
429 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400430 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
431 pendingOperations_.erase(it);
432 lk.unlock();
433 for (auto& cb : ret)
434 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400435 }
436
Adrien Béraud665294f2023-06-13 18:09:11 -0400437 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400438 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400439 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400440 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400441 auto it = pendingOperations_.find(deviceId);
442 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400443 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400444 auto& pendingOp = it->second;
445 for (const auto& [id, pc]: pendingOp.connecting) {
446 if (vid == 0 || id == vid)
447 ret[id] = pc.name;
448 }
449 for (const auto& [id, pc]: pendingOp.waiting) {
450 if (vid == 0 || id == vid)
451 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400452 }
453 return ret;
454 }
455
456 std::shared_ptr<ConnectionManager::Impl> shared()
457 {
458 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
459 }
460 std::shared_ptr<ConnectionManager::Impl const> shared() const
461 {
462 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
463 }
464 std::weak_ptr<ConnectionManager::Impl> weak()
465 {
466 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
467 }
468 std::weak_ptr<ConnectionManager::Impl const> weak() const
469 {
470 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
471 }
472
473 std::atomic_bool isDestroying_ {false};
474};
475
476void
477ConnectionManager::Impl::connectDeviceStartIce(
478 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
479 const dht::Value::Id& vid,
480 const std::string& connType,
481 std::function<void(bool)> onConnected)
482{
483 auto deviceId = devicePk->getLongId();
484 auto info = getInfo(deviceId, vid);
485 if (!info) {
486 onConnected(false);
487 return;
488 }
489
490 std::unique_lock<std::mutex> lk(info->mutex_);
491 auto& ice = info->ice_;
492
493 if (!ice) {
494 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400495 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400496 onConnected(false);
497 return;
498 }
499
500 auto iceAttributes = ice->getLocalAttributes();
501 std::ostringstream icemsg;
502 icemsg << iceAttributes.ufrag << "\n";
503 icemsg << iceAttributes.pwd << "\n";
504 for (const auto& addr : ice->getLocalCandidates(1)) {
505 icemsg << addr << "\n";
506 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400507 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400508 }
509
510 // Prepare connection request as a DHT message
511 PeerConnectionRequest val;
512
513 val.id = vid; /* Random id for the message unicity */
514 val.ice_msg = icemsg.str();
515 val.connType = connType;
516
517 auto value = std::make_shared<dht::Value>(std::move(val));
518 value->user_type = "peer_request";
519
520 // Send connection request through DHT
521 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400522 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400523 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
524 + devicePk->getId().toString()),
525 devicePk,
526 value,
527 [l=config_->logger,deviceId](bool ok) {
528 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400529 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400530 deviceId,
531 (ok ? "ok" : "failed"));
532 });
533 // Wait for call to onResponse() operated by DHT
534 if (isDestroying_) {
535 onConnected(true); // This avoid to wait new negotiation when destroying
536 return;
537 }
538
539 info->onConnected_ = std::move(onConnected);
540 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
541 std::chrono::steady_clock::now()
542 + DHT_MSG_TIMEOUT);
543 info->waitForAnswer_->async_wait(
544 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
545}
546
547void
548ConnectionManager::Impl::onResponse(const asio::error_code& ec,
549 const DeviceId& deviceId,
550 const dht::Value::Id& vid)
551{
552 if (ec == asio::error::operation_aborted)
553 return;
554 auto info = getInfo(deviceId, vid);
555 if (!info)
556 return;
557
558 std::unique_lock<std::mutex> lk(info->mutex_);
559 auto& ice = info->ice_;
560 if (isDestroying_) {
561 info->onConnected_(true); // The destructor can wake a pending wait here.
562 return;
563 }
564 if (!info->responseReceived_) {
565 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400566 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400567 info->onConnected_(false);
568 return;
569 }
570
571 if (!info->ice_) {
572 info->onConnected_(false);
573 return;
574 }
575
576 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
577
578 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
579 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400580 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400581 info->onConnected_(false);
582 return;
583 }
584 info->onConnected_(true);
585}
586
587bool
588ConnectionManager::Impl::connectDeviceOnNegoDone(
589 const DeviceId& deviceId,
590 const std::string& name,
591 const dht::Value::Id& vid,
592 const std::shared_ptr<dht::crypto::Certificate>& cert)
593{
594 auto info = getInfo(deviceId, vid);
595 if (!info)
596 return false;
597
598 std::unique_lock<std::mutex> lk {info->mutex_};
599 if (info->waitForAnswer_) {
600 // Negotiation is done and connected, go to handshake
601 // and avoid any cancellation at this point.
602 info->waitForAnswer_->cancel();
603 }
604 auto& ice = info->ice_;
605 if (!ice || !ice->isRunning()) {
606 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400607 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400608 return false;
609 }
610
611 // Build socket
612 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
613 std::move(ice)),
614 true);
615
616 // Negotiate a TLS session
617 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400618 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400619 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
620 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400621 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400622 identity(),
623 dhParams(),
624 *cert);
625
626 info->tls_->setOnReady(
627 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
628 bool ok) {
629 if (auto shared = w.lock())
630 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
631 });
632 return true;
633}
634
635void
636ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
637 const std::string& name,
638 ConnectCallback cb,
639 bool noNewSocket,
640 bool forceNewSocket,
641 const std::string& connType)
642{
643 if (!dht()) {
644 cb(nullptr, deviceId);
645 return;
646 }
647 if (deviceId.toString() == identity().second->getLongId().toString()) {
648 cb(nullptr, deviceId);
649 return;
650 }
651 findCertificate(deviceId,
652 [w = weak(),
653 deviceId,
654 name,
655 cb = std::move(cb),
656 noNewSocket,
657 forceNewSocket,
658 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
659 if (!cert) {
660 if (auto shared = w.lock())
661 if (shared->config_->logger)
662 shared->config_->logger->error(
663 "No valid certificate found for device {}",
664 deviceId);
665 cb(nullptr, deviceId);
666 return;
667 }
668 if (auto shared = w.lock()) {
669 shared->connectDevice(cert,
670 name,
671 std::move(cb),
672 noNewSocket,
673 forceNewSocket,
674 connType);
675 } else
676 cb(nullptr, deviceId);
677 });
678}
679
680void
Amna0cf544d2023-07-25 14:25:09 -0400681ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
682 const std::string& name,
683 ConnectCallbackLegacy cb,
684 bool noNewSocket,
685 bool forceNewSocket,
686 const std::string& connType)
687{
688 if (!dht()) {
689 cb(nullptr, deviceId);
690 return;
691 }
692 if (deviceId.toString() == identity().second->getLongId().toString()) {
693 cb(nullptr, deviceId);
694 return;
695 }
696 findCertificate(deviceId,
697 [w = weak(),
698 deviceId,
699 name,
700 cb = std::move(cb),
701 noNewSocket,
702 forceNewSocket,
703 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
704 if (!cert) {
705 if (auto shared = w.lock())
706 if (shared->config_->logger)
707 shared->config_->logger->error(
708 "No valid certificate found for device {}",
709 deviceId);
710 cb(nullptr, deviceId);
711 return;
712 }
713 if (auto shared = w.lock()) {
714 shared->connectDevice(cert,
715 name,
Adrien Béraudd78d1ac2023-08-25 10:43:33 -0400716 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){
Amna0cf544d2023-07-25 14:25:09 -0400717 cb(sock, deviceId);
718 },
719 noNewSocket,
720 forceNewSocket,
721 connType);
722 } else
723 cb(nullptr, deviceId);
724 });
725}
726
727void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400728ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
729 const std::string& name,
730 ConnectCallback cb,
731 bool noNewSocket,
732 bool forceNewSocket,
733 const std::string& connType)
734{
735 // Avoid dht operation in a DHT callback to avoid deadlocks
736 dht::ThreadPool::computation().run([w = weak(),
737 name = std::move(name),
738 cert = std::move(cert),
739 cb = std::move(cb),
740 noNewSocket,
741 forceNewSocket,
742 connType] {
743 auto devicePk = cert->getSharedPublicKey();
744 auto deviceId = devicePk->getLongId();
745 auto sthis = w.lock();
746 if (!sthis || sthis->isDestroying_) {
747 cb(nullptr, deviceId);
748 return;
749 }
750 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
751 auto isConnectingToDevice = false;
752 {
753 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400754 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
755 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400756 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400757 while (pendings.connecting.find(vid) != pendings.connecting.end()
758 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400759 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400760 }
761 }
762 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400763 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400764 // Save current request for sendChannelRequest.
765 // Note: do not return here, cause we can be in a state where first
766 // socket is negotiated and first channel is pending
767 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400768 if (isConnectingToDevice && !forceNewSocket)
769 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400770 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400771 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400772 }
773
774 // Check if already negotiated
775 CallbackId cbId(deviceId, vid);
776 if (auto info = sthis->getConnectedInfo(deviceId)) {
777 std::lock_guard<std::mutex> lk(info->mutex_);
778 if (info->socket_) {
779 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400780 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400781 info->cbIds_.emplace(cbId);
782 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
783 return;
784 }
785 }
786
787 if (isConnectingToDevice && !forceNewSocket) {
788 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400789 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400790 return;
791 }
792 if (noNewSocket) {
793 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400794 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400795 return;
796 }
797
798 // Note: used when the ice negotiation fails to erase
799 // all stored structures.
800 auto eraseInfo = [w, cbId] {
801 if (auto shared = w.lock()) {
802 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400803 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400804 std::lock_guard<std::mutex> lk(shared->infosMtx_);
805 shared->infos_.erase(cbId);
806 }
807 };
808
809 // If no socket exists, we need to initiate an ICE connection.
810 sthis->getIceOptions([w,
811 deviceId = std::move(deviceId),
812 devicePk = std::move(devicePk),
813 name = std::move(name),
814 cert = std::move(cert),
815 vid,
816 connType,
817 eraseInfo](auto&& ice_config) {
818 auto sthis = w.lock();
819 if (!sthis) {
820 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
821 return;
822 }
823 ice_config.tcpEnable = true;
824 ice_config.onInitDone = [w,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400825 devicePk = std::move(devicePk),
826 name = std::move(name),
827 cert = std::move(cert),
828 vid,
829 connType,
830 eraseInfo](bool ok) {
831 dht::ThreadPool::io().run([w = std::move(w),
832 devicePk = std::move(devicePk),
833 vid = std::move(vid),
834 eraseInfo,
835 connType, ok] {
836 auto sthis = w.lock();
837 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400838 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400839 if (!sthis || !ok) {
840 eraseInfo();
841 return;
842 }
843 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
844 if (!ok) {
845 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
846 }
847 });
848 });
849 };
850 ice_config.onNegoDone = [w,
851 deviceId,
852 name,
853 cert = std::move(cert),
854 vid,
855 eraseInfo](bool ok) {
856 dht::ThreadPool::io().run([w = std::move(w),
857 deviceId = std::move(deviceId),
858 name = std::move(name),
859 cert = std::move(cert),
860 vid = std::move(vid),
861 eraseInfo = std::move(eraseInfo),
862 ok] {
863 auto sthis = w.lock();
864 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400865 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400866 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
867 eraseInfo();
868 });
869 };
870
871 auto info = std::make_shared<ConnectionInfo>();
872 {
873 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
874 sthis->infos_[{deviceId, vid}] = info;
875 }
876 std::unique_lock<std::mutex> lk {info->mutex_};
877 ice_config.master = false;
878 ice_config.streamsCount = 1;
879 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400880 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400881 if (!info->ice_) {
882 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400883 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400884 eraseInfo();
885 return;
886 }
887 // We need to detect any shutdown if the ice session is destroyed before going to the
888 // TLS session;
889 info->ice_->setOnShutdown([eraseInfo]() {
890 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
891 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400892 try {
893 info->ice_->initIceInstance(ice_config);
894 } catch (const std::exception& e) {
895 if (sthis->config_->logger)
896 sthis->config_->logger->error("{}", e.what());
897 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
898 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400899 });
900 });
901}
902
903void
904ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
905 const std::string& name,
906 const DeviceId& deviceId,
907 const dht::Value::Id& vid)
908{
909 auto channelSock = sock->addChannel(name);
910 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
911 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400912 if (auto shared = w.lock())
913 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400914 });
915 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400916 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400917 auto shared = w.lock();
918 auto channelSock = wSock.lock();
919 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400920 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400921 });
922
923 ChannelRequest val;
924 val.name = channelSock->name();
925 val.state = ChannelRequestState::REQUEST;
926 val.channel = channelSock->channel();
927 msgpack::sbuffer buffer(256);
928 msgpack::pack(buffer, val);
929
930 std::error_code ec;
931 int res = sock->write(CONTROL_CHANNEL,
932 reinterpret_cast<const uint8_t*>(buffer.data()),
933 buffer.size(),
934 ec);
935 if (res < 0) {
936 // TODO check if we should handle errors here
937 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400938 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400939 }
940}
941
942void
943ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
944{
945 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400946 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400947 if (config_->logger)
948 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400949 std::lock_guard<std::mutex> lk {info->mutex_};
950 info->responseReceived_ = true;
951 info->response_ = std::move(req);
952 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
953 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
954 this,
955 std::placeholders::_1,
956 device,
957 req.id));
958 } else {
959 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400960 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400961 }
962}
963
964void
965ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
966{
967 if (!dht())
968 return;
969 dht()->listen<PeerConnectionRequest>(
970 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
971 [w = weak()](PeerConnectionRequest&& req) {
972 auto shared = w.lock();
973 if (!shared)
974 return false;
975 if (shared->isMessageTreated(to_hex_string(req.id))) {
976 // Message already treated. Just ignore
977 return true;
978 }
979 if (req.isAnswer) {
980 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400981 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400982 } else {
983 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400984 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400985 }
986 if (req.isAnswer) {
987 shared->onPeerResponse(req);
988 } else {
989 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400990 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400991 req.from,
992 [w, req = std::move(req)](
993 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
994 auto shared = w.lock();
995 if (!shared)
996 return;
997 dht::InfoHash peer_h;
998 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
999#if TARGET_OS_IOS
1000 if (shared->iOSConnectedCb_(req.connType, peer_h))
1001 return;
1002#endif
1003 shared->onDhtPeerRequest(req, cert);
1004 } else {
1005 if (shared->config_->logger)
1006 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -04001007 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001008 req.owner->getLongId());
1009 }
1010 });
1011 }
1012
1013 return true;
1014 },
1015 dht::Value::UserTypeFilter("peer_request"));
1016}
1017
1018void
1019ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
1020 const DeviceId& deviceId,
1021 const dht::Value::Id& vid,
1022 const std::string& name)
1023{
1024 if (isDestroying_)
1025 return;
1026 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
1027 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
1028 // asked yet)
1029 auto isDhtRequest = name.empty();
1030 if (!ok) {
1031 if (isDhtRequest) {
1032 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001033 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001034 deviceId,
1035 name,
1036 vid);
1037 if (connReadyCb_)
1038 connReadyCb_(deviceId, "", nullptr);
1039 } else {
1040 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001041 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001042 deviceId,
1043 name,
1044 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -04001045 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001046 }
1047 } else {
1048 // The socket is ready, store it
1049 if (isDhtRequest) {
1050 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001051 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001052 deviceId,
1053 vid);
1054 } else {
1055 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001056 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001057 deviceId,
1058 name,
1059 vid);
1060 }
1061
1062 auto info = getInfo(deviceId, vid);
1063 addNewMultiplexedSocket({deviceId, vid}, info);
1064 // Finally, open the channel and launch pending callbacks
1065 if (info->socket_) {
1066 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001067 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001068 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001069 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001070 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001071 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001072 }
1073 }
1074 }
1075}
1076
1077void
1078ConnectionManager::Impl::answerTo(IceTransport& ice,
1079 const dht::Value::Id& id,
1080 const std::shared_ptr<dht::crypto::PublicKey>& from)
1081{
1082 // NOTE: This is a shortest version of a real SDP message to save some bits
1083 auto iceAttributes = ice.getLocalAttributes();
1084 std::ostringstream icemsg;
1085 icemsg << iceAttributes.ufrag << "\n";
1086 icemsg << iceAttributes.pwd << "\n";
1087 for (const auto& addr : ice.getLocalCandidates(1)) {
1088 icemsg << addr << "\n";
1089 }
1090
1091 // Send PeerConnection response
1092 PeerConnectionRequest val;
1093 val.id = id;
1094 val.ice_msg = icemsg.str();
1095 val.isAnswer = true;
1096 auto value = std::make_shared<dht::Value>(std::move(val));
1097 value->user_type = "peer_request";
1098
1099 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001100 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001101 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1102 + from->getId().toString()),
1103 from,
1104 value,
1105 [from,l=config_->logger](bool ok) {
1106 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001107 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001108 from->getLongId(),
1109 (ok ? "ok" : "failed"));
1110 });
1111}
1112
1113bool
1114ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1115{
1116 auto deviceId = req.owner->getLongId();
1117 auto info = getInfo(deviceId, req.id);
1118 if (!info)
1119 return false;
1120
1121 std::unique_lock<std::mutex> lk {info->mutex_};
1122 auto& ice = info->ice_;
1123 if (!ice) {
1124 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001125 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001126 if (connReadyCb_)
1127 connReadyCb_(deviceId, "", nullptr);
1128 return false;
1129 }
1130
1131 auto sdp = ice->parseIceCandidates(req.ice_msg);
1132 answerTo(*ice, req.id, req.owner);
1133 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1134 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001135 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001136 ice = nullptr;
1137 if (connReadyCb_)
1138 connReadyCb_(deviceId, "", nullptr);
1139 return false;
1140 }
1141 return true;
1142}
1143
1144bool
1145ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1146{
1147 auto deviceId = req.owner->getLongId();
1148 auto info = getInfo(deviceId, req.id);
1149 if (!info)
1150 return false;
1151
1152 std::unique_lock<std::mutex> lk {info->mutex_};
1153 auto& ice = info->ice_;
1154 if (!ice) {
1155 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001156 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001157 return false;
1158 }
1159
1160 // Build socket
1161 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1162 std::move(ice)),
1163 false);
1164
1165 // init TLS session
1166 auto ph = req.from;
1167 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001168 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1169 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001170 req.id);
1171 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1172 std::move(endpoint),
1173 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001174 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001175 identity(),
1176 dhParams(),
Adrien Béraud9efbd442023-08-27 12:38:07 -04001177 [ph, deviceId, w=weak(), l=config_->logger](const dht::crypto::Certificate& cert) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001178 auto shared = w.lock();
1179 if (!shared)
1180 return false;
Adrien Béraud9efbd442023-08-27 12:38:07 -04001181 if (cert.getPublicKey().getId() != ph
1182 || deviceId != cert.getPublicKey().getLongId()) {
1183 if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.",
1184 deviceId,
1185 cert.getPublicKey().getLongId());
1186 return false;
1187 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001188 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1189 if (!crt)
1190 return false;
1191 return crt->getPacked() == cert.getPacked();
1192 });
1193
1194 info->tls_->setOnReady(
1195 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1196 if (auto shared = w.lock())
1197 shared->onTlsNegotiationDone(ok, deviceId, vid);
1198 });
1199 return true;
1200}
1201
1202void
1203ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1204 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1205{
1206 auto deviceId = req.owner->getLongId();
1207 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001208 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001209 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1210 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001211 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001212 return;
1213 }
1214
1215 // Because the connection is accepted, create an ICE socket.
1216 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1217 auto shared = w.lock();
1218 if (!shared)
1219 return;
1220 // Note: used when the ice negotiation fails to erase
1221 // all stored structures.
1222 auto eraseInfo = [w, id = req.id, deviceId] {
1223 if (auto shared = w.lock()) {
1224 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001225 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001226 if (shared->connReadyCb_)
1227 shared->connReadyCb_(deviceId, "", nullptr);
1228 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1229 shared->infos_.erase({deviceId, id});
1230 }
1231 };
1232
1233 ice_config.tcpEnable = true;
1234 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1235 auto shared = w.lock();
1236 if (!shared)
1237 return;
1238 if (!ok) {
1239 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001240 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001241 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1242 return;
1243 }
1244
1245 dht::ThreadPool::io().run(
1246 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1247 auto shared = w.lock();
1248 if (!shared)
1249 return;
1250 if (!shared->onRequestStartIce(req))
1251 eraseInfo();
1252 });
1253 };
1254
1255 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1256 auto shared = w.lock();
1257 if (!shared)
1258 return;
1259 if (!ok) {
1260 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001261 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001262 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1263 return;
1264 }
1265
1266 dht::ThreadPool::io().run(
1267 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1268 if (auto shared = w.lock())
1269 if (!shared->onRequestOnNegoDone(req))
1270 eraseInfo();
1271 });
1272 };
1273
1274 // Negotiate a new ICE socket
1275 auto info = std::make_shared<ConnectionInfo>();
1276 {
1277 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1278 shared->infos_[{deviceId, req.id}] = info;
1279 }
1280 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001281 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001282 std::unique_lock<std::mutex> lk {info->mutex_};
1283 ice_config.streamsCount = 1;
1284 ice_config.compCountPerStream = 1; // TCP
1285 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001286 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001287 if (not info->ice_) {
1288 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001289 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001290 eraseInfo();
1291 return;
1292 }
1293 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1294 info->ice_->setOnShutdown([eraseInfo]() {
1295 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1296 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001297 try {
1298 info->ice_->initIceInstance(ice_config);
1299 } catch (const std::exception& e) {
1300 if (shared->config_->logger)
1301 shared->config_->logger->error("{}", e.what());
1302 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1303 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001304 });
1305}
1306
1307void
1308ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1309{
Adrien Béraud5636f7c2023-09-14 14:34:57 -04001310 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_), config_->logger);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001311 info->socket_->setOnReady(
1312 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1313 if (auto sthis = w.lock())
1314 if (sthis->connReadyCb_)
1315 sthis->connReadyCb_(deviceId, socket->name(), socket);
1316 });
1317 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1318 const uint16_t&,
1319 const std::string& name) {
1320 if (auto sthis = w.lock())
1321 if (sthis->channelReqCb_)
1322 return sthis->channelReqCb_(peer, name);
1323 return false;
1324 });
1325 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1326 // Cancel current outgoing connections
1327 dht::ThreadPool::io().run([w, deviceId, vid] {
1328 auto sthis = w.lock();
1329 if (!sthis)
1330 return;
1331
1332 std::set<CallbackId> ids;
1333 if (auto info = sthis->getInfo(deviceId, vid)) {
1334 std::lock_guard<std::mutex> lk(info->mutex_);
1335 if (info->socket_) {
1336 ids = std::move(info->cbIds_);
1337 info->socket_->shutdown();
1338 }
1339 }
1340 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001341 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001342
1343 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1344 sthis->infos_.erase({deviceId, vid});
1345 });
1346 });
1347}
1348
1349const std::shared_future<tls::DhParams>
1350ConnectionManager::Impl::dhParams() const
1351{
1352 return dht::ThreadPool::computation().get<tls::DhParams>(
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001353 std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001354}
1355
1356template<typename ID = dht::Value::Id>
1357std::set<ID, std::less<>>
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001358loadIdList(const std::filesystem::path& path)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001359{
1360 std::set<ID, std::less<>> ids;
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001361 std::ifstream file(path);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001362 if (!file.is_open()) {
1363 //JAMI_DBG("Could not load %s", path.c_str());
1364 return ids;
1365 }
1366 std::string line;
1367 while (std::getline(file, line)) {
1368 if constexpr (std::is_same<ID, std::string>::value) {
1369 ids.emplace(std::move(line));
1370 } else if constexpr (std::is_integral<ID>::value) {
1371 ID vid;
1372 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1373 ec == std::errc()) {
1374 ids.emplace(vid);
1375 }
1376 }
1377 }
1378 return ids;
1379}
1380
1381template<typename List = std::set<dht::Value::Id>>
1382void
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001383saveIdList(const std::filesystem::path& path, const List& ids)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001384{
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001385 std::ofstream file(path, std::ios::trunc | std::ios::binary);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001386 if (!file.is_open()) {
1387 //JAMI_ERR("Could not save to %s", path.c_str());
1388 return;
1389 }
1390 for (auto& c : ids)
1391 file << std::hex << c << "\n";
1392}
1393
1394void
1395ConnectionManager::Impl::loadTreatedMessages()
1396{
1397 std::lock_guard<std::mutex> lock(messageMutex_);
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001398 auto path = config_->cachePath / "treatedMessages";
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001399 treatedMessages_ = loadIdList<std::string>(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001400 if (treatedMessages_.empty()) {
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001401 auto messages = loadIdList(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001402 for (const auto& m : messages)
1403 treatedMessages_.emplace(to_hex_string(m));
1404 }
1405}
1406
1407void
1408ConnectionManager::Impl::saveTreatedMessages() const
1409{
1410 dht::ThreadPool::io().run([w = weak()]() {
1411 if (auto sthis = w.lock()) {
1412 auto& this_ = *sthis;
1413 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1414 fileutils::check_dir(this_.config_->cachePath.c_str());
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001415 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001416 this_.treatedMessages_);
1417 }
1418 });
1419}
1420
1421bool
1422ConnectionManager::Impl::isMessageTreated(std::string_view id)
1423{
1424 std::lock_guard<std::mutex> lock(messageMutex_);
1425 auto res = treatedMessages_.emplace(id);
1426 if (res.second) {
1427 saveTreatedMessages();
1428 return false;
1429 }
1430 return true;
1431}
1432
1433/**
1434 * returns whether or not UPnP is enabled and active_
1435 * ie: if it is able to make port mappings
1436 */
1437bool
1438ConnectionManager::Impl::getUPnPActive() const
1439{
1440 return config_->getUPnPActive();
1441}
1442
1443IpAddr
1444ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1445{
1446 if (family == AF_INET)
1447 return publishedIp_[0];
1448 if (family == AF_INET6)
1449 return publishedIp_[1];
1450
1451 assert(family == AF_UNSPEC);
1452
1453 // If family is not set, prefere IPv4 if available. It's more
1454 // likely to succeed behind NAT.
1455 if (publishedIp_[0])
1456 return publishedIp_[0];
1457 if (publishedIp_[1])
1458 return publishedIp_[1];
1459 return {};
1460}
1461
1462void
1463ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1464{
1465 if (ip_addr.getFamily() == AF_INET) {
1466 publishedIp_[0] = ip_addr;
1467 } else {
1468 publishedIp_[1] = ip_addr;
1469 }
1470}
1471
1472void
1473ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1474{
1475 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1476 bool hasIpv4 {false}, hasIpv6 {false};
1477 for (auto& result : results) {
1478 auto family = result.getFamily();
1479 if (family == AF_INET) {
1480 if (not hasIpv4) {
1481 hasIpv4 = true;
1482 if (config_->logger)
1483 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1484 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1485 setPublishedAddress(*result.get());
1486 if (config_->upnpCtrl) {
1487 config_->upnpCtrl->setPublicAddress(*result.get());
1488 }
1489 }
1490 } else if (family == AF_INET6) {
1491 if (not hasIpv6) {
1492 hasIpv6 = true;
1493 if (config_->logger)
1494 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1495 setPublishedAddress(*result.get());
1496 }
1497 }
1498 if (hasIpv4 and hasIpv6)
1499 break;
1500 }
1501 if (cb)
1502 cb();
1503 });
1504}
1505
1506void
1507ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1508{
1509 storeActiveIpAddress([this, cb = std::move(cb)] {
1510 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1511 auto publishedAddr = getPublishedIpAddress();
1512
1513 if (publishedAddr) {
1514 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1515 publishedAddr.getFamily());
1516 if (interfaceAddr) {
1517 opts.accountLocalAddr = interfaceAddr;
1518 opts.accountPublicAddr = publishedAddr;
1519 }
1520 }
1521 if (cb)
1522 cb(std::move(opts));
1523 });
1524}
1525
1526IceTransportOptions
1527ConnectionManager::Impl::getIceOptions() const noexcept
1528{
1529 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001530 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001531 opts.upnpEnable = getUPnPActive();
Adrien Béraud7b869d92023-08-21 09:02:35 -04001532 opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001533
1534 if (config_->stunEnabled)
1535 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1536 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001537 if (config_->turnCache) {
1538 auto turnAddr = config_->turnCache->getResolvedTurn();
1539 if (turnAddr != std::nullopt) {
1540 opts.turnServers.emplace_back(TurnServerInfo()
1541 .setUri(turnAddr->toString())
1542 .setUsername(config_->turnServerUserName)
1543 .setPassword(config_->turnServerPwd)
1544 .setRealm(config_->turnServerRealm));
1545 }
1546 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001547 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001548 .setUri(config_->turnServer)
1549 .setUsername(config_->turnServerUserName)
1550 .setPassword(config_->turnServerPwd)
1551 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001552 }
1553 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1554 // co issues. So this needs some debug. for now just disable
1555 // if (cacheTurnV6 && *cacheTurnV6) {
1556 // opts.turnServers.emplace_back(TurnServerInfo()
1557 // .setUri(cacheTurnV6->toString(true))
1558 // .setUsername(turnServerUserName_)
1559 // .setPassword(turnServerPwd_)
1560 // .setRealm(turnServerRealm_));
1561 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001562 }
1563 return opts;
1564}
1565
1566bool
1567ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1568 dht::InfoHash& account_id,
1569 const std::shared_ptr<Logger>& logger)
1570{
1571 if (not crt)
1572 return false;
1573
1574 auto top_issuer = crt;
1575 while (top_issuer->issuer)
1576 top_issuer = top_issuer->issuer;
1577
1578 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001579 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001580 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001581 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001582 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001583 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001584
1585 // Check peer certificate chain
1586 // Trust store with top issuer as the only CA
1587 dht::crypto::TrustList peer_trust;
1588 peer_trust.add(*top_issuer);
1589 if (not peer_trust.verify(*crt)) {
1590 if (logger)
1591 logger->warn("Found invalid peer device: {}", crt->getLongId());
1592 return false;
1593 }
1594
1595 // Check cached OCSP response
1596 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1597 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001598 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001599 return false;
1600 }
1601
Adrien Béraudc631a832023-07-26 22:19:00 -04001602 account_id = crt->issuer->getId();
1603 if (logger)
1604 logger->warn("Found peer device: {} account:{} CA:{}",
1605 crt->getLongId(),
1606 account_id,
1607 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001608 return true;
1609}
1610
1611bool
1612ConnectionManager::Impl::findCertificate(
1613 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1614{
1615 if (auto cert = certStore().getCertificate(id.toString())) {
1616 if (cb)
1617 cb(cert);
1618 } else if (cb)
1619 cb(nullptr);
1620 return true;
1621}
1622
Sébastien Blin34086512023-07-25 09:52:14 -04001623bool
1624ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1625 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1626{
1627 if (auto cert = certStore().getCertificate(h.toString())) {
1628 if (cb)
1629 cb(cert);
1630 } else {
1631 dht()->findCertificate(h,
1632 [cb = std::move(cb), this](
1633 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1634 if (crt)
1635 certStore().pinCertificate(crt);
1636 if (cb)
1637 cb(crt);
1638 });
1639 }
1640 return true;
1641}
1642
Amna81221ad2023-09-14 17:33:26 -04001643std::shared_ptr<ConnectionManager::Config>
1644buildDefaultConfig(dht::crypto::Identity id){
1645 auto conf = std::make_shared<ConnectionManager::Config>();
1646 conf->id = std::move(id);
1647 return conf;
1648}
1649
Adrien Béraud612b55b2023-05-29 10:42:04 -04001650ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1651 : pimpl_ {std::make_shared<Impl>(config_)}
1652{}
1653
Amna81221ad2023-09-14 17:33:26 -04001654ConnectionManager::ConnectionManager(dht::crypto::Identity id)
1655 : ConnectionManager {buildDefaultConfig(id)}
1656{}
1657
Adrien Béraud612b55b2023-05-29 10:42:04 -04001658ConnectionManager::~ConnectionManager()
1659{
1660 if (pimpl_)
1661 pimpl_->shutdown();
1662}
1663
1664void
1665ConnectionManager::connectDevice(const DeviceId& deviceId,
1666 const std::string& name,
1667 ConnectCallback cb,
1668 bool noNewSocket,
1669 bool forceNewSocket,
1670 const std::string& connType)
1671{
1672 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1673}
1674
1675void
Amna0cf544d2023-07-25 14:25:09 -04001676ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1677 const std::string& name,
1678 ConnectCallbackLegacy cb,
1679 bool noNewSocket,
1680 bool forceNewSocket,
1681 const std::string& connType)
1682{
1683 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1684}
1685
1686
1687void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001688ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1689 const std::string& name,
1690 ConnectCallback cb,
1691 bool noNewSocket,
1692 bool forceNewSocket,
1693 const std::string& connType)
1694{
1695 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1696}
1697
1698bool
1699ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1700{
Adrien Béraud665294f2023-06-13 18:09:11 -04001701 auto pending = pimpl_->getPendingIds(deviceId);
1702 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001703 != pending.end();
1704}
1705
1706void
1707ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1708{
1709 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1710 std::set<DeviceId> peersDevices;
1711 {
1712 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1713 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1714 auto const& [key, value] = *iter;
1715 auto deviceId = key.first;
1716 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1717 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1718 connInfos.emplace_back(value);
1719 peersDevices.emplace(deviceId);
1720 iter = pimpl_->infos_.erase(iter);
1721 } else {
1722 iter++;
1723 }
1724 }
1725 }
1726 // Stop connections to all peers devices
1727 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001728 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001729 // This will close the TLS Session
1730 pimpl_->removeUnusedConnections(deviceId);
1731 }
1732 for (auto& info : connInfos) {
1733 if (info->socket_)
1734 info->socket_->shutdown();
1735 if (info->waitForAnswer_)
1736 info->waitForAnswer_->cancel();
1737 if (info->ice_) {
1738 std::unique_lock<std::mutex> lk {info->mutex_};
1739 dht::ThreadPool::io().run(
1740 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1741 }
1742 }
1743}
1744
1745void
1746ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1747{
1748 pimpl_->onDhtConnected(devicePk);
1749}
1750
1751void
1752ConnectionManager::onICERequest(onICERequestCallback&& cb)
1753{
1754 pimpl_->iceReqCb_ = std::move(cb);
1755}
1756
1757void
1758ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1759{
1760 pimpl_->channelReqCb_ = std::move(cb);
1761}
1762
1763void
1764ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1765{
1766 pimpl_->connReadyCb_ = std::move(cb);
1767}
1768
1769void
1770ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1771{
1772 pimpl_->iOSConnectedCb_ = std::move(cb);
1773}
1774
1775std::size_t
1776ConnectionManager::activeSockets() const
1777{
1778 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1779 return pimpl_->infos_.size();
1780}
1781
1782void
1783ConnectionManager::monitor() const
1784{
1785 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1786 auto logger = pimpl_->config_->logger;
1787 if (!logger)
1788 return;
1789 logger->debug("ConnectionManager current status:");
1790 for (const auto& [_, ci] : pimpl_->infos_) {
1791 if (ci->socket_)
1792 ci->socket_->monitor();
1793 }
1794 logger->debug("ConnectionManager end status.");
1795}
1796
1797void
1798ConnectionManager::connectivityChanged()
1799{
1800 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1801 for (const auto& [_, ci] : pimpl_->infos_) {
1802 if (ci->socket_)
1803 ci->socket_->sendBeacon();
1804 }
1805}
1806
1807void
1808ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1809{
1810 return pimpl_->getIceOptions(std::move(cb));
1811}
1812
1813IceTransportOptions
1814ConnectionManager::getIceOptions() const noexcept
1815{
1816 return pimpl_->getIceOptions();
1817}
1818
1819IpAddr
1820ConnectionManager::getPublishedIpAddress(uint16_t family) const
1821{
1822 return pimpl_->getPublishedIpAddress(family);
1823}
1824
1825void
1826ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1827{
1828 return pimpl_->setPublishedAddress(ip_addr);
1829}
1830
1831void
1832ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1833{
1834 return pimpl_->storeActiveIpAddress(std::move(cb));
1835}
1836
1837std::shared_ptr<ConnectionManager::Config>
1838ConnectionManager::getConfig()
1839{
1840 return pimpl_->config_;
1841}
1842
Amna31791e52023-08-03 12:40:57 -04001843std::vector<std::map<std::string, std::string>>
1844ConnectionManager::getConnectionList(const DeviceId& device) const
1845{
1846 std::vector<std::map<std::string, std::string>> connectionsList;
1847 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1848
1849 for (const auto& [key, ci] : pimpl_->infos_) {
1850 if (device && key.first != device)
1851 continue;
1852 std::map<std::string, std::string> connectionInfo;
1853 connectionInfo["id"] = callbackIdToString(key.first, key.second);
Amna82420202023-08-15 16:27:18 -04001854 connectionInfo["device"] = key.first.toString();
Amna6c999d82023-08-15 15:19:41 -04001855 if (ci->tls_) {
1856 if (auto cert = ci->tls_->peerCertificate()) {
1857 connectionInfo["peer"] = cert->issuer->getId().toString();
1858 }
Amna31791e52023-08-03 12:40:57 -04001859 }
1860 if (ci->socket_) {
1861 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
1862 } else if (ci->tls_) {
1863 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
1864 } else if(ci->ice_)
1865 {
1866 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
1867 }
1868 if (ci->tls_) {
1869 std::string remoteAddress = ci->tls_->getRemoteAddress();
1870 std::string remoteAddressIp = remoteAddress.substr(0, remoteAddress.find(':'));
1871 std::string remoteAddressPort = remoteAddress.substr(remoteAddress.find(':') + 1);
1872 connectionInfo["remoteAdress"] = remoteAddressIp;
1873 connectionInfo["remotePort"] = remoteAddressPort;
1874 }
1875 connectionsList.emplace_back(std::move(connectionInfo));
1876 }
1877
1878 if (device) {
1879 auto it = pimpl_->pendingOperations_.find(device);
1880 if (it != pimpl_->pendingOperations_.end()) {
1881 const auto& po = it->second;
1882 for (const auto& [vid, ci] : po.connecting) {
1883 std::map<std::string, std::string> connectionInfo;
1884 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001885 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1886 connectionsList.emplace_back(std::move(connectionInfo));
1887 }
1888
1889 for (const auto& [vid, ci] : po.waiting) {
1890 std::map<std::string, std::string> connectionInfo;
1891 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001892 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1893 connectionsList.emplace_back(std::move(connectionInfo));
1894 }
1895 }
1896 }
1897 else {
1898 for (const auto& [key, po] : pimpl_->pendingOperations_) {
1899 for (const auto& [vid, ci] : po.connecting) {
1900 std::map<std::string, std::string> connectionInfo;
1901 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001902 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1903 connectionsList.emplace_back(std::move(connectionInfo));
1904 }
1905
1906 for (const auto& [vid, ci] : po.waiting) {
1907 std::map<std::string, std::string> connectionInfo;
1908 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001909 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1910 connectionsList.emplace_back(std::move(connectionInfo));
1911 }
1912 }
1913 }
1914 return connectionsList;
1915}
1916
1917std::vector<std::map<std::string, std::string>>
1918ConnectionManager::getChannelList(const std::string& connectionId) const
1919{
1920 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1921 CallbackId cbid = parseCallbackId(connectionId);
1922 if (pimpl_->infos_.count(cbid) > 0) {
1923 return pimpl_->infos_[cbid]->socket_->getChannelList();
1924 } else {
1925 return {};
1926 }
1927}
1928
Sébastien Blin464bdff2023-07-19 08:02:53 -04001929} // namespace dhtnet