blob: 1ca85e7ea522f030a90864d316ac8d1a1b15b9f3 [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Amna31791e52023-08-03 12:40:57 -040044std::string
45callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
46{
47 return fmt::format("{} {}", did.to_view(), vid);
48}
Adrien Béraud612b55b2023-05-29 10:42:04 -040049
Amna31791e52023-08-03 12:40:57 -040050CallbackId parseCallbackId(std::string_view ci)
51{
52 auto sep = ci.find(' ');
53 std::string_view deviceIdString = ci.substr(0, sep);
54 std::string_view vidString = ci.substr(sep + 1);
55
56 dhtnet::DeviceId deviceId(deviceIdString);
57 dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);
58
59 return CallbackId(deviceId, vid);
60}
Adrien Béraud612b55b2023-05-29 10:42:04 -040061struct ConnectionInfo
62{
63 ~ConnectionInfo()
64 {
65 if (socket_)
66 socket_->join();
67 }
68
69 std::mutex mutex_ {};
70 bool responseReceived_ {false};
71 PeerConnectionRequest response_ {};
72 std::unique_ptr<IceTransport> ice_ {nullptr};
73 // Used to store currently non ready TLS Socket
74 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
75 std::shared_ptr<MultiplexedSocket> socket_ {};
76 std::set<CallbackId> cbIds_ {};
77
78 std::function<void(bool)> onConnected_;
79 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
80};
81
82/**
83 * returns whether or not UPnP is enabled and active_
84 * ie: if it is able to make port mappings
85 */
86bool
87ConnectionManager::Config::getUPnPActive() const
88{
89 if (upnpCtrl)
90 return upnpCtrl->isReady();
91 return false;
92}
93
94class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
95{
96public:
97 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
98 : config_ {std::move(config_)}
Sébastien Blincf569402023-07-27 09:46:40 -040099 , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Adrien Béraud612b55b2023-05-29 10:42:04 -0400100 {}
101 ~Impl() {}
102
103 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
104 const dht::crypto::Identity& identity() const { return config_->id; }
105
106 void removeUnusedConnections(const DeviceId& deviceId = {})
107 {
108 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
109
110 {
111 std::lock_guard<std::mutex> lk(infosMtx_);
112 for (auto it = infos_.begin(); it != infos_.end();) {
113 auto& [key, info] = *it;
114 if (info && (!deviceId || key.first == deviceId)) {
115 unused.emplace_back(std::move(info));
116 it = infos_.erase(it);
117 } else {
118 ++it;
119 }
120 }
121 }
122 for (auto& info: unused) {
123 if (info->tls_)
124 info->tls_->shutdown();
125 if (info->socket_)
126 info->socket_->shutdown();
127 if (info->waitForAnswer_)
128 info->waitForAnswer_->cancel();
129 }
130 if (!unused.empty())
131 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); });
132 }
133
134 void shutdown()
135 {
136 if (isDestroying_.exchange(true))
137 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400138 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400139 {
140 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400141 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400142 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400143 for (auto& [deviceId, pcbs] : po) {
144 for (auto& [id, pending] : pcbs.connecting)
145 pending.cb(nullptr, deviceId);
146 for (auto& [id, pending] : pcbs.waiting)
147 pending.cb(nullptr, deviceId);
148 }
149
Adrien Béraud612b55b2023-05-29 10:42:04 -0400150 removeUnusedConnections();
151 }
152
Adrien Béraud612b55b2023-05-29 10:42:04 -0400153 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
154 const dht::Value::Id& vid,
155 const std::string& connType,
156 std::function<void(bool)> onConnected);
157 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
158 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
159 const std::string& name,
160 const dht::Value::Id& vid,
161 const std::shared_ptr<dht::crypto::Certificate>& cert);
162 void connectDevice(const DeviceId& deviceId,
163 const std::string& uri,
164 ConnectCallback cb,
165 bool noNewSocket = false,
166 bool forceNewSocket = false,
167 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400168 void connectDevice(const dht::InfoHash& deviceId,
169 const std::string& uri,
170 ConnectCallbackLegacy cb,
171 bool noNewSocket = false,
172 bool forceNewSocket = false,
173 const std::string& connType = "");
174
Adrien Béraud612b55b2023-05-29 10:42:04 -0400175 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
176 const std::string& name,
177 ConnectCallback cb,
178 bool noNewSocket = false,
179 bool forceNewSocket = false,
180 const std::string& connType = "");
181 /**
182 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
183 * @param sock socket used to send the request
184 * @param name channel's name
185 * @param vid channel's id
186 * @param deviceId to identify the linked ConnectCallback
187 */
188 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
189 const std::string& name,
190 const DeviceId& deviceId,
191 const dht::Value::Id& vid);
192 /**
193 * Triggered when a PeerConnectionRequest comes from the DHT
194 */
195 void answerTo(IceTransport& ice,
196 const dht::Value::Id& id,
197 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
198 bool onRequestStartIce(const PeerConnectionRequest& req);
199 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
200 void onDhtPeerRequest(const PeerConnectionRequest& req,
201 const std::shared_ptr<dht::crypto::Certificate>& cert);
202
203 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
204 void onPeerResponse(const PeerConnectionRequest& req);
205 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
206
207 const std::shared_future<tls::DhParams> dhParams() const;
208 tls::CertificateStore& certStore() const { return *config_->certStore; }
209
210 mutable std::mutex messageMutex_ {};
211 std::set<std::string, std::less<>> treatedMessages_ {};
212
213 void loadTreatedMessages();
214 void saveTreatedMessages() const;
215
216 /// \return true if the given DHT message identifier has been treated
217 /// \note if message has not been treated yet this method st/ore this id and returns true at
218 /// further calls
219 bool isMessageTreated(std::string_view id);
220
221 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
222
223 /**
224 * Published IPv4/IPv6 addresses, used only if defined by the user in account
225 * configuration
226 *
227 */
228 IpAddr publishedIp_[2] {};
229
Adrien Béraud612b55b2023-05-29 10:42:04 -0400230 /**
231 * interface name on which this account is bound
232 */
233 std::string interface_ {"default"};
234
235 /**
236 * Get the local interface name on which this account is bound.
237 */
238 const std::string& getLocalInterface() const { return interface_; }
239
240 /**
241 * Get the published IP address, fallbacks to NAT if family is unspecified
242 * Prefers the usage of IPv4 if possible.
243 */
244 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
245
246 /**
247 * Set published IP address according to given family
248 */
249 void setPublishedAddress(const IpAddr& ip_addr);
250
251 /**
252 * Store the local/public addresses used to register
253 */
254 void storeActiveIpAddress(std::function<void()>&& cb = {});
255
256 /**
257 * Create and return ICE options.
258 */
259 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
260 IceTransportOptions getIceOptions() const noexcept;
261
262 /**
263 * Inform that a potential peer device have been found.
264 * Returns true only if the device certificate is a valid device certificate.
265 * In that case (true is returned) the account_id parameter is set to the peer account ID.
266 */
267 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
268 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
269
270 bool findCertificate(const dht::PkId& id,
271 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400272 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400273
274 /**
275 * returns whether or not UPnP is enabled and active
276 * ie: if it is able to make port mappings
277 */
278 bool getUPnPActive() const;
279
280 /**
281 * Triggered when a new TLS socket is ready to use
282 * @param ok If succeed
283 * @param deviceId Related device
284 * @param vid vid of the connection request
285 * @param name non empty if TLS was created by connectDevice()
286 */
287 void onTlsNegotiationDone(bool ok,
288 const DeviceId& deviceId,
289 const dht::Value::Id& vid,
290 const std::string& name = "");
291
292 std::shared_ptr<ConnectionManager::Config> config_;
293
Adrien Béraud612b55b2023-05-29 10:42:04 -0400294 mutable std::mt19937_64 rand;
295
296 iOSConnectedCallback iOSConnectedCb_ {};
297
298 std::mutex infosMtx_ {};
299 // Note: Someone can ask multiple sockets, so to avoid any race condition,
300 // each device can have multiple multiplexed sockets.
301 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
302
303 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
304 {
305 std::lock_guard<std::mutex> lk(infosMtx_);
306 auto it = infos_.find({deviceId, id});
307 if (it != infos_.end())
308 return it->second;
309 return {};
310 }
311
312 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
313 {
314 std::lock_guard<std::mutex> lk(infosMtx_);
315 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
316 auto& [key, value] = item;
317 return key.first == deviceId && value && value->socket_;
318 });
319 if (it != infos_.end())
320 return it->second;
321 return {};
322 }
323
324 ChannelRequestCallback channelReqCb_ {};
325 ConnectionReadyCallback connReadyCb_ {};
326 onICERequestCallback iceReqCb_ {};
327
328 /**
329 * Stores callback from connectDevice
330 * @note: each device needs a vector because several connectDevice can
331 * be done in parallel and we only want one socket
332 */
333 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400334
Adrien Béraud665294f2023-06-13 18:09:11 -0400335 struct PendingCb
336 {
337 std::string name;
338 ConnectCallback cb;
339 };
340 struct PendingOperations {
341 std::map<dht::Value::Id, PendingCb> connecting;
342 std::map<dht::Value::Id, PendingCb> waiting;
343 };
344
345 std::map<DeviceId, PendingOperations> pendingOperations_ {};
346
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400347 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400348 {
349 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400350 std::unique_lock<std::mutex> lk(connectCbsMtx_);
351 auto it = pendingOperations_.find(deviceId);
352 if (it == pendingOperations_.end())
353 return;
354 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400355 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400356 // Extract all pending callbacks
357 for (auto& [vid, cb] : pendingOperations.connecting)
358 ret.emplace_back(std::move(cb));
359 pendingOperations.connecting.clear();
360 for (auto& [vid, cb] : pendingOperations.waiting)
361 ret.emplace_back(std::move(cb));
362 pendingOperations.waiting.clear();
363 } else if (auto n = pendingOperations.waiting.extract(vid)) {
364 // If it's a waiting operation, just move it
365 ret.emplace_back(std::move(n.mapped()));
366 } else if (auto n = pendingOperations.connecting.extract(vid)) {
367 ret.emplace_back(std::move(n.mapped()));
368 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400369 // If accepted is false, it means that underlying socket is ok, but channel is declined
370 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400371 for (auto& [vid, cb] : pendingOperations.waiting)
372 ret.emplace_back(std::move(cb));
373 pendingOperations.waiting.clear();
374 for (auto& [vid, cb] : pendingOperations.connecting)
375 ret.emplace_back(std::move(cb));
376 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400377 }
378 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400379 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
380 pendingOperations_.erase(it);
381 lk.unlock();
382 for (auto& cb : ret)
383 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400384 }
385
Adrien Béraud665294f2023-06-13 18:09:11 -0400386 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400387 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400388 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400389 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400390 auto it = pendingOperations_.find(deviceId);
391 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400392 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400393 auto& pendingOp = it->second;
394 for (const auto& [id, pc]: pendingOp.connecting) {
395 if (vid == 0 || id == vid)
396 ret[id] = pc.name;
397 }
398 for (const auto& [id, pc]: pendingOp.waiting) {
399 if (vid == 0 || id == vid)
400 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400401 }
402 return ret;
403 }
404
405 std::shared_ptr<ConnectionManager::Impl> shared()
406 {
407 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
408 }
409 std::shared_ptr<ConnectionManager::Impl const> shared() const
410 {
411 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
412 }
413 std::weak_ptr<ConnectionManager::Impl> weak()
414 {
415 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
416 }
417 std::weak_ptr<ConnectionManager::Impl const> weak() const
418 {
419 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
420 }
421
422 std::atomic_bool isDestroying_ {false};
423};
424
425void
426ConnectionManager::Impl::connectDeviceStartIce(
427 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
428 const dht::Value::Id& vid,
429 const std::string& connType,
430 std::function<void(bool)> onConnected)
431{
432 auto deviceId = devicePk->getLongId();
433 auto info = getInfo(deviceId, vid);
434 if (!info) {
435 onConnected(false);
436 return;
437 }
438
439 std::unique_lock<std::mutex> lk(info->mutex_);
440 auto& ice = info->ice_;
441
442 if (!ice) {
443 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400444 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400445 onConnected(false);
446 return;
447 }
448
449 auto iceAttributes = ice->getLocalAttributes();
450 std::ostringstream icemsg;
451 icemsg << iceAttributes.ufrag << "\n";
452 icemsg << iceAttributes.pwd << "\n";
453 for (const auto& addr : ice->getLocalCandidates(1)) {
454 icemsg << addr << "\n";
455 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400456 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400457 }
458
459 // Prepare connection request as a DHT message
460 PeerConnectionRequest val;
461
462 val.id = vid; /* Random id for the message unicity */
463 val.ice_msg = icemsg.str();
464 val.connType = connType;
465
466 auto value = std::make_shared<dht::Value>(std::move(val));
467 value->user_type = "peer_request";
468
469 // Send connection request through DHT
470 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400471 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400472 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
473 + devicePk->getId().toString()),
474 devicePk,
475 value,
476 [l=config_->logger,deviceId](bool ok) {
477 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400478 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400479 deviceId,
480 (ok ? "ok" : "failed"));
481 });
482 // Wait for call to onResponse() operated by DHT
483 if (isDestroying_) {
484 onConnected(true); // This avoid to wait new negotiation when destroying
485 return;
486 }
487
488 info->onConnected_ = std::move(onConnected);
489 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
490 std::chrono::steady_clock::now()
491 + DHT_MSG_TIMEOUT);
492 info->waitForAnswer_->async_wait(
493 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
494}
495
496void
497ConnectionManager::Impl::onResponse(const asio::error_code& ec,
498 const DeviceId& deviceId,
499 const dht::Value::Id& vid)
500{
501 if (ec == asio::error::operation_aborted)
502 return;
503 auto info = getInfo(deviceId, vid);
504 if (!info)
505 return;
506
507 std::unique_lock<std::mutex> lk(info->mutex_);
508 auto& ice = info->ice_;
509 if (isDestroying_) {
510 info->onConnected_(true); // The destructor can wake a pending wait here.
511 return;
512 }
513 if (!info->responseReceived_) {
514 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400515 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400516 info->onConnected_(false);
517 return;
518 }
519
520 if (!info->ice_) {
521 info->onConnected_(false);
522 return;
523 }
524
525 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
526
527 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
528 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400529 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400530 info->onConnected_(false);
531 return;
532 }
533 info->onConnected_(true);
534}
535
536bool
537ConnectionManager::Impl::connectDeviceOnNegoDone(
538 const DeviceId& deviceId,
539 const std::string& name,
540 const dht::Value::Id& vid,
541 const std::shared_ptr<dht::crypto::Certificate>& cert)
542{
543 auto info = getInfo(deviceId, vid);
544 if (!info)
545 return false;
546
547 std::unique_lock<std::mutex> lk {info->mutex_};
548 if (info->waitForAnswer_) {
549 // Negotiation is done and connected, go to handshake
550 // and avoid any cancellation at this point.
551 info->waitForAnswer_->cancel();
552 }
553 auto& ice = info->ice_;
554 if (!ice || !ice->isRunning()) {
555 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400556 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400557 return false;
558 }
559
560 // Build socket
561 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
562 std::move(ice)),
563 true);
564
565 // Negotiate a TLS session
566 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400567 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400568 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
569 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400570 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400571 identity(),
572 dhParams(),
573 *cert);
574
575 info->tls_->setOnReady(
576 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
577 bool ok) {
578 if (auto shared = w.lock())
579 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
580 });
581 return true;
582}
583
584void
585ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
586 const std::string& name,
587 ConnectCallback cb,
588 bool noNewSocket,
589 bool forceNewSocket,
590 const std::string& connType)
591{
592 if (!dht()) {
593 cb(nullptr, deviceId);
594 return;
595 }
596 if (deviceId.toString() == identity().second->getLongId().toString()) {
597 cb(nullptr, deviceId);
598 return;
599 }
600 findCertificate(deviceId,
601 [w = weak(),
602 deviceId,
603 name,
604 cb = std::move(cb),
605 noNewSocket,
606 forceNewSocket,
607 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
608 if (!cert) {
609 if (auto shared = w.lock())
610 if (shared->config_->logger)
611 shared->config_->logger->error(
612 "No valid certificate found for device {}",
613 deviceId);
614 cb(nullptr, deviceId);
615 return;
616 }
617 if (auto shared = w.lock()) {
618 shared->connectDevice(cert,
619 name,
620 std::move(cb),
621 noNewSocket,
622 forceNewSocket,
623 connType);
624 } else
625 cb(nullptr, deviceId);
626 });
627}
628
629void
Amna0cf544d2023-07-25 14:25:09 -0400630ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
631 const std::string& name,
632 ConnectCallbackLegacy cb,
633 bool noNewSocket,
634 bool forceNewSocket,
635 const std::string& connType)
636{
637 if (!dht()) {
638 cb(nullptr, deviceId);
639 return;
640 }
641 if (deviceId.toString() == identity().second->getLongId().toString()) {
642 cb(nullptr, deviceId);
643 return;
644 }
645 findCertificate(deviceId,
646 [w = weak(),
647 deviceId,
648 name,
649 cb = std::move(cb),
650 noNewSocket,
651 forceNewSocket,
652 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
653 if (!cert) {
654 if (auto shared = w.lock())
655 if (shared->config_->logger)
656 shared->config_->logger->error(
657 "No valid certificate found for device {}",
658 deviceId);
659 cb(nullptr, deviceId);
660 return;
661 }
662 if (auto shared = w.lock()) {
663 shared->connectDevice(cert,
664 name,
665 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& did){
666 cb(sock, deviceId);
667 },
668 noNewSocket,
669 forceNewSocket,
670 connType);
671 } else
672 cb(nullptr, deviceId);
673 });
674}
675
676void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400677ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
678 const std::string& name,
679 ConnectCallback cb,
680 bool noNewSocket,
681 bool forceNewSocket,
682 const std::string& connType)
683{
684 // Avoid dht operation in a DHT callback to avoid deadlocks
685 dht::ThreadPool::computation().run([w = weak(),
686 name = std::move(name),
687 cert = std::move(cert),
688 cb = std::move(cb),
689 noNewSocket,
690 forceNewSocket,
691 connType] {
692 auto devicePk = cert->getSharedPublicKey();
693 auto deviceId = devicePk->getLongId();
694 auto sthis = w.lock();
695 if (!sthis || sthis->isDestroying_) {
696 cb(nullptr, deviceId);
697 return;
698 }
699 dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
700 auto isConnectingToDevice = false;
701 {
702 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400703 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
704 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400705 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400706 while (pendings.connecting.find(vid) != pendings.connecting.end()
707 && pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400708 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400709 }
710 }
711 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400712 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400713 // Save current request for sendChannelRequest.
714 // Note: do not return here, cause we can be in a state where first
715 // socket is negotiated and first channel is pending
716 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400717 if (isConnectingToDevice && !forceNewSocket)
718 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400719 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400720 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400721 }
722
723 // Check if already negotiated
724 CallbackId cbId(deviceId, vid);
725 if (auto info = sthis->getConnectedInfo(deviceId)) {
726 std::lock_guard<std::mutex> lk(info->mutex_);
727 if (info->socket_) {
728 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400729 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400730 info->cbIds_.emplace(cbId);
731 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
732 return;
733 }
734 }
735
736 if (isConnectingToDevice && !forceNewSocket) {
737 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400738 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400739 return;
740 }
741 if (noNewSocket) {
742 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400743 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400744 return;
745 }
746
747 // Note: used when the ice negotiation fails to erase
748 // all stored structures.
749 auto eraseInfo = [w, cbId] {
750 if (auto shared = w.lock()) {
751 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400752 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400753 std::lock_guard<std::mutex> lk(shared->infosMtx_);
754 shared->infos_.erase(cbId);
755 }
756 };
757
758 // If no socket exists, we need to initiate an ICE connection.
759 sthis->getIceOptions([w,
760 deviceId = std::move(deviceId),
761 devicePk = std::move(devicePk),
762 name = std::move(name),
763 cert = std::move(cert),
764 vid,
765 connType,
766 eraseInfo](auto&& ice_config) {
767 auto sthis = w.lock();
768 if (!sthis) {
769 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
770 return;
771 }
772 ice_config.tcpEnable = true;
773 ice_config.onInitDone = [w,
774 deviceId = std::move(deviceId),
775 devicePk = std::move(devicePk),
776 name = std::move(name),
777 cert = std::move(cert),
778 vid,
779 connType,
780 eraseInfo](bool ok) {
781 dht::ThreadPool::io().run([w = std::move(w),
782 devicePk = std::move(devicePk),
783 vid = std::move(vid),
784 eraseInfo,
785 connType, ok] {
786 auto sthis = w.lock();
787 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400788 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400789 if (!sthis || !ok) {
790 eraseInfo();
791 return;
792 }
793 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
794 if (!ok) {
795 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
796 }
797 });
798 });
799 };
800 ice_config.onNegoDone = [w,
801 deviceId,
802 name,
803 cert = std::move(cert),
804 vid,
805 eraseInfo](bool ok) {
806 dht::ThreadPool::io().run([w = std::move(w),
807 deviceId = std::move(deviceId),
808 name = std::move(name),
809 cert = std::move(cert),
810 vid = std::move(vid),
811 eraseInfo = std::move(eraseInfo),
812 ok] {
813 auto sthis = w.lock();
814 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400815 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400816 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
817 eraseInfo();
818 });
819 };
820
821 auto info = std::make_shared<ConnectionInfo>();
822 {
823 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
824 sthis->infos_[{deviceId, vid}] = info;
825 }
826 std::unique_lock<std::mutex> lk {info->mutex_};
827 ice_config.master = false;
828 ice_config.streamsCount = 1;
829 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400830 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400831 if (!info->ice_) {
832 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400833 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400834 eraseInfo();
835 return;
836 }
837 // We need to detect any shutdown if the ice session is destroyed before going to the
838 // TLS session;
839 info->ice_->setOnShutdown([eraseInfo]() {
840 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
841 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400842 try {
843 info->ice_->initIceInstance(ice_config);
844 } catch (const std::exception& e) {
845 if (sthis->config_->logger)
846 sthis->config_->logger->error("{}", e.what());
847 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
848 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400849 });
850 });
851}
852
853void
854ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
855 const std::string& name,
856 const DeviceId& deviceId,
857 const dht::Value::Id& vid)
858{
859 auto channelSock = sock->addChannel(name);
860 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
861 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400862 if (auto shared = w.lock())
863 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400864 });
865 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400866 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400867 auto shared = w.lock();
868 auto channelSock = wSock.lock();
869 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400870 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400871 });
872
873 ChannelRequest val;
874 val.name = channelSock->name();
875 val.state = ChannelRequestState::REQUEST;
876 val.channel = channelSock->channel();
877 msgpack::sbuffer buffer(256);
878 msgpack::pack(buffer, val);
879
880 std::error_code ec;
881 int res = sock->write(CONTROL_CHANNEL,
882 reinterpret_cast<const uint8_t*>(buffer.data()),
883 buffer.size(),
884 ec);
885 if (res < 0) {
886 // TODO check if we should handle errors here
887 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400888 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400889 }
890}
891
892void
893ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
894{
895 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400896 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400897 if (config_->logger)
898 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400899 std::lock_guard<std::mutex> lk {info->mutex_};
900 info->responseReceived_ = true;
901 info->response_ = std::move(req);
902 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
903 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
904 this,
905 std::placeholders::_1,
906 device,
907 req.id));
908 } else {
909 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400910 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400911 }
912}
913
914void
915ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
916{
917 if (!dht())
918 return;
919 dht()->listen<PeerConnectionRequest>(
920 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
921 [w = weak()](PeerConnectionRequest&& req) {
922 auto shared = w.lock();
923 if (!shared)
924 return false;
925 if (shared->isMessageTreated(to_hex_string(req.id))) {
926 // Message already treated. Just ignore
927 return true;
928 }
929 if (req.isAnswer) {
930 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400931 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400932 } else {
933 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400934 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400935 }
936 if (req.isAnswer) {
937 shared->onPeerResponse(req);
938 } else {
939 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400940 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400941 req.from,
942 [w, req = std::move(req)](
943 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
944 auto shared = w.lock();
945 if (!shared)
946 return;
947 dht::InfoHash peer_h;
948 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
949#if TARGET_OS_IOS
950 if (shared->iOSConnectedCb_(req.connType, peer_h))
951 return;
952#endif
953 shared->onDhtPeerRequest(req, cert);
954 } else {
955 if (shared->config_->logger)
956 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -0400957 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400958 req.owner->getLongId());
959 }
960 });
961 }
962
963 return true;
964 },
965 dht::Value::UserTypeFilter("peer_request"));
966}
967
968void
969ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
970 const DeviceId& deviceId,
971 const dht::Value::Id& vid,
972 const std::string& name)
973{
974 if (isDestroying_)
975 return;
976 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
977 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
978 // asked yet)
979 auto isDhtRequest = name.empty();
980 if (!ok) {
981 if (isDhtRequest) {
982 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400983 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400984 deviceId,
985 name,
986 vid);
987 if (connReadyCb_)
988 connReadyCb_(deviceId, "", nullptr);
989 } else {
990 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400991 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400992 deviceId,
993 name,
994 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -0400995 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400996 }
997 } else {
998 // The socket is ready, store it
999 if (isDhtRequest) {
1000 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001001 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001002 deviceId,
1003 vid);
1004 } else {
1005 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001006 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001007 deviceId,
1008 name,
1009 vid);
1010 }
1011
1012 auto info = getInfo(deviceId, vid);
1013 addNewMultiplexedSocket({deviceId, vid}, info);
1014 // Finally, open the channel and launch pending callbacks
1015 if (info->socket_) {
1016 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001017 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001018 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001019 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001020 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001021 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001022 }
1023 }
1024 }
1025}
1026
1027void
1028ConnectionManager::Impl::answerTo(IceTransport& ice,
1029 const dht::Value::Id& id,
1030 const std::shared_ptr<dht::crypto::PublicKey>& from)
1031{
1032 // NOTE: This is a shortest version of a real SDP message to save some bits
1033 auto iceAttributes = ice.getLocalAttributes();
1034 std::ostringstream icemsg;
1035 icemsg << iceAttributes.ufrag << "\n";
1036 icemsg << iceAttributes.pwd << "\n";
1037 for (const auto& addr : ice.getLocalCandidates(1)) {
1038 icemsg << addr << "\n";
1039 }
1040
1041 // Send PeerConnection response
1042 PeerConnectionRequest val;
1043 val.id = id;
1044 val.ice_msg = icemsg.str();
1045 val.isAnswer = true;
1046 auto value = std::make_shared<dht::Value>(std::move(val));
1047 value->user_type = "peer_request";
1048
1049 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001050 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001051 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1052 + from->getId().toString()),
1053 from,
1054 value,
1055 [from,l=config_->logger](bool ok) {
1056 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001057 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001058 from->getLongId(),
1059 (ok ? "ok" : "failed"));
1060 });
1061}
1062
1063bool
1064ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1065{
1066 auto deviceId = req.owner->getLongId();
1067 auto info = getInfo(deviceId, req.id);
1068 if (!info)
1069 return false;
1070
1071 std::unique_lock<std::mutex> lk {info->mutex_};
1072 auto& ice = info->ice_;
1073 if (!ice) {
1074 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001075 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001076 if (connReadyCb_)
1077 connReadyCb_(deviceId, "", nullptr);
1078 return false;
1079 }
1080
1081 auto sdp = ice->parseIceCandidates(req.ice_msg);
1082 answerTo(*ice, req.id, req.owner);
1083 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1084 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001085 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001086 ice = nullptr;
1087 if (connReadyCb_)
1088 connReadyCb_(deviceId, "", nullptr);
1089 return false;
1090 }
1091 return true;
1092}
1093
1094bool
1095ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1096{
1097 auto deviceId = req.owner->getLongId();
1098 auto info = getInfo(deviceId, req.id);
1099 if (!info)
1100 return false;
1101
1102 std::unique_lock<std::mutex> lk {info->mutex_};
1103 auto& ice = info->ice_;
1104 if (!ice) {
1105 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001106 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001107 return false;
1108 }
1109
1110 // Build socket
1111 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1112 std::move(ice)),
1113 false);
1114
1115 // init TLS session
1116 auto ph = req.from;
1117 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001118 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1119 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001120 req.id);
1121 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1122 std::move(endpoint),
1123 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001124 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001125 identity(),
1126 dhParams(),
1127 [ph, w = weak()](const dht::crypto::Certificate& cert) {
1128 auto shared = w.lock();
1129 if (!shared)
1130 return false;
1131 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1132 if (!crt)
1133 return false;
1134 return crt->getPacked() == cert.getPacked();
1135 });
1136
1137 info->tls_->setOnReady(
1138 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1139 if (auto shared = w.lock())
1140 shared->onTlsNegotiationDone(ok, deviceId, vid);
1141 });
1142 return true;
1143}
1144
1145void
1146ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1147 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1148{
1149 auto deviceId = req.owner->getLongId();
1150 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001151 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001152 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1153 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001154 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001155 return;
1156 }
1157
1158 // Because the connection is accepted, create an ICE socket.
1159 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1160 auto shared = w.lock();
1161 if (!shared)
1162 return;
1163 // Note: used when the ice negotiation fails to erase
1164 // all stored structures.
1165 auto eraseInfo = [w, id = req.id, deviceId] {
1166 if (auto shared = w.lock()) {
1167 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001168 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001169 if (shared->connReadyCb_)
1170 shared->connReadyCb_(deviceId, "", nullptr);
1171 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1172 shared->infos_.erase({deviceId, id});
1173 }
1174 };
1175
1176 ice_config.tcpEnable = true;
1177 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1178 auto shared = w.lock();
1179 if (!shared)
1180 return;
1181 if (!ok) {
1182 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001183 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001184 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1185 return;
1186 }
1187
1188 dht::ThreadPool::io().run(
1189 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1190 auto shared = w.lock();
1191 if (!shared)
1192 return;
1193 if (!shared->onRequestStartIce(req))
1194 eraseInfo();
1195 });
1196 };
1197
1198 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1199 auto shared = w.lock();
1200 if (!shared)
1201 return;
1202 if (!ok) {
1203 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001204 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001205 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1206 return;
1207 }
1208
1209 dht::ThreadPool::io().run(
1210 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1211 if (auto shared = w.lock())
1212 if (!shared->onRequestOnNegoDone(req))
1213 eraseInfo();
1214 });
1215 };
1216
1217 // Negotiate a new ICE socket
1218 auto info = std::make_shared<ConnectionInfo>();
1219 {
1220 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1221 shared->infos_[{deviceId, req.id}] = info;
1222 }
1223 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001224 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001225 std::unique_lock<std::mutex> lk {info->mutex_};
1226 ice_config.streamsCount = 1;
1227 ice_config.compCountPerStream = 1; // TCP
1228 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001229 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001230 if (not info->ice_) {
1231 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001232 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001233 eraseInfo();
1234 return;
1235 }
1236 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1237 info->ice_->setOnShutdown([eraseInfo]() {
1238 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1239 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001240 try {
1241 info->ice_->initIceInstance(ice_config);
1242 } catch (const std::exception& e) {
1243 if (shared->config_->logger)
1244 shared->config_->logger->error("{}", e.what());
1245 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1246 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001247 });
1248}
1249
1250void
1251ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1252{
1253 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_));
1254 info->socket_->setOnReady(
1255 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1256 if (auto sthis = w.lock())
1257 if (sthis->connReadyCb_)
1258 sthis->connReadyCb_(deviceId, socket->name(), socket);
1259 });
1260 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1261 const uint16_t&,
1262 const std::string& name) {
1263 if (auto sthis = w.lock())
1264 if (sthis->channelReqCb_)
1265 return sthis->channelReqCb_(peer, name);
1266 return false;
1267 });
1268 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1269 // Cancel current outgoing connections
1270 dht::ThreadPool::io().run([w, deviceId, vid] {
1271 auto sthis = w.lock();
1272 if (!sthis)
1273 return;
1274
1275 std::set<CallbackId> ids;
1276 if (auto info = sthis->getInfo(deviceId, vid)) {
1277 std::lock_guard<std::mutex> lk(info->mutex_);
1278 if (info->socket_) {
1279 ids = std::move(info->cbIds_);
1280 info->socket_->shutdown();
1281 }
1282 }
1283 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001284 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001285
1286 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1287 sthis->infos_.erase({deviceId, vid});
1288 });
1289 });
1290}
1291
1292const std::shared_future<tls::DhParams>
1293ConnectionManager::Impl::dhParams() const
1294{
1295 return dht::ThreadPool::computation().get<tls::DhParams>(
1296 std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001297}
1298
1299template<typename ID = dht::Value::Id>
1300std::set<ID, std::less<>>
1301loadIdList(const std::string& path)
1302{
1303 std::set<ID, std::less<>> ids;
1304 std::ifstream file = fileutils::ifstream(path);
1305 if (!file.is_open()) {
1306 //JAMI_DBG("Could not load %s", path.c_str());
1307 return ids;
1308 }
1309 std::string line;
1310 while (std::getline(file, line)) {
1311 if constexpr (std::is_same<ID, std::string>::value) {
1312 ids.emplace(std::move(line));
1313 } else if constexpr (std::is_integral<ID>::value) {
1314 ID vid;
1315 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1316 ec == std::errc()) {
1317 ids.emplace(vid);
1318 }
1319 }
1320 }
1321 return ids;
1322}
1323
1324template<typename List = std::set<dht::Value::Id>>
1325void
1326saveIdList(const std::string& path, const List& ids)
1327{
1328 std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary);
1329 if (!file.is_open()) {
1330 //JAMI_ERR("Could not save to %s", path.c_str());
1331 return;
1332 }
1333 for (auto& c : ids)
1334 file << std::hex << c << "\n";
1335}
1336
1337void
1338ConnectionManager::Impl::loadTreatedMessages()
1339{
1340 std::lock_guard<std::mutex> lock(messageMutex_);
1341 auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages";
1342 treatedMessages_ = loadIdList<std::string>(path);
1343 if (treatedMessages_.empty()) {
1344 auto messages = loadIdList(path);
1345 for (const auto& m : messages)
1346 treatedMessages_.emplace(to_hex_string(m));
1347 }
1348}
1349
1350void
1351ConnectionManager::Impl::saveTreatedMessages() const
1352{
1353 dht::ThreadPool::io().run([w = weak()]() {
1354 if (auto sthis = w.lock()) {
1355 auto& this_ = *sthis;
1356 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1357 fileutils::check_dir(this_.config_->cachePath.c_str());
1358 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath
1359 + DIR_SEPARATOR_STR "treatedMessages",
1360 this_.treatedMessages_);
1361 }
1362 });
1363}
1364
1365bool
1366ConnectionManager::Impl::isMessageTreated(std::string_view id)
1367{
1368 std::lock_guard<std::mutex> lock(messageMutex_);
1369 auto res = treatedMessages_.emplace(id);
1370 if (res.second) {
1371 saveTreatedMessages();
1372 return false;
1373 }
1374 return true;
1375}
1376
1377/**
1378 * returns whether or not UPnP is enabled and active_
1379 * ie: if it is able to make port mappings
1380 */
1381bool
1382ConnectionManager::Impl::getUPnPActive() const
1383{
1384 return config_->getUPnPActive();
1385}
1386
1387IpAddr
1388ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1389{
1390 if (family == AF_INET)
1391 return publishedIp_[0];
1392 if (family == AF_INET6)
1393 return publishedIp_[1];
1394
1395 assert(family == AF_UNSPEC);
1396
1397 // If family is not set, prefere IPv4 if available. It's more
1398 // likely to succeed behind NAT.
1399 if (publishedIp_[0])
1400 return publishedIp_[0];
1401 if (publishedIp_[1])
1402 return publishedIp_[1];
1403 return {};
1404}
1405
1406void
1407ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1408{
1409 if (ip_addr.getFamily() == AF_INET) {
1410 publishedIp_[0] = ip_addr;
1411 } else {
1412 publishedIp_[1] = ip_addr;
1413 }
1414}
1415
1416void
1417ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1418{
1419 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1420 bool hasIpv4 {false}, hasIpv6 {false};
1421 for (auto& result : results) {
1422 auto family = result.getFamily();
1423 if (family == AF_INET) {
1424 if (not hasIpv4) {
1425 hasIpv4 = true;
1426 if (config_->logger)
1427 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1428 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1429 setPublishedAddress(*result.get());
1430 if (config_->upnpCtrl) {
1431 config_->upnpCtrl->setPublicAddress(*result.get());
1432 }
1433 }
1434 } else if (family == AF_INET6) {
1435 if (not hasIpv6) {
1436 hasIpv6 = true;
1437 if (config_->logger)
1438 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1439 setPublishedAddress(*result.get());
1440 }
1441 }
1442 if (hasIpv4 and hasIpv6)
1443 break;
1444 }
1445 if (cb)
1446 cb();
1447 });
1448}
1449
1450void
1451ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1452{
1453 storeActiveIpAddress([this, cb = std::move(cb)] {
1454 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1455 auto publishedAddr = getPublishedIpAddress();
1456
1457 if (publishedAddr) {
1458 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1459 publishedAddr.getFamily());
1460 if (interfaceAddr) {
1461 opts.accountLocalAddr = interfaceAddr;
1462 opts.accountPublicAddr = publishedAddr;
1463 }
1464 }
1465 if (cb)
1466 cb(std::move(opts));
1467 });
1468}
1469
1470IceTransportOptions
1471ConnectionManager::Impl::getIceOptions() const noexcept
1472{
1473 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001474 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001475 opts.upnpEnable = getUPnPActive();
Adrien Béraud7b869d92023-08-21 09:02:35 -04001476 opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001477
1478 if (config_->stunEnabled)
1479 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1480 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001481 if (config_->turnCache) {
1482 auto turnAddr = config_->turnCache->getResolvedTurn();
1483 if (turnAddr != std::nullopt) {
1484 opts.turnServers.emplace_back(TurnServerInfo()
1485 .setUri(turnAddr->toString())
1486 .setUsername(config_->turnServerUserName)
1487 .setPassword(config_->turnServerPwd)
1488 .setRealm(config_->turnServerRealm));
1489 }
1490 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001491 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001492 .setUri(config_->turnServer)
1493 .setUsername(config_->turnServerUserName)
1494 .setPassword(config_->turnServerPwd)
1495 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001496 }
1497 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1498 // co issues. So this needs some debug. for now just disable
1499 // if (cacheTurnV6 && *cacheTurnV6) {
1500 // opts.turnServers.emplace_back(TurnServerInfo()
1501 // .setUri(cacheTurnV6->toString(true))
1502 // .setUsername(turnServerUserName_)
1503 // .setPassword(turnServerPwd_)
1504 // .setRealm(turnServerRealm_));
1505 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001506 }
1507 return opts;
1508}
1509
1510bool
1511ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1512 dht::InfoHash& account_id,
1513 const std::shared_ptr<Logger>& logger)
1514{
1515 if (not crt)
1516 return false;
1517
1518 auto top_issuer = crt;
1519 while (top_issuer->issuer)
1520 top_issuer = top_issuer->issuer;
1521
1522 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001523 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001524 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001525 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001526 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001527 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001528
1529 // Check peer certificate chain
1530 // Trust store with top issuer as the only CA
1531 dht::crypto::TrustList peer_trust;
1532 peer_trust.add(*top_issuer);
1533 if (not peer_trust.verify(*crt)) {
1534 if (logger)
1535 logger->warn("Found invalid peer device: {}", crt->getLongId());
1536 return false;
1537 }
1538
1539 // Check cached OCSP response
1540 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1541 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001542 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001543 return false;
1544 }
1545
Adrien Béraudc631a832023-07-26 22:19:00 -04001546 account_id = crt->issuer->getId();
1547 if (logger)
1548 logger->warn("Found peer device: {} account:{} CA:{}",
1549 crt->getLongId(),
1550 account_id,
1551 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001552 return true;
1553}
1554
1555bool
1556ConnectionManager::Impl::findCertificate(
1557 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1558{
1559 if (auto cert = certStore().getCertificate(id.toString())) {
1560 if (cb)
1561 cb(cert);
1562 } else if (cb)
1563 cb(nullptr);
1564 return true;
1565}
1566
Sébastien Blin34086512023-07-25 09:52:14 -04001567bool
1568ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1569 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1570{
1571 if (auto cert = certStore().getCertificate(h.toString())) {
1572 if (cb)
1573 cb(cert);
1574 } else {
1575 dht()->findCertificate(h,
1576 [cb = std::move(cb), this](
1577 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1578 if (crt)
1579 certStore().pinCertificate(crt);
1580 if (cb)
1581 cb(crt);
1582 });
1583 }
1584 return true;
1585}
1586
Adrien Béraud612b55b2023-05-29 10:42:04 -04001587ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1588 : pimpl_ {std::make_shared<Impl>(config_)}
1589{}
1590
1591ConnectionManager::~ConnectionManager()
1592{
1593 if (pimpl_)
1594 pimpl_->shutdown();
1595}
1596
1597void
1598ConnectionManager::connectDevice(const DeviceId& deviceId,
1599 const std::string& name,
1600 ConnectCallback cb,
1601 bool noNewSocket,
1602 bool forceNewSocket,
1603 const std::string& connType)
1604{
1605 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1606}
1607
1608void
Amna0cf544d2023-07-25 14:25:09 -04001609ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1610 const std::string& name,
1611 ConnectCallbackLegacy cb,
1612 bool noNewSocket,
1613 bool forceNewSocket,
1614 const std::string& connType)
1615{
1616 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1617}
1618
1619
1620void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001621ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1622 const std::string& name,
1623 ConnectCallback cb,
1624 bool noNewSocket,
1625 bool forceNewSocket,
1626 const std::string& connType)
1627{
1628 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1629}
1630
1631bool
1632ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1633{
Adrien Béraud665294f2023-06-13 18:09:11 -04001634 auto pending = pimpl_->getPendingIds(deviceId);
1635 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001636 != pending.end();
1637}
1638
1639void
1640ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1641{
1642 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1643 std::set<DeviceId> peersDevices;
1644 {
1645 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1646 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1647 auto const& [key, value] = *iter;
1648 auto deviceId = key.first;
1649 auto cert = pimpl_->certStore().getCertificate(deviceId.toString());
1650 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1651 connInfos.emplace_back(value);
1652 peersDevices.emplace(deviceId);
1653 iter = pimpl_->infos_.erase(iter);
1654 } else {
1655 iter++;
1656 }
1657 }
1658 }
1659 // Stop connections to all peers devices
1660 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001661 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001662 // This will close the TLS Session
1663 pimpl_->removeUnusedConnections(deviceId);
1664 }
1665 for (auto& info : connInfos) {
1666 if (info->socket_)
1667 info->socket_->shutdown();
1668 if (info->waitForAnswer_)
1669 info->waitForAnswer_->cancel();
1670 if (info->ice_) {
1671 std::unique_lock<std::mutex> lk {info->mutex_};
1672 dht::ThreadPool::io().run(
1673 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1674 }
1675 }
1676}
1677
1678void
1679ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1680{
1681 pimpl_->onDhtConnected(devicePk);
1682}
1683
1684void
1685ConnectionManager::onICERequest(onICERequestCallback&& cb)
1686{
1687 pimpl_->iceReqCb_ = std::move(cb);
1688}
1689
1690void
1691ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1692{
1693 pimpl_->channelReqCb_ = std::move(cb);
1694}
1695
1696void
1697ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1698{
1699 pimpl_->connReadyCb_ = std::move(cb);
1700}
1701
1702void
1703ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1704{
1705 pimpl_->iOSConnectedCb_ = std::move(cb);
1706}
1707
1708std::size_t
1709ConnectionManager::activeSockets() const
1710{
1711 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1712 return pimpl_->infos_.size();
1713}
1714
1715void
1716ConnectionManager::monitor() const
1717{
1718 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1719 auto logger = pimpl_->config_->logger;
1720 if (!logger)
1721 return;
1722 logger->debug("ConnectionManager current status:");
1723 for (const auto& [_, ci] : pimpl_->infos_) {
1724 if (ci->socket_)
1725 ci->socket_->monitor();
1726 }
1727 logger->debug("ConnectionManager end status.");
1728}
1729
1730void
1731ConnectionManager::connectivityChanged()
1732{
1733 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1734 for (const auto& [_, ci] : pimpl_->infos_) {
1735 if (ci->socket_)
1736 ci->socket_->sendBeacon();
1737 }
1738}
1739
1740void
1741ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1742{
1743 return pimpl_->getIceOptions(std::move(cb));
1744}
1745
1746IceTransportOptions
1747ConnectionManager::getIceOptions() const noexcept
1748{
1749 return pimpl_->getIceOptions();
1750}
1751
1752IpAddr
1753ConnectionManager::getPublishedIpAddress(uint16_t family) const
1754{
1755 return pimpl_->getPublishedIpAddress(family);
1756}
1757
1758void
1759ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1760{
1761 return pimpl_->setPublishedAddress(ip_addr);
1762}
1763
1764void
1765ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1766{
1767 return pimpl_->storeActiveIpAddress(std::move(cb));
1768}
1769
1770std::shared_ptr<ConnectionManager::Config>
1771ConnectionManager::getConfig()
1772{
1773 return pimpl_->config_;
1774}
1775
Amna31791e52023-08-03 12:40:57 -04001776std::vector<std::map<std::string, std::string>>
1777ConnectionManager::getConnectionList(const DeviceId& device) const
1778{
1779 std::vector<std::map<std::string, std::string>> connectionsList;
1780 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1781
1782 for (const auto& [key, ci] : pimpl_->infos_) {
1783 if (device && key.first != device)
1784 continue;
1785 std::map<std::string, std::string> connectionInfo;
1786 connectionInfo["id"] = callbackIdToString(key.first, key.second);
Amna82420202023-08-15 16:27:18 -04001787 connectionInfo["device"] = key.first.toString();
Amna6c999d82023-08-15 15:19:41 -04001788 if (ci->tls_) {
1789 if (auto cert = ci->tls_->peerCertificate()) {
1790 connectionInfo["peer"] = cert->issuer->getId().toString();
1791 }
Amna31791e52023-08-03 12:40:57 -04001792 }
1793 if (ci->socket_) {
1794 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
1795 } else if (ci->tls_) {
1796 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
1797 } else if(ci->ice_)
1798 {
1799 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
1800 }
1801 if (ci->tls_) {
1802 std::string remoteAddress = ci->tls_->getRemoteAddress();
1803 std::string remoteAddressIp = remoteAddress.substr(0, remoteAddress.find(':'));
1804 std::string remoteAddressPort = remoteAddress.substr(remoteAddress.find(':') + 1);
1805 connectionInfo["remoteAdress"] = remoteAddressIp;
1806 connectionInfo["remotePort"] = remoteAddressPort;
1807 }
1808 connectionsList.emplace_back(std::move(connectionInfo));
1809 }
1810
1811 if (device) {
1812 auto it = pimpl_->pendingOperations_.find(device);
1813 if (it != pimpl_->pendingOperations_.end()) {
1814 const auto& po = it->second;
1815 for (const auto& [vid, ci] : po.connecting) {
1816 std::map<std::string, std::string> connectionInfo;
1817 connectionInfo["id"] = callbackIdToString(device, vid);
1818 connectionInfo["deviceId"] = vid;
1819 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1820 connectionsList.emplace_back(std::move(connectionInfo));
1821 }
1822
1823 for (const auto& [vid, ci] : po.waiting) {
1824 std::map<std::string, std::string> connectionInfo;
1825 connectionInfo["id"] = callbackIdToString(device, vid);
1826 connectionInfo["deviceId"] = vid;
1827 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1828 connectionsList.emplace_back(std::move(connectionInfo));
1829 }
1830 }
1831 }
1832 else {
1833 for (const auto& [key, po] : pimpl_->pendingOperations_) {
1834 for (const auto& [vid, ci] : po.connecting) {
1835 std::map<std::string, std::string> connectionInfo;
1836 connectionInfo["id"] = callbackIdToString(device, vid);
1837 connectionInfo["deviceId"] = vid;
1838 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1839 connectionsList.emplace_back(std::move(connectionInfo));
1840 }
1841
1842 for (const auto& [vid, ci] : po.waiting) {
1843 std::map<std::string, std::string> connectionInfo;
1844 connectionInfo["id"] = callbackIdToString(device, vid);
1845 connectionInfo["deviceId"] = vid;
1846 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1847 connectionsList.emplace_back(std::move(connectionInfo));
1848 }
1849 }
1850 }
1851 return connectionsList;
1852}
1853
1854std::vector<std::map<std::string, std::string>>
1855ConnectionManager::getChannelList(const std::string& connectionId) const
1856{
1857 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1858 CallbackId cbid = parseCallbackId(connectionId);
1859 if (pimpl_->infos_.count(cbid) > 0) {
1860 return pimpl_->infos_[cbid]->socket_->getChannelList();
1861 } else {
1862 return {};
1863 }
1864}
1865
Sébastien Blin464bdff2023-07-19 08:02:53 -04001866} // namespace dhtnet