blob: 0b97ac5afda8161cd326ff7cf5843b89711cbe44 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Amna31791e52023-08-03 12:40:57 -040044std::string
45callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
46{
47 return fmt::format("{} {}", did.to_view(), vid);
48}
Adrien Béraud612b55b2023-05-29 10:42:04 -040049
Amna31791e52023-08-03 12:40:57 -040050CallbackId parseCallbackId(std::string_view ci)
51{
52 auto sep = ci.find(' ');
53 std::string_view deviceIdString = ci.substr(0, sep);
54 std::string_view vidString = ci.substr(sep + 1);
55
56 dhtnet::DeviceId deviceId(deviceIdString);
57 dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);
58
59 return CallbackId(deviceId, vid);
60}
Adrien Béraud612b55b2023-05-29 10:42:04 -040061struct ConnectionInfo
62{
63 ~ConnectionInfo()
64 {
65 if (socket_)
66 socket_->join();
67 }
68
69 std::mutex mutex_ {};
70 bool responseReceived_ {false};
71 PeerConnectionRequest response_ {};
72 std::unique_ptr<IceTransport> ice_ {nullptr};
73 // Used to store currently non ready TLS Socket
74 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
75 std::shared_ptr<MultiplexedSocket> socket_ {};
76 std::set<CallbackId> cbIds_ {};
77
78 std::function<void(bool)> onConnected_;
79 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
80};
81
82/**
83 * returns whether or not UPnP is enabled and active_
84 * ie: if it is able to make port mappings
85 */
86bool
87ConnectionManager::Config::getUPnPActive() const
88{
89 if (upnpCtrl)
90 return upnpCtrl->isReady();
91 return false;
92}
93
94class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
95{
96public:
97 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
98 : config_ {std::move(config_)}
Sébastien Blincf569402023-07-27 09:46:40 -040099 , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400100 {
101 loadTreatedMessages();
102 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400103 ~Impl() {}
104
105 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
106 const dht::crypto::Identity& identity() const { return config_->id; }
107
108 void removeUnusedConnections(const DeviceId& deviceId = {})
109 {
110 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
111
112 {
113 std::lock_guard<std::mutex> lk(infosMtx_);
114 for (auto it = infos_.begin(); it != infos_.end();) {
115 auto& [key, info] = *it;
116 if (info && (!deviceId || key.first == deviceId)) {
117 unused.emplace_back(std::move(info));
118 it = infos_.erase(it);
119 } else {
120 ++it;
121 }
122 }
123 }
124 for (auto& info: unused) {
125 if (info->tls_)
126 info->tls_->shutdown();
127 if (info->socket_)
128 info->socket_->shutdown();
129 if (info->waitForAnswer_)
130 info->waitForAnswer_->cancel();
131 }
132 if (!unused.empty())
133 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
134 }
135
136 void shutdown()
137 {
138 if (isDestroying_.exchange(true))
139 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400140 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400141 {
142 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400143 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400144 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400145 for (auto& [deviceId, pcbs] : po) {
146 for (auto& [id, pending] : pcbs.connecting)
147 pending.cb(nullptr, deviceId);
148 for (auto& [id, pending] : pcbs.waiting)
149 pending.cb(nullptr, deviceId);
150 }
151
Adrien Béraud612b55b2023-05-29 10:42:04 -0400152 removeUnusedConnections();
153 }
154
Adrien Béraud612b55b2023-05-29 10:42:04 -0400155 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
156 const dht::Value::Id& vid,
157 const std::string& connType,
158 std::function<void(bool)> onConnected);
159 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
160 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
161 const std::string& name,
162 const dht::Value::Id& vid,
163 const std::shared_ptr<dht::crypto::Certificate>& cert);
164 void connectDevice(const DeviceId& deviceId,
165 const std::string& uri,
166 ConnectCallback cb,
167 bool noNewSocket = false,
168 bool forceNewSocket = false,
169 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400170 void connectDevice(const dht::InfoHash& deviceId,
171 const std::string& uri,
172 ConnectCallbackLegacy cb,
173 bool noNewSocket = false,
174 bool forceNewSocket = false,
175 const std::string& connType = "");
176
Adrien Béraud612b55b2023-05-29 10:42:04 -0400177 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
178 const std::string& name,
179 ConnectCallback cb,
180 bool noNewSocket = false,
181 bool forceNewSocket = false,
182 const std::string& connType = "");
183 /**
184 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
185 * @param sock socket used to send the request
186 * @param name channel's name
187 * @param vid channel's id
188 * @param deviceId to identify the linked ConnectCallback
189 */
190 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
191 const std::string& name,
192 const DeviceId& deviceId,
193 const dht::Value::Id& vid);
194 /**
195 * Triggered when a PeerConnectionRequest comes from the DHT
196 */
197 void answerTo(IceTransport& ice,
198 const dht::Value::Id& id,
199 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
200 bool onRequestStartIce(const PeerConnectionRequest& req);
201 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
202 void onDhtPeerRequest(const PeerConnectionRequest& req,
203 const std::shared_ptr<dht::crypto::Certificate>& cert);
204
205 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
206 void onPeerResponse(const PeerConnectionRequest& req);
207 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
208
209 const std::shared_future<tls::DhParams> dhParams() const;
210 tls::CertificateStore& certStore() const { return *config_->certStore; }
211
212 mutable std::mutex messageMutex_ {};
213 std::set<std::string, std::less<>> treatedMessages_ {};
214
215 void loadTreatedMessages();
216 void saveTreatedMessages() const;
217
218 /// \return true if the given DHT message identifier has been treated
219 /// \note if message has not been treated yet this method st/ore this id and returns true at
220 /// further calls
221 bool isMessageTreated(std::string_view id);
222
223 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
224
225 /**
226 * Published IPv4/IPv6 addresses, used only if defined by the user in account
227 * configuration
228 *
229 */
230 IpAddr publishedIp_[2] {};
231
Adrien Béraud612b55b2023-05-29 10:42:04 -0400232 /**
233 * interface name on which this account is bound
234 */
235 std::string interface_ {"default"};
236
237 /**
238 * Get the local interface name on which this account is bound.
239 */
240 const std::string& getLocalInterface() const { return interface_; }
241
242 /**
243 * Get the published IP address, fallbacks to NAT if family is unspecified
244 * Prefers the usage of IPv4 if possible.
245 */
246 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
247
248 /**
249 * Set published IP address according to given family
250 */
251 void setPublishedAddress(const IpAddr& ip_addr);
252
253 /**
254 * Store the local/public addresses used to register
255 */
256 void storeActiveIpAddress(std::function<void()>&& cb = {});
257
258 /**
259 * Create and return ICE options.
260 */
261 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
262 IceTransportOptions getIceOptions() const noexcept;
263
264 /**
265 * Inform that a potential peer device have been found.
266 * Returns true only if the device certificate is a valid device certificate.
267 * In that case (true is returned) the account_id parameter is set to the peer account ID.
268 */
269 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
270 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
271
272 bool findCertificate(const dht::PkId& id,
273 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400274 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400275
276 /**
277 * returns whether or not UPnP is enabled and active
278 * ie: if it is able to make port mappings
279 */
280 bool getUPnPActive() const;
281
282 /**
283 * Triggered when a new TLS socket is ready to use
284 * @param ok If succeed
285 * @param deviceId Related device
286 * @param vid vid of the connection request
287 * @param name non empty if TLS was created by connectDevice()
288 */
289 void onTlsNegotiationDone(bool ok,
290 const DeviceId& deviceId,
291 const dht::Value::Id& vid,
292 const std::string& name = "");
293
294 std::shared_ptr<ConnectionManager::Config> config_;
295
Adrien Béraud612b55b2023-05-29 10:42:04 -0400296 mutable std::mt19937_64 rand;
297
298 iOSConnectedCallback iOSConnectedCb_ {};
299
300 std::mutex infosMtx_ {};
301 // Note: Someone can ask multiple sockets, so to avoid any race condition,
302 // each device can have multiple multiplexed sockets.
303 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
304
305 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
306 {
307 std::lock_guard<std::mutex> lk(infosMtx_);
308 auto it = infos_.find({deviceId, id});
309 if (it != infos_.end())
310 return it->second;
311 return {};
312 }
313
314 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
315 {
316 std::lock_guard<std::mutex> lk(infosMtx_);
317 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
318 auto& [key, value] = item;
319 return key.first == deviceId && value && value->socket_;
320 });
321 if (it != infos_.end())
322 return it->second;
323 return {};
324 }
325
326 ChannelRequestCallback channelReqCb_ {};
327 ConnectionReadyCallback connReadyCb_ {};
328 onICERequestCallback iceReqCb_ {};
329
330 /**
331 * Stores callback from connectDevice
332 * @note: each device needs a vector because several connectDevice can
333 * be done in parallel and we only want one socket
334 */
335 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400336
Adrien Béraud665294f2023-06-13 18:09:11 -0400337 struct PendingCb
338 {
339 std::string name;
340 ConnectCallback cb;
341 };
342 struct PendingOperations {
343 std::map<dht::Value::Id, PendingCb> connecting;
344 std::map<dht::Value::Id, PendingCb> waiting;
345 };
346
347 std::map<DeviceId, PendingOperations> pendingOperations_ {};
348
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400349 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400350 {
351 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400352 std::unique_lock<std::mutex> lk(connectCbsMtx_);
353 auto it = pendingOperations_.find(deviceId);
354 if (it == pendingOperations_.end())
355 return;
356 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400357 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400358 // Extract all pending callbacks
359 for (auto& [vid, cb] : pendingOperations.connecting)
360 ret.emplace_back(std::move(cb));
361 pendingOperations.connecting.clear();
362 for (auto& [vid, cb] : pendingOperations.waiting)
363 ret.emplace_back(std::move(cb));
364 pendingOperations.waiting.clear();
365 } else if (auto n = pendingOperations.waiting.extract(vid)) {
366 // If it's a waiting operation, just move it
367 ret.emplace_back(std::move(n.mapped()));
368 } else if (auto n = pendingOperations.connecting.extract(vid)) {
369 ret.emplace_back(std::move(n.mapped()));
370 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400371 // If accepted is false, it means that underlying socket is ok, but channel is declined
372 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400373 for (auto& [vid, cb] : pendingOperations.waiting)
374 ret.emplace_back(std::move(cb));
375 pendingOperations.waiting.clear();
376 for (auto& [vid, cb] : pendingOperations.connecting)
377 ret.emplace_back(std::move(cb));
378 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400379 }
380 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400381 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
382 pendingOperations_.erase(it);
383 lk.unlock();
384 for (auto& cb : ret)
385 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400386 }
387
Adrien Béraud665294f2023-06-13 18:09:11 -0400388 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400389 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400390 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400391 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400392 auto it = pendingOperations_.find(deviceId);
393 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400394 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400395 auto& pendingOp = it->second;
396 for (const auto& [id, pc]: pendingOp.connecting) {
397 if (vid == 0 || id == vid)
398 ret[id] = pc.name;
399 }
400 for (const auto& [id, pc]: pendingOp.waiting) {
401 if (vid == 0 || id == vid)
402 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400403 }
404 return ret;
405 }
406
407 std::shared_ptr<ConnectionManager::Impl> shared()
408 {
409 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
410 }
411 std::shared_ptr<ConnectionManager::Impl const> shared() const
412 {
413 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
414 }
415 std::weak_ptr<ConnectionManager::Impl> weak()
416 {
417 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
418 }
419 std::weak_ptr<ConnectionManager::Impl const> weak() const
420 {
421 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
422 }
423
424 std::atomic_bool isDestroying_ {false};
425};
426
427void
428ConnectionManager::Impl::connectDeviceStartIce(
429 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
430 const dht::Value::Id& vid,
431 const std::string& connType,
432 std::function<void(bool)> onConnected)
433{
434 auto deviceId = devicePk->getLongId();
435 auto info = getInfo(deviceId, vid);
436 if (!info) {
437 onConnected(false);
438 return;
439 }
440
441 std::unique_lock<std::mutex> lk(info->mutex_);
442 auto& ice = info->ice_;
443
444 if (!ice) {
445 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400446 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400447 onConnected(false);
448 return;
449 }
450
451 auto iceAttributes = ice->getLocalAttributes();
452 std::ostringstream icemsg;
453 icemsg << iceAttributes.ufrag << "\n";
454 icemsg << iceAttributes.pwd << "\n";
455 for (const auto& addr : ice->getLocalCandidates(1)) {
456 icemsg << addr << "\n";
457 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400458 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400459 }
460
461 // Prepare connection request as a DHT message
462 PeerConnectionRequest val;
463
464 val.id = vid; /* Random id for the message unicity */
465 val.ice_msg = icemsg.str();
466 val.connType = connType;
467
468 auto value = std::make_shared<dht::Value>(std::move(val));
469 value->user_type = "peer_request";
470
471 // Send connection request through DHT
472 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400473 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400474 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
475 + devicePk->getId().toString()),
476 devicePk,
477 value,
478 [l=config_->logger,deviceId](bool ok) {
479 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400480 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400481 deviceId,
482 (ok ? "ok" : "failed"));
483 });
484 // Wait for call to onResponse() operated by DHT
485 if (isDestroying_) {
486 onConnected(true); // This avoid to wait new negotiation when destroying
487 return;
488 }
489
490 info->onConnected_ = std::move(onConnected);
491 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
492 std::chrono::steady_clock::now()
493 + DHT_MSG_TIMEOUT);
494 info->waitForAnswer_->async_wait(
495 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
496}
497
498void
499ConnectionManager::Impl::onResponse(const asio::error_code& ec,
500 const DeviceId& deviceId,
501 const dht::Value::Id& vid)
502{
503 if (ec == asio::error::operation_aborted)
504 return;
505 auto info = getInfo(deviceId, vid);
506 if (!info)
507 return;
508
509 std::unique_lock<std::mutex> lk(info->mutex_);
510 auto& ice = info->ice_;
511 if (isDestroying_) {
512 info->onConnected_(true); // The destructor can wake a pending wait here.
513 return;
514 }
515 if (!info->responseReceived_) {
516 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400517 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400518 info->onConnected_(false);
519 return;
520 }
521
522 if (!info->ice_) {
523 info->onConnected_(false);
524 return;
525 }
526
527 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
528
529 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
530 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400531 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400532 info->onConnected_(false);
533 return;
534 }
535 info->onConnected_(true);
536}
537
538bool
539ConnectionManager::Impl::connectDeviceOnNegoDone(
540 const DeviceId& deviceId,
541 const std::string& name,
542 const dht::Value::Id& vid,
543 const std::shared_ptr<dht::crypto::Certificate>& cert)
544{
545 auto info = getInfo(deviceId, vid);
546 if (!info)
547 return false;
548
549 std::unique_lock<std::mutex> lk {info->mutex_};
550 if (info->waitForAnswer_) {
551 // Negotiation is done and connected, go to handshake
552 // and avoid any cancellation at this point.
553 info->waitForAnswer_->cancel();
554 }
555 auto& ice = info->ice_;
556 if (!ice || !ice->isRunning()) {
557 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400558 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400559 return false;
560 }
561
562 // Build socket
563 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
564 std::move(ice)),
565 true);
566
567 // Negotiate a TLS session
568 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400569 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400570 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
571 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400572 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400573 identity(),
574 dhParams(),
575 *cert);
576
577 info->tls_->setOnReady(
578 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
579 bool ok) {
580 if (auto shared = w.lock())
581 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
582 });
583 return true;
584}
585
586void
587ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
588 const std::string& name,
589 ConnectCallback cb,
590 bool noNewSocket,
591 bool forceNewSocket,
592 const std::string& connType)
593{
594 if (!dht()) {
595 cb(nullptr, deviceId);
596 return;
597 }
598 if (deviceId.toString() == identity().second->getLongId().toString()) {
599 cb(nullptr, deviceId);
600 return;
601 }
602 findCertificate(deviceId,
603 [w = weak(),
604 deviceId,
605 name,
606 cb = std::move(cb),
607 noNewSocket,
608 forceNewSocket,
609 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
610 if (!cert) {
611 if (auto shared = w.lock())
612 if (shared->config_->logger)
613 shared->config_->logger->error(
614 "No valid certificate found for device {}",
615 deviceId);
616 cb(nullptr, deviceId);
617 return;
618 }
619 if (auto shared = w.lock()) {
620 shared->connectDevice(cert,
621 name,
622 std::move(cb),
623 noNewSocket,
624 forceNewSocket,
625 connType);
626 } else
627 cb(nullptr, deviceId);
628 });
629}
630
631void
Amna0cf544d2023-07-25 14:25:09 -0400632ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
633 const std::string& name,
634 ConnectCallbackLegacy cb,
635 bool noNewSocket,
636 bool forceNewSocket,
637 const std::string& connType)
638{
639 if (!dht()) {
640 cb(nullptr, deviceId);
641 return;
642 }
643 if (deviceId.toString() == identity().second->getLongId().toString()) {
644 cb(nullptr, deviceId);
645 return;
646 }
647 findCertificate(deviceId,
648 [w = weak(),
649 deviceId,
650 name,
651 cb = std::move(cb),
652 noNewSocket,
653 forceNewSocket,
654 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
655 if (!cert) {
656 if (auto shared = w.lock())
657 if (shared->config_->logger)
658 shared->config_->logger->error(
659 "No valid certificate found for device {}",
660 deviceId);
661 cb(nullptr, deviceId);
662 return;
663 }
664 if (auto shared = w.lock()) {
665 shared->connectDevice(cert,
666 name,
Adrien Béraudd78d1ac2023-08-25 10:43:33 -0400667 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){
Amna0cf544d2023-07-25 14:25:09 -0400668 cb(sock, deviceId);
669 },
670 noNewSocket,
671 forceNewSocket,
672 connType);
673 } else
674 cb(nullptr, deviceId);
675 });
676}
677
678void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400679ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
680 const std::string& name,
681 ConnectCallback cb,
682 bool noNewSocket,
683 bool forceNewSocket,
684 const std::string& connType)
685{
686 // Avoid dht operation in a DHT callback to avoid deadlocks
687 dht::ThreadPool::computation().run([w = weak(),
688 name = std::move(name),
689 cert = std::move(cert),
690 cb = std::move(cb),
691 noNewSocket,
692 forceNewSocket,
693 connType] {
694 auto devicePk = cert->getSharedPublicKey();
695 auto deviceId = devicePk->getLongId();
696 auto sthis = w.lock();
697 if (!sthis || sthis->isDestroying_) {
698 cb(nullptr, deviceId);
699 return;
700 }
701 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
702 auto isConnectingToDevice = false;
703 {
704 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400705 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
706 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400707 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400708 while (pendings.connecting.find(vid) != pendings.connecting.end()
709 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400710 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400711 }
712 }
713 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400714 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400715 // Save current request for sendChannelRequest.
716 // Note: do not return here, cause we can be in a state where first
717 // socket is negotiated and first channel is pending
718 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400719 if (isConnectingToDevice && !forceNewSocket)
720 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400721 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400722 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400723 }
724
725 // Check if already negotiated
726 CallbackId cbId(deviceId, vid);
727 if (auto info = sthis->getConnectedInfo(deviceId)) {
728 std::lock_guard<std::mutex> lk(info->mutex_);
729 if (info->socket_) {
730 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400731 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400732 info->cbIds_.emplace(cbId);
733 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
734 return;
735 }
736 }
737
738 if (isConnectingToDevice && !forceNewSocket) {
739 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400740 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400741 return;
742 }
743 if (noNewSocket) {
744 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400745 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400746 return;
747 }
748
749 // Note: used when the ice negotiation fails to erase
750 // all stored structures.
751 auto eraseInfo = [w, cbId] {
752 if (auto shared = w.lock()) {
753 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400754 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400755 std::lock_guard<std::mutex> lk(shared->infosMtx_);
756 shared->infos_.erase(cbId);
757 }
758 };
759
760 // If no socket exists, we need to initiate an ICE connection.
761 sthis->getIceOptions([w,
762 deviceId = std::move(deviceId),
763 devicePk = std::move(devicePk),
764 name = std::move(name),
765 cert = std::move(cert),
766 vid,
767 connType,
768 eraseInfo](auto&& ice_config) {
769 auto sthis = w.lock();
770 if (!sthis) {
771 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
772 return;
773 }
774 ice_config.tcpEnable = true;
775 ice_config.onInitDone = [w,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400776 devicePk = std::move(devicePk),
777 name = std::move(name),
778 cert = std::move(cert),
779 vid,
780 connType,
781 eraseInfo](bool ok) {
782 dht::ThreadPool::io().run([w = std::move(w),
783 devicePk = std::move(devicePk),
784 vid = std::move(vid),
785 eraseInfo,
786 connType, ok] {
787 auto sthis = w.lock();
788 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400789 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400790 if (!sthis || !ok) {
791 eraseInfo();
792 return;
793 }
794 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
795 if (!ok) {
796 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
797 }
798 });
799 });
800 };
801 ice_config.onNegoDone = [w,
802 deviceId,
803 name,
804 cert = std::move(cert),
805 vid,
806 eraseInfo](bool ok) {
807 dht::ThreadPool::io().run([w = std::move(w),
808 deviceId = std::move(deviceId),
809 name = std::move(name),
810 cert = std::move(cert),
811 vid = std::move(vid),
812 eraseInfo = std::move(eraseInfo),
813 ok] {
814 auto sthis = w.lock();
815 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400816 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400817 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
818 eraseInfo();
819 });
820 };
821
822 auto info = std::make_shared<ConnectionInfo>();
823 {
824 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
825 sthis->infos_[{deviceId, vid}] = info;
826 }
827 std::unique_lock<std::mutex> lk {info->mutex_};
828 ice_config.master = false;
829 ice_config.streamsCount = 1;
830 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400831 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400832 if (!info->ice_) {
833 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400834 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400835 eraseInfo();
836 return;
837 }
838 // We need to detect any shutdown if the ice session is destroyed before going to the
839 // TLS session;
840 info->ice_->setOnShutdown([eraseInfo]() {
841 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
842 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400843 try {
844 info->ice_->initIceInstance(ice_config);
845 } catch (const std::exception& e) {
846 if (sthis->config_->logger)
847 sthis->config_->logger->error("{}", e.what());
848 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
849 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400850 });
851 });
852}
853
854void
855ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
856 const std::string& name,
857 const DeviceId& deviceId,
858 const dht::Value::Id& vid)
859{
860 auto channelSock = sock->addChannel(name);
861 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
862 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400863 if (auto shared = w.lock())
864 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400865 });
866 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400867 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400868 auto shared = w.lock();
869 auto channelSock = wSock.lock();
870 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400871 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400872 });
873
874 ChannelRequest val;
875 val.name = channelSock->name();
876 val.state = ChannelRequestState::REQUEST;
877 val.channel = channelSock->channel();
878 msgpack::sbuffer buffer(256);
879 msgpack::pack(buffer, val);
880
881 std::error_code ec;
882 int res = sock->write(CONTROL_CHANNEL,
883 reinterpret_cast<const uint8_t*>(buffer.data()),
884 buffer.size(),
885 ec);
886 if (res < 0) {
887 // TODO check if we should handle errors here
888 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400889 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400890 }
891}
892
893void
894ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
895{
896 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400897 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400898 if (config_->logger)
899 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400900 std::lock_guard<std::mutex> lk {info->mutex_};
901 info->responseReceived_ = true;
902 info->response_ = std::move(req);
903 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
904 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
905 this,
906 std::placeholders::_1,
907 device,
908 req.id));
909 } else {
910 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400911 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400912 }
913}
914
915void
916ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
917{
918 if (!dht())
919 return;
920 dht()->listen<PeerConnectionRequest>(
921 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
922 [w = weak()](PeerConnectionRequest&& req) {
923 auto shared = w.lock();
924 if (!shared)
925 return false;
926 if (shared->isMessageTreated(to_hex_string(req.id))) {
927 // Message already treated. Just ignore
928 return true;
929 }
930 if (req.isAnswer) {
931 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400932 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400933 } else {
934 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400935 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400936 }
937 if (req.isAnswer) {
938 shared->onPeerResponse(req);
939 } else {
940 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400941 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400942 req.from,
943 [w, req = std::move(req)](
944 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
945 auto shared = w.lock();
946 if (!shared)
947 return;
948 dht::InfoHash peer_h;
949 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
950#if TARGET_OS_IOS
951 if (shared->iOSConnectedCb_(req.connType, peer_h))
952 return;
953#endif
954 shared->onDhtPeerRequest(req, cert);
955 } else {
956 if (shared->config_->logger)
957 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -0400958 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400959 req.owner->getLongId());
960 }
961 });
962 }
963
964 return true;
965 },
966 dht::Value::UserTypeFilter("peer_request"));
967}
968
969void
970ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
971 const DeviceId& deviceId,
972 const dht::Value::Id& vid,
973 const std::string& name)
974{
975 if (isDestroying_)
976 return;
977 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
978 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
979 // asked yet)
980 auto isDhtRequest = name.empty();
981 if (!ok) {
982 if (isDhtRequest) {
983 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400984 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400985 deviceId,
986 name,
987 vid);
988 if (connReadyCb_)
989 connReadyCb_(deviceId, "", nullptr);
990 } else {
991 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400992 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400993 deviceId,
994 name,
995 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400996 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400997 }
998 } else {
999 // The socket is ready, store it
1000 if (isDhtRequest) {
1001 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001002 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001003 deviceId,
1004 vid);
1005 } else {
1006 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001007 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001008 deviceId,
1009 name,
1010 vid);
1011 }
1012
1013 auto info = getInfo(deviceId, vid);
1014 addNewMultiplexedSocket({deviceId, vid}, info);
1015 // Finally, open the channel and launch pending callbacks
1016 if (info->socket_) {
1017 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001018 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001019 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001020 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001021 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001022 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001023 }
1024 }
1025 }
1026}
1027
1028void
1029ConnectionManager::Impl::answerTo(IceTransport& ice,
1030 const dht::Value::Id& id,
1031 const std::shared_ptr<dht::crypto::PublicKey>& from)
1032{
1033 // NOTE: This is a shortest version of a real SDP message to save some bits
1034 auto iceAttributes = ice.getLocalAttributes();
1035 std::ostringstream icemsg;
1036 icemsg << iceAttributes.ufrag << "\n";
1037 icemsg << iceAttributes.pwd << "\n";
1038 for (const auto& addr : ice.getLocalCandidates(1)) {
1039 icemsg << addr << "\n";
1040 }
1041
1042 // Send PeerConnection response
1043 PeerConnectionRequest val;
1044 val.id = id;
1045 val.ice_msg = icemsg.str();
1046 val.isAnswer = true;
1047 auto value = std::make_shared<dht::Value>(std::move(val));
1048 value->user_type = "peer_request";
1049
1050 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001051 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001052 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1053 + from->getId().toString()),
1054 from,
1055 value,
1056 [from,l=config_->logger](bool ok) {
1057 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001058 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001059 from->getLongId(),
1060 (ok ? "ok" : "failed"));
1061 });
1062}
1063
1064bool
1065ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1066{
1067 auto deviceId = req.owner->getLongId();
1068 auto info = getInfo(deviceId, req.id);
1069 if (!info)
1070 return false;
1071
1072 std::unique_lock<std::mutex> lk {info->mutex_};
1073 auto& ice = info->ice_;
1074 if (!ice) {
1075 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001076 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001077 if (connReadyCb_)
1078 connReadyCb_(deviceId, "", nullptr);
1079 return false;
1080 }
1081
1082 auto sdp = ice->parseIceCandidates(req.ice_msg);
1083 answerTo(*ice, req.id, req.owner);
1084 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1085 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001086 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001087 ice = nullptr;
1088 if (connReadyCb_)
1089 connReadyCb_(deviceId, "", nullptr);
1090 return false;
1091 }
1092 return true;
1093}
1094
1095bool
1096ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1097{
1098 auto deviceId = req.owner->getLongId();
1099 auto info = getInfo(deviceId, req.id);
1100 if (!info)
1101 return false;
1102
1103 std::unique_lock<std::mutex> lk {info->mutex_};
1104 auto& ice = info->ice_;
1105 if (!ice) {
1106 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001107 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001108 return false;
1109 }
1110
1111 // Build socket
1112 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1113 std::move(ice)),
1114 false);
1115
1116 // init TLS session
1117 auto ph = req.from;
1118 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001119 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1120 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001121 req.id);
1122 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1123 std::move(endpoint),
1124 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001125 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001126 identity(),
1127 dhParams(),
Adrien Béraud9efbd442023-08-27 12:38:07 -04001128 [ph, deviceId, w=weak(), l=config_->logger](const dht::crypto::Certificate& cert) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001129 auto shared = w.lock();
1130 if (!shared)
1131 return false;
Adrien Béraud9efbd442023-08-27 12:38:07 -04001132 if (cert.getPublicKey().getId() != ph
1133 || deviceId != cert.getPublicKey().getLongId()) {
1134 if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.",
1135 deviceId,
1136 cert.getPublicKey().getLongId());
1137 return false;
1138 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001139 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1140 if (!crt)
1141 return false;
1142 return crt->getPacked() == cert.getPacked();
1143 });
1144
1145 info->tls_->setOnReady(
1146 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1147 if (auto shared = w.lock())
1148 shared->onTlsNegotiationDone(ok, deviceId, vid);
1149 });
1150 return true;
1151}
1152
1153void
1154ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1155 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1156{
1157 auto deviceId = req.owner->getLongId();
1158 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001159 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001160 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1161 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001162 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001163 return;
1164 }
1165
1166 // Because the connection is accepted, create an ICE socket.
1167 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1168 auto shared = w.lock();
1169 if (!shared)
1170 return;
1171 // Note: used when the ice negotiation fails to erase
1172 // all stored structures.
1173 auto eraseInfo = [w, id = req.id, deviceId] {
1174 if (auto shared = w.lock()) {
1175 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001176 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001177 if (shared->connReadyCb_)
1178 shared->connReadyCb_(deviceId, "", nullptr);
1179 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1180 shared->infos_.erase({deviceId, id});
1181 }
1182 };
1183
1184 ice_config.tcpEnable = true;
1185 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1186 auto shared = w.lock();
1187 if (!shared)
1188 return;
1189 if (!ok) {
1190 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001191 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001192 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1193 return;
1194 }
1195
1196 dht::ThreadPool::io().run(
1197 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1198 auto shared = w.lock();
1199 if (!shared)
1200 return;
1201 if (!shared->onRequestStartIce(req))
1202 eraseInfo();
1203 });
1204 };
1205
1206 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1207 auto shared = w.lock();
1208 if (!shared)
1209 return;
1210 if (!ok) {
1211 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001212 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001213 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1214 return;
1215 }
1216
1217 dht::ThreadPool::io().run(
1218 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1219 if (auto shared = w.lock())
1220 if (!shared->onRequestOnNegoDone(req))
1221 eraseInfo();
1222 });
1223 };
1224
1225 // Negotiate a new ICE socket
1226 auto info = std::make_shared<ConnectionInfo>();
1227 {
1228 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1229 shared->infos_[{deviceId, req.id}] = info;
1230 }
1231 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001232 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001233 std::unique_lock<std::mutex> lk {info->mutex_};
1234 ice_config.streamsCount = 1;
1235 ice_config.compCountPerStream = 1; // TCP
1236 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001237 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001238 if (not info->ice_) {
1239 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001240 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001241 eraseInfo();
1242 return;
1243 }
1244 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1245 info->ice_->setOnShutdown([eraseInfo]() {
1246 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1247 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001248 try {
1249 info->ice_->initIceInstance(ice_config);
1250 } catch (const std::exception& e) {
1251 if (shared->config_->logger)
1252 shared->config_->logger->error("{}", e.what());
1253 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1254 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001255 });
1256}
1257
1258void
1259ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1260{
Adrien Béraud5636f7c2023-09-14 14:34:57 -04001261 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_), config_->logger);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001262 info->socket_->setOnReady(
1263 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1264 if (auto sthis = w.lock())
1265 if (sthis->connReadyCb_)
1266 sthis->connReadyCb_(deviceId, socket->name(), socket);
1267 });
1268 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1269 const uint16_t&,
1270 const std::string& name) {
1271 if (auto sthis = w.lock())
1272 if (sthis->channelReqCb_)
1273 return sthis->channelReqCb_(peer, name);
1274 return false;
1275 });
1276 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1277 // Cancel current outgoing connections
1278 dht::ThreadPool::io().run([w, deviceId, vid] {
1279 auto sthis = w.lock();
1280 if (!sthis)
1281 return;
1282
1283 std::set<CallbackId> ids;
1284 if (auto info = sthis->getInfo(deviceId, vid)) {
1285 std::lock_guard<std::mutex> lk(info->mutex_);
1286 if (info->socket_) {
1287 ids = std::move(info->cbIds_);
1288 info->socket_->shutdown();
1289 }
1290 }
1291 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001292 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001293
1294 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1295 sthis->infos_.erase({deviceId, vid});
1296 });
1297 });
1298}
1299
1300const std::shared_future<tls::DhParams>
1301ConnectionManager::Impl::dhParams() const
1302{
1303 return dht::ThreadPool::computation().get<tls::DhParams>(
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001304 std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001305}
1306
1307template<typename ID = dht::Value::Id>
1308std::set<ID, std::less<>>
1309loadIdList(const std::string& path)
1310{
1311 std::set<ID, std::less<>> ids;
1312 std::ifstream file = fileutils::ifstream(path);
1313 if (!file.is_open()) {
1314 //JAMI_DBG("Could not load %s", path.c_str());
1315 return ids;
1316 }
1317 std::string line;
1318 while (std::getline(file, line)) {
1319 if constexpr (std::is_same<ID, std::string>::value) {
1320 ids.emplace(std::move(line));
1321 } else if constexpr (std::is_integral<ID>::value) {
1322 ID vid;
1323 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1324 ec == std::errc()) {
1325 ids.emplace(vid);
1326 }
1327 }
1328 }
1329 return ids;
1330}
1331
1332template<typename List = std::set<dht::Value::Id>>
1333void
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001334saveIdList(const std::filesystem::path& path, const List& ids)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001335{
1336 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1337 if (!file.is_open()) {
1338 //JAMI_ERR("Could not save to %s", path.c_str());
1339 return;
1340 }
1341 for (auto& c : ids)
1342 file << std::hex << c << "\n";
1343}
1344
1345void
1346ConnectionManager::Impl::loadTreatedMessages()
1347{
1348 std::lock_guard<std::mutex> lock(messageMutex_);
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001349 auto path = config_->cachePath / "treatedMessages";
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001350 treatedMessages_ = loadIdList<std::string>(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001351 if (treatedMessages_.empty()) {
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001352 auto messages = loadIdList(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001353 for (const auto& m : messages)
1354 treatedMessages_.emplace(to_hex_string(m));
1355 }
1356}
1357
1358void
1359ConnectionManager::Impl::saveTreatedMessages() const
1360{
1361 dht::ThreadPool::io().run([w = weak()]() {
1362 if (auto sthis = w.lock()) {
1363 auto& this_ = *sthis;
1364 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1365 fileutils::check_dir(this_.config_->cachePath.c_str());
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001366 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001367 this_.treatedMessages_);
1368 }
1369 });
1370}
1371
1372bool
1373ConnectionManager::Impl::isMessageTreated(std::string_view id)
1374{
1375 std::lock_guard<std::mutex> lock(messageMutex_);
1376 auto res = treatedMessages_.emplace(id);
1377 if (res.second) {
1378 saveTreatedMessages();
1379 return false;
1380 }
1381 return true;
1382}
1383
1384/**
1385 * returns whether or not UPnP is enabled and active_
1386 * ie: if it is able to make port mappings
1387 */
1388bool
1389ConnectionManager::Impl::getUPnPActive() const
1390{
1391 return config_->getUPnPActive();
1392}
1393
1394IpAddr
1395ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1396{
1397 if (family == AF_INET)
1398 return publishedIp_[0];
1399 if (family == AF_INET6)
1400 return publishedIp_[1];
1401
1402 assert(family == AF_UNSPEC);
1403
1404 // If family is not set, prefere IPv4 if available. It's more
1405 // likely to succeed behind NAT.
1406 if (publishedIp_[0])
1407 return publishedIp_[0];
1408 if (publishedIp_[1])
1409 return publishedIp_[1];
1410 return {};
1411}
1412
1413void
1414ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1415{
1416 if (ip_addr.getFamily() == AF_INET) {
1417 publishedIp_[0] = ip_addr;
1418 } else {
1419 publishedIp_[1] = ip_addr;
1420 }
1421}
1422
1423void
1424ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1425{
1426 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1427 bool hasIpv4 {false}, hasIpv6 {false};
1428 for (auto& result : results) {
1429 auto family = result.getFamily();
1430 if (family == AF_INET) {
1431 if (not hasIpv4) {
1432 hasIpv4 = true;
1433 if (config_->logger)
1434 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1435 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1436 setPublishedAddress(*result.get());
1437 if (config_->upnpCtrl) {
1438 config_->upnpCtrl->setPublicAddress(*result.get());
1439 }
1440 }
1441 } else if (family == AF_INET6) {
1442 if (not hasIpv6) {
1443 hasIpv6 = true;
1444 if (config_->logger)
1445 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1446 setPublishedAddress(*result.get());
1447 }
1448 }
1449 if (hasIpv4 and hasIpv6)
1450 break;
1451 }
1452 if (cb)
1453 cb();
1454 });
1455}
1456
1457void
1458ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1459{
1460 storeActiveIpAddress([this, cb = std::move(cb)] {
1461 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1462 auto publishedAddr = getPublishedIpAddress();
1463
1464 if (publishedAddr) {
1465 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1466 publishedAddr.getFamily());
1467 if (interfaceAddr) {
1468 opts.accountLocalAddr = interfaceAddr;
1469 opts.accountPublicAddr = publishedAddr;
1470 }
1471 }
1472 if (cb)
1473 cb(std::move(opts));
1474 });
1475}
1476
1477IceTransportOptions
1478ConnectionManager::Impl::getIceOptions() const noexcept
1479{
1480 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001481 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001482 opts.upnpEnable = getUPnPActive();
Adrien Béraud7b869d92023-08-21 09:02:35 -04001483 opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001484
1485 if (config_->stunEnabled)
1486 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1487 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001488 if (config_->turnCache) {
1489 auto turnAddr = config_->turnCache->getResolvedTurn();
1490 if (turnAddr != std::nullopt) {
1491 opts.turnServers.emplace_back(TurnServerInfo()
1492 .setUri(turnAddr->toString())
1493 .setUsername(config_->turnServerUserName)
1494 .setPassword(config_->turnServerPwd)
1495 .setRealm(config_->turnServerRealm));
1496 }
1497 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001498 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001499 .setUri(config_->turnServer)
1500 .setUsername(config_->turnServerUserName)
1501 .setPassword(config_->turnServerPwd)
1502 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001503 }
1504 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1505 // co issues. So this needs some debug. for now just disable
1506 // if (cacheTurnV6 && *cacheTurnV6) {
1507 // opts.turnServers.emplace_back(TurnServerInfo()
1508 // .setUri(cacheTurnV6->toString(true))
1509 // .setUsername(turnServerUserName_)
1510 // .setPassword(turnServerPwd_)
1511 // .setRealm(turnServerRealm_));
1512 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001513 }
1514 return opts;
1515}
1516
1517bool
1518ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1519 dht::InfoHash& account_id,
1520 const std::shared_ptr<Logger>& logger)
1521{
1522 if (not crt)
1523 return false;
1524
1525 auto top_issuer = crt;
1526 while (top_issuer->issuer)
1527 top_issuer = top_issuer->issuer;
1528
1529 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001530 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001531 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001532 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001533 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001534 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001535
1536 // Check peer certificate chain
1537 // Trust store with top issuer as the only CA
1538 dht::crypto::TrustList peer_trust;
1539 peer_trust.add(*top_issuer);
1540 if (not peer_trust.verify(*crt)) {
1541 if (logger)
1542 logger->warn("Found invalid peer device: {}", crt->getLongId());
1543 return false;
1544 }
1545
1546 // Check cached OCSP response
1547 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1548 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001549 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001550 return false;
1551 }
1552
Adrien Béraudc631a832023-07-26 22:19:00 -04001553 account_id = crt->issuer->getId();
1554 if (logger)
1555 logger->warn("Found peer device: {} account:{} CA:{}",
1556 crt->getLongId(),
1557 account_id,
1558 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001559 return true;
1560}
1561
1562bool
1563ConnectionManager::Impl::findCertificate(
1564 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1565{
1566 if (auto cert = certStore().getCertificate(id.toString())) {
1567 if (cb)
1568 cb(cert);
1569 } else if (cb)
1570 cb(nullptr);
1571 return true;
1572}
1573
Sébastien Blin34086512023-07-25 09:52:14 -04001574bool
1575ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1576 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1577{
1578 if (auto cert = certStore().getCertificate(h.toString())) {
1579 if (cb)
1580 cb(cert);
1581 } else {
1582 dht()->findCertificate(h,
1583 [cb = std::move(cb), this](
1584 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1585 if (crt)
1586 certStore().pinCertificate(crt);
1587 if (cb)
1588 cb(crt);
1589 });
1590 }
1591 return true;
1592}
1593
Adrien Béraud612b55b2023-05-29 10:42:04 -04001594ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1595 : pimpl_ {std::make_shared<Impl>(config_)}
1596{}
1597
1598ConnectionManager::~ConnectionManager()
1599{
1600 if (pimpl_)
1601 pimpl_->shutdown();
1602}
1603
1604void
1605ConnectionManager::connectDevice(const DeviceId& deviceId,
1606 const std::string& name,
1607 ConnectCallback cb,
1608 bool noNewSocket,
1609 bool forceNewSocket,
1610 const std::string& connType)
1611{
1612 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1613}
1614
1615void
Amna0cf544d2023-07-25 14:25:09 -04001616ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1617 const std::string& name,
1618 ConnectCallbackLegacy cb,
1619 bool noNewSocket,
1620 bool forceNewSocket,
1621 const std::string& connType)
1622{
1623 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1624}
1625
1626
1627void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001628ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1629 const std::string& name,
1630 ConnectCallback cb,
1631 bool noNewSocket,
1632 bool forceNewSocket,
1633 const std::string& connType)
1634{
1635 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1636}
1637
1638bool
1639ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1640{
Adrien Béraud665294f2023-06-13 18:09:11 -04001641 auto pending = pimpl_->getPendingIds(deviceId);
1642 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001643 != pending.end();
1644}
1645
1646void
1647ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1648{
1649 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1650 std::set<DeviceId> peersDevices;
1651 {
1652 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1653 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1654 auto const& [key, value] = *iter;
1655 auto deviceId = key.first;
1656 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1657 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1658 connInfos.emplace_back(value);
1659 peersDevices.emplace(deviceId);
1660 iter = pimpl_->infos_.erase(iter);
1661 } else {
1662 iter++;
1663 }
1664 }
1665 }
1666 // Stop connections to all peers devices
1667 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001668 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001669 // This will close the TLS Session
1670 pimpl_->removeUnusedConnections(deviceId);
1671 }
1672 for (auto& info : connInfos) {
1673 if (info->socket_)
1674 info->socket_->shutdown();
1675 if (info->waitForAnswer_)
1676 info->waitForAnswer_->cancel();
1677 if (info->ice_) {
1678 std::unique_lock<std::mutex> lk {info->mutex_};
1679 dht::ThreadPool::io().run(
1680 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1681 }
1682 }
1683}
1684
1685void
1686ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1687{
1688 pimpl_->onDhtConnected(devicePk);
1689}
1690
1691void
1692ConnectionManager::onICERequest(onICERequestCallback&& cb)
1693{
1694 pimpl_->iceReqCb_ = std::move(cb);
1695}
1696
1697void
1698ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1699{
1700 pimpl_->channelReqCb_ = std::move(cb);
1701}
1702
1703void
1704ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1705{
1706 pimpl_->connReadyCb_ = std::move(cb);
1707}
1708
1709void
1710ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1711{
1712 pimpl_->iOSConnectedCb_ = std::move(cb);
1713}
1714
1715std::size_t
1716ConnectionManager::activeSockets() const
1717{
1718 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1719 return pimpl_->infos_.size();
1720}
1721
1722void
1723ConnectionManager::monitor() const
1724{
1725 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1726 auto logger = pimpl_->config_->logger;
1727 if (!logger)
1728 return;
1729 logger->debug("ConnectionManager current status:");
1730 for (const auto& [_, ci] : pimpl_->infos_) {
1731 if (ci->socket_)
1732 ci->socket_->monitor();
1733 }
1734 logger->debug("ConnectionManager end status.");
1735}
1736
1737void
1738ConnectionManager::connectivityChanged()
1739{
1740 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1741 for (const auto& [_, ci] : pimpl_->infos_) {
1742 if (ci->socket_)
1743 ci->socket_->sendBeacon();
1744 }
1745}
1746
1747void
1748ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1749{
1750 return pimpl_->getIceOptions(std::move(cb));
1751}
1752
1753IceTransportOptions
1754ConnectionManager::getIceOptions() const noexcept
1755{
1756 return pimpl_->getIceOptions();
1757}
1758
1759IpAddr
1760ConnectionManager::getPublishedIpAddress(uint16_t family) const
1761{
1762 return pimpl_->getPublishedIpAddress(family);
1763}
1764
1765void
1766ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1767{
1768 return pimpl_->setPublishedAddress(ip_addr);
1769}
1770
1771void
1772ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1773{
1774 return pimpl_->storeActiveIpAddress(std::move(cb));
1775}
1776
1777std::shared_ptr<ConnectionManager::Config>
1778ConnectionManager::getConfig()
1779{
1780 return pimpl_->config_;
1781}
1782
Amna31791e52023-08-03 12:40:57 -04001783std::vector<std::map<std::string, std::string>>
1784ConnectionManager::getConnectionList(const DeviceId& device) const
1785{
1786 std::vector<std::map<std::string, std::string>> connectionsList;
1787 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1788
1789 for (const auto& [key, ci] : pimpl_->infos_) {
1790 if (device && key.first != device)
1791 continue;
1792 std::map<std::string, std::string> connectionInfo;
1793 connectionInfo["id"] = callbackIdToString(key.first, key.second);
Amna82420202023-08-15 16:27:18 -04001794 connectionInfo["device"] = key.first.toString();
Amna6c999d82023-08-15 15:19:41 -04001795 if (ci->tls_) {
1796 if (auto cert = ci->tls_->peerCertificate()) {
1797 connectionInfo["peer"] = cert->issuer->getId().toString();
1798 }
Amna31791e52023-08-03 12:40:57 -04001799 }
1800 if (ci->socket_) {
1801 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
1802 } else if (ci->tls_) {
1803 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
1804 } else if(ci->ice_)
1805 {
1806 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
1807 }
1808 if (ci->tls_) {
1809 std::string remoteAddress = ci->tls_->getRemoteAddress();
1810 std::string remoteAddressIp = remoteAddress.substr(0, remoteAddress.find(':'));
1811 std::string remoteAddressPort = remoteAddress.substr(remoteAddress.find(':') + 1);
1812 connectionInfo["remoteAdress"] = remoteAddressIp;
1813 connectionInfo["remotePort"] = remoteAddressPort;
1814 }
1815 connectionsList.emplace_back(std::move(connectionInfo));
1816 }
1817
1818 if (device) {
1819 auto it = pimpl_->pendingOperations_.find(device);
1820 if (it != pimpl_->pendingOperations_.end()) {
1821 const auto& po = it->second;
1822 for (const auto& [vid, ci] : po.connecting) {
1823 std::map<std::string, std::string> connectionInfo;
1824 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001825 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1826 connectionsList.emplace_back(std::move(connectionInfo));
1827 }
1828
1829 for (const auto& [vid, ci] : po.waiting) {
1830 std::map<std::string, std::string> connectionInfo;
1831 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001832 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1833 connectionsList.emplace_back(std::move(connectionInfo));
1834 }
1835 }
1836 }
1837 else {
1838 for (const auto& [key, po] : pimpl_->pendingOperations_) {
1839 for (const auto& [vid, ci] : po.connecting) {
1840 std::map<std::string, std::string> connectionInfo;
1841 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001842 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1843 connectionsList.emplace_back(std::move(connectionInfo));
1844 }
1845
1846 for (const auto& [vid, ci] : po.waiting) {
1847 std::map<std::string, std::string> connectionInfo;
1848 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001849 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1850 connectionsList.emplace_back(std::move(connectionInfo));
1851 }
1852 }
1853 }
1854 return connectionsList;
1855}
1856
1857std::vector<std::map<std::string, std::string>>
1858ConnectionManager::getChannelList(const std::string& connectionId) const
1859{
1860 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1861 CallbackId cbid = parseCallbackId(connectionId);
1862 if (pimpl_->infos_.count(cbid) > 0) {
1863 return pimpl_->infos_[cbid]->socket_->getChannelList();
1864 } else {
1865 return {};
1866 }
1867}
1868
Sébastien Blin464bdff2023-07-19 08:02:53 -04001869} // namespace dhtnet