blob: 751b798a006728673532867e753d4b182bda141f [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Amna31791e52023-08-03 12:40:57 -040044std::string
45callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
46{
47 return fmt::format("{} {}", did.to_view(), vid);
48}
Adrien Béraud612b55b2023-05-29 10:42:04 -040049
Amna31791e52023-08-03 12:40:57 -040050CallbackId parseCallbackId(std::string_view ci)
51{
52 auto sep = ci.find(' ');
53 std::string_view deviceIdString = ci.substr(0, sep);
54 std::string_view vidString = ci.substr(sep + 1);
55
56 dhtnet::DeviceId deviceId(deviceIdString);
57 dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);
58
59 return CallbackId(deviceId, vid);
60}
Amna81221ad2023-09-14 17:33:26 -040061
62std::shared_ptr<ConnectionManager::Config>
63createConfig(std::shared_ptr<ConnectionManager::Config> config_)
64{
65 if (!config_->certStore){
66 config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger);
67 }
68 if (!config_->dht) {
69 dht::DhtRunner::Config dhtConfig;
70 dhtConfig.dht_config.id = config_->id;
71 dhtConfig.threaded = true;
72 dht::DhtRunner::Context dhtContext;
73 dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) {
74 std::vector<std::shared_ptr<dht::crypto::Certificate>> ret;
75 if (auto cert = c->getCertificate(pk_id.toString()))
76 ret.emplace_back(std::move(cert));
77 return ret;
78 };
79 config_->dht = std::make_shared<dht::DhtRunner>();
80 config_->dht->run(dhtConfig, std::move(dhtContext));
81 config_->dht->bootstrap("bootstrap.jami.net");
82 }
83 if (!config_->factory){
84 config_->factory = std::make_shared<IceTransportFactory>(config_->logger);
85 }
86 return config_;
87}
88
Adrien Béraud612b55b2023-05-29 10:42:04 -040089struct ConnectionInfo
90{
91 ~ConnectionInfo()
92 {
93 if (socket_)
94 socket_->join();
95 }
96
97 std::mutex mutex_ {};
98 bool responseReceived_ {false};
99 PeerConnectionRequest response_ {};
100 std::unique_ptr<IceTransport> ice_ {nullptr};
101 // Used to store currently non ready TLS Socket
102 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
103 std::shared_ptr<MultiplexedSocket> socket_ {};
104 std::set<CallbackId> cbIds_ {};
105
106 std::function<void(bool)> onConnected_;
107 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
108};
109
110/**
111 * returns whether or not UPnP is enabled and active_
112 * ie: if it is able to make port mappings
113 */
114bool
115ConnectionManager::Config::getUPnPActive() const
116{
117 if (upnpCtrl)
118 return upnpCtrl->isReady();
119 return false;
120}
121
122class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
123{
124public:
125 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
Amna81221ad2023-09-14 17:33:26 -0400126 : config_ {std::move(createConfig(config_))}
Sébastien Blincf569402023-07-27 09:46:40 -0400127 , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400128 {
Amna81221ad2023-09-14 17:33:26 -0400129 if(!config_->ioContext) {
130 config_->ioContext = std::make_shared<asio::io_context>();
131 ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() {
132 try {
133 auto work = asio::make_work_guard(*context);
134 context->run();
135 } catch (const std::exception& ex) {
136 if (l) l->error("Exception: {}", ex.what());
137 }
138 });
139 }
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400140 }
Amna81221ad2023-09-14 17:33:26 -0400141 ~Impl() {
142 if (ioContextRunner_) {
143 if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread");
144 config_->ioContext->stop();
145 ioContextRunner_->join();
146 ioContextRunner_.reset();
147 }
148 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400149
150 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
151 const dht::crypto::Identity& identity() const { return config_->id; }
152
153 void removeUnusedConnections(const DeviceId& deviceId = {})
154 {
155 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
156
157 {
158 std::lock_guard<std::mutex> lk(infosMtx_);
159 for (auto it = infos_.begin(); it != infos_.end();) {
160 auto& [key, info] = *it;
161 if (info && (!deviceId || key.first == deviceId)) {
162 unused.emplace_back(std::move(info));
163 it = infos_.erase(it);
164 } else {
165 ++it;
166 }
167 }
168 }
169 for (auto& info: unused) {
170 if (info->tls_)
171 info->tls_->shutdown();
172 if (info->socket_)
173 info->socket_->shutdown();
174 if (info->waitForAnswer_)
175 info->waitForAnswer_->cancel();
176 }
177 if (!unused.empty())
Amna81221ad2023-09-14 17:33:26 -0400178 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable {
179 infos.clear();
180 });
Adrien Béraud612b55b2023-05-29 10:42:04 -0400181 }
182
183 void shutdown()
184 {
185 if (isDestroying_.exchange(true))
186 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400187 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400188 {
189 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400190 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400191 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400192 for (auto& [deviceId, pcbs] : po) {
193 for (auto& [id, pending] : pcbs.connecting)
194 pending.cb(nullptr, deviceId);
195 for (auto& [id, pending] : pcbs.waiting)
196 pending.cb(nullptr, deviceId);
197 }
198
Adrien Béraud612b55b2023-05-29 10:42:04 -0400199 removeUnusedConnections();
200 }
201
Adrien Béraud612b55b2023-05-29 10:42:04 -0400202 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
203 const dht::Value::Id& vid,
204 const std::string& connType,
205 std::function<void(bool)> onConnected);
206 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
207 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
208 const std::string& name,
209 const dht::Value::Id& vid,
210 const std::shared_ptr<dht::crypto::Certificate>& cert);
211 void connectDevice(const DeviceId& deviceId,
212 const std::string& uri,
213 ConnectCallback cb,
214 bool noNewSocket = false,
215 bool forceNewSocket = false,
216 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400217 void connectDevice(const dht::InfoHash& deviceId,
218 const std::string& uri,
219 ConnectCallbackLegacy cb,
220 bool noNewSocket = false,
221 bool forceNewSocket = false,
222 const std::string& connType = "");
223
Adrien Béraud612b55b2023-05-29 10:42:04 -0400224 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
225 const std::string& name,
226 ConnectCallback cb,
227 bool noNewSocket = false,
228 bool forceNewSocket = false,
229 const std::string& connType = "");
230 /**
231 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
232 * @param sock socket used to send the request
233 * @param name channel's name
234 * @param vid channel's id
235 * @param deviceId to identify the linked ConnectCallback
236 */
237 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
238 const std::string& name,
239 const DeviceId& deviceId,
240 const dht::Value::Id& vid);
241 /**
242 * Triggered when a PeerConnectionRequest comes from the DHT
243 */
244 void answerTo(IceTransport& ice,
245 const dht::Value::Id& id,
246 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
247 bool onRequestStartIce(const PeerConnectionRequest& req);
248 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
249 void onDhtPeerRequest(const PeerConnectionRequest& req,
250 const std::shared_ptr<dht::crypto::Certificate>& cert);
251
252 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
253 void onPeerResponse(const PeerConnectionRequest& req);
254 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
255
256 const std::shared_future<tls::DhParams> dhParams() const;
257 tls::CertificateStore& certStore() const { return *config_->certStore; }
258
259 mutable std::mutex messageMutex_ {};
260 std::set<std::string, std::less<>> treatedMessages_ {};
261
262 void loadTreatedMessages();
263 void saveTreatedMessages() const;
264
265 /// \return true if the given DHT message identifier has been treated
266 /// \note if message has not been treated yet this method st/ore this id and returns true at
267 /// further calls
268 bool isMessageTreated(std::string_view id);
269
270 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
271
272 /**
273 * Published IPv4/IPv6 addresses, used only if defined by the user in account
274 * configuration
275 *
276 */
277 IpAddr publishedIp_[2] {};
278
Adrien Béraud612b55b2023-05-29 10:42:04 -0400279 /**
280 * interface name on which this account is bound
281 */
282 std::string interface_ {"default"};
283
284 /**
285 * Get the local interface name on which this account is bound.
286 */
287 const std::string& getLocalInterface() const { return interface_; }
288
289 /**
290 * Get the published IP address, fallbacks to NAT if family is unspecified
291 * Prefers the usage of IPv4 if possible.
292 */
293 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
294
295 /**
296 * Set published IP address according to given family
297 */
298 void setPublishedAddress(const IpAddr& ip_addr);
299
300 /**
301 * Store the local/public addresses used to register
302 */
303 void storeActiveIpAddress(std::function<void()>&& cb = {});
304
305 /**
306 * Create and return ICE options.
307 */
308 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
309 IceTransportOptions getIceOptions() const noexcept;
310
311 /**
312 * Inform that a potential peer device have been found.
313 * Returns true only if the device certificate is a valid device certificate.
314 * In that case (true is returned) the account_id parameter is set to the peer account ID.
315 */
316 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
317 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
318
319 bool findCertificate(const dht::PkId& id,
320 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400321 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400322
323 /**
324 * returns whether or not UPnP is enabled and active
325 * ie: if it is able to make port mappings
326 */
327 bool getUPnPActive() const;
328
329 /**
330 * Triggered when a new TLS socket is ready to use
331 * @param ok If succeed
332 * @param deviceId Related device
333 * @param vid vid of the connection request
334 * @param name non empty if TLS was created by connectDevice()
335 */
336 void onTlsNegotiationDone(bool ok,
337 const DeviceId& deviceId,
338 const dht::Value::Id& vid,
339 const std::string& name = "");
340
341 std::shared_ptr<ConnectionManager::Config> config_;
Amna81221ad2023-09-14 17:33:26 -0400342 std::unique_ptr<std::thread> ioContextRunner_;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400343
Adrien Béraud612b55b2023-05-29 10:42:04 -0400344 mutable std::mt19937_64 rand;
345
346 iOSConnectedCallback iOSConnectedCb_ {};
347
348 std::mutex infosMtx_ {};
349 // Note: Someone can ask multiple sockets, so to avoid any race condition,
350 // each device can have multiple multiplexed sockets.
351 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
352
353 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
354 {
355 std::lock_guard<std::mutex> lk(infosMtx_);
356 auto it = infos_.find({deviceId, id});
357 if (it != infos_.end())
358 return it->second;
359 return {};
360 }
361
362 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
363 {
364 std::lock_guard<std::mutex> lk(infosMtx_);
365 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
366 auto& [key, value] = item;
367 return key.first == deviceId && value && value->socket_;
368 });
369 if (it != infos_.end())
370 return it->second;
371 return {};
372 }
373
374 ChannelRequestCallback channelReqCb_ {};
375 ConnectionReadyCallback connReadyCb_ {};
376 onICERequestCallback iceReqCb_ {};
377
378 /**
379 * Stores callback from connectDevice
380 * @note: each device needs a vector because several connectDevice can
381 * be done in parallel and we only want one socket
382 */
383 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400384
Amna81221ad2023-09-14 17:33:26 -0400385
Adrien Béraud665294f2023-06-13 18:09:11 -0400386 struct PendingCb
387 {
388 std::string name;
389 ConnectCallback cb;
390 };
391 struct PendingOperations {
392 std::map<dht::Value::Id, PendingCb> connecting;
393 std::map<dht::Value::Id, PendingCb> waiting;
394 };
395
396 std::map<DeviceId, PendingOperations> pendingOperations_ {};
397
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400398 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400399 {
400 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400401 std::unique_lock<std::mutex> lk(connectCbsMtx_);
402 auto it = pendingOperations_.find(deviceId);
403 if (it == pendingOperations_.end())
404 return;
405 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400406 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400407 // Extract all pending callbacks
408 for (auto& [vid, cb] : pendingOperations.connecting)
409 ret.emplace_back(std::move(cb));
410 pendingOperations.connecting.clear();
411 for (auto& [vid, cb] : pendingOperations.waiting)
412 ret.emplace_back(std::move(cb));
413 pendingOperations.waiting.clear();
414 } else if (auto n = pendingOperations.waiting.extract(vid)) {
415 // If it's a waiting operation, just move it
416 ret.emplace_back(std::move(n.mapped()));
417 } else if (auto n = pendingOperations.connecting.extract(vid)) {
418 ret.emplace_back(std::move(n.mapped()));
419 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400420 // If accepted is false, it means that underlying socket is ok, but channel is declined
421 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400422 for (auto& [vid, cb] : pendingOperations.waiting)
423 ret.emplace_back(std::move(cb));
424 pendingOperations.waiting.clear();
425 for (auto& [vid, cb] : pendingOperations.connecting)
426 ret.emplace_back(std::move(cb));
427 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400428 }
429 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400430 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
431 pendingOperations_.erase(it);
432 lk.unlock();
433 for (auto& cb : ret)
434 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400435 }
436
Adrien Béraud665294f2023-06-13 18:09:11 -0400437 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400438 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400439 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400440 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400441 auto it = pendingOperations_.find(deviceId);
442 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400443 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400444 auto& pendingOp = it->second;
445 for (const auto& [id, pc]: pendingOp.connecting) {
446 if (vid == 0 || id == vid)
447 ret[id] = pc.name;
448 }
449 for (const auto& [id, pc]: pendingOp.waiting) {
450 if (vid == 0 || id == vid)
451 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400452 }
453 return ret;
454 }
455
456 std::shared_ptr<ConnectionManager::Impl> shared()
457 {
458 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
459 }
460 std::shared_ptr<ConnectionManager::Impl const> shared() const
461 {
462 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
463 }
464 std::weak_ptr<ConnectionManager::Impl> weak()
465 {
466 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
467 }
468 std::weak_ptr<ConnectionManager::Impl const> weak() const
469 {
470 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
471 }
472
473 std::atomic_bool isDestroying_ {false};
474};
475
476void
477ConnectionManager::Impl::connectDeviceStartIce(
478 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
479 const dht::Value::Id& vid,
480 const std::string& connType,
481 std::function<void(bool)> onConnected)
482{
483 auto deviceId = devicePk->getLongId();
484 auto info = getInfo(deviceId, vid);
485 if (!info) {
486 onConnected(false);
487 return;
488 }
489
490 std::unique_lock<std::mutex> lk(info->mutex_);
491 auto& ice = info->ice_;
492
493 if (!ice) {
494 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400495 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400496 onConnected(false);
497 return;
498 }
499
500 auto iceAttributes = ice->getLocalAttributes();
501 std::ostringstream icemsg;
502 icemsg << iceAttributes.ufrag << "\n";
503 icemsg << iceAttributes.pwd << "\n";
504 for (const auto& addr : ice->getLocalCandidates(1)) {
505 icemsg << addr << "\n";
506 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400507 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400508 }
509
510 // Prepare connection request as a DHT message
511 PeerConnectionRequest val;
512
513 val.id = vid; /* Random id for the message unicity */
514 val.ice_msg = icemsg.str();
515 val.connType = connType;
516
517 auto value = std::make_shared<dht::Value>(std::move(val));
518 value->user_type = "peer_request";
519
520 // Send connection request through DHT
521 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400522 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400523 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
524 + devicePk->getId().toString()),
525 devicePk,
526 value,
527 [l=config_->logger,deviceId](bool ok) {
528 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400529 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400530 deviceId,
531 (ok ? "ok" : "failed"));
532 });
533 // Wait for call to onResponse() operated by DHT
534 if (isDestroying_) {
535 onConnected(true); // This avoid to wait new negotiation when destroying
536 return;
537 }
538
539 info->onConnected_ = std::move(onConnected);
540 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
541 std::chrono::steady_clock::now()
542 + DHT_MSG_TIMEOUT);
543 info->waitForAnswer_->async_wait(
544 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
545}
546
547void
548ConnectionManager::Impl::onResponse(const asio::error_code& ec,
549 const DeviceId& deviceId,
550 const dht::Value::Id& vid)
551{
552 if (ec == asio::error::operation_aborted)
553 return;
554 auto info = getInfo(deviceId, vid);
555 if (!info)
556 return;
557
558 std::unique_lock<std::mutex> lk(info->mutex_);
559 auto& ice = info->ice_;
560 if (isDestroying_) {
561 info->onConnected_(true); // The destructor can wake a pending wait here.
562 return;
563 }
564 if (!info->responseReceived_) {
565 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400566 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400567 info->onConnected_(false);
568 return;
569 }
570
571 if (!info->ice_) {
572 info->onConnected_(false);
573 return;
574 }
575
576 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
577
578 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
579 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400580 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400581 info->onConnected_(false);
582 return;
583 }
584 info->onConnected_(true);
585}
586
587bool
588ConnectionManager::Impl::connectDeviceOnNegoDone(
589 const DeviceId& deviceId,
590 const std::string& name,
591 const dht::Value::Id& vid,
592 const std::shared_ptr<dht::crypto::Certificate>& cert)
593{
594 auto info = getInfo(deviceId, vid);
595 if (!info)
596 return false;
597
598 std::unique_lock<std::mutex> lk {info->mutex_};
599 if (info->waitForAnswer_) {
600 // Negotiation is done and connected, go to handshake
601 // and avoid any cancellation at this point.
602 info->waitForAnswer_->cancel();
603 }
604 auto& ice = info->ice_;
605 if (!ice || !ice->isRunning()) {
606 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400607 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400608 return false;
609 }
610
611 // Build socket
612 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
613 std::move(ice)),
614 true);
615
616 // Negotiate a TLS session
617 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400618 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400619 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
620 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400621 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400622 identity(),
623 dhParams(),
624 *cert);
625
626 info->tls_->setOnReady(
627 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
628 bool ok) {
629 if (auto shared = w.lock())
630 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
631 });
632 return true;
633}
634
635void
636ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
637 const std::string& name,
638 ConnectCallback cb,
639 bool noNewSocket,
640 bool forceNewSocket,
641 const std::string& connType)
642{
643 if (!dht()) {
644 cb(nullptr, deviceId);
645 return;
646 }
647 if (deviceId.toString() == identity().second->getLongId().toString()) {
648 cb(nullptr, deviceId);
649 return;
650 }
651 findCertificate(deviceId,
652 [w = weak(),
653 deviceId,
654 name,
655 cb = std::move(cb),
656 noNewSocket,
657 forceNewSocket,
658 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
659 if (!cert) {
660 if (auto shared = w.lock())
661 if (shared->config_->logger)
662 shared->config_->logger->error(
663 "No valid certificate found for device {}",
664 deviceId);
665 cb(nullptr, deviceId);
666 return;
667 }
668 if (auto shared = w.lock()) {
669 shared->connectDevice(cert,
670 name,
671 std::move(cb),
672 noNewSocket,
673 forceNewSocket,
674 connType);
675 } else
676 cb(nullptr, deviceId);
677 });
678}
679
680void
Amna0cf544d2023-07-25 14:25:09 -0400681ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
682 const std::string& name,
683 ConnectCallbackLegacy cb,
684 bool noNewSocket,
685 bool forceNewSocket,
686 const std::string& connType)
687{
688 if (!dht()) {
689 cb(nullptr, deviceId);
690 return;
691 }
692 if (deviceId.toString() == identity().second->getLongId().toString()) {
693 cb(nullptr, deviceId);
694 return;
695 }
696 findCertificate(deviceId,
697 [w = weak(),
698 deviceId,
699 name,
700 cb = std::move(cb),
701 noNewSocket,
702 forceNewSocket,
703 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
704 if (!cert) {
705 if (auto shared = w.lock())
706 if (shared->config_->logger)
707 shared->config_->logger->error(
708 "No valid certificate found for device {}",
709 deviceId);
710 cb(nullptr, deviceId);
711 return;
712 }
713 if (auto shared = w.lock()) {
714 shared->connectDevice(cert,
715 name,
Adrien Béraudd78d1ac2023-08-25 10:43:33 -0400716 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){
Amna0cf544d2023-07-25 14:25:09 -0400717 cb(sock, deviceId);
718 },
719 noNewSocket,
720 forceNewSocket,
721 connType);
722 } else
723 cb(nullptr, deviceId);
724 });
725}
726
727void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400728ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
729 const std::string& name,
730 ConnectCallback cb,
731 bool noNewSocket,
732 bool forceNewSocket,
733 const std::string& connType)
734{
735 // Avoid dht operation in a DHT callback to avoid deadlocks
736 dht::ThreadPool::computation().run([w = weak(),
737 name = std::move(name),
738 cert = std::move(cert),
739 cb = std::move(cb),
740 noNewSocket,
741 forceNewSocket,
742 connType] {
743 auto devicePk = cert->getSharedPublicKey();
744 auto deviceId = devicePk->getLongId();
745 auto sthis = w.lock();
746 if (!sthis || sthis->isDestroying_) {
747 cb(nullptr, deviceId);
748 return;
749 }
Adrien Béraud26365c92023-09-23 23:42:43 -0400750 dht::Value::Id vid;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400751 auto isConnectingToDevice = false;
752 {
753 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud26365c92023-09-23 23:42:43 -0400754 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud665294f2023-06-13 18:09:11 -0400755 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
756 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400757 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400758 while (pendings.connecting.find(vid) != pendings.connecting.end()
Adrien Béraud26365c92023-09-23 23:42:43 -0400759 || pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400760 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400761 }
762 }
763 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400764 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400765 // Save current request for sendChannelRequest.
766 // Note: do not return here, cause we can be in a state where first
767 // socket is negotiated and first channel is pending
768 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400769 if (isConnectingToDevice && !forceNewSocket)
770 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400771 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400772 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400773 }
774
775 // Check if already negotiated
776 CallbackId cbId(deviceId, vid);
777 if (auto info = sthis->getConnectedInfo(deviceId)) {
778 std::lock_guard<std::mutex> lk(info->mutex_);
779 if (info->socket_) {
780 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400781 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400782 info->cbIds_.emplace(cbId);
783 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
784 return;
785 }
786 }
787
788 if (isConnectingToDevice && !forceNewSocket) {
789 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400790 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400791 return;
792 }
793 if (noNewSocket) {
794 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400795 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400796 return;
797 }
798
799 // Note: used when the ice negotiation fails to erase
800 // all stored structures.
801 auto eraseInfo = [w, cbId] {
802 if (auto shared = w.lock()) {
803 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400804 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400805 std::lock_guard<std::mutex> lk(shared->infosMtx_);
806 shared->infos_.erase(cbId);
807 }
808 };
809
810 // If no socket exists, we need to initiate an ICE connection.
811 sthis->getIceOptions([w,
812 deviceId = std::move(deviceId),
813 devicePk = std::move(devicePk),
814 name = std::move(name),
815 cert = std::move(cert),
816 vid,
817 connType,
818 eraseInfo](auto&& ice_config) {
819 auto sthis = w.lock();
820 if (!sthis) {
821 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
822 return;
823 }
824 ice_config.tcpEnable = true;
825 ice_config.onInitDone = [w,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400826 devicePk = std::move(devicePk),
827 name = std::move(name),
828 cert = std::move(cert),
829 vid,
830 connType,
831 eraseInfo](bool ok) {
832 dht::ThreadPool::io().run([w = std::move(w),
833 devicePk = std::move(devicePk),
834 vid = std::move(vid),
835 eraseInfo,
836 connType, ok] {
837 auto sthis = w.lock();
838 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400839 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400840 if (!sthis || !ok) {
841 eraseInfo();
842 return;
843 }
844 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
845 if (!ok) {
846 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
847 }
848 });
849 });
850 };
851 ice_config.onNegoDone = [w,
852 deviceId,
853 name,
854 cert = std::move(cert),
855 vid,
856 eraseInfo](bool ok) {
857 dht::ThreadPool::io().run([w = std::move(w),
858 deviceId = std::move(deviceId),
859 name = std::move(name),
860 cert = std::move(cert),
861 vid = std::move(vid),
862 eraseInfo = std::move(eraseInfo),
863 ok] {
864 auto sthis = w.lock();
865 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400866 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400867 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
868 eraseInfo();
869 });
870 };
871
872 auto info = std::make_shared<ConnectionInfo>();
873 {
874 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
875 sthis->infos_[{deviceId, vid}] = info;
876 }
877 std::unique_lock<std::mutex> lk {info->mutex_};
878 ice_config.master = false;
879 ice_config.streamsCount = 1;
880 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400881 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400882 if (!info->ice_) {
883 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400884 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400885 eraseInfo();
886 return;
887 }
888 // We need to detect any shutdown if the ice session is destroyed before going to the
889 // TLS session;
890 info->ice_->setOnShutdown([eraseInfo]() {
891 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
892 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400893 try {
894 info->ice_->initIceInstance(ice_config);
895 } catch (const std::exception& e) {
896 if (sthis->config_->logger)
897 sthis->config_->logger->error("{}", e.what());
898 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
899 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400900 });
901 });
902}
903
904void
905ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
906 const std::string& name,
907 const DeviceId& deviceId,
908 const dht::Value::Id& vid)
909{
910 auto channelSock = sock->addChannel(name);
911 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
912 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400913 if (auto shared = w.lock())
914 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400915 });
916 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400917 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400918 auto shared = w.lock();
919 auto channelSock = wSock.lock();
920 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400921 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400922 });
923
924 ChannelRequest val;
925 val.name = channelSock->name();
926 val.state = ChannelRequestState::REQUEST;
927 val.channel = channelSock->channel();
928 msgpack::sbuffer buffer(256);
929 msgpack::pack(buffer, val);
930
931 std::error_code ec;
932 int res = sock->write(CONTROL_CHANNEL,
933 reinterpret_cast<const uint8_t*>(buffer.data()),
934 buffer.size(),
935 ec);
936 if (res < 0) {
937 // TODO check if we should handle errors here
938 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400939 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400940 }
941}
942
943void
944ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
945{
946 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400947 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400948 if (config_->logger)
949 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400950 std::lock_guard<std::mutex> lk {info->mutex_};
951 info->responseReceived_ = true;
952 info->response_ = std::move(req);
953 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
954 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
955 this,
956 std::placeholders::_1,
957 device,
958 req.id));
959 } else {
960 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400961 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400962 }
963}
964
965void
966ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
967{
968 if (!dht())
969 return;
970 dht()->listen<PeerConnectionRequest>(
971 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
972 [w = weak()](PeerConnectionRequest&& req) {
973 auto shared = w.lock();
974 if (!shared)
975 return false;
976 if (shared->isMessageTreated(to_hex_string(req.id))) {
977 // Message already treated. Just ignore
978 return true;
979 }
980 if (req.isAnswer) {
981 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400982 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400983 } else {
984 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400985 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400986 }
987 if (req.isAnswer) {
988 shared->onPeerResponse(req);
989 } else {
990 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400991 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400992 req.from,
993 [w, req = std::move(req)](
994 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
995 auto shared = w.lock();
996 if (!shared)
997 return;
998 dht::InfoHash peer_h;
999 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
1000#if TARGET_OS_IOS
1001 if (shared->iOSConnectedCb_(req.connType, peer_h))
1002 return;
1003#endif
1004 shared->onDhtPeerRequest(req, cert);
1005 } else {
1006 if (shared->config_->logger)
1007 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -04001008 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001009 req.owner->getLongId());
1010 }
1011 });
1012 }
1013
1014 return true;
1015 },
1016 dht::Value::UserTypeFilter("peer_request"));
1017}
1018
1019void
1020ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
1021 const DeviceId& deviceId,
1022 const dht::Value::Id& vid,
1023 const std::string& name)
1024{
1025 if (isDestroying_)
1026 return;
1027 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
1028 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
1029 // asked yet)
1030 auto isDhtRequest = name.empty();
1031 if (!ok) {
1032 if (isDhtRequest) {
1033 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001034 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001035 deviceId,
1036 name,
1037 vid);
1038 if (connReadyCb_)
1039 connReadyCb_(deviceId, "", nullptr);
1040 } else {
1041 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001042 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001043 deviceId,
1044 name,
1045 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -04001046 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001047 }
1048 } else {
1049 // The socket is ready, store it
1050 if (isDhtRequest) {
1051 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001052 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001053 deviceId,
1054 vid);
1055 } else {
1056 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001057 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001058 deviceId,
1059 name,
1060 vid);
1061 }
1062
1063 auto info = getInfo(deviceId, vid);
1064 addNewMultiplexedSocket({deviceId, vid}, info);
1065 // Finally, open the channel and launch pending callbacks
1066 if (info->socket_) {
1067 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001068 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001069 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001070 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001071 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001072 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001073 }
1074 }
1075 }
1076}
1077
1078void
1079ConnectionManager::Impl::answerTo(IceTransport& ice,
1080 const dht::Value::Id& id,
1081 const std::shared_ptr<dht::crypto::PublicKey>& from)
1082{
1083 // NOTE: This is a shortest version of a real SDP message to save some bits
1084 auto iceAttributes = ice.getLocalAttributes();
1085 std::ostringstream icemsg;
1086 icemsg << iceAttributes.ufrag << "\n";
1087 icemsg << iceAttributes.pwd << "\n";
1088 for (const auto& addr : ice.getLocalCandidates(1)) {
1089 icemsg << addr << "\n";
1090 }
1091
1092 // Send PeerConnection response
1093 PeerConnectionRequest val;
1094 val.id = id;
1095 val.ice_msg = icemsg.str();
1096 val.isAnswer = true;
1097 auto value = std::make_shared<dht::Value>(std::move(val));
1098 value->user_type = "peer_request";
1099
1100 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001101 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001102 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1103 + from->getId().toString()),
1104 from,
1105 value,
1106 [from,l=config_->logger](bool ok) {
1107 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001108 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001109 from->getLongId(),
1110 (ok ? "ok" : "failed"));
1111 });
1112}
1113
1114bool
1115ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1116{
1117 auto deviceId = req.owner->getLongId();
1118 auto info = getInfo(deviceId, req.id);
1119 if (!info)
1120 return false;
1121
1122 std::unique_lock<std::mutex> lk {info->mutex_};
1123 auto& ice = info->ice_;
1124 if (!ice) {
1125 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001126 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001127 if (connReadyCb_)
1128 connReadyCb_(deviceId, "", nullptr);
1129 return false;
1130 }
1131
1132 auto sdp = ice->parseIceCandidates(req.ice_msg);
1133 answerTo(*ice, req.id, req.owner);
1134 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1135 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001136 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001137 ice = nullptr;
1138 if (connReadyCb_)
1139 connReadyCb_(deviceId, "", nullptr);
1140 return false;
1141 }
1142 return true;
1143}
1144
1145bool
1146ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1147{
1148 auto deviceId = req.owner->getLongId();
1149 auto info = getInfo(deviceId, req.id);
1150 if (!info)
1151 return false;
1152
1153 std::unique_lock<std::mutex> lk {info->mutex_};
1154 auto& ice = info->ice_;
1155 if (!ice) {
1156 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001157 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001158 return false;
1159 }
1160
1161 // Build socket
1162 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1163 std::move(ice)),
1164 false);
1165
1166 // init TLS session
1167 auto ph = req.from;
1168 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001169 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1170 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001171 req.id);
1172 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1173 std::move(endpoint),
1174 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001175 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001176 identity(),
1177 dhParams(),
Adrien Béraud9efbd442023-08-27 12:38:07 -04001178 [ph, deviceId, w=weak(), l=config_->logger](const dht::crypto::Certificate& cert) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001179 auto shared = w.lock();
1180 if (!shared)
1181 return false;
Adrien Béraud9efbd442023-08-27 12:38:07 -04001182 if (cert.getPublicKey().getId() != ph
1183 || deviceId != cert.getPublicKey().getLongId()) {
1184 if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.",
1185 deviceId,
1186 cert.getPublicKey().getLongId());
1187 return false;
1188 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001189 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1190 if (!crt)
1191 return false;
1192 return crt->getPacked() == cert.getPacked();
1193 });
1194
1195 info->tls_->setOnReady(
1196 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1197 if (auto shared = w.lock())
1198 shared->onTlsNegotiationDone(ok, deviceId, vid);
1199 });
1200 return true;
1201}
1202
1203void
1204ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1205 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1206{
1207 auto deviceId = req.owner->getLongId();
1208 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001209 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001210 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1211 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001212 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001213 return;
1214 }
1215
1216 // Because the connection is accepted, create an ICE socket.
1217 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1218 auto shared = w.lock();
1219 if (!shared)
1220 return;
1221 // Note: used when the ice negotiation fails to erase
1222 // all stored structures.
1223 auto eraseInfo = [w, id = req.id, deviceId] {
1224 if (auto shared = w.lock()) {
1225 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001226 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001227 if (shared->connReadyCb_)
1228 shared->connReadyCb_(deviceId, "", nullptr);
1229 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1230 shared->infos_.erase({deviceId, id});
1231 }
1232 };
1233
1234 ice_config.tcpEnable = true;
1235 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1236 auto shared = w.lock();
1237 if (!shared)
1238 return;
1239 if (!ok) {
1240 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001241 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001242 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1243 return;
1244 }
1245
1246 dht::ThreadPool::io().run(
1247 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1248 auto shared = w.lock();
1249 if (!shared)
1250 return;
1251 if (!shared->onRequestStartIce(req))
1252 eraseInfo();
1253 });
1254 };
1255
1256 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1257 auto shared = w.lock();
1258 if (!shared)
1259 return;
1260 if (!ok) {
1261 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001262 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001263 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1264 return;
1265 }
1266
1267 dht::ThreadPool::io().run(
1268 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1269 if (auto shared = w.lock())
1270 if (!shared->onRequestOnNegoDone(req))
1271 eraseInfo();
1272 });
1273 };
1274
1275 // Negotiate a new ICE socket
1276 auto info = std::make_shared<ConnectionInfo>();
1277 {
1278 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1279 shared->infos_[{deviceId, req.id}] = info;
1280 }
1281 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001282 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001283 std::unique_lock<std::mutex> lk {info->mutex_};
1284 ice_config.streamsCount = 1;
1285 ice_config.compCountPerStream = 1; // TCP
1286 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001287 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001288 if (not info->ice_) {
1289 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001290 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001291 eraseInfo();
1292 return;
1293 }
1294 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1295 info->ice_->setOnShutdown([eraseInfo]() {
1296 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1297 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001298 try {
1299 info->ice_->initIceInstance(ice_config);
1300 } catch (const std::exception& e) {
1301 if (shared->config_->logger)
1302 shared->config_->logger->error("{}", e.what());
1303 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1304 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001305 });
1306}
1307
1308void
1309ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1310{
Adrien Béraud5636f7c2023-09-14 14:34:57 -04001311 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_), config_->logger);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001312 info->socket_->setOnReady(
1313 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1314 if (auto sthis = w.lock())
1315 if (sthis->connReadyCb_)
1316 sthis->connReadyCb_(deviceId, socket->name(), socket);
1317 });
1318 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1319 const uint16_t&,
1320 const std::string& name) {
1321 if (auto sthis = w.lock())
1322 if (sthis->channelReqCb_)
1323 return sthis->channelReqCb_(peer, name);
1324 return false;
1325 });
1326 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1327 // Cancel current outgoing connections
1328 dht::ThreadPool::io().run([w, deviceId, vid] {
1329 auto sthis = w.lock();
1330 if (!sthis)
1331 return;
1332
1333 std::set<CallbackId> ids;
1334 if (auto info = sthis->getInfo(deviceId, vid)) {
1335 std::lock_guard<std::mutex> lk(info->mutex_);
1336 if (info->socket_) {
1337 ids = std::move(info->cbIds_);
1338 info->socket_->shutdown();
1339 }
1340 }
1341 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001342 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001343
1344 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1345 sthis->infos_.erase({deviceId, vid});
1346 });
1347 });
1348}
1349
1350const std::shared_future<tls::DhParams>
1351ConnectionManager::Impl::dhParams() const
1352{
1353 return dht::ThreadPool::computation().get<tls::DhParams>(
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001354 std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001355}
1356
1357template<typename ID = dht::Value::Id>
1358std::set<ID, std::less<>>
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001359loadIdList(const std::filesystem::path& path)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001360{
1361 std::set<ID, std::less<>> ids;
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001362 std::ifstream file(path);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001363 if (!file.is_open()) {
1364 //JAMI_DBG("Could not load %s", path.c_str());
1365 return ids;
1366 }
1367 std::string line;
1368 while (std::getline(file, line)) {
1369 if constexpr (std::is_same<ID, std::string>::value) {
1370 ids.emplace(std::move(line));
1371 } else if constexpr (std::is_integral<ID>::value) {
1372 ID vid;
1373 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1374 ec == std::errc()) {
1375 ids.emplace(vid);
1376 }
1377 }
1378 }
1379 return ids;
1380}
1381
1382template<typename List = std::set<dht::Value::Id>>
1383void
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001384saveIdList(const std::filesystem::path& path, const List& ids)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001385{
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001386 std::ofstream file(path, std::ios::trunc | std::ios::binary);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001387 if (!file.is_open()) {
1388 //JAMI_ERR("Could not save to %s", path.c_str());
1389 return;
1390 }
1391 for (auto& c : ids)
1392 file << std::hex << c << "\n";
1393}
1394
1395void
1396ConnectionManager::Impl::loadTreatedMessages()
1397{
1398 std::lock_guard<std::mutex> lock(messageMutex_);
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001399 auto path = config_->cachePath / "treatedMessages";
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001400 treatedMessages_ = loadIdList<std::string>(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001401 if (treatedMessages_.empty()) {
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001402 auto messages = loadIdList(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001403 for (const auto& m : messages)
1404 treatedMessages_.emplace(to_hex_string(m));
1405 }
1406}
1407
1408void
1409ConnectionManager::Impl::saveTreatedMessages() const
1410{
1411 dht::ThreadPool::io().run([w = weak()]() {
1412 if (auto sthis = w.lock()) {
1413 auto& this_ = *sthis;
1414 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1415 fileutils::check_dir(this_.config_->cachePath.c_str());
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001416 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001417 this_.treatedMessages_);
1418 }
1419 });
1420}
1421
1422bool
1423ConnectionManager::Impl::isMessageTreated(std::string_view id)
1424{
1425 std::lock_guard<std::mutex> lock(messageMutex_);
1426 auto res = treatedMessages_.emplace(id);
1427 if (res.second) {
1428 saveTreatedMessages();
1429 return false;
1430 }
1431 return true;
1432}
1433
1434/**
1435 * returns whether or not UPnP is enabled and active_
1436 * ie: if it is able to make port mappings
1437 */
1438bool
1439ConnectionManager::Impl::getUPnPActive() const
1440{
1441 return config_->getUPnPActive();
1442}
1443
1444IpAddr
1445ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1446{
1447 if (family == AF_INET)
1448 return publishedIp_[0];
1449 if (family == AF_INET6)
1450 return publishedIp_[1];
1451
1452 assert(family == AF_UNSPEC);
1453
1454 // If family is not set, prefere IPv4 if available. It's more
1455 // likely to succeed behind NAT.
1456 if (publishedIp_[0])
1457 return publishedIp_[0];
1458 if (publishedIp_[1])
1459 return publishedIp_[1];
1460 return {};
1461}
1462
1463void
1464ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1465{
1466 if (ip_addr.getFamily() == AF_INET) {
1467 publishedIp_[0] = ip_addr;
1468 } else {
1469 publishedIp_[1] = ip_addr;
1470 }
1471}
1472
1473void
1474ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1475{
1476 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1477 bool hasIpv4 {false}, hasIpv6 {false};
1478 for (auto& result : results) {
1479 auto family = result.getFamily();
1480 if (family == AF_INET) {
1481 if (not hasIpv4) {
1482 hasIpv4 = true;
1483 if (config_->logger)
1484 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1485 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1486 setPublishedAddress(*result.get());
1487 if (config_->upnpCtrl) {
1488 config_->upnpCtrl->setPublicAddress(*result.get());
1489 }
1490 }
1491 } else if (family == AF_INET6) {
1492 if (not hasIpv6) {
1493 hasIpv6 = true;
1494 if (config_->logger)
1495 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1496 setPublishedAddress(*result.get());
1497 }
1498 }
1499 if (hasIpv4 and hasIpv6)
1500 break;
1501 }
1502 if (cb)
1503 cb();
1504 });
1505}
1506
1507void
1508ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1509{
1510 storeActiveIpAddress([this, cb = std::move(cb)] {
1511 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1512 auto publishedAddr = getPublishedIpAddress();
1513
1514 if (publishedAddr) {
1515 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1516 publishedAddr.getFamily());
1517 if (interfaceAddr) {
1518 opts.accountLocalAddr = interfaceAddr;
1519 opts.accountPublicAddr = publishedAddr;
1520 }
1521 }
1522 if (cb)
1523 cb(std::move(opts));
1524 });
1525}
1526
1527IceTransportOptions
1528ConnectionManager::Impl::getIceOptions() const noexcept
1529{
1530 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001531 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001532 opts.upnpEnable = getUPnPActive();
Adrien Béraud7b869d92023-08-21 09:02:35 -04001533 opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001534
1535 if (config_->stunEnabled)
1536 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1537 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001538 if (config_->turnCache) {
1539 auto turnAddr = config_->turnCache->getResolvedTurn();
1540 if (turnAddr != std::nullopt) {
1541 opts.turnServers.emplace_back(TurnServerInfo()
1542 .setUri(turnAddr->toString())
1543 .setUsername(config_->turnServerUserName)
1544 .setPassword(config_->turnServerPwd)
1545 .setRealm(config_->turnServerRealm));
1546 }
1547 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001548 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001549 .setUri(config_->turnServer)
1550 .setUsername(config_->turnServerUserName)
1551 .setPassword(config_->turnServerPwd)
1552 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001553 }
1554 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1555 // co issues. So this needs some debug. for now just disable
1556 // if (cacheTurnV6 && *cacheTurnV6) {
1557 // opts.turnServers.emplace_back(TurnServerInfo()
1558 // .setUri(cacheTurnV6->toString(true))
1559 // .setUsername(turnServerUserName_)
1560 // .setPassword(turnServerPwd_)
1561 // .setRealm(turnServerRealm_));
1562 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001563 }
1564 return opts;
1565}
1566
1567bool
1568ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1569 dht::InfoHash& account_id,
1570 const std::shared_ptr<Logger>& logger)
1571{
1572 if (not crt)
1573 return false;
1574
1575 auto top_issuer = crt;
1576 while (top_issuer->issuer)
1577 top_issuer = top_issuer->issuer;
1578
1579 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001580 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001581 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001582 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001583 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001584 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001585
1586 // Check peer certificate chain
1587 // Trust store with top issuer as the only CA
1588 dht::crypto::TrustList peer_trust;
1589 peer_trust.add(*top_issuer);
1590 if (not peer_trust.verify(*crt)) {
1591 if (logger)
1592 logger->warn("Found invalid peer device: {}", crt->getLongId());
1593 return false;
1594 }
1595
1596 // Check cached OCSP response
1597 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1598 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001599 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001600 return false;
1601 }
1602
Adrien Béraudc631a832023-07-26 22:19:00 -04001603 account_id = crt->issuer->getId();
1604 if (logger)
1605 logger->warn("Found peer device: {} account:{} CA:{}",
1606 crt->getLongId(),
1607 account_id,
1608 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001609 return true;
1610}
1611
1612bool
1613ConnectionManager::Impl::findCertificate(
1614 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1615{
1616 if (auto cert = certStore().getCertificate(id.toString())) {
1617 if (cb)
1618 cb(cert);
1619 } else if (cb)
1620 cb(nullptr);
1621 return true;
1622}
1623
Sébastien Blin34086512023-07-25 09:52:14 -04001624bool
1625ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1626 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1627{
1628 if (auto cert = certStore().getCertificate(h.toString())) {
1629 if (cb)
1630 cb(cert);
1631 } else {
1632 dht()->findCertificate(h,
1633 [cb = std::move(cb), this](
1634 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1635 if (crt)
1636 certStore().pinCertificate(crt);
1637 if (cb)
1638 cb(crt);
1639 });
1640 }
1641 return true;
1642}
1643
Amna81221ad2023-09-14 17:33:26 -04001644std::shared_ptr<ConnectionManager::Config>
1645buildDefaultConfig(dht::crypto::Identity id){
1646 auto conf = std::make_shared<ConnectionManager::Config>();
1647 conf->id = std::move(id);
1648 return conf;
1649}
1650
Adrien Béraud612b55b2023-05-29 10:42:04 -04001651ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1652 : pimpl_ {std::make_shared<Impl>(config_)}
1653{}
1654
Amna81221ad2023-09-14 17:33:26 -04001655ConnectionManager::ConnectionManager(dht::crypto::Identity id)
1656 : ConnectionManager {buildDefaultConfig(id)}
1657{}
1658
Adrien Béraud612b55b2023-05-29 10:42:04 -04001659ConnectionManager::~ConnectionManager()
1660{
1661 if (pimpl_)
1662 pimpl_->shutdown();
1663}
1664
1665void
1666ConnectionManager::connectDevice(const DeviceId& deviceId,
1667 const std::string& name,
1668 ConnectCallback cb,
1669 bool noNewSocket,
1670 bool forceNewSocket,
1671 const std::string& connType)
1672{
1673 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1674}
1675
1676void
Amna0cf544d2023-07-25 14:25:09 -04001677ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1678 const std::string& name,
1679 ConnectCallbackLegacy cb,
1680 bool noNewSocket,
1681 bool forceNewSocket,
1682 const std::string& connType)
1683{
1684 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1685}
1686
1687
1688void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001689ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1690 const std::string& name,
1691 ConnectCallback cb,
1692 bool noNewSocket,
1693 bool forceNewSocket,
1694 const std::string& connType)
1695{
1696 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1697}
1698
1699bool
1700ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1701{
Adrien Béraud665294f2023-06-13 18:09:11 -04001702 auto pending = pimpl_->getPendingIds(deviceId);
1703 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001704 != pending.end();
1705}
1706
1707void
1708ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1709{
1710 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1711 std::set<DeviceId> peersDevices;
1712 {
1713 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1714 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1715 auto const& [key, value] = *iter;
Adrien Béraudafa8e282023-09-24 12:53:20 -04001716 std::unique_lock<std::mutex> lkv {value->mutex_};
Adrien Béraud612b55b2023-05-29 10:42:04 -04001717 auto deviceId = key.first;
Adrien Béraudafa8e282023-09-24 12:53:20 -04001718 auto tls = value->tls_ ? value->tls_.get() : (value->socket_ ? value->socket_->endpoint() : nullptr);
1719 auto cert = tls ? tls->peerCertificate() : nullptr;
1720 if (not cert)
1721 cert = pimpl_->certStore().getCertificate(deviceId.toString());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001722 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1723 connInfos.emplace_back(value);
1724 peersDevices.emplace(deviceId);
Adrien Béraudafa8e282023-09-24 12:53:20 -04001725 lkv.unlock();
Adrien Béraud612b55b2023-05-29 10:42:04 -04001726 iter = pimpl_->infos_.erase(iter);
1727 } else {
1728 iter++;
1729 }
1730 }
1731 }
1732 // Stop connections to all peers devices
1733 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001734 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001735 // This will close the TLS Session
1736 pimpl_->removeUnusedConnections(deviceId);
1737 }
1738 for (auto& info : connInfos) {
1739 if (info->socket_)
1740 info->socket_->shutdown();
1741 if (info->waitForAnswer_)
1742 info->waitForAnswer_->cancel();
1743 if (info->ice_) {
1744 std::unique_lock<std::mutex> lk {info->mutex_};
1745 dht::ThreadPool::io().run(
1746 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1747 }
1748 }
1749}
1750
1751void
1752ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1753{
1754 pimpl_->onDhtConnected(devicePk);
1755}
1756
1757void
1758ConnectionManager::onICERequest(onICERequestCallback&& cb)
1759{
1760 pimpl_->iceReqCb_ = std::move(cb);
1761}
1762
1763void
1764ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1765{
1766 pimpl_->channelReqCb_ = std::move(cb);
1767}
1768
1769void
1770ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1771{
1772 pimpl_->connReadyCb_ = std::move(cb);
1773}
1774
1775void
1776ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1777{
1778 pimpl_->iOSConnectedCb_ = std::move(cb);
1779}
1780
1781std::size_t
1782ConnectionManager::activeSockets() const
1783{
1784 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1785 return pimpl_->infos_.size();
1786}
1787
1788void
1789ConnectionManager::monitor() const
1790{
1791 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1792 auto logger = pimpl_->config_->logger;
1793 if (!logger)
1794 return;
1795 logger->debug("ConnectionManager current status:");
1796 for (const auto& [_, ci] : pimpl_->infos_) {
1797 if (ci->socket_)
1798 ci->socket_->monitor();
1799 }
1800 logger->debug("ConnectionManager end status.");
1801}
1802
1803void
1804ConnectionManager::connectivityChanged()
1805{
1806 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1807 for (const auto& [_, ci] : pimpl_->infos_) {
1808 if (ci->socket_)
1809 ci->socket_->sendBeacon();
1810 }
1811}
1812
1813void
1814ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1815{
1816 return pimpl_->getIceOptions(std::move(cb));
1817}
1818
1819IceTransportOptions
1820ConnectionManager::getIceOptions() const noexcept
1821{
1822 return pimpl_->getIceOptions();
1823}
1824
1825IpAddr
1826ConnectionManager::getPublishedIpAddress(uint16_t family) const
1827{
1828 return pimpl_->getPublishedIpAddress(family);
1829}
1830
1831void
1832ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1833{
1834 return pimpl_->setPublishedAddress(ip_addr);
1835}
1836
1837void
1838ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1839{
1840 return pimpl_->storeActiveIpAddress(std::move(cb));
1841}
1842
1843std::shared_ptr<ConnectionManager::Config>
1844ConnectionManager::getConfig()
1845{
1846 return pimpl_->config_;
1847}
1848
Amna31791e52023-08-03 12:40:57 -04001849std::vector<std::map<std::string, std::string>>
1850ConnectionManager::getConnectionList(const DeviceId& device) const
1851{
1852 std::vector<std::map<std::string, std::string>> connectionsList;
1853 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1854
1855 for (const auto& [key, ci] : pimpl_->infos_) {
1856 if (device && key.first != device)
1857 continue;
1858 std::map<std::string, std::string> connectionInfo;
1859 connectionInfo["id"] = callbackIdToString(key.first, key.second);
Amna82420202023-08-15 16:27:18 -04001860 connectionInfo["device"] = key.first.toString();
Amna6c999d82023-08-15 15:19:41 -04001861 if (ci->tls_) {
1862 if (auto cert = ci->tls_->peerCertificate()) {
1863 connectionInfo["peer"] = cert->issuer->getId().toString();
1864 }
Amna31791e52023-08-03 12:40:57 -04001865 }
1866 if (ci->socket_) {
1867 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
1868 } else if (ci->tls_) {
1869 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
1870 } else if(ci->ice_)
1871 {
1872 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
1873 }
1874 if (ci->tls_) {
1875 std::string remoteAddress = ci->tls_->getRemoteAddress();
1876 std::string remoteAddressIp = remoteAddress.substr(0, remoteAddress.find(':'));
1877 std::string remoteAddressPort = remoteAddress.substr(remoteAddress.find(':') + 1);
1878 connectionInfo["remoteAdress"] = remoteAddressIp;
1879 connectionInfo["remotePort"] = remoteAddressPort;
1880 }
1881 connectionsList.emplace_back(std::move(connectionInfo));
1882 }
1883
1884 if (device) {
1885 auto it = pimpl_->pendingOperations_.find(device);
1886 if (it != pimpl_->pendingOperations_.end()) {
1887 const auto& po = it->second;
1888 for (const auto& [vid, ci] : po.connecting) {
1889 std::map<std::string, std::string> connectionInfo;
1890 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001891 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1892 connectionsList.emplace_back(std::move(connectionInfo));
1893 }
1894
1895 for (const auto& [vid, ci] : po.waiting) {
1896 std::map<std::string, std::string> connectionInfo;
1897 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001898 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1899 connectionsList.emplace_back(std::move(connectionInfo));
1900 }
1901 }
1902 }
1903 else {
1904 for (const auto& [key, po] : pimpl_->pendingOperations_) {
1905 for (const auto& [vid, ci] : po.connecting) {
1906 std::map<std::string, std::string> connectionInfo;
1907 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001908 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1909 connectionsList.emplace_back(std::move(connectionInfo));
1910 }
1911
1912 for (const auto& [vid, ci] : po.waiting) {
1913 std::map<std::string, std::string> connectionInfo;
1914 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001915 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1916 connectionsList.emplace_back(std::move(connectionInfo));
1917 }
1918 }
1919 }
1920 return connectionsList;
1921}
1922
1923std::vector<std::map<std::string, std::string>>
1924ConnectionManager::getChannelList(const std::string& connectionId) const
1925{
1926 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1927 CallbackId cbid = parseCallbackId(connectionId);
1928 if (pimpl_->infos_.count(cbid) > 0) {
1929 return pimpl_->infos_[cbid]->socket_->getChannelList();
1930 } else {
1931 return {};
1932 }
1933}
1934
Sébastien Blin464bdff2023-07-19 08:02:53 -04001935} // namespace dhtnet