blob: 62fb4ef4111a3f83525f5f5735cdc7967fd2f72d [file] [log] [blame]
Adrien Béraud612b55b2023-05-29 10:42:04 -04001/*
Adrien Béraudcb753622023-07-17 22:32:49 -04002 * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
Adrien Béraud612b55b2023-05-29 10:42:04 -04003 *
Adrien Béraudcb753622023-07-17 22:32:49 -04004 * This program is free software: you can redistribute it and/or modify
Adrien Béraud612b55b2023-05-29 10:42:04 -04005 * it under the terms of the GNU General Public License as published by
Adrien Béraudcb753622023-07-17 22:32:49 -04006 * the Free Software Foundation, either version 3 of the License, or
Adrien Béraud612b55b2023-05-29 10:42:04 -04007 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Adrien Béraudcb753622023-07-17 22:32:49 -040011 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Adrien Béraud612b55b2023-05-29 10:42:04 -040012 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "connectionmanager.h"
18#include "peer_connection.h"
19#include "upnp/upnp_control.h"
20#include "certstore.h"
21#include "fileutils.h"
22#include "sip_utils.h"
23#include "string_utils.h"
24
25#include <opendht/crypto.h>
26#include <opendht/thread_pool.h>
27#include <opendht/value.h>
28#include <asio.hpp>
29
30#include <algorithm>
31#include <mutex>
32#include <map>
33#include <condition_variable>
34#include <set>
35#include <charconv>
Morteza Namvar5f639522023-07-04 17:08:58 -040036#include <fstream>
Adrien Béraud612b55b2023-05-29 10:42:04 -040037
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040038namespace dhtnet {
Adrien Béraud612b55b2023-05-29 10:42:04 -040039static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30};
40static constexpr uint64_t ID_MAX_VAL = 9007199254740992;
41
42using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>;
Adrien Béraud1ae60aa2023-07-07 09:55:09 -040043using CallbackId = std::pair<dhtnet::DeviceId, dht::Value::Id>;
Amna31791e52023-08-03 12:40:57 -040044std::string
45callbackIdToString(const dhtnet::DeviceId& did, const dht::Value::Id& vid)
46{
47 return fmt::format("{} {}", did.to_view(), vid);
48}
Adrien Béraud612b55b2023-05-29 10:42:04 -040049
Amna31791e52023-08-03 12:40:57 -040050CallbackId parseCallbackId(std::string_view ci)
51{
52 auto sep = ci.find(' ');
53 std::string_view deviceIdString = ci.substr(0, sep);
54 std::string_view vidString = ci.substr(sep + 1);
55
56 dhtnet::DeviceId deviceId(deviceIdString);
57 dht::Value::Id vid = std::stoul(std::string(vidString), nullptr, 10);
58
59 return CallbackId(deviceId, vid);
60}
Amna81221ad2023-09-14 17:33:26 -040061
62std::shared_ptr<ConnectionManager::Config>
63createConfig(std::shared_ptr<ConnectionManager::Config> config_)
64{
65 if (!config_->certStore){
66 config_->certStore = std::make_shared<dhtnet::tls::CertificateStore>("client", config_->logger);
67 }
68 if (!config_->dht) {
69 dht::DhtRunner::Config dhtConfig;
70 dhtConfig.dht_config.id = config_->id;
71 dhtConfig.threaded = true;
72 dht::DhtRunner::Context dhtContext;
73 dhtContext.certificateStore = [c = config_->certStore](const dht::InfoHash& pk_id) {
74 std::vector<std::shared_ptr<dht::crypto::Certificate>> ret;
75 if (auto cert = c->getCertificate(pk_id.toString()))
76 ret.emplace_back(std::move(cert));
77 return ret;
78 };
79 config_->dht = std::make_shared<dht::DhtRunner>();
80 config_->dht->run(dhtConfig, std::move(dhtContext));
81 config_->dht->bootstrap("bootstrap.jami.net");
82 }
83 if (!config_->factory){
84 config_->factory = std::make_shared<IceTransportFactory>(config_->logger);
85 }
86 return config_;
87}
88
Adrien Béraud612b55b2023-05-29 10:42:04 -040089struct ConnectionInfo
90{
91 ~ConnectionInfo()
92 {
93 if (socket_)
94 socket_->join();
95 }
96
97 std::mutex mutex_ {};
98 bool responseReceived_ {false};
99 PeerConnectionRequest response_ {};
100 std::unique_ptr<IceTransport> ice_ {nullptr};
101 // Used to store currently non ready TLS Socket
102 std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr};
103 std::shared_ptr<MultiplexedSocket> socket_ {};
104 std::set<CallbackId> cbIds_ {};
105
106 std::function<void(bool)> onConnected_;
107 std::unique_ptr<asio::steady_timer> waitForAnswer_ {};
108};
109
110/**
111 * returns whether or not UPnP is enabled and active_
112 * ie: if it is able to make port mappings
113 */
114bool
115ConnectionManager::Config::getUPnPActive() const
116{
117 if (upnpCtrl)
118 return upnpCtrl->isReady();
119 return false;
120}
121
122class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl>
123{
124public:
125 explicit Impl(std::shared_ptr<ConnectionManager::Config> config_)
Amna81221ad2023-09-14 17:33:26 -0400126 : config_ {std::move(createConfig(config_))}
Sébastien Blincf569402023-07-27 09:46:40 -0400127 , rand {dht::crypto::getSeededRandomEngine<std::mt19937_64>()}
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400128 {
Kateryna Kostiukbb300a12023-10-02 13:19:50 -0400129 loadTreatedMessages();
Amna81221ad2023-09-14 17:33:26 -0400130 if(!config_->ioContext) {
131 config_->ioContext = std::make_shared<asio::io_context>();
132 ioContextRunner_ = std::make_unique<std::thread>([context = config_->ioContext, l=config_->logger]() {
133 try {
134 auto work = asio::make_work_guard(*context);
135 context->run();
136 } catch (const std::exception& ex) {
137 if (l) l->error("Exception: {}", ex.what());
138 }
139 });
140 }
Kateryna Kostiukc39f6672023-09-15 16:49:42 -0400141 }
Amna81221ad2023-09-14 17:33:26 -0400142 ~Impl() {
143 if (ioContextRunner_) {
144 if (config_->logger) config_->logger->debug("ConnectionManager: stopping io_context thread");
145 config_->ioContext->stop();
146 ioContextRunner_->join();
147 ioContextRunner_.reset();
148 }
149 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400150
151 std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; }
152 const dht::crypto::Identity& identity() const { return config_->id; }
153
154 void removeUnusedConnections(const DeviceId& deviceId = {})
155 {
156 std::vector<std::shared_ptr<ConnectionInfo>> unused {};
157
158 {
159 std::lock_guard<std::mutex> lk(infosMtx_);
160 for (auto it = infos_.begin(); it != infos_.end();) {
161 auto& [key, info] = *it;
162 if (info && (!deviceId || key.first == deviceId)) {
163 unused.emplace_back(std::move(info));
164 it = infos_.erase(it);
165 } else {
166 ++it;
167 }
168 }
169 }
170 for (auto& info: unused) {
171 if (info->tls_)
172 info->tls_->shutdown();
173 if (info->socket_)
174 info->socket_->shutdown();
175 if (info->waitForAnswer_)
176 info->waitForAnswer_->cancel();
177 }
178 if (!unused.empty())
Amna81221ad2023-09-14 17:33:26 -0400179 dht::ThreadPool::io().run([infos = std::move(unused)]() mutable {
180 infos.clear();
181 });
Adrien Béraud612b55b2023-05-29 10:42:04 -0400182 }
183
184 void shutdown()
185 {
186 if (isDestroying_.exchange(true))
187 return;
Adrien Béraud665294f2023-06-13 18:09:11 -0400188 decltype(pendingOperations_) po;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400189 {
190 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400191 po = std::move(pendingOperations_);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400192 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400193 for (auto& [deviceId, pcbs] : po) {
194 for (auto& [id, pending] : pcbs.connecting)
195 pending.cb(nullptr, deviceId);
196 for (auto& [id, pending] : pcbs.waiting)
197 pending.cb(nullptr, deviceId);
198 }
199
Adrien Béraud612b55b2023-05-29 10:42:04 -0400200 removeUnusedConnections();
201 }
202
Adrien Béraud612b55b2023-05-29 10:42:04 -0400203 void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
204 const dht::Value::Id& vid,
205 const std::string& connType,
206 std::function<void(bool)> onConnected);
207 void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid);
208 bool connectDeviceOnNegoDone(const DeviceId& deviceId,
209 const std::string& name,
210 const dht::Value::Id& vid,
211 const std::shared_ptr<dht::crypto::Certificate>& cert);
212 void connectDevice(const DeviceId& deviceId,
213 const std::string& uri,
214 ConnectCallback cb,
215 bool noNewSocket = false,
216 bool forceNewSocket = false,
217 const std::string& connType = "");
Amna0cf544d2023-07-25 14:25:09 -0400218 void connectDevice(const dht::InfoHash& deviceId,
219 const std::string& uri,
220 ConnectCallbackLegacy cb,
221 bool noNewSocket = false,
222 bool forceNewSocket = false,
223 const std::string& connType = "");
224
Adrien Béraud612b55b2023-05-29 10:42:04 -0400225 void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
226 const std::string& name,
227 ConnectCallback cb,
228 bool noNewSocket = false,
229 bool forceNewSocket = false,
230 const std::string& connType = "");
231 /**
232 * Send a ChannelRequest on the TLS socket. Triggers cb when ready
233 * @param sock socket used to send the request
234 * @param name channel's name
235 * @param vid channel's id
236 * @param deviceId to identify the linked ConnectCallback
237 */
238 void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
239 const std::string& name,
240 const DeviceId& deviceId,
241 const dht::Value::Id& vid);
242 /**
243 * Triggered when a PeerConnectionRequest comes from the DHT
244 */
245 void answerTo(IceTransport& ice,
246 const dht::Value::Id& id,
247 const std::shared_ptr<dht::crypto::PublicKey>& fromPk);
248 bool onRequestStartIce(const PeerConnectionRequest& req);
249 bool onRequestOnNegoDone(const PeerConnectionRequest& req);
250 void onDhtPeerRequest(const PeerConnectionRequest& req,
251 const std::shared_ptr<dht::crypto::Certificate>& cert);
252
253 void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info);
254 void onPeerResponse(const PeerConnectionRequest& req);
255 void onDhtConnected(const dht::crypto::PublicKey& devicePk);
256
257 const std::shared_future<tls::DhParams> dhParams() const;
258 tls::CertificateStore& certStore() const { return *config_->certStore; }
259
260 mutable std::mutex messageMutex_ {};
261 std::set<std::string, std::less<>> treatedMessages_ {};
262
263 void loadTreatedMessages();
264 void saveTreatedMessages() const;
265
266 /// \return true if the given DHT message identifier has been treated
267 /// \note if message has not been treated yet this method st/ore this id and returns true at
268 /// further calls
269 bool isMessageTreated(std::string_view id);
270
271 const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; }
272
273 /**
274 * Published IPv4/IPv6 addresses, used only if defined by the user in account
275 * configuration
276 *
277 */
278 IpAddr publishedIp_[2] {};
279
Adrien Béraud612b55b2023-05-29 10:42:04 -0400280 /**
281 * interface name on which this account is bound
282 */
283 std::string interface_ {"default"};
284
285 /**
286 * Get the local interface name on which this account is bound.
287 */
288 const std::string& getLocalInterface() const { return interface_; }
289
290 /**
291 * Get the published IP address, fallbacks to NAT if family is unspecified
292 * Prefers the usage of IPv4 if possible.
293 */
294 IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const;
295
296 /**
297 * Set published IP address according to given family
298 */
299 void setPublishedAddress(const IpAddr& ip_addr);
300
301 /**
302 * Store the local/public addresses used to register
303 */
304 void storeActiveIpAddress(std::function<void()>&& cb = {});
305
306 /**
307 * Create and return ICE options.
308 */
309 void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept;
310 IceTransportOptions getIceOptions() const noexcept;
311
312 /**
313 * Inform that a potential peer device have been found.
314 * Returns true only if the device certificate is a valid device certificate.
315 * In that case (true is returned) the account_id parameter is set to the peer account ID.
316 */
317 static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
318 dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger);
319
320 bool findCertificate(const dht::PkId& id,
321 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Sébastien Blin34086512023-07-25 09:52:14 -0400322 bool findCertificate(const dht::InfoHash& h, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400323
324 /**
325 * returns whether or not UPnP is enabled and active
326 * ie: if it is able to make port mappings
327 */
328 bool getUPnPActive() const;
329
330 /**
331 * Triggered when a new TLS socket is ready to use
332 * @param ok If succeed
333 * @param deviceId Related device
334 * @param vid vid of the connection request
335 * @param name non empty if TLS was created by connectDevice()
336 */
337 void onTlsNegotiationDone(bool ok,
338 const DeviceId& deviceId,
339 const dht::Value::Id& vid,
340 const std::string& name = "");
341
342 std::shared_ptr<ConnectionManager::Config> config_;
Amna81221ad2023-09-14 17:33:26 -0400343 std::unique_ptr<std::thread> ioContextRunner_;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400344
Adrien Béraud612b55b2023-05-29 10:42:04 -0400345 mutable std::mt19937_64 rand;
346
347 iOSConnectedCallback iOSConnectedCb_ {};
348
349 std::mutex infosMtx_ {};
350 // Note: Someone can ask multiple sockets, so to avoid any race condition,
351 // each device can have multiple multiplexed sockets.
352 std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {};
353
354 std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id)
355 {
356 std::lock_guard<std::mutex> lk(infosMtx_);
357 auto it = infos_.find({deviceId, id});
358 if (it != infos_.end())
359 return it->second;
360 return {};
361 }
362
363 std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId)
364 {
365 std::lock_guard<std::mutex> lk(infosMtx_);
366 auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) {
367 auto& [key, value] = item;
368 return key.first == deviceId && value && value->socket_;
369 });
370 if (it != infos_.end())
371 return it->second;
372 return {};
373 }
374
375 ChannelRequestCallback channelReqCb_ {};
376 ConnectionReadyCallback connReadyCb_ {};
377 onICERequestCallback iceReqCb_ {};
378
379 /**
380 * Stores callback from connectDevice
381 * @note: each device needs a vector because several connectDevice can
382 * be done in parallel and we only want one socket
383 */
384 std::mutex connectCbsMtx_ {};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400385
Amna81221ad2023-09-14 17:33:26 -0400386
Adrien Béraud665294f2023-06-13 18:09:11 -0400387 struct PendingCb
388 {
389 std::string name;
390 ConnectCallback cb;
391 };
392 struct PendingOperations {
393 std::map<dht::Value::Id, PendingCb> connecting;
394 std::map<dht::Value::Id, PendingCb> waiting;
395 };
396
397 std::map<DeviceId, PendingOperations> pendingOperations_ {};
398
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400399 void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400400 {
401 std::vector<PendingCb> ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400402 std::unique_lock<std::mutex> lk(connectCbsMtx_);
403 auto it = pendingOperations_.find(deviceId);
404 if (it == pendingOperations_.end())
405 return;
406 auto& pendingOperations = it->second;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400407 if (vid == 0) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400408 // Extract all pending callbacks
409 for (auto& [vid, cb] : pendingOperations.connecting)
410 ret.emplace_back(std::move(cb));
411 pendingOperations.connecting.clear();
412 for (auto& [vid, cb] : pendingOperations.waiting)
413 ret.emplace_back(std::move(cb));
414 pendingOperations.waiting.clear();
415 } else if (auto n = pendingOperations.waiting.extract(vid)) {
416 // If it's a waiting operation, just move it
417 ret.emplace_back(std::move(n.mapped()));
418 } else if (auto n = pendingOperations.connecting.extract(vid)) {
419 ret.emplace_back(std::move(n.mapped()));
420 // If sock is nullptr, execute if it's the last connecting operation
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400421 // If accepted is false, it means that underlying socket is ok, but channel is declined
422 if (!sock && pendingOperations.connecting.empty() && accepted) {
Adrien Béraud665294f2023-06-13 18:09:11 -0400423 for (auto& [vid, cb] : pendingOperations.waiting)
424 ret.emplace_back(std::move(cb));
425 pendingOperations.waiting.clear();
426 for (auto& [vid, cb] : pendingOperations.connecting)
427 ret.emplace_back(std::move(cb));
428 pendingOperations.connecting.clear();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400429 }
430 }
Adrien Béraud665294f2023-06-13 18:09:11 -0400431 if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty())
432 pendingOperations_.erase(it);
433 lk.unlock();
434 for (auto& cb : ret)
435 cb.cb(sock, deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400436 }
437
Adrien Béraud665294f2023-06-13 18:09:11 -0400438 std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0)
Adrien Béraud612b55b2023-05-29 10:42:04 -0400439 {
Adrien Béraud665294f2023-06-13 18:09:11 -0400440 std::map<dht::Value::Id, std::string> ret;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400441 std::lock_guard<std::mutex> lk(connectCbsMtx_);
Adrien Béraud665294f2023-06-13 18:09:11 -0400442 auto it = pendingOperations_.find(deviceId);
443 if (it == pendingOperations_.end())
Adrien Béraud612b55b2023-05-29 10:42:04 -0400444 return ret;
Adrien Béraud665294f2023-06-13 18:09:11 -0400445 auto& pendingOp = it->second;
446 for (const auto& [id, pc]: pendingOp.connecting) {
447 if (vid == 0 || id == vid)
448 ret[id] = pc.name;
449 }
450 for (const auto& [id, pc]: pendingOp.waiting) {
451 if (vid == 0 || id == vid)
452 ret[id] = pc.name;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400453 }
454 return ret;
455 }
456
457 std::shared_ptr<ConnectionManager::Impl> shared()
458 {
459 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
460 }
461 std::shared_ptr<ConnectionManager::Impl const> shared() const
462 {
463 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
464 }
465 std::weak_ptr<ConnectionManager::Impl> weak()
466 {
467 return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this());
468 }
469 std::weak_ptr<ConnectionManager::Impl const> weak() const
470 {
471 return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this());
472 }
473
474 std::atomic_bool isDestroying_ {false};
475};
476
477void
478ConnectionManager::Impl::connectDeviceStartIce(
479 const std::shared_ptr<dht::crypto::PublicKey>& devicePk,
480 const dht::Value::Id& vid,
481 const std::string& connType,
482 std::function<void(bool)> onConnected)
483{
484 auto deviceId = devicePk->getLongId();
485 auto info = getInfo(deviceId, vid);
486 if (!info) {
487 onConnected(false);
488 return;
489 }
490
491 std::unique_lock<std::mutex> lk(info->mutex_);
492 auto& ice = info->ice_;
493
494 if (!ice) {
495 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400496 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400497 onConnected(false);
498 return;
499 }
500
501 auto iceAttributes = ice->getLocalAttributes();
502 std::ostringstream icemsg;
503 icemsg << iceAttributes.ufrag << "\n";
504 icemsg << iceAttributes.pwd << "\n";
505 for (const auto& addr : ice->getLocalCandidates(1)) {
506 icemsg << addr << "\n";
507 if (config_->logger)
Sébastien Blinaec46fc2023-07-25 15:43:10 -0400508 config_->logger->debug("[device {}] Added local ICE candidate {}", deviceId, addr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400509 }
510
511 // Prepare connection request as a DHT message
512 PeerConnectionRequest val;
513
514 val.id = vid; /* Random id for the message unicity */
515 val.ice_msg = icemsg.str();
516 val.connType = connType;
517
518 auto value = std::make_shared<dht::Value>(std::move(val));
519 value->user_type = "peer_request";
520
521 // Send connection request through DHT
522 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400523 config_->logger->debug("[device {}] Sending connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400524 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
525 + devicePk->getId().toString()),
526 devicePk,
527 value,
528 [l=config_->logger,deviceId](bool ok) {
529 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -0400530 l->debug("[device {}] Sent connection request. Put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -0400531 deviceId,
532 (ok ? "ok" : "failed"));
533 });
534 // Wait for call to onResponse() operated by DHT
535 if (isDestroying_) {
536 onConnected(true); // This avoid to wait new negotiation when destroying
537 return;
538 }
539
540 info->onConnected_ = std::move(onConnected);
541 info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext,
542 std::chrono::steady_clock::now()
543 + DHT_MSG_TIMEOUT);
544 info->waitForAnswer_->async_wait(
545 std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid));
546}
547
548void
549ConnectionManager::Impl::onResponse(const asio::error_code& ec,
550 const DeviceId& deviceId,
551 const dht::Value::Id& vid)
552{
553 if (ec == asio::error::operation_aborted)
554 return;
555 auto info = getInfo(deviceId, vid);
556 if (!info)
557 return;
558
559 std::unique_lock<std::mutex> lk(info->mutex_);
560 auto& ice = info->ice_;
561 if (isDestroying_) {
562 info->onConnected_(true); // The destructor can wake a pending wait here.
563 return;
564 }
565 if (!info->responseReceived_) {
566 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400567 config_->logger->error("[device {}] no response from DHT to ICE request.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400568 info->onConnected_(false);
569 return;
570 }
571
572 if (!info->ice_) {
573 info->onConnected_(false);
574 return;
575 }
576
577 auto sdp = ice->parseIceCandidates(info->response_.ice_msg);
578
579 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
580 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400581 config_->logger->warn("[device {}] start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400582 info->onConnected_(false);
583 return;
584 }
585 info->onConnected_(true);
586}
587
588bool
589ConnectionManager::Impl::connectDeviceOnNegoDone(
590 const DeviceId& deviceId,
591 const std::string& name,
592 const dht::Value::Id& vid,
593 const std::shared_ptr<dht::crypto::Certificate>& cert)
594{
595 auto info = getInfo(deviceId, vid);
596 if (!info)
597 return false;
598
599 std::unique_lock<std::mutex> lk {info->mutex_};
600 if (info->waitForAnswer_) {
601 // Negotiation is done and connected, go to handshake
602 // and avoid any cancellation at this point.
603 info->waitForAnswer_->cancel();
604 }
605 auto& ice = info->ice_;
606 if (!ice || !ice->isRunning()) {
607 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400608 config_->logger->error("[device {}] No ICE detected or not running", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400609 return false;
610 }
611
612 // Build socket
613 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
614 std::move(ice)),
615 true);
616
617 // Negotiate a TLS session
618 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400619 config_->logger->debug("[device {}] Start TLS session - Initied by connectDevice(). Launched by channel: {} - vid: {}", deviceId, name, vid);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400620 info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint),
621 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -0400622 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400623 identity(),
624 dhParams(),
625 *cert);
626
627 info->tls_->setOnReady(
628 [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)](
629 bool ok) {
630 if (auto shared = w.lock())
631 shared->onTlsNegotiationDone(ok, deviceId, vid, name);
632 });
633 return true;
634}
635
636void
637ConnectionManager::Impl::connectDevice(const DeviceId& deviceId,
638 const std::string& name,
639 ConnectCallback cb,
640 bool noNewSocket,
641 bool forceNewSocket,
642 const std::string& connType)
643{
644 if (!dht()) {
645 cb(nullptr, deviceId);
646 return;
647 }
648 if (deviceId.toString() == identity().second->getLongId().toString()) {
649 cb(nullptr, deviceId);
650 return;
651 }
652 findCertificate(deviceId,
653 [w = weak(),
654 deviceId,
655 name,
656 cb = std::move(cb),
657 noNewSocket,
658 forceNewSocket,
659 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
660 if (!cert) {
661 if (auto shared = w.lock())
662 if (shared->config_->logger)
663 shared->config_->logger->error(
664 "No valid certificate found for device {}",
665 deviceId);
666 cb(nullptr, deviceId);
667 return;
668 }
669 if (auto shared = w.lock()) {
670 shared->connectDevice(cert,
671 name,
672 std::move(cb),
673 noNewSocket,
674 forceNewSocket,
675 connType);
676 } else
677 cb(nullptr, deviceId);
678 });
679}
680
681void
Amna0cf544d2023-07-25 14:25:09 -0400682ConnectionManager::Impl::connectDevice(const dht::InfoHash& deviceId,
683 const std::string& name,
684 ConnectCallbackLegacy cb,
685 bool noNewSocket,
686 bool forceNewSocket,
687 const std::string& connType)
688{
689 if (!dht()) {
690 cb(nullptr, deviceId);
691 return;
692 }
693 if (deviceId.toString() == identity().second->getLongId().toString()) {
694 cb(nullptr, deviceId);
695 return;
696 }
697 findCertificate(deviceId,
698 [w = weak(),
699 deviceId,
700 name,
701 cb = std::move(cb),
702 noNewSocket,
703 forceNewSocket,
704 connType](const std::shared_ptr<dht::crypto::Certificate>& cert) {
705 if (!cert) {
706 if (auto shared = w.lock())
707 if (shared->config_->logger)
708 shared->config_->logger->error(
709 "No valid certificate found for device {}",
710 deviceId);
711 cb(nullptr, deviceId);
712 return;
713 }
714 if (auto shared = w.lock()) {
715 shared->connectDevice(cert,
716 name,
Adrien Béraudd78d1ac2023-08-25 10:43:33 -0400717 [cb, deviceId](const std::shared_ptr<ChannelSocket>& sock, const DeviceId& /*did*/){
Amna0cf544d2023-07-25 14:25:09 -0400718 cb(sock, deviceId);
719 },
720 noNewSocket,
721 forceNewSocket,
722 connType);
723 } else
724 cb(nullptr, deviceId);
725 });
726}
727
728void
Adrien Béraud612b55b2023-05-29 10:42:04 -0400729ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
730 const std::string& name,
731 ConnectCallback cb,
732 bool noNewSocket,
733 bool forceNewSocket,
734 const std::string& connType)
735{
736 // Avoid dht operation in a DHT callback to avoid deadlocks
737 dht::ThreadPool::computation().run([w = weak(),
738 name = std::move(name),
739 cert = std::move(cert),
740 cb = std::move(cb),
741 noNewSocket,
742 forceNewSocket,
743 connType] {
744 auto devicePk = cert->getSharedPublicKey();
745 auto deviceId = devicePk->getLongId();
746 auto sthis = w.lock();
747 if (!sthis || sthis->isDestroying_) {
748 cb(nullptr, deviceId);
749 return;
750 }
Adrien Béraud26365c92023-09-23 23:42:43 -0400751 dht::Value::Id vid;
Adrien Béraud612b55b2023-05-29 10:42:04 -0400752 auto isConnectingToDevice = false;
753 {
754 std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_);
Adrien Béraud26365c92023-09-23 23:42:43 -0400755 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud665294f2023-06-13 18:09:11 -0400756 auto pendingsIt = sthis->pendingOperations_.find(deviceId);
757 if (pendingsIt != sthis->pendingOperations_.end()) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400758 const auto& pendings = pendingsIt->second;
Adrien Béraud665294f2023-06-13 18:09:11 -0400759 while (pendings.connecting.find(vid) != pendings.connecting.end()
Adrien Béraud26365c92023-09-23 23:42:43 -0400760 || pendings.waiting.find(vid) != pendings.waiting.end()) {
Adrien Béraudc5b971d2023-06-13 19:41:25 -0400761 vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400762 }
763 }
764 // Check if already connecting
Adrien Béraud665294f2023-06-13 18:09:11 -0400765 isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400766 // Save current request for sendChannelRequest.
767 // Note: do not return here, cause we can be in a state where first
768 // socket is negotiated and first channel is pending
769 // so return only after we checked the info
Adrien Béraud665294f2023-06-13 18:09:11 -0400770 if (isConnectingToDevice && !forceNewSocket)
771 pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400772 else
Adrien Béraud665294f2023-06-13 18:09:11 -0400773 sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)};
Adrien Béraud612b55b2023-05-29 10:42:04 -0400774 }
775
776 // Check if already negotiated
777 CallbackId cbId(deviceId, vid);
778 if (auto info = sthis->getConnectedInfo(deviceId)) {
779 std::lock_guard<std::mutex> lk(info->mutex_);
780 if (info->socket_) {
781 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400782 sthis->config_->logger->debug("[device {}] Peer already connected. Add a new channel", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400783 info->cbIds_.emplace(cbId);
784 sthis->sendChannelRequest(info->socket_, name, deviceId, vid);
785 return;
786 }
787 }
788
789 if (isConnectingToDevice && !forceNewSocket) {
790 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400791 sthis->config_->logger->debug("[device {}] Already connecting, wait for ICE negotiation", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400792 return;
793 }
794 if (noNewSocket) {
795 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400796 sthis->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400797 return;
798 }
799
800 // Note: used when the ice negotiation fails to erase
801 // all stored structures.
802 auto eraseInfo = [w, cbId] {
803 if (auto shared = w.lock()) {
804 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -0400805 shared->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400806 std::lock_guard<std::mutex> lk(shared->infosMtx_);
807 shared->infos_.erase(cbId);
808 }
809 };
810
811 // If no socket exists, we need to initiate an ICE connection.
812 sthis->getIceOptions([w,
813 deviceId = std::move(deviceId),
814 devicePk = std::move(devicePk),
815 name = std::move(name),
816 cert = std::move(cert),
817 vid,
818 connType,
819 eraseInfo](auto&& ice_config) {
820 auto sthis = w.lock();
821 if (!sthis) {
822 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
823 return;
824 }
825 ice_config.tcpEnable = true;
826 ice_config.onInitDone = [w,
Adrien Béraud612b55b2023-05-29 10:42:04 -0400827 devicePk = std::move(devicePk),
828 name = std::move(name),
829 cert = std::move(cert),
830 vid,
831 connType,
832 eraseInfo](bool ok) {
833 dht::ThreadPool::io().run([w = std::move(w),
834 devicePk = std::move(devicePk),
835 vid = std::move(vid),
836 eraseInfo,
837 connType, ok] {
838 auto sthis = w.lock();
839 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400840 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", devicePk->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400841 if (!sthis || !ok) {
842 eraseInfo();
843 return;
844 }
845 sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) {
846 if (!ok) {
847 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
848 }
849 });
850 });
851 };
852 ice_config.onNegoDone = [w,
853 deviceId,
854 name,
855 cert = std::move(cert),
856 vid,
857 eraseInfo](bool ok) {
858 dht::ThreadPool::io().run([w = std::move(w),
859 deviceId = std::move(deviceId),
860 name = std::move(name),
861 cert = std::move(cert),
862 vid = std::move(vid),
863 eraseInfo = std::move(eraseInfo),
864 ok] {
865 auto sthis = w.lock();
866 if (!ok && sthis && sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400867 sthis->config_->logger->error("[device {}] ICE negotiation failed.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400868 if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert))
869 eraseInfo();
870 });
871 };
872
873 auto info = std::make_shared<ConnectionInfo>();
874 {
875 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
876 sthis->infos_[{deviceId, vid}] = info;
877 }
878 std::unique_lock<std::mutex> lk {info->mutex_};
879 ice_config.master = false;
880 ice_config.streamsCount = 1;
881 ice_config.compCountPerStream = 1;
Sébastien Blin34086512023-07-25 09:52:14 -0400882 info->ice_ = sthis->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -0400883 if (!info->ice_) {
884 if (sthis->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400885 sthis->config_->logger->error("[device {}] Cannot initialize ICE session.", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400886 eraseInfo();
887 return;
888 }
889 // We need to detect any shutdown if the ice session is destroyed before going to the
890 // TLS session;
891 info->ice_->setOnShutdown([eraseInfo]() {
892 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
893 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -0400894 try {
895 info->ice_->initIceInstance(ice_config);
896 } catch (const std::exception& e) {
897 if (sthis->config_->logger)
898 sthis->config_->logger->error("{}", e.what());
899 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
900 }
Adrien Béraud612b55b2023-05-29 10:42:04 -0400901 });
902 });
903}
904
905void
906ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock,
907 const std::string& name,
908 const DeviceId& deviceId,
909 const dht::Value::Id& vid)
910{
911 auto channelSock = sock->addChannel(name);
912 channelSock->onShutdown([name, deviceId, vid, w = weak()] {
913 auto shared = w.lock();
Adrien Béraud665294f2023-06-13 18:09:11 -0400914 if (auto shared = w.lock())
915 shared->executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400916 });
917 channelSock->onReady(
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400918 [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) {
Adrien Béraud612b55b2023-05-29 10:42:04 -0400919 auto shared = w.lock();
920 auto channelSock = wSock.lock();
921 if (shared)
Adrien Béraud7e8b0562023-06-13 18:10:53 -0400922 shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400923 });
924
925 ChannelRequest val;
926 val.name = channelSock->name();
927 val.state = ChannelRequestState::REQUEST;
928 val.channel = channelSock->channel();
929 msgpack::sbuffer buffer(256);
930 msgpack::pack(buffer, val);
931
932 std::error_code ec;
933 int res = sock->write(CONTROL_CHANNEL,
934 reinterpret_cast<const uint8_t*>(buffer.data()),
935 buffer.size(),
936 ec);
937 if (res < 0) {
938 // TODO check if we should handle errors here
939 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400940 config_->logger->error("[device {}] sendChannelRequest failed - error: {}", deviceId, ec.message());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400941 }
942}
943
944void
945ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req)
946{
947 auto device = req.owner->getLongId();
Adrien Béraud612b55b2023-05-29 10:42:04 -0400948 if (auto info = getInfo(device, req.id)) {
Adrien Béraud23852462023-07-22 01:46:27 -0400949 if (config_->logger)
950 config_->logger->debug("[device {}] New response received", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400951 std::lock_guard<std::mutex> lk {info->mutex_};
952 info->responseReceived_ = true;
953 info->response_ = std::move(req);
954 info->waitForAnswer_->expires_at(std::chrono::steady_clock::now());
955 info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse,
956 this,
957 std::placeholders::_1,
958 device,
959 req.id));
960 } else {
961 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400962 config_->logger->warn("[device {}] Respond received, but cannot find request", device);
Adrien Béraud612b55b2023-05-29 10:42:04 -0400963 }
964}
965
966void
967ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk)
968{
969 if (!dht())
970 return;
971 dht()->listen<PeerConnectionRequest>(
972 dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()),
973 [w = weak()](PeerConnectionRequest&& req) {
974 auto shared = w.lock();
975 if (!shared)
976 return false;
977 if (shared->isMessageTreated(to_hex_string(req.id))) {
978 // Message already treated. Just ignore
979 return true;
980 }
981 if (req.isAnswer) {
982 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400983 shared->config_->logger->debug("[device {}] Received request answer", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400984 } else {
985 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -0400986 shared->config_->logger->debug("[device {}] Received request", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -0400987 }
988 if (req.isAnswer) {
989 shared->onPeerResponse(req);
990 } else {
991 // Async certificate checking
Sébastien Blin34086512023-07-25 09:52:14 -0400992 shared->findCertificate(
Adrien Béraud612b55b2023-05-29 10:42:04 -0400993 req.from,
994 [w, req = std::move(req)](
995 const std::shared_ptr<dht::crypto::Certificate>& cert) mutable {
996 auto shared = w.lock();
997 if (!shared)
998 return;
999 dht::InfoHash peer_h;
1000 if (foundPeerDevice(cert, peer_h, shared->config_->logger)) {
1001#if TARGET_OS_IOS
1002 if (shared->iOSConnectedCb_(req.connType, peer_h))
1003 return;
1004#endif
1005 shared->onDhtPeerRequest(req, cert);
1006 } else {
1007 if (shared->config_->logger)
1008 shared->config_->logger->warn(
Adrien Béraud23852462023-07-22 01:46:27 -04001009 "[device {}] Received request from untrusted peer",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001010 req.owner->getLongId());
1011 }
1012 });
1013 }
1014
1015 return true;
1016 },
1017 dht::Value::UserTypeFilter("peer_request"));
1018}
1019
1020void
1021ConnectionManager::Impl::onTlsNegotiationDone(bool ok,
1022 const DeviceId& deviceId,
1023 const dht::Value::Id& vid,
1024 const std::string& name)
1025{
1026 if (isDestroying_)
1027 return;
1028 // Note: only handle pendingCallbacks here for TLS initied by connectDevice()
1029 // Note: if not initied by connectDevice() the channel name will be empty (because no channel
1030 // asked yet)
1031 auto isDhtRequest = name.empty();
1032 if (!ok) {
1033 if (isDhtRequest) {
1034 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001035 config_->logger->error("[device {}] TLS connection failure - Initied by DHT request. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001036 deviceId,
1037 name,
1038 vid);
1039 if (connReadyCb_)
1040 connReadyCb_(deviceId, "", nullptr);
1041 } else {
1042 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001043 config_->logger->error("[device {}] TLS connection failure - Initied by connectDevice. channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001044 deviceId,
1045 name,
1046 vid);
Adrien Béraud665294f2023-06-13 18:09:11 -04001047 executePendingOperations(deviceId, vid, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001048 }
1049 } else {
1050 // The socket is ready, store it
1051 if (isDhtRequest) {
1052 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001053 config_->logger->debug("[device {}] Connection is ready - Initied by DHT request. Vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001054 deviceId,
1055 vid);
1056 } else {
1057 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001058 config_->logger->debug("[device {}] Connection is ready - Initied by connectDevice(). channel: {} - vid: {}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001059 deviceId,
1060 name,
1061 vid);
1062 }
1063
1064 auto info = getInfo(deviceId, vid);
1065 addNewMultiplexedSocket({deviceId, vid}, info);
1066 // Finally, open the channel and launch pending callbacks
1067 if (info->socket_) {
1068 // Note: do not remove pending there it's done in sendChannelRequest
Adrien Béraud665294f2023-06-13 18:09:11 -04001069 for (const auto& [id, name] : getPendingIds(deviceId)) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001070 if (config_->logger)
Adrien Béraude5f25062023-07-25 13:16:13 -04001071 config_->logger->debug("[device {}] Send request on TLS socket for channel {}",
Adrien Béraud23852462023-07-22 01:46:27 -04001072 deviceId, name);
Adrien Béraud665294f2023-06-13 18:09:11 -04001073 sendChannelRequest(info->socket_, name, deviceId, id);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001074 }
1075 }
1076 }
1077}
1078
1079void
1080ConnectionManager::Impl::answerTo(IceTransport& ice,
1081 const dht::Value::Id& id,
1082 const std::shared_ptr<dht::crypto::PublicKey>& from)
1083{
1084 // NOTE: This is a shortest version of a real SDP message to save some bits
1085 auto iceAttributes = ice.getLocalAttributes();
1086 std::ostringstream icemsg;
1087 icemsg << iceAttributes.ufrag << "\n";
1088 icemsg << iceAttributes.pwd << "\n";
1089 for (const auto& addr : ice.getLocalCandidates(1)) {
1090 icemsg << addr << "\n";
1091 }
1092
1093 // Send PeerConnection response
1094 PeerConnectionRequest val;
1095 val.id = id;
1096 val.ice_msg = icemsg.str();
1097 val.isAnswer = true;
1098 auto value = std::make_shared<dht::Value>(std::move(val));
1099 value->user_type = "peer_request";
1100
1101 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001102 config_->logger->debug("[device {}] Connection accepted, DHT reply", from->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001103 dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix
1104 + from->getId().toString()),
1105 from,
1106 value,
1107 [from,l=config_->logger](bool ok) {
1108 if (l)
Adrien Béraud23852462023-07-22 01:46:27 -04001109 l->debug("[device {}] Answer to connection request: put encrypted {:s}",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001110 from->getLongId(),
1111 (ok ? "ok" : "failed"));
1112 });
1113}
1114
1115bool
1116ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req)
1117{
1118 auto deviceId = req.owner->getLongId();
1119 auto info = getInfo(deviceId, req.id);
1120 if (!info)
1121 return false;
1122
1123 std::unique_lock<std::mutex> lk {info->mutex_};
1124 auto& ice = info->ice_;
1125 if (!ice) {
1126 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001127 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001128 if (connReadyCb_)
1129 connReadyCb_(deviceId, "", nullptr);
1130 return false;
1131 }
1132
1133 auto sdp = ice->parseIceCandidates(req.ice_msg);
1134 answerTo(*ice, req.id, req.owner);
1135 if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) {
1136 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001137 config_->logger->error("[device {}] Start ICE failed", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001138 ice = nullptr;
1139 if (connReadyCb_)
1140 connReadyCb_(deviceId, "", nullptr);
1141 return false;
1142 }
1143 return true;
1144}
1145
1146bool
1147ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req)
1148{
1149 auto deviceId = req.owner->getLongId();
1150 auto info = getInfo(deviceId, req.id);
1151 if (!info)
1152 return false;
1153
1154 std::unique_lock<std::mutex> lk {info->mutex_};
1155 auto& ice = info->ice_;
1156 if (!ice) {
1157 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001158 config_->logger->error("[device {}] No ICE detected", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001159 return false;
1160 }
1161
1162 // Build socket
1163 auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>(
1164 std::move(ice)),
1165 false);
1166
1167 // init TLS session
1168 auto ph = req.from;
1169 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001170 config_->logger->debug("[device {}] Start TLS session - Initied by DHT request. vid: {}",
1171 deviceId,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001172 req.id);
1173 info->tls_ = std::make_unique<TlsSocketEndpoint>(
1174 std::move(endpoint),
1175 certStore(),
Adrien Béraud3f93ddf2023-07-21 14:46:22 -04001176 config_->ioContext,
Adrien Béraud612b55b2023-05-29 10:42:04 -04001177 identity(),
1178 dhParams(),
Adrien Béraud9efbd442023-08-27 12:38:07 -04001179 [ph, deviceId, w=weak(), l=config_->logger](const dht::crypto::Certificate& cert) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001180 auto shared = w.lock();
1181 if (!shared)
1182 return false;
Adrien Béraud9efbd442023-08-27 12:38:07 -04001183 if (cert.getPublicKey().getId() != ph
1184 || deviceId != cert.getPublicKey().getLongId()) {
1185 if (l) l->warn("[device {}] TLS certificate with ID {} doesn't match the DHT request.",
1186 deviceId,
1187 cert.getPublicKey().getLongId());
1188 return false;
1189 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001190 auto crt = shared->certStore().getCertificate(cert.getLongId().toString());
1191 if (!crt)
1192 return false;
1193 return crt->getPacked() == cert.getPacked();
1194 });
1195
1196 info->tls_->setOnReady(
1197 [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) {
1198 if (auto shared = w.lock())
1199 shared->onTlsNegotiationDone(ok, deviceId, vid);
1200 });
1201 return true;
1202}
1203
1204void
1205ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req,
1206 const std::shared_ptr<dht::crypto::Certificate>& /*cert*/)
1207{
1208 auto deviceId = req.owner->getLongId();
1209 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001210 config_->logger->debug("[device {}] New connection request", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001211 if (!iceReqCb_ || !iceReqCb_(deviceId)) {
1212 if (config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001213 config_->logger->debug("[device {}] Refusing connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001214 return;
1215 }
1216
1217 // Because the connection is accepted, create an ICE socket.
1218 getIceOptions([w = weak(), req, deviceId](auto&& ice_config) {
1219 auto shared = w.lock();
1220 if (!shared)
1221 return;
1222 // Note: used when the ice negotiation fails to erase
1223 // all stored structures.
1224 auto eraseInfo = [w, id = req.id, deviceId] {
1225 if (auto shared = w.lock()) {
1226 // If no new socket is specified, we don't try to generate a new socket
Adrien Béraud665294f2023-06-13 18:09:11 -04001227 shared->executePendingOperations(deviceId, id, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001228 if (shared->connReadyCb_)
1229 shared->connReadyCb_(deviceId, "", nullptr);
1230 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1231 shared->infos_.erase({deviceId, id});
1232 }
1233 };
1234
1235 ice_config.tcpEnable = true;
1236 ice_config.onInitDone = [w, req, eraseInfo](bool ok) {
1237 auto shared = w.lock();
1238 if (!shared)
1239 return;
1240 if (!ok) {
1241 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001242 shared->config_->logger->error("[device {}] Cannot initialize ICE session.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001243 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1244 return;
1245 }
1246
1247 dht::ThreadPool::io().run(
1248 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1249 auto shared = w.lock();
1250 if (!shared)
1251 return;
1252 if (!shared->onRequestStartIce(req))
1253 eraseInfo();
1254 });
1255 };
1256
1257 ice_config.onNegoDone = [w, req, eraseInfo](bool ok) {
1258 auto shared = w.lock();
1259 if (!shared)
1260 return;
1261 if (!ok) {
1262 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001263 shared->config_->logger->error("[device {}] ICE negotiation failed.", req.owner->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001264 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1265 return;
1266 }
1267
1268 dht::ThreadPool::io().run(
1269 [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] {
1270 if (auto shared = w.lock())
1271 if (!shared->onRequestOnNegoDone(req))
1272 eraseInfo();
1273 });
1274 };
1275
1276 // Negotiate a new ICE socket
1277 auto info = std::make_shared<ConnectionInfo>();
1278 {
1279 std::lock_guard<std::mutex> lk(shared->infosMtx_);
1280 shared->infos_[{deviceId, req.id}] = info;
1281 }
1282 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001283 shared->config_->logger->debug("[device {}] Accepting connection", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001284 std::unique_lock<std::mutex> lk {info->mutex_};
1285 ice_config.streamsCount = 1;
1286 ice_config.compCountPerStream = 1; // TCP
1287 ice_config.master = true;
Sébastien Blin34086512023-07-25 09:52:14 -04001288 info->ice_ = shared->config_->factory->createUTransport("");
Adrien Béraud612b55b2023-05-29 10:42:04 -04001289 if (not info->ice_) {
1290 if (shared->config_->logger)
Adrien Béraud23852462023-07-22 01:46:27 -04001291 shared->config_->logger->error("[device {}] Cannot initialize ICE session", deviceId);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001292 eraseInfo();
1293 return;
1294 }
1295 // We need to detect any shutdown if the ice session is destroyed before going to the TLS session;
1296 info->ice_->setOnShutdown([eraseInfo]() {
1297 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1298 });
Adrien Béraud4cda2d72023-06-01 15:44:43 -04001299 try {
1300 info->ice_->initIceInstance(ice_config);
1301 } catch (const std::exception& e) {
1302 if (shared->config_->logger)
1303 shared->config_->logger->error("{}", e.what());
1304 dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); });
1305 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001306 });
1307}
1308
1309void
1310ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info)
1311{
Adrien Béraud5636f7c2023-09-14 14:34:57 -04001312 info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_), config_->logger);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001313 info->socket_->setOnReady(
1314 [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) {
1315 if (auto sthis = w.lock())
1316 if (sthis->connReadyCb_)
1317 sthis->connReadyCb_(deviceId, socket->name(), socket);
1318 });
1319 info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer,
1320 const uint16_t&,
1321 const std::string& name) {
1322 if (auto sthis = w.lock())
1323 if (sthis->channelReqCb_)
1324 return sthis->channelReqCb_(peer, name);
1325 return false;
1326 });
1327 info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() {
1328 // Cancel current outgoing connections
1329 dht::ThreadPool::io().run([w, deviceId, vid] {
1330 auto sthis = w.lock();
1331 if (!sthis)
1332 return;
1333
1334 std::set<CallbackId> ids;
1335 if (auto info = sthis->getInfo(deviceId, vid)) {
1336 std::lock_guard<std::mutex> lk(info->mutex_);
1337 if (info->socket_) {
1338 ids = std::move(info->cbIds_);
1339 info->socket_->shutdown();
1340 }
1341 }
1342 for (const auto& cbId : ids)
Adrien Béraud665294f2023-06-13 18:09:11 -04001343 sthis->executePendingOperations(cbId.first, cbId.second, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001344
1345 std::lock_guard<std::mutex> lk(sthis->infosMtx_);
1346 sthis->infos_.erase({deviceId, vid});
1347 });
1348 });
1349}
1350
1351const std::shared_future<tls::DhParams>
1352ConnectionManager::Impl::dhParams() const
1353{
1354 return dht::ThreadPool::computation().get<tls::DhParams>(
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001355 std::bind(tls::DhParams::loadDhParams, config_->cachePath / "dhParams"));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001356}
1357
1358template<typename ID = dht::Value::Id>
1359std::set<ID, std::less<>>
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001360loadIdList(const std::filesystem::path& path)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001361{
1362 std::set<ID, std::less<>> ids;
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001363 std::ifstream file(path);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001364 if (!file.is_open()) {
1365 //JAMI_DBG("Could not load %s", path.c_str());
1366 return ids;
1367 }
1368 std::string line;
1369 while (std::getline(file, line)) {
1370 if constexpr (std::is_same<ID, std::string>::value) {
1371 ids.emplace(std::move(line));
1372 } else if constexpr (std::is_integral<ID>::value) {
1373 ID vid;
1374 if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16);
1375 ec == std::errc()) {
1376 ids.emplace(vid);
1377 }
1378 }
1379 }
1380 return ids;
1381}
1382
1383template<typename List = std::set<dht::Value::Id>>
1384void
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001385saveIdList(const std::filesystem::path& path, const List& ids)
Adrien Béraud612b55b2023-05-29 10:42:04 -04001386{
Adrien Béraud1299a0d2023-09-19 15:03:28 -04001387 std::ofstream file(path, std::ios::trunc | std::ios::binary);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001388 if (!file.is_open()) {
1389 //JAMI_ERR("Could not save to %s", path.c_str());
1390 return;
1391 }
1392 for (auto& c : ids)
1393 file << std::hex << c << "\n";
1394}
1395
1396void
1397ConnectionManager::Impl::loadTreatedMessages()
1398{
1399 std::lock_guard<std::mutex> lock(messageMutex_);
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001400 auto path = config_->cachePath / "treatedMessages";
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001401 treatedMessages_ = loadIdList<std::string>(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001402 if (treatedMessages_.empty()) {
Aline Gondim Santos406c0f42023-09-13 12:10:23 -03001403 auto messages = loadIdList(path.string());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001404 for (const auto& m : messages)
1405 treatedMessages_.emplace(to_hex_string(m));
1406 }
1407}
1408
1409void
1410ConnectionManager::Impl::saveTreatedMessages() const
1411{
1412 dht::ThreadPool::io().run([w = weak()]() {
1413 if (auto sthis = w.lock()) {
1414 auto& this_ = *sthis;
1415 std::lock_guard<std::mutex> lock(this_.messageMutex_);
1416 fileutils::check_dir(this_.config_->cachePath.c_str());
Adrien Béraud2a4e73d2023-08-27 12:53:55 -04001417 saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath / "treatedMessages",
Adrien Béraud612b55b2023-05-29 10:42:04 -04001418 this_.treatedMessages_);
1419 }
1420 });
1421}
1422
1423bool
1424ConnectionManager::Impl::isMessageTreated(std::string_view id)
1425{
1426 std::lock_guard<std::mutex> lock(messageMutex_);
1427 auto res = treatedMessages_.emplace(id);
1428 if (res.second) {
1429 saveTreatedMessages();
1430 return false;
1431 }
1432 return true;
1433}
1434
1435/**
1436 * returns whether or not UPnP is enabled and active_
1437 * ie: if it is able to make port mappings
1438 */
1439bool
1440ConnectionManager::Impl::getUPnPActive() const
1441{
1442 return config_->getUPnPActive();
1443}
1444
1445IpAddr
1446ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const
1447{
1448 if (family == AF_INET)
1449 return publishedIp_[0];
1450 if (family == AF_INET6)
1451 return publishedIp_[1];
1452
1453 assert(family == AF_UNSPEC);
1454
1455 // If family is not set, prefere IPv4 if available. It's more
1456 // likely to succeed behind NAT.
1457 if (publishedIp_[0])
1458 return publishedIp_[0];
1459 if (publishedIp_[1])
1460 return publishedIp_[1];
1461 return {};
1462}
1463
1464void
1465ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr)
1466{
1467 if (ip_addr.getFamily() == AF_INET) {
1468 publishedIp_[0] = ip_addr;
1469 } else {
1470 publishedIp_[1] = ip_addr;
1471 }
1472}
1473
1474void
1475ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb)
1476{
1477 dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) {
1478 bool hasIpv4 {false}, hasIpv6 {false};
1479 for (auto& result : results) {
1480 auto family = result.getFamily();
1481 if (family == AF_INET) {
1482 if (not hasIpv4) {
1483 hasIpv4 = true;
1484 if (config_->logger)
1485 config_->logger->debug("Store DHT public IPv4 address: {}", result);
1486 //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str());
1487 setPublishedAddress(*result.get());
1488 if (config_->upnpCtrl) {
1489 config_->upnpCtrl->setPublicAddress(*result.get());
1490 }
1491 }
1492 } else if (family == AF_INET6) {
1493 if (not hasIpv6) {
1494 hasIpv6 = true;
1495 if (config_->logger)
1496 config_->logger->debug("Store DHT public IPv6 address: {}", result);
1497 setPublishedAddress(*result.get());
1498 }
1499 }
1500 if (hasIpv4 and hasIpv6)
1501 break;
1502 }
1503 if (cb)
1504 cb();
1505 });
1506}
1507
1508void
1509ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1510{
1511 storeActiveIpAddress([this, cb = std::move(cb)] {
1512 IceTransportOptions opts = ConnectionManager::Impl::getIceOptions();
1513 auto publishedAddr = getPublishedIpAddress();
1514
1515 if (publishedAddr) {
1516 auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(),
1517 publishedAddr.getFamily());
1518 if (interfaceAddr) {
1519 opts.accountLocalAddr = interfaceAddr;
1520 opts.accountPublicAddr = publishedAddr;
1521 }
1522 }
1523 if (cb)
1524 cb(std::move(opts));
1525 });
1526}
1527
1528IceTransportOptions
1529ConnectionManager::Impl::getIceOptions() const noexcept
1530{
1531 IceTransportOptions opts;
Sébastien Blin34086512023-07-25 09:52:14 -04001532 opts.factory = config_->factory;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001533 opts.upnpEnable = getUPnPActive();
Adrien Béraud7b869d92023-08-21 09:02:35 -04001534 opts.upnpContext = config_->upnpCtrl ? config_->upnpCtrl->upnpContext() : nullptr;
Adrien Béraud612b55b2023-05-29 10:42:04 -04001535
1536 if (config_->stunEnabled)
1537 opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer));
1538 if (config_->turnEnabled) {
Sébastien Blin84bf4182023-07-21 14:18:39 -04001539 if (config_->turnCache) {
1540 auto turnAddr = config_->turnCache->getResolvedTurn();
1541 if (turnAddr != std::nullopt) {
1542 opts.turnServers.emplace_back(TurnServerInfo()
1543 .setUri(turnAddr->toString())
1544 .setUsername(config_->turnServerUserName)
1545 .setPassword(config_->turnServerPwd)
1546 .setRealm(config_->turnServerRealm));
1547 }
1548 } else {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001549 opts.turnServers.emplace_back(TurnServerInfo()
Sébastien Blin84bf4182023-07-21 14:18:39 -04001550 .setUri(config_->turnServer)
1551 .setUsername(config_->turnServerUserName)
1552 .setPassword(config_->turnServerPwd)
1553 .setRealm(config_->turnServerRealm));
Adrien Béraud612b55b2023-05-29 10:42:04 -04001554 }
1555 // NOTE: first test with ipv6 turn was not concluant and resulted in multiple
1556 // co issues. So this needs some debug. for now just disable
1557 // if (cacheTurnV6 && *cacheTurnV6) {
1558 // opts.turnServers.emplace_back(TurnServerInfo()
1559 // .setUri(cacheTurnV6->toString(true))
1560 // .setUsername(turnServerUserName_)
1561 // .setPassword(turnServerPwd_)
1562 // .setRealm(turnServerRealm_));
1563 //}
Adrien Béraud612b55b2023-05-29 10:42:04 -04001564 }
1565 return opts;
1566}
1567
1568bool
1569ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt,
1570 dht::InfoHash& account_id,
1571 const std::shared_ptr<Logger>& logger)
1572{
1573 if (not crt)
1574 return false;
1575
1576 auto top_issuer = crt;
1577 while (top_issuer->issuer)
1578 top_issuer = top_issuer->issuer;
1579
1580 // Device certificate can't be self-signed
Adrien Béraudc631a832023-07-26 22:19:00 -04001581 if (top_issuer == crt) {
Adrien Béraud612b55b2023-05-29 10:42:04 -04001582 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001583 logger->warn("Found invalid (self-signed) peer device: {}", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001584 return false;
Adrien Béraudc631a832023-07-26 22:19:00 -04001585 }
Adrien Béraud612b55b2023-05-29 10:42:04 -04001586
1587 // Check peer certificate chain
1588 // Trust store with top issuer as the only CA
1589 dht::crypto::TrustList peer_trust;
1590 peer_trust.add(*top_issuer);
1591 if (not peer_trust.verify(*crt)) {
1592 if (logger)
1593 logger->warn("Found invalid peer device: {}", crt->getLongId());
1594 return false;
1595 }
1596
1597 // Check cached OCSP response
1598 if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) {
1599 if (logger)
Adrien Béraud8b831a82023-07-21 14:13:06 -04001600 logger->error("Certificate {} is disabled by cached OCSP response", crt->getLongId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001601 return false;
1602 }
1603
Adrien Béraudc631a832023-07-26 22:19:00 -04001604 account_id = crt->issuer->getId();
1605 if (logger)
1606 logger->warn("Found peer device: {} account:{} CA:{}",
1607 crt->getLongId(),
1608 account_id,
1609 top_issuer->getId());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001610 return true;
1611}
1612
1613bool
1614ConnectionManager::Impl::findCertificate(
1615 const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1616{
1617 if (auto cert = certStore().getCertificate(id.toString())) {
1618 if (cb)
1619 cb(cert);
1620 } else if (cb)
1621 cb(nullptr);
1622 return true;
1623}
1624
Sébastien Blin34086512023-07-25 09:52:14 -04001625bool
1626ConnectionManager::Impl::findCertificate(const dht::InfoHash& h,
1627 std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb)
1628{
1629 if (auto cert = certStore().getCertificate(h.toString())) {
1630 if (cb)
1631 cb(cert);
1632 } else {
1633 dht()->findCertificate(h,
1634 [cb = std::move(cb), this](
1635 const std::shared_ptr<dht::crypto::Certificate>& crt) {
1636 if (crt)
1637 certStore().pinCertificate(crt);
1638 if (cb)
1639 cb(crt);
1640 });
1641 }
1642 return true;
1643}
1644
Amna81221ad2023-09-14 17:33:26 -04001645std::shared_ptr<ConnectionManager::Config>
1646buildDefaultConfig(dht::crypto::Identity id){
1647 auto conf = std::make_shared<ConnectionManager::Config>();
1648 conf->id = std::move(id);
1649 return conf;
1650}
1651
Adrien Béraud612b55b2023-05-29 10:42:04 -04001652ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_)
1653 : pimpl_ {std::make_shared<Impl>(config_)}
1654{}
1655
Amna81221ad2023-09-14 17:33:26 -04001656ConnectionManager::ConnectionManager(dht::crypto::Identity id)
1657 : ConnectionManager {buildDefaultConfig(id)}
1658{}
1659
Adrien Béraud612b55b2023-05-29 10:42:04 -04001660ConnectionManager::~ConnectionManager()
1661{
1662 if (pimpl_)
1663 pimpl_->shutdown();
1664}
1665
1666void
1667ConnectionManager::connectDevice(const DeviceId& deviceId,
1668 const std::string& name,
1669 ConnectCallback cb,
1670 bool noNewSocket,
1671 bool forceNewSocket,
1672 const std::string& connType)
1673{
1674 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1675}
1676
1677void
Amna0cf544d2023-07-25 14:25:09 -04001678ConnectionManager::connectDevice(const dht::InfoHash& deviceId,
1679 const std::string& name,
1680 ConnectCallbackLegacy cb,
1681 bool noNewSocket,
1682 bool forceNewSocket,
1683 const std::string& connType)
1684{
1685 pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1686}
1687
1688
1689void
Adrien Béraud612b55b2023-05-29 10:42:04 -04001690ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert,
1691 const std::string& name,
1692 ConnectCallback cb,
1693 bool noNewSocket,
1694 bool forceNewSocket,
1695 const std::string& connType)
1696{
1697 pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType);
1698}
1699
1700bool
1701ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const
1702{
Adrien Béraud665294f2023-06-13 18:09:11 -04001703 auto pending = pimpl_->getPendingIds(deviceId);
1704 return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; })
Adrien Béraud612b55b2023-05-29 10:42:04 -04001705 != pending.end();
1706}
1707
1708void
1709ConnectionManager::closeConnectionsWith(const std::string& peerUri)
1710{
1711 std::vector<std::shared_ptr<ConnectionInfo>> connInfos;
1712 std::set<DeviceId> peersDevices;
1713 {
1714 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1715 for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) {
1716 auto const& [key, value] = *iter;
Adrien Béraudafa8e282023-09-24 12:53:20 -04001717 std::unique_lock<std::mutex> lkv {value->mutex_};
Adrien Béraud612b55b2023-05-29 10:42:04 -04001718 auto deviceId = key.first;
Adrien Béraudafa8e282023-09-24 12:53:20 -04001719 auto tls = value->tls_ ? value->tls_.get() : (value->socket_ ? value->socket_->endpoint() : nullptr);
1720 auto cert = tls ? tls->peerCertificate() : nullptr;
1721 if (not cert)
1722 cert = pimpl_->certStore().getCertificate(deviceId.toString());
Adrien Béraud612b55b2023-05-29 10:42:04 -04001723 if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) {
1724 connInfos.emplace_back(value);
1725 peersDevices.emplace(deviceId);
Adrien Béraudafa8e282023-09-24 12:53:20 -04001726 lkv.unlock();
Adrien Béraud612b55b2023-05-29 10:42:04 -04001727 iter = pimpl_->infos_.erase(iter);
1728 } else {
1729 iter++;
1730 }
1731 }
1732 }
1733 // Stop connections to all peers devices
1734 for (const auto& deviceId : peersDevices) {
Adrien Béraud665294f2023-06-13 18:09:11 -04001735 pimpl_->executePendingOperations(deviceId, 0, nullptr);
Adrien Béraud612b55b2023-05-29 10:42:04 -04001736 // This will close the TLS Session
1737 pimpl_->removeUnusedConnections(deviceId);
1738 }
1739 for (auto& info : connInfos) {
1740 if (info->socket_)
1741 info->socket_->shutdown();
1742 if (info->waitForAnswer_)
1743 info->waitForAnswer_->cancel();
1744 if (info->ice_) {
1745 std::unique_lock<std::mutex> lk {info->mutex_};
1746 dht::ThreadPool::io().run(
1747 [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {});
1748 }
1749 }
1750}
1751
1752void
1753ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk)
1754{
1755 pimpl_->onDhtConnected(devicePk);
1756}
1757
1758void
1759ConnectionManager::onICERequest(onICERequestCallback&& cb)
1760{
1761 pimpl_->iceReqCb_ = std::move(cb);
1762}
1763
1764void
1765ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb)
1766{
1767 pimpl_->channelReqCb_ = std::move(cb);
1768}
1769
1770void
1771ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb)
1772{
1773 pimpl_->connReadyCb_ = std::move(cb);
1774}
1775
1776void
1777ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb)
1778{
1779 pimpl_->iOSConnectedCb_ = std::move(cb);
1780}
1781
1782std::size_t
1783ConnectionManager::activeSockets() const
1784{
1785 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1786 return pimpl_->infos_.size();
1787}
1788
1789void
1790ConnectionManager::monitor() const
1791{
1792 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1793 auto logger = pimpl_->config_->logger;
1794 if (!logger)
1795 return;
1796 logger->debug("ConnectionManager current status:");
1797 for (const auto& [_, ci] : pimpl_->infos_) {
1798 if (ci->socket_)
1799 ci->socket_->monitor();
1800 }
1801 logger->debug("ConnectionManager end status.");
1802}
1803
1804void
1805ConnectionManager::connectivityChanged()
1806{
1807 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1808 for (const auto& [_, ci] : pimpl_->infos_) {
1809 if (ci->socket_)
1810 ci->socket_->sendBeacon();
1811 }
1812}
1813
1814void
1815ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept
1816{
1817 return pimpl_->getIceOptions(std::move(cb));
1818}
1819
1820IceTransportOptions
1821ConnectionManager::getIceOptions() const noexcept
1822{
1823 return pimpl_->getIceOptions();
1824}
1825
1826IpAddr
1827ConnectionManager::getPublishedIpAddress(uint16_t family) const
1828{
1829 return pimpl_->getPublishedIpAddress(family);
1830}
1831
1832void
1833ConnectionManager::setPublishedAddress(const IpAddr& ip_addr)
1834{
1835 return pimpl_->setPublishedAddress(ip_addr);
1836}
1837
1838void
1839ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb)
1840{
1841 return pimpl_->storeActiveIpAddress(std::move(cb));
1842}
1843
1844std::shared_ptr<ConnectionManager::Config>
1845ConnectionManager::getConfig()
1846{
1847 return pimpl_->config_;
1848}
1849
Amna31791e52023-08-03 12:40:57 -04001850std::vector<std::map<std::string, std::string>>
1851ConnectionManager::getConnectionList(const DeviceId& device) const
1852{
1853 std::vector<std::map<std::string, std::string>> connectionsList;
1854 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1855
1856 for (const auto& [key, ci] : pimpl_->infos_) {
1857 if (device && key.first != device)
1858 continue;
1859 std::map<std::string, std::string> connectionInfo;
1860 connectionInfo["id"] = callbackIdToString(key.first, key.second);
Amna82420202023-08-15 16:27:18 -04001861 connectionInfo["device"] = key.first.toString();
Amna6c999d82023-08-15 15:19:41 -04001862 if (ci->tls_) {
1863 if (auto cert = ci->tls_->peerCertificate()) {
1864 connectionInfo["peer"] = cert->issuer->getId().toString();
1865 }
Amna31791e52023-08-03 12:40:57 -04001866 }
1867 if (ci->socket_) {
1868 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connected));
1869 } else if (ci->tls_) {
1870 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::TLS));
1871 } else if(ci->ice_)
1872 {
1873 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::ICE));
1874 }
1875 if (ci->tls_) {
1876 std::string remoteAddress = ci->tls_->getRemoteAddress();
1877 std::string remoteAddressIp = remoteAddress.substr(0, remoteAddress.find(':'));
1878 std::string remoteAddressPort = remoteAddress.substr(remoteAddress.find(':') + 1);
1879 connectionInfo["remoteAdress"] = remoteAddressIp;
1880 connectionInfo["remotePort"] = remoteAddressPort;
1881 }
1882 connectionsList.emplace_back(std::move(connectionInfo));
1883 }
1884
1885 if (device) {
1886 auto it = pimpl_->pendingOperations_.find(device);
1887 if (it != pimpl_->pendingOperations_.end()) {
1888 const auto& po = it->second;
1889 for (const auto& [vid, ci] : po.connecting) {
1890 std::map<std::string, std::string> connectionInfo;
1891 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001892 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1893 connectionsList.emplace_back(std::move(connectionInfo));
1894 }
1895
1896 for (const auto& [vid, ci] : po.waiting) {
1897 std::map<std::string, std::string> connectionInfo;
1898 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001899 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1900 connectionsList.emplace_back(std::move(connectionInfo));
1901 }
1902 }
1903 }
1904 else {
1905 for (const auto& [key, po] : pimpl_->pendingOperations_) {
1906 for (const auto& [vid, ci] : po.connecting) {
1907 std::map<std::string, std::string> connectionInfo;
1908 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001909 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Connecting));
1910 connectionsList.emplace_back(std::move(connectionInfo));
1911 }
1912
1913 for (const auto& [vid, ci] : po.waiting) {
1914 std::map<std::string, std::string> connectionInfo;
1915 connectionInfo["id"] = callbackIdToString(device, vid);
Amna31791e52023-08-03 12:40:57 -04001916 connectionInfo["status"] = std::to_string(static_cast<int>(ConnectionStatus::Waiting));
1917 connectionsList.emplace_back(std::move(connectionInfo));
1918 }
1919 }
1920 }
1921 return connectionsList;
1922}
1923
1924std::vector<std::map<std::string, std::string>>
1925ConnectionManager::getChannelList(const std::string& connectionId) const
1926{
1927 std::lock_guard<std::mutex> lk(pimpl_->infosMtx_);
1928 CallbackId cbid = parseCallbackId(connectionId);
1929 if (pimpl_->infos_.count(cbid) > 0) {
1930 return pimpl_->infos_[cbid]->socket_->getChannelList();
1931 } else {
1932 return {};
1933 }
1934}
1935
Sébastien Blin464bdff2023-07-19 08:02:53 -04001936} // namespace dhtnet