| /* |
| * Copyright (C) 2019-2023 Savoir-faire Linux Inc. |
| * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com> |
| * |
| * This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation; either version 3 of the License, or |
| * (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program. If not, see <https://www.gnu.org/licenses/>. |
| */ |
| #include "connectionmanager.h" |
| #include "peer_connection.h" |
| #include "upnp/upnp_control.h" |
| #include "certstore.h" |
| #include "fileutils.h" |
| #include "sip_utils.h" |
| #include "string_utils.h" |
| |
| #include <opendht/crypto.h> |
| #include <opendht/thread_pool.h> |
| #include <opendht/value.h> |
| #include <asio.hpp> |
| |
| #include <algorithm> |
| #include <mutex> |
| #include <map> |
| #include <condition_variable> |
| #include <set> |
| #include <charconv> |
| |
| namespace jami { |
| static constexpr std::chrono::seconds DHT_MSG_TIMEOUT {30}; |
| static constexpr uint64_t ID_MAX_VAL = 9007199254740992; |
| |
| using ValueIdDist = std::uniform_int_distribution<dht::Value::Id>; |
| using CallbackId = std::pair<jami::DeviceId, dht::Value::Id>; |
| |
| struct ConnectionInfo |
| { |
| ~ConnectionInfo() |
| { |
| if (socket_) |
| socket_->join(); |
| } |
| |
| std::mutex mutex_ {}; |
| bool responseReceived_ {false}; |
| PeerConnectionRequest response_ {}; |
| std::unique_ptr<IceTransport> ice_ {nullptr}; |
| // Used to store currently non ready TLS Socket |
| std::unique_ptr<TlsSocketEndpoint> tls_ {nullptr}; |
| std::shared_ptr<MultiplexedSocket> socket_ {}; |
| std::set<CallbackId> cbIds_ {}; |
| |
| std::function<void(bool)> onConnected_; |
| std::unique_ptr<asio::steady_timer> waitForAnswer_ {}; |
| }; |
| |
| /** |
| * returns whether or not UPnP is enabled and active_ |
| * ie: if it is able to make port mappings |
| */ |
| bool |
| ConnectionManager::Config::getUPnPActive() const |
| { |
| if (upnpCtrl) |
| return upnpCtrl->isReady(); |
| return false; |
| } |
| |
| class ConnectionManager::Impl : public std::enable_shared_from_this<ConnectionManager::Impl> |
| { |
| public: |
| explicit Impl(std::shared_ptr<ConnectionManager::Config> config_) |
| : config_ {std::move(config_)} |
| {} |
| ~Impl() {} |
| |
| std::shared_ptr<dht::DhtRunner> dht() { return config_->dht; } |
| const dht::crypto::Identity& identity() const { return config_->id; } |
| |
| void removeUnusedConnections(const DeviceId& deviceId = {}) |
| { |
| std::vector<std::shared_ptr<ConnectionInfo>> unused {}; |
| |
| { |
| std::lock_guard<std::mutex> lk(infosMtx_); |
| for (auto it = infos_.begin(); it != infos_.end();) { |
| auto& [key, info] = *it; |
| if (info && (!deviceId || key.first == deviceId)) { |
| unused.emplace_back(std::move(info)); |
| it = infos_.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| } |
| for (auto& info: unused) { |
| if (info->tls_) |
| info->tls_->shutdown(); |
| if (info->socket_) |
| info->socket_->shutdown(); |
| if (info->waitForAnswer_) |
| info->waitForAnswer_->cancel(); |
| } |
| if (!unused.empty()) |
| dht::ThreadPool::io().run([infos = std::move(unused)]() mutable { infos.clear(); }); |
| } |
| |
| void shutdown() |
| { |
| if (isDestroying_.exchange(true)) |
| return; |
| decltype(pendingOperations_) po; |
| { |
| std::lock_guard<std::mutex> lk(connectCbsMtx_); |
| po = std::move(pendingOperations_); |
| } |
| for (auto& [deviceId, pcbs] : po) { |
| for (auto& [id, pending] : pcbs.connecting) |
| pending.cb(nullptr, deviceId); |
| for (auto& [id, pending] : pcbs.waiting) |
| pending.cb(nullptr, deviceId); |
| } |
| |
| removeUnusedConnections(); |
| } |
| |
| void connectDeviceStartIce(const std::shared_ptr<dht::crypto::PublicKey>& devicePk, |
| const dht::Value::Id& vid, |
| const std::string& connType, |
| std::function<void(bool)> onConnected); |
| void onResponse(const asio::error_code& ec, const DeviceId& deviceId, const dht::Value::Id& vid); |
| bool connectDeviceOnNegoDone(const DeviceId& deviceId, |
| const std::string& name, |
| const dht::Value::Id& vid, |
| const std::shared_ptr<dht::crypto::Certificate>& cert); |
| void connectDevice(const DeviceId& deviceId, |
| const std::string& uri, |
| ConnectCallback cb, |
| bool noNewSocket = false, |
| bool forceNewSocket = false, |
| const std::string& connType = ""); |
| void connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket = false, |
| bool forceNewSocket = false, |
| const std::string& connType = ""); |
| /** |
| * Send a ChannelRequest on the TLS socket. Triggers cb when ready |
| * @param sock socket used to send the request |
| * @param name channel's name |
| * @param vid channel's id |
| * @param deviceId to identify the linked ConnectCallback |
| */ |
| void sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock, |
| const std::string& name, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid); |
| /** |
| * Triggered when a PeerConnectionRequest comes from the DHT |
| */ |
| void answerTo(IceTransport& ice, |
| const dht::Value::Id& id, |
| const std::shared_ptr<dht::crypto::PublicKey>& fromPk); |
| bool onRequestStartIce(const PeerConnectionRequest& req); |
| bool onRequestOnNegoDone(const PeerConnectionRequest& req); |
| void onDhtPeerRequest(const PeerConnectionRequest& req, |
| const std::shared_ptr<dht::crypto::Certificate>& cert); |
| |
| void addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info); |
| void onPeerResponse(const PeerConnectionRequest& req); |
| void onDhtConnected(const dht::crypto::PublicKey& devicePk); |
| |
| const std::shared_future<tls::DhParams> dhParams() const; |
| tls::CertificateStore& certStore() const { return *config_->certStore; } |
| |
| mutable std::mutex messageMutex_ {}; |
| std::set<std::string, std::less<>> treatedMessages_ {}; |
| |
| void loadTreatedMessages(); |
| void saveTreatedMessages() const; |
| |
| /// \return true if the given DHT message identifier has been treated |
| /// \note if message has not been treated yet this method st/ore this id and returns true at |
| /// further calls |
| bool isMessageTreated(std::string_view id); |
| |
| const std::shared_ptr<dht::log::Logger>& logger() const { return config_->logger; } |
| |
| /** |
| * Published IPv4/IPv6 addresses, used only if defined by the user in account |
| * configuration |
| * |
| */ |
| IpAddr publishedIp_[2] {}; |
| |
| // This will be stored in the configuration |
| std::string publishedIpAddress_ {}; |
| |
| /** |
| * Published port, used only if defined by the user |
| */ |
| pj_uint16_t publishedPort_ {sip_utils::DEFAULT_SIP_PORT}; |
| |
| /** |
| * interface name on which this account is bound |
| */ |
| std::string interface_ {"default"}; |
| |
| /** |
| * Get the local interface name on which this account is bound. |
| */ |
| const std::string& getLocalInterface() const { return interface_; } |
| |
| /** |
| * Get the published IP address, fallbacks to NAT if family is unspecified |
| * Prefers the usage of IPv4 if possible. |
| */ |
| IpAddr getPublishedIpAddress(uint16_t family = PF_UNSPEC) const; |
| |
| /** |
| * Set published IP address according to given family |
| */ |
| void setPublishedAddress(const IpAddr& ip_addr); |
| |
| /** |
| * Store the local/public addresses used to register |
| */ |
| void storeActiveIpAddress(std::function<void()>&& cb = {}); |
| |
| /** |
| * Create and return ICE options. |
| */ |
| void getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept; |
| IceTransportOptions getIceOptions() const noexcept; |
| |
| /** |
| * Inform that a potential peer device have been found. |
| * Returns true only if the device certificate is a valid device certificate. |
| * In that case (true is returned) the account_id parameter is set to the peer account ID. |
| */ |
| static bool foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt, |
| dht::InfoHash& account_id, const std::shared_ptr<Logger>& logger); |
| |
| bool findCertificate(const dht::PkId& id, |
| std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb); |
| |
| /** |
| * returns whether or not UPnP is enabled and active |
| * ie: if it is able to make port mappings |
| */ |
| bool getUPnPActive() const; |
| |
| /** |
| * Triggered when a new TLS socket is ready to use |
| * @param ok If succeed |
| * @param deviceId Related device |
| * @param vid vid of the connection request |
| * @param name non empty if TLS was created by connectDevice() |
| */ |
| void onTlsNegotiationDone(bool ok, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid, |
| const std::string& name = ""); |
| |
| std::shared_ptr<ConnectionManager::Config> config_; |
| |
| IceTransportFactory iceFactory_ {}; |
| |
| mutable std::mt19937_64 rand; |
| |
| iOSConnectedCallback iOSConnectedCb_ {}; |
| |
| std::mutex infosMtx_ {}; |
| // Note: Someone can ask multiple sockets, so to avoid any race condition, |
| // each device can have multiple multiplexed sockets. |
| std::map<CallbackId, std::shared_ptr<ConnectionInfo>> infos_ {}; |
| |
| std::shared_ptr<ConnectionInfo> getInfo(const DeviceId& deviceId, const dht::Value::Id& id) |
| { |
| std::lock_guard<std::mutex> lk(infosMtx_); |
| auto it = infos_.find({deviceId, id}); |
| if (it != infos_.end()) |
| return it->second; |
| return {}; |
| } |
| |
| std::shared_ptr<ConnectionInfo> getConnectedInfo(const DeviceId& deviceId) |
| { |
| std::lock_guard<std::mutex> lk(infosMtx_); |
| auto it = std::find_if(infos_.begin(), infos_.end(), [&](const auto& item) { |
| auto& [key, value] = item; |
| return key.first == deviceId && value && value->socket_; |
| }); |
| if (it != infos_.end()) |
| return it->second; |
| return {}; |
| } |
| |
| ChannelRequestCallback channelReqCb_ {}; |
| ConnectionReadyCallback connReadyCb_ {}; |
| onICERequestCallback iceReqCb_ {}; |
| |
| /** |
| * Stores callback from connectDevice |
| * @note: each device needs a vector because several connectDevice can |
| * be done in parallel and we only want one socket |
| */ |
| std::mutex connectCbsMtx_ {}; |
| |
| struct PendingCb |
| { |
| std::string name; |
| ConnectCallback cb; |
| }; |
| struct PendingOperations { |
| std::map<dht::Value::Id, PendingCb> connecting; |
| std::map<dht::Value::Id, PendingCb> waiting; |
| }; |
| |
| std::map<DeviceId, PendingOperations> pendingOperations_ {}; |
| |
| void executePendingOperations(const DeviceId& deviceId, const dht::Value::Id& vid, const std::shared_ptr<ChannelSocket>& sock, bool accepted = true) |
| { |
| std::vector<PendingCb> ret; |
| std::unique_lock<std::mutex> lk(connectCbsMtx_); |
| auto it = pendingOperations_.find(deviceId); |
| if (it == pendingOperations_.end()) |
| return; |
| auto& pendingOperations = it->second; |
| if (vid == 0) { |
| // Extract all pending callbacks |
| for (auto& [vid, cb] : pendingOperations.connecting) |
| ret.emplace_back(std::move(cb)); |
| pendingOperations.connecting.clear(); |
| for (auto& [vid, cb] : pendingOperations.waiting) |
| ret.emplace_back(std::move(cb)); |
| pendingOperations.waiting.clear(); |
| } else if (auto n = pendingOperations.waiting.extract(vid)) { |
| // If it's a waiting operation, just move it |
| ret.emplace_back(std::move(n.mapped())); |
| } else if (auto n = pendingOperations.connecting.extract(vid)) { |
| ret.emplace_back(std::move(n.mapped())); |
| // If sock is nullptr, execute if it's the last connecting operation |
| // If accepted is false, it means that underlying socket is ok, but channel is declined |
| if (!sock && pendingOperations.connecting.empty() && accepted) { |
| for (auto& [vid, cb] : pendingOperations.waiting) |
| ret.emplace_back(std::move(cb)); |
| pendingOperations.waiting.clear(); |
| for (auto& [vid, cb] : pendingOperations.connecting) |
| ret.emplace_back(std::move(cb)); |
| pendingOperations.connecting.clear(); |
| } |
| } |
| if (pendingOperations.waiting.empty() && pendingOperations.connecting.empty()) |
| pendingOperations_.erase(it); |
| lk.unlock(); |
| for (auto& cb : ret) |
| cb.cb(sock, deviceId); |
| } |
| |
| std::map<dht::Value::Id, std::string> getPendingIds(const DeviceId& deviceId, const dht::Value::Id vid = 0) |
| { |
| std::map<dht::Value::Id, std::string> ret; |
| std::lock_guard<std::mutex> lk(connectCbsMtx_); |
| auto it = pendingOperations_.find(deviceId); |
| if (it == pendingOperations_.end()) |
| return ret; |
| auto& pendingOp = it->second; |
| for (const auto& [id, pc]: pendingOp.connecting) { |
| if (vid == 0 || id == vid) |
| ret[id] = pc.name; |
| } |
| for (const auto& [id, pc]: pendingOp.waiting) { |
| if (vid == 0 || id == vid) |
| ret[id] = pc.name; |
| } |
| return ret; |
| } |
| |
| std::shared_ptr<ConnectionManager::Impl> shared() |
| { |
| return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this()); |
| } |
| std::shared_ptr<ConnectionManager::Impl const> shared() const |
| { |
| return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this()); |
| } |
| std::weak_ptr<ConnectionManager::Impl> weak() |
| { |
| return std::static_pointer_cast<ConnectionManager::Impl>(shared_from_this()); |
| } |
| std::weak_ptr<ConnectionManager::Impl const> weak() const |
| { |
| return std::static_pointer_cast<ConnectionManager::Impl const>(shared_from_this()); |
| } |
| |
| std::atomic_bool isDestroying_ {false}; |
| }; |
| |
| void |
| ConnectionManager::Impl::connectDeviceStartIce( |
| const std::shared_ptr<dht::crypto::PublicKey>& devicePk, |
| const dht::Value::Id& vid, |
| const std::string& connType, |
| std::function<void(bool)> onConnected) |
| { |
| auto deviceId = devicePk->getLongId(); |
| auto info = getInfo(deviceId, vid); |
| if (!info) { |
| onConnected(false); |
| return; |
| } |
| |
| std::unique_lock<std::mutex> lk(info->mutex_); |
| auto& ice = info->ice_; |
| |
| if (!ice) { |
| if (config_->logger) |
| config_->logger->error("No ICE detected"); |
| onConnected(false); |
| return; |
| } |
| |
| auto iceAttributes = ice->getLocalAttributes(); |
| std::ostringstream icemsg; |
| icemsg << iceAttributes.ufrag << "\n"; |
| icemsg << iceAttributes.pwd << "\n"; |
| for (const auto& addr : ice->getLocalCandidates(1)) { |
| icemsg << addr << "\n"; |
| if (config_->logger) |
| config_->logger->debug("Added local ICE candidate {}", addr); |
| } |
| |
| // Prepare connection request as a DHT message |
| PeerConnectionRequest val; |
| |
| val.id = vid; /* Random id for the message unicity */ |
| val.ice_msg = icemsg.str(); |
| val.connType = connType; |
| |
| auto value = std::make_shared<dht::Value>(std::move(val)); |
| value->user_type = "peer_request"; |
| |
| // Send connection request through DHT |
| if (config_->logger) |
| config_->logger->debug("Request connection to {}", deviceId); |
| dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix |
| + devicePk->getId().toString()), |
| devicePk, |
| value, |
| [l=config_->logger,deviceId](bool ok) { |
| if (l) |
| l->debug("Sent connection request to {:s}. Put encrypted {:s}", |
| deviceId, |
| (ok ? "ok" : "failed")); |
| }); |
| // Wait for call to onResponse() operated by DHT |
| if (isDestroying_) { |
| onConnected(true); // This avoid to wait new negotiation when destroying |
| return; |
| } |
| |
| info->onConnected_ = std::move(onConnected); |
| info->waitForAnswer_ = std::make_unique<asio::steady_timer>(*config_->ioContext, |
| std::chrono::steady_clock::now() |
| + DHT_MSG_TIMEOUT); |
| info->waitForAnswer_->async_wait( |
| std::bind(&ConnectionManager::Impl::onResponse, this, std::placeholders::_1, deviceId, vid)); |
| } |
| |
| void |
| ConnectionManager::Impl::onResponse(const asio::error_code& ec, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid) |
| { |
| if (ec == asio::error::operation_aborted) |
| return; |
| auto info = getInfo(deviceId, vid); |
| if (!info) |
| return; |
| |
| std::unique_lock<std::mutex> lk(info->mutex_); |
| auto& ice = info->ice_; |
| if (isDestroying_) { |
| info->onConnected_(true); // The destructor can wake a pending wait here. |
| return; |
| } |
| if (!info->responseReceived_) { |
| if (config_->logger) |
| config_->logger->error("no response from DHT to E2E request."); |
| info->onConnected_(false); |
| return; |
| } |
| |
| if (!info->ice_) { |
| info->onConnected_(false); |
| return; |
| } |
| |
| auto sdp = ice->parseIceCandidates(info->response_.ice_msg); |
| |
| if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { |
| if (config_->logger) |
| config_->logger->warn("start ICE failed"); |
| info->onConnected_(false); |
| return; |
| } |
| info->onConnected_(true); |
| } |
| |
| bool |
| ConnectionManager::Impl::connectDeviceOnNegoDone( |
| const DeviceId& deviceId, |
| const std::string& name, |
| const dht::Value::Id& vid, |
| const std::shared_ptr<dht::crypto::Certificate>& cert) |
| { |
| auto info = getInfo(deviceId, vid); |
| if (!info) |
| return false; |
| |
| std::unique_lock<std::mutex> lk {info->mutex_}; |
| if (info->waitForAnswer_) { |
| // Negotiation is done and connected, go to handshake |
| // and avoid any cancellation at this point. |
| info->waitForAnswer_->cancel(); |
| } |
| auto& ice = info->ice_; |
| if (!ice || !ice->isRunning()) { |
| if (config_->logger) |
| config_->logger->error("No ICE detected or not running"); |
| return false; |
| } |
| |
| // Build socket |
| auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( |
| std::move(ice)), |
| true); |
| |
| // Negotiate a TLS session |
| if (config_->logger) |
| config_->logger->debug("Start TLS session - Initied by connectDevice(). Launched by channel: {} - device: {} - vid: {}", name, deviceId, vid); |
| info->tls_ = std::make_unique<TlsSocketEndpoint>(std::move(endpoint), |
| certStore(), |
| identity(), |
| dhParams(), |
| *cert); |
| |
| info->tls_->setOnReady( |
| [w = weak(), deviceId = std::move(deviceId), vid = std::move(vid), name = std::move(name)]( |
| bool ok) { |
| if (auto shared = w.lock()) |
| shared->onTlsNegotiationDone(ok, deviceId, vid, name); |
| }); |
| return true; |
| } |
| |
| void |
| ConnectionManager::Impl::connectDevice(const DeviceId& deviceId, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| if (!dht()) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| if (deviceId.toString() == identity().second->getLongId().toString()) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| findCertificate(deviceId, |
| [w = weak(), |
| deviceId, |
| name, |
| cb = std::move(cb), |
| noNewSocket, |
| forceNewSocket, |
| connType](const std::shared_ptr<dht::crypto::Certificate>& cert) { |
| if (!cert) { |
| if (auto shared = w.lock()) |
| if (shared->config_->logger) |
| shared->config_->logger->error( |
| "No valid certificate found for device {}", |
| deviceId); |
| cb(nullptr, deviceId); |
| return; |
| } |
| if (auto shared = w.lock()) { |
| shared->connectDevice(cert, |
| name, |
| std::move(cb), |
| noNewSocket, |
| forceNewSocket, |
| connType); |
| } else |
| cb(nullptr, deviceId); |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| // Avoid dht operation in a DHT callback to avoid deadlocks |
| dht::ThreadPool::computation().run([w = weak(), |
| name = std::move(name), |
| cert = std::move(cert), |
| cb = std::move(cb), |
| noNewSocket, |
| forceNewSocket, |
| connType] { |
| auto devicePk = cert->getSharedPublicKey(); |
| auto deviceId = devicePk->getLongId(); |
| auto sthis = w.lock(); |
| if (!sthis || sthis->isDestroying_) { |
| cb(nullptr, deviceId); |
| return; |
| } |
| dht::Value::Id vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand); |
| auto isConnectingToDevice = false; |
| { |
| std::lock_guard<std::mutex> lk(sthis->connectCbsMtx_); |
| auto pendingsIt = sthis->pendingOperations_.find(deviceId); |
| if (pendingsIt != sthis->pendingOperations_.end()) { |
| const auto& pendings = pendingsIt->second; |
| while (pendings.connecting.find(vid) != pendings.connecting.end() |
| && pendings.waiting.find(vid) != pendings.waiting.end()) { |
| vid = ValueIdDist(1, ID_MAX_VAL)(sthis->rand); |
| } |
| } |
| // Check if already connecting |
| isConnectingToDevice = pendingsIt != sthis->pendingOperations_.end(); |
| // Save current request for sendChannelRequest. |
| // Note: do not return here, cause we can be in a state where first |
| // socket is negotiated and first channel is pending |
| // so return only after we checked the info |
| if (isConnectingToDevice && !forceNewSocket) |
| pendingsIt->second.waiting[vid] = PendingCb {name, std::move(cb)}; |
| else |
| sthis->pendingOperations_[deviceId].connecting[vid] = PendingCb {name, std::move(cb)}; |
| } |
| |
| // Check if already negotiated |
| CallbackId cbId(deviceId, vid); |
| if (auto info = sthis->getConnectedInfo(deviceId)) { |
| std::lock_guard<std::mutex> lk(info->mutex_); |
| if (info->socket_) { |
| if (sthis->config_->logger) |
| sthis->config_->logger->debug("Peer already connected to {}. Add a new channel", deviceId); |
| info->cbIds_.emplace(cbId); |
| sthis->sendChannelRequest(info->socket_, name, deviceId, vid); |
| return; |
| } |
| } |
| |
| if (isConnectingToDevice && !forceNewSocket) { |
| if (sthis->config_->logger) |
| sthis->config_->logger->debug("Already connecting to {}, wait for the ICE negotiation", deviceId); |
| return; |
| } |
| if (noNewSocket) { |
| // If no new socket is specified, we don't try to generate a new socket |
| sthis->executePendingOperations(deviceId, vid, nullptr); |
| return; |
| } |
| |
| // Note: used when the ice negotiation fails to erase |
| // all stored structures. |
| auto eraseInfo = [w, cbId] { |
| if (auto shared = w.lock()) { |
| // If no new socket is specified, we don't try to generate a new socket |
| shared->executePendingOperations(cbId.first, cbId.second, nullptr); |
| std::lock_guard<std::mutex> lk(shared->infosMtx_); |
| shared->infos_.erase(cbId); |
| } |
| }; |
| |
| // If no socket exists, we need to initiate an ICE connection. |
| sthis->getIceOptions([w, |
| deviceId = std::move(deviceId), |
| devicePk = std::move(devicePk), |
| name = std::move(name), |
| cert = std::move(cert), |
| vid, |
| connType, |
| eraseInfo](auto&& ice_config) { |
| auto sthis = w.lock(); |
| if (!sthis) { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| return; |
| } |
| ice_config.tcpEnable = true; |
| ice_config.onInitDone = [w, |
| deviceId = std::move(deviceId), |
| devicePk = std::move(devicePk), |
| name = std::move(name), |
| cert = std::move(cert), |
| vid, |
| connType, |
| eraseInfo](bool ok) { |
| dht::ThreadPool::io().run([w = std::move(w), |
| devicePk = std::move(devicePk), |
| vid = std::move(vid), |
| eraseInfo, |
| connType, ok] { |
| auto sthis = w.lock(); |
| if (!ok && sthis && sthis->config_->logger) |
| sthis->config_->logger->error("Cannot initialize ICE session."); |
| if (!sthis || !ok) { |
| eraseInfo(); |
| return; |
| } |
| sthis->connectDeviceStartIce(devicePk, vid, connType, [=](bool ok) { |
| if (!ok) { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| } |
| }); |
| }); |
| }; |
| ice_config.onNegoDone = [w, |
| deviceId, |
| name, |
| cert = std::move(cert), |
| vid, |
| eraseInfo](bool ok) { |
| dht::ThreadPool::io().run([w = std::move(w), |
| deviceId = std::move(deviceId), |
| name = std::move(name), |
| cert = std::move(cert), |
| vid = std::move(vid), |
| eraseInfo = std::move(eraseInfo), |
| ok] { |
| auto sthis = w.lock(); |
| if (!ok && sthis && sthis->config_->logger) |
| sthis->config_->logger->error("ICE negotiation failed."); |
| if (!sthis || !ok || !sthis->connectDeviceOnNegoDone(deviceId, name, vid, cert)) |
| eraseInfo(); |
| }); |
| }; |
| |
| auto info = std::make_shared<ConnectionInfo>(); |
| { |
| std::lock_guard<std::mutex> lk(sthis->infosMtx_); |
| sthis->infos_[{deviceId, vid}] = info; |
| } |
| std::unique_lock<std::mutex> lk {info->mutex_}; |
| ice_config.master = false; |
| ice_config.streamsCount = 1; |
| ice_config.compCountPerStream = 1; |
| info->ice_ = sthis->iceFactory_.createUTransport(""); |
| if (!info->ice_) { |
| if (sthis->config_->logger) |
| sthis->config_->logger->error("Cannot initialize ICE session."); |
| eraseInfo(); |
| return; |
| } |
| // We need to detect any shutdown if the ice session is destroyed before going to the |
| // TLS session; |
| info->ice_->setOnShutdown([eraseInfo]() { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| }); |
| try { |
| info->ice_->initIceInstance(ice_config); |
| } catch (const std::exception& e) { |
| if (sthis->config_->logger) |
| sthis->config_->logger->error("{}", e.what()); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| } |
| }); |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::sendChannelRequest(std::shared_ptr<MultiplexedSocket>& sock, |
| const std::string& name, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid) |
| { |
| auto channelSock = sock->addChannel(name); |
| channelSock->onShutdown([name, deviceId, vid, w = weak()] { |
| auto shared = w.lock(); |
| if (auto shared = w.lock()) |
| shared->executePendingOperations(deviceId, vid, nullptr); |
| }); |
| channelSock->onReady( |
| [wSock = std::weak_ptr<ChannelSocket>(channelSock), name, deviceId, vid, w = weak()](bool accepted) { |
| auto shared = w.lock(); |
| auto channelSock = wSock.lock(); |
| if (shared) |
| shared->executePendingOperations(deviceId, vid, accepted ? channelSock : nullptr, accepted); |
| }); |
| |
| ChannelRequest val; |
| val.name = channelSock->name(); |
| val.state = ChannelRequestState::REQUEST; |
| val.channel = channelSock->channel(); |
| msgpack::sbuffer buffer(256); |
| msgpack::pack(buffer, val); |
| |
| std::error_code ec; |
| int res = sock->write(CONTROL_CHANNEL, |
| reinterpret_cast<const uint8_t*>(buffer.data()), |
| buffer.size(), |
| ec); |
| if (res < 0) { |
| // TODO check if we should handle errors here |
| if (config_->logger) |
| config_->logger->error("sendChannelRequest failed - error: {}", ec.message()); |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::onPeerResponse(const PeerConnectionRequest& req) |
| { |
| auto device = req.owner->getLongId(); |
| if (config_->logger) |
| config_->logger->debug("New response received from {}", device); |
| if (auto info = getInfo(device, req.id)) { |
| std::lock_guard<std::mutex> lk {info->mutex_}; |
| info->responseReceived_ = true; |
| info->response_ = std::move(req); |
| info->waitForAnswer_->expires_at(std::chrono::steady_clock::now()); |
| info->waitForAnswer_->async_wait(std::bind(&ConnectionManager::Impl::onResponse, |
| this, |
| std::placeholders::_1, |
| device, |
| req.id)); |
| } else { |
| if (config_->logger) |
| config_->logger->warn("Respond received, but cannot find request"); |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::onDhtConnected(const dht::crypto::PublicKey& devicePk) |
| { |
| if (!dht()) |
| return; |
| dht()->listen<PeerConnectionRequest>( |
| dht::InfoHash::get(PeerConnectionRequest::key_prefix + devicePk.getId().toString()), |
| [w = weak()](PeerConnectionRequest&& req) { |
| auto shared = w.lock(); |
| if (!shared) |
| return false; |
| if (shared->isMessageTreated(to_hex_string(req.id))) { |
| // Message already treated. Just ignore |
| return true; |
| } |
| if (req.isAnswer) { |
| if (shared->config_->logger) |
| shared->config_->logger->debug("Received request answer from {}", req.owner->getLongId()); |
| } else { |
| if (shared->config_->logger) |
| shared->config_->logger->debug("Received request from {}", req.owner->getLongId()); |
| } |
| if (req.isAnswer) { |
| shared->onPeerResponse(req); |
| } else { |
| // Async certificate checking |
| shared->dht()->findCertificate( |
| req.from, |
| [w, req = std::move(req)]( |
| const std::shared_ptr<dht::crypto::Certificate>& cert) mutable { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| dht::InfoHash peer_h; |
| if (foundPeerDevice(cert, peer_h, shared->config_->logger)) { |
| #if TARGET_OS_IOS |
| if (shared->iOSConnectedCb_(req.connType, peer_h)) |
| return; |
| #endif |
| shared->onDhtPeerRequest(req, cert); |
| } else { |
| if (shared->config_->logger) |
| shared->config_->logger->warn( |
| "Received request from untrusted peer {}", |
| req.owner->getLongId()); |
| } |
| }); |
| } |
| |
| return true; |
| }, |
| dht::Value::UserTypeFilter("peer_request")); |
| } |
| |
| void |
| ConnectionManager::Impl::onTlsNegotiationDone(bool ok, |
| const DeviceId& deviceId, |
| const dht::Value::Id& vid, |
| const std::string& name) |
| { |
| if (isDestroying_) |
| return; |
| // Note: only handle pendingCallbacks here for TLS initied by connectDevice() |
| // Note: if not initied by connectDevice() the channel name will be empty (because no channel |
| // asked yet) |
| auto isDhtRequest = name.empty(); |
| if (!ok) { |
| if (isDhtRequest) { |
| if (config_->logger) |
| config_->logger->error("TLS connection failure for peer {} - Initied by DHT request. channel: {} - vid: {}", |
| deviceId, |
| name, |
| vid); |
| if (connReadyCb_) |
| connReadyCb_(deviceId, "", nullptr); |
| } else { |
| if (config_->logger) |
| config_->logger->error("TLS connection failure for peer {} - Initied by connectDevice. channel: {} - vid: {}", |
| deviceId, |
| name, |
| vid); |
| executePendingOperations(deviceId, vid, nullptr); |
| } |
| } else { |
| // The socket is ready, store it |
| if (isDhtRequest) { |
| if (config_->logger) |
| config_->logger->debug("Connection to {} is ready - Initied by DHT request. Vid: {}", |
| deviceId, |
| vid); |
| } else { |
| if (config_->logger) |
| config_->logger->debug("Connection to {} is ready - Initied by connectDevice(). channel: {} - vid: {}", |
| deviceId, |
| name, |
| vid); |
| } |
| |
| auto info = getInfo(deviceId, vid); |
| addNewMultiplexedSocket({deviceId, vid}, info); |
| // Finally, open the channel and launch pending callbacks |
| if (info->socket_) { |
| // Note: do not remove pending there it's done in sendChannelRequest |
| for (const auto& [id, name] : getPendingIds(deviceId)) { |
| if (config_->logger) |
| config_->logger->debug("Send request on TLS socket for channel {} to {}", |
| name, |
| deviceId.toString()); |
| sendChannelRequest(info->socket_, name, deviceId, id); |
| } |
| } |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::answerTo(IceTransport& ice, |
| const dht::Value::Id& id, |
| const std::shared_ptr<dht::crypto::PublicKey>& from) |
| { |
| // NOTE: This is a shortest version of a real SDP message to save some bits |
| auto iceAttributes = ice.getLocalAttributes(); |
| std::ostringstream icemsg; |
| icemsg << iceAttributes.ufrag << "\n"; |
| icemsg << iceAttributes.pwd << "\n"; |
| for (const auto& addr : ice.getLocalCandidates(1)) { |
| icemsg << addr << "\n"; |
| } |
| |
| // Send PeerConnection response |
| PeerConnectionRequest val; |
| val.id = id; |
| val.ice_msg = icemsg.str(); |
| val.isAnswer = true; |
| auto value = std::make_shared<dht::Value>(std::move(val)); |
| value->user_type = "peer_request"; |
| |
| if (config_->logger) |
| config_->logger->debug("Connection accepted, DHT reply to {}", from->getLongId()); |
| dht()->putEncrypted(dht::InfoHash::get(PeerConnectionRequest::key_prefix |
| + from->getId().toString()), |
| from, |
| value, |
| [from,l=config_->logger](bool ok) { |
| if (l) |
| l->debug("Answer to connection request from {:s}. Put encrypted {:s}", |
| from->getLongId(), |
| (ok ? "ok" : "failed")); |
| }); |
| } |
| |
| bool |
| ConnectionManager::Impl::onRequestStartIce(const PeerConnectionRequest& req) |
| { |
| auto deviceId = req.owner->getLongId(); |
| auto info = getInfo(deviceId, req.id); |
| if (!info) |
| return false; |
| |
| std::unique_lock<std::mutex> lk {info->mutex_}; |
| auto& ice = info->ice_; |
| if (!ice) { |
| if (config_->logger) |
| config_->logger->error("No ICE detected"); |
| if (connReadyCb_) |
| connReadyCb_(deviceId, "", nullptr); |
| return false; |
| } |
| |
| auto sdp = ice->parseIceCandidates(req.ice_msg); |
| answerTo(*ice, req.id, req.owner); |
| if (not ice->startIce({sdp.rem_ufrag, sdp.rem_pwd}, std::move(sdp.rem_candidates))) { |
| if (config_->logger) |
| config_->logger->error("Start ICE failed - fallback to TURN"); |
| ice = nullptr; |
| if (connReadyCb_) |
| connReadyCb_(deviceId, "", nullptr); |
| return false; |
| } |
| return true; |
| } |
| |
| bool |
| ConnectionManager::Impl::onRequestOnNegoDone(const PeerConnectionRequest& req) |
| { |
| auto deviceId = req.owner->getLongId(); |
| auto info = getInfo(deviceId, req.id); |
| if (!info) |
| return false; |
| |
| std::unique_lock<std::mutex> lk {info->mutex_}; |
| auto& ice = info->ice_; |
| if (!ice) { |
| if (config_->logger) |
| config_->logger->error("No ICE detected"); |
| return false; |
| } |
| |
| // Build socket |
| auto endpoint = std::make_unique<IceSocketEndpoint>(std::shared_ptr<IceTransport>( |
| std::move(ice)), |
| false); |
| |
| // init TLS session |
| auto ph = req.from; |
| if (config_->logger) |
| config_->logger->debug("Start TLS session - Initied by DHT request. Device: {} - vid: {}", |
| req.from, |
| req.id); |
| info->tls_ = std::make_unique<TlsSocketEndpoint>( |
| std::move(endpoint), |
| certStore(), |
| identity(), |
| dhParams(), |
| [ph, w = weak()](const dht::crypto::Certificate& cert) { |
| auto shared = w.lock(); |
| if (!shared) |
| return false; |
| auto crt = shared->certStore().getCertificate(cert.getLongId().toString()); |
| if (!crt) |
| return false; |
| return crt->getPacked() == cert.getPacked(); |
| }); |
| |
| info->tls_->setOnReady( |
| [w = weak(), deviceId = std::move(deviceId), vid = std::move(req.id)](bool ok) { |
| if (auto shared = w.lock()) |
| shared->onTlsNegotiationDone(ok, deviceId, vid); |
| }); |
| return true; |
| } |
| |
| void |
| ConnectionManager::Impl::onDhtPeerRequest(const PeerConnectionRequest& req, |
| const std::shared_ptr<dht::crypto::Certificate>& /*cert*/) |
| { |
| auto deviceId = req.owner->getLongId(); |
| if (config_->logger) |
| config_->logger->debug("New connection request from {}", deviceId); |
| if (!iceReqCb_ || !iceReqCb_(deviceId)) { |
| if (config_->logger) |
| config_->logger->debug("Refuse connection from {}", deviceId); |
| return; |
| } |
| |
| // Because the connection is accepted, create an ICE socket. |
| getIceOptions([w = weak(), req, deviceId](auto&& ice_config) { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| // Note: used when the ice negotiation fails to erase |
| // all stored structures. |
| auto eraseInfo = [w, id = req.id, deviceId] { |
| if (auto shared = w.lock()) { |
| // If no new socket is specified, we don't try to generate a new socket |
| shared->executePendingOperations(deviceId, id, nullptr); |
| if (shared->connReadyCb_) |
| shared->connReadyCb_(deviceId, "", nullptr); |
| std::lock_guard<std::mutex> lk(shared->infosMtx_); |
| shared->infos_.erase({deviceId, id}); |
| } |
| }; |
| |
| ice_config.tcpEnable = true; |
| ice_config.onInitDone = [w, req, eraseInfo](bool ok) { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| if (!ok) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("Cannot initialize ICE session."); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| return; |
| } |
| |
| dht::ThreadPool::io().run( |
| [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| if (!shared->onRequestStartIce(req)) |
| eraseInfo(); |
| }); |
| }; |
| |
| ice_config.onNegoDone = [w, req, eraseInfo](bool ok) { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| if (!ok) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("ICE negotiation failed."); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| return; |
| } |
| |
| dht::ThreadPool::io().run( |
| [w = std::move(w), req = std::move(req), eraseInfo = std::move(eraseInfo)] { |
| if (auto shared = w.lock()) |
| if (!shared->onRequestOnNegoDone(req)) |
| eraseInfo(); |
| }); |
| }; |
| |
| // Negotiate a new ICE socket |
| auto info = std::make_shared<ConnectionInfo>(); |
| { |
| std::lock_guard<std::mutex> lk(shared->infosMtx_); |
| shared->infos_[{deviceId, req.id}] = info; |
| } |
| if (shared->config_->logger) |
| shared->config_->logger->debug("Accepting connection from {}", deviceId); |
| std::unique_lock<std::mutex> lk {info->mutex_}; |
| ice_config.streamsCount = 1; |
| ice_config.compCountPerStream = 1; // TCP |
| ice_config.master = true; |
| info->ice_ = shared->iceFactory_.createUTransport(""); |
| if (not info->ice_) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("Cannot initialize ICE session"); |
| eraseInfo(); |
| return; |
| } |
| // We need to detect any shutdown if the ice session is destroyed before going to the TLS session; |
| info->ice_->setOnShutdown([eraseInfo]() { |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| }); |
| try { |
| info->ice_->initIceInstance(ice_config); |
| } catch (const std::exception& e) { |
| if (shared->config_->logger) |
| shared->config_->logger->error("{}", e.what()); |
| dht::ThreadPool::io().run([eraseInfo = std::move(eraseInfo)] { eraseInfo(); }); |
| } |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::addNewMultiplexedSocket(const CallbackId& id, const std::shared_ptr<ConnectionInfo>& info) |
| { |
| info->socket_ = std::make_shared<MultiplexedSocket>(config_->ioContext, id.first, std::move(info->tls_)); |
| info->socket_->setOnReady( |
| [w = weak()](const DeviceId& deviceId, const std::shared_ptr<ChannelSocket>& socket) { |
| if (auto sthis = w.lock()) |
| if (sthis->connReadyCb_) |
| sthis->connReadyCb_(deviceId, socket->name(), socket); |
| }); |
| info->socket_->setOnRequest([w = weak()](const std::shared_ptr<dht::crypto::Certificate>& peer, |
| const uint16_t&, |
| const std::string& name) { |
| if (auto sthis = w.lock()) |
| if (sthis->channelReqCb_) |
| return sthis->channelReqCb_(peer, name); |
| return false; |
| }); |
| info->socket_->onShutdown([w = weak(), deviceId=id.first, vid=id.second]() { |
| // Cancel current outgoing connections |
| dht::ThreadPool::io().run([w, deviceId, vid] { |
| auto sthis = w.lock(); |
| if (!sthis) |
| return; |
| |
| std::set<CallbackId> ids; |
| if (auto info = sthis->getInfo(deviceId, vid)) { |
| std::lock_guard<std::mutex> lk(info->mutex_); |
| if (info->socket_) { |
| ids = std::move(info->cbIds_); |
| info->socket_->shutdown(); |
| } |
| } |
| for (const auto& cbId : ids) |
| sthis->executePendingOperations(cbId.first, cbId.second, nullptr); |
| |
| std::lock_guard<std::mutex> lk(sthis->infosMtx_); |
| sthis->infos_.erase({deviceId, vid}); |
| }); |
| }); |
| } |
| |
| const std::shared_future<tls::DhParams> |
| ConnectionManager::Impl::dhParams() const |
| { |
| return dht::ThreadPool::computation().get<tls::DhParams>( |
| std::bind(tls::DhParams::loadDhParams, config_->cachePath + DIR_SEPARATOR_STR "dhParams")); |
| ; |
| } |
| |
| template<typename ID = dht::Value::Id> |
| std::set<ID, std::less<>> |
| loadIdList(const std::string& path) |
| { |
| std::set<ID, std::less<>> ids; |
| std::ifstream file = fileutils::ifstream(path); |
| if (!file.is_open()) { |
| //JAMI_DBG("Could not load %s", path.c_str()); |
| return ids; |
| } |
| std::string line; |
| while (std::getline(file, line)) { |
| if constexpr (std::is_same<ID, std::string>::value) { |
| ids.emplace(std::move(line)); |
| } else if constexpr (std::is_integral<ID>::value) { |
| ID vid; |
| if (auto [p, ec] = std::from_chars(line.data(), line.data() + line.size(), vid, 16); |
| ec == std::errc()) { |
| ids.emplace(vid); |
| } |
| } |
| } |
| return ids; |
| } |
| |
| template<typename List = std::set<dht::Value::Id>> |
| void |
| saveIdList(const std::string& path, const List& ids) |
| { |
| std::ofstream file = fileutils::ofstream(path, std::ios::trunc | std::ios::binary); |
| if (!file.is_open()) { |
| //JAMI_ERR("Could not save to %s", path.c_str()); |
| return; |
| } |
| for (auto& c : ids) |
| file << std::hex << c << "\n"; |
| } |
| |
| void |
| ConnectionManager::Impl::loadTreatedMessages() |
| { |
| std::lock_guard<std::mutex> lock(messageMutex_); |
| auto path = config_->cachePath + DIR_SEPARATOR_STR "treatedMessages"; |
| treatedMessages_ = loadIdList<std::string>(path); |
| if (treatedMessages_.empty()) { |
| auto messages = loadIdList(path); |
| for (const auto& m : messages) |
| treatedMessages_.emplace(to_hex_string(m)); |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::saveTreatedMessages() const |
| { |
| dht::ThreadPool::io().run([w = weak()]() { |
| if (auto sthis = w.lock()) { |
| auto& this_ = *sthis; |
| std::lock_guard<std::mutex> lock(this_.messageMutex_); |
| fileutils::check_dir(this_.config_->cachePath.c_str()); |
| saveIdList<decltype(this_.treatedMessages_)>(this_.config_->cachePath |
| + DIR_SEPARATOR_STR "treatedMessages", |
| this_.treatedMessages_); |
| } |
| }); |
| } |
| |
| bool |
| ConnectionManager::Impl::isMessageTreated(std::string_view id) |
| { |
| std::lock_guard<std::mutex> lock(messageMutex_); |
| auto res = treatedMessages_.emplace(id); |
| if (res.second) { |
| saveTreatedMessages(); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * returns whether or not UPnP is enabled and active_ |
| * ie: if it is able to make port mappings |
| */ |
| bool |
| ConnectionManager::Impl::getUPnPActive() const |
| { |
| return config_->getUPnPActive(); |
| } |
| |
| IpAddr |
| ConnectionManager::Impl::getPublishedIpAddress(uint16_t family) const |
| { |
| if (family == AF_INET) |
| return publishedIp_[0]; |
| if (family == AF_INET6) |
| return publishedIp_[1]; |
| |
| assert(family == AF_UNSPEC); |
| |
| // If family is not set, prefere IPv4 if available. It's more |
| // likely to succeed behind NAT. |
| if (publishedIp_[0]) |
| return publishedIp_[0]; |
| if (publishedIp_[1]) |
| return publishedIp_[1]; |
| return {}; |
| } |
| |
| void |
| ConnectionManager::Impl::setPublishedAddress(const IpAddr& ip_addr) |
| { |
| if (ip_addr.getFamily() == AF_INET) { |
| publishedIp_[0] = ip_addr; |
| } else { |
| publishedIp_[1] = ip_addr; |
| } |
| } |
| |
| void |
| ConnectionManager::Impl::storeActiveIpAddress(std::function<void()>&& cb) |
| { |
| dht()->getPublicAddress([this, cb = std::move(cb)](std::vector<dht::SockAddr>&& results) { |
| bool hasIpv4 {false}, hasIpv6 {false}; |
| for (auto& result : results) { |
| auto family = result.getFamily(); |
| if (family == AF_INET) { |
| if (not hasIpv4) { |
| hasIpv4 = true; |
| if (config_->logger) |
| config_->logger->debug("Store DHT public IPv4 address: {}", result); |
| //JAMI_DBG("Store DHT public IPv4 address : %s", result.toString().c_str()); |
| setPublishedAddress(*result.get()); |
| if (config_->upnpCtrl) { |
| config_->upnpCtrl->setPublicAddress(*result.get()); |
| } |
| } |
| } else if (family == AF_INET6) { |
| if (not hasIpv6) { |
| hasIpv6 = true; |
| if (config_->logger) |
| config_->logger->debug("Store DHT public IPv6 address: {}", result); |
| setPublishedAddress(*result.get()); |
| } |
| } |
| if (hasIpv4 and hasIpv6) |
| break; |
| } |
| if (cb) |
| cb(); |
| }); |
| } |
| |
| void |
| ConnectionManager::Impl::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept |
| { |
| storeActiveIpAddress([this, cb = std::move(cb)] { |
| IceTransportOptions opts = ConnectionManager::Impl::getIceOptions(); |
| auto publishedAddr = getPublishedIpAddress(); |
| |
| if (publishedAddr) { |
| auto interfaceAddr = ip_utils::getInterfaceAddr(getLocalInterface(), |
| publishedAddr.getFamily()); |
| if (interfaceAddr) { |
| opts.accountLocalAddr = interfaceAddr; |
| opts.accountPublicAddr = publishedAddr; |
| } |
| } |
| if (cb) |
| cb(std::move(opts)); |
| }); |
| } |
| |
| IceTransportOptions |
| ConnectionManager::Impl::getIceOptions() const noexcept |
| { |
| IceTransportOptions opts; |
| opts.upnpEnable = getUPnPActive(); |
| |
| if (config_->stunEnabled) |
| opts.stunServers.emplace_back(StunServerInfo().setUri(config_->stunServer)); |
| if (config_->turnEnabled) { |
| auto cached = false; |
| std::lock_guard<std::mutex> lk(config_->cachedTurnMutex); |
| cached = config_->cacheTurnV4 || config_->cacheTurnV6; |
| if (config_->cacheTurnV4) { |
| opts.turnServers.emplace_back(TurnServerInfo() |
| .setUri(config_->cacheTurnV4.toString()) |
| .setUsername(config_->turnServerUserName) |
| .setPassword(config_->turnServerPwd) |
| .setRealm(config_->turnServerRealm)); |
| } |
| // NOTE: first test with ipv6 turn was not concluant and resulted in multiple |
| // co issues. So this needs some debug. for now just disable |
| // if (cacheTurnV6 && *cacheTurnV6) { |
| // opts.turnServers.emplace_back(TurnServerInfo() |
| // .setUri(cacheTurnV6->toString(true)) |
| // .setUsername(turnServerUserName_) |
| // .setPassword(turnServerPwd_) |
| // .setRealm(turnServerRealm_)); |
| //} |
| // Nothing cached, so do the resolution |
| if (!cached) { |
| opts.turnServers.emplace_back(TurnServerInfo() |
| .setUri(config_->turnServer) |
| .setUsername(config_->turnServerUserName) |
| .setPassword(config_->turnServerPwd) |
| .setRealm(config_->turnServerRealm)); |
| } |
| } |
| return opts; |
| } |
| |
| bool |
| ConnectionManager::Impl::foundPeerDevice(const std::shared_ptr<dht::crypto::Certificate>& crt, |
| dht::InfoHash& account_id, |
| const std::shared_ptr<Logger>& logger) |
| { |
| if (not crt) |
| return false; |
| |
| auto top_issuer = crt; |
| while (top_issuer->issuer) |
| top_issuer = top_issuer->issuer; |
| |
| // Device certificate can't be self-signed |
| if (top_issuer == crt) { |
| if (logger) |
| logger->warn("Found invalid peer device: {}", crt->getLongId()); |
| return false; |
| } |
| |
| // Check peer certificate chain |
| // Trust store with top issuer as the only CA |
| dht::crypto::TrustList peer_trust; |
| peer_trust.add(*top_issuer); |
| if (not peer_trust.verify(*crt)) { |
| if (logger) |
| logger->warn("Found invalid peer device: {}", crt->getLongId()); |
| return false; |
| } |
| |
| // Check cached OCSP response |
| if (crt->ocspResponse and crt->ocspResponse->getCertificateStatus() != GNUTLS_OCSP_CERT_GOOD) { |
| if (logger) |
| logger->error("Certificate %s is disabled by cached OCSP response", crt->getLongId()); |
| return false; |
| } |
| |
| account_id = crt->issuer->getId(); |
| if (logger) |
| logger->warn("Found peer device: {} account:{} CA:{}", |
| crt->getLongId(), |
| account_id, |
| top_issuer->getId()); |
| return true; |
| } |
| |
| bool |
| ConnectionManager::Impl::findCertificate( |
| const dht::PkId& id, std::function<void(const std::shared_ptr<dht::crypto::Certificate>&)>&& cb) |
| { |
| if (auto cert = certStore().getCertificate(id.toString())) { |
| if (cb) |
| cb(cert); |
| } else if (cb) |
| cb(nullptr); |
| return true; |
| } |
| |
| ConnectionManager::ConnectionManager(std::shared_ptr<ConnectionManager::Config> config_) |
| : pimpl_ {std::make_shared<Impl>(config_)} |
| {} |
| |
| ConnectionManager::~ConnectionManager() |
| { |
| if (pimpl_) |
| pimpl_->shutdown(); |
| } |
| |
| void |
| ConnectionManager::connectDevice(const DeviceId& deviceId, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| pimpl_->connectDevice(deviceId, name, std::move(cb), noNewSocket, forceNewSocket, connType); |
| } |
| |
| void |
| ConnectionManager::connectDevice(const std::shared_ptr<dht::crypto::Certificate>& cert, |
| const std::string& name, |
| ConnectCallback cb, |
| bool noNewSocket, |
| bool forceNewSocket, |
| const std::string& connType) |
| { |
| pimpl_->connectDevice(cert, name, std::move(cb), noNewSocket, forceNewSocket, connType); |
| } |
| |
| bool |
| ConnectionManager::isConnecting(const DeviceId& deviceId, const std::string& name) const |
| { |
| auto pending = pimpl_->getPendingIds(deviceId); |
| return std::find_if(pending.begin(), pending.end(), [&](auto p) { return p.second == name; }) |
| != pending.end(); |
| } |
| |
| void |
| ConnectionManager::closeConnectionsWith(const std::string& peerUri) |
| { |
| std::vector<std::shared_ptr<ConnectionInfo>> connInfos; |
| std::set<DeviceId> peersDevices; |
| { |
| std::lock_guard<std::mutex> lk(pimpl_->infosMtx_); |
| for (auto iter = pimpl_->infos_.begin(); iter != pimpl_->infos_.end();) { |
| auto const& [key, value] = *iter; |
| auto deviceId = key.first; |
| auto cert = pimpl_->certStore().getCertificate(deviceId.toString()); |
| if (cert && cert->issuer && peerUri == cert->issuer->getId().toString()) { |
| connInfos.emplace_back(value); |
| peersDevices.emplace(deviceId); |
| iter = pimpl_->infos_.erase(iter); |
| } else { |
| iter++; |
| } |
| } |
| } |
| // Stop connections to all peers devices |
| for (const auto& deviceId : peersDevices) { |
| pimpl_->executePendingOperations(deviceId, 0, nullptr); |
| // This will close the TLS Session |
| pimpl_->removeUnusedConnections(deviceId); |
| } |
| for (auto& info : connInfos) { |
| if (info->socket_) |
| info->socket_->shutdown(); |
| if (info->waitForAnswer_) |
| info->waitForAnswer_->cancel(); |
| if (info->ice_) { |
| std::unique_lock<std::mutex> lk {info->mutex_}; |
| dht::ThreadPool::io().run( |
| [ice = std::shared_ptr<IceTransport>(std::move(info->ice_))] {}); |
| } |
| } |
| } |
| |
| void |
| ConnectionManager::onDhtConnected(const dht::crypto::PublicKey& devicePk) |
| { |
| pimpl_->onDhtConnected(devicePk); |
| } |
| |
| void |
| ConnectionManager::onICERequest(onICERequestCallback&& cb) |
| { |
| pimpl_->iceReqCb_ = std::move(cb); |
| } |
| |
| void |
| ConnectionManager::onChannelRequest(ChannelRequestCallback&& cb) |
| { |
| pimpl_->channelReqCb_ = std::move(cb); |
| } |
| |
| void |
| ConnectionManager::onConnectionReady(ConnectionReadyCallback&& cb) |
| { |
| pimpl_->connReadyCb_ = std::move(cb); |
| } |
| |
| void |
| ConnectionManager::oniOSConnected(iOSConnectedCallback&& cb) |
| { |
| pimpl_->iOSConnectedCb_ = std::move(cb); |
| } |
| |
| std::size_t |
| ConnectionManager::activeSockets() const |
| { |
| std::lock_guard<std::mutex> lk(pimpl_->infosMtx_); |
| return pimpl_->infos_.size(); |
| } |
| |
| void |
| ConnectionManager::monitor() const |
| { |
| std::lock_guard<std::mutex> lk(pimpl_->infosMtx_); |
| auto logger = pimpl_->config_->logger; |
| if (!logger) |
| return; |
| logger->debug("ConnectionManager current status:"); |
| for (const auto& [_, ci] : pimpl_->infos_) { |
| if (ci->socket_) |
| ci->socket_->monitor(); |
| } |
| logger->debug("ConnectionManager end status."); |
| } |
| |
| void |
| ConnectionManager::connectivityChanged() |
| { |
| std::lock_guard<std::mutex> lk(pimpl_->infosMtx_); |
| for (const auto& [_, ci] : pimpl_->infos_) { |
| if (ci->socket_) |
| ci->socket_->sendBeacon(); |
| } |
| } |
| |
| void |
| ConnectionManager::getIceOptions(std::function<void(IceTransportOptions&&)> cb) noexcept |
| { |
| return pimpl_->getIceOptions(std::move(cb)); |
| } |
| |
| IceTransportOptions |
| ConnectionManager::getIceOptions() const noexcept |
| { |
| return pimpl_->getIceOptions(); |
| } |
| |
| IpAddr |
| ConnectionManager::getPublishedIpAddress(uint16_t family) const |
| { |
| return pimpl_->getPublishedIpAddress(family); |
| } |
| |
| void |
| ConnectionManager::setPublishedAddress(const IpAddr& ip_addr) |
| { |
| return pimpl_->setPublishedAddress(ip_addr); |
| } |
| |
| void |
| ConnectionManager::storeActiveIpAddress(std::function<void()>&& cb) |
| { |
| return pimpl_->storeActiveIpAddress(std::move(cb)); |
| } |
| |
| std::shared_ptr<ConnectionManager::Config> |
| ConnectionManager::getConfig() |
| { |
| return pimpl_->config_; |
| } |
| |
| } // namespace jami |