add upnp/natpmp support
Change-Id: I4945a7df3a30cb39d81a33fc7a32e9fea600bdff
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4a72860..b27b0f3 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -30,6 +30,9 @@
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DMSGPACK_NO_BOOST -DMSGPACK_DISABLE_LEGACY_NIL -DMSGPACK_DISABLE_LEGACY_CONVERT")
+option(DHTNET_PUPNP "Enable UPnP support" ON)
+option(DHTNET_NATPMP "Enable NAT-PMP support" ON)
+
# Sources
list (APPEND dhtnet_SOURCES
src/connectionmanager.cpp
@@ -48,8 +51,6 @@
src/upnp/upnp_context.cpp
src/upnp/upnp_control.cpp
src/upnp/protocol/mapping.cpp
- src/upnp/upnp_context.cpp
- src/upnp/upnp_control.cpp
src/upnp/protocol/igd.cpp
)
@@ -70,6 +71,37 @@
include/upnp/upnp_control.h
)
+if (DHTNET_PUPNP)
+ pkg_search_module (upnp IMPORTED_TARGET upnp libupnp)
+ if (NOT upnp_FOUND)
+ message("libupnp not found: disabling")
+ set(DHTNET_PUPNP Off)
+ else()
+ list (APPEND dhtnet_SOURCES
+ src/upnp/protocol/pupnp/pupnp.cpp
+ src/upnp/protocol/pupnp/upnp_igd.cpp
+ )
+ endif()
+endif()
+if (DHTNET_NATPMP)
+ pkg_search_module (natpmp IMPORTED_TARGET natpmp)
+ if (NOT natpmp_FOUND)
+ find_library(natpmp_LIBRARIES natpmp)
+ if (NOT natpmp_LIBRARIES)
+ message("NAT-PMP not found: disabling")
+ set(DHTNET_NATPMP Off)
+ else()
+ message("NAT-PMP found: ${natpmp_LIBRARIES}")
+ endif()
+ endif()
+ if (DHTNET_NATPMP)
+ list (APPEND dhtnet_SOURCES
+ src/upnp/protocol/natpmp/nat_pmp.cpp
+ src/upnp/protocol/natpmp/pmp_igd.cpp
+ )
+ endif()
+endif()
+
add_library(dhtnet ${dhtnet_SOURCES})
target_link_libraries(dhtnet PUBLIC PkgConfig::opendht PkgConfig::pjproject fmt::fmt ${MSGPACK_LIB})
if (APPLE)
@@ -80,6 +112,14 @@
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
+if (DHTNET_PUPNP)
+ target_compile_definitions(dhtnet PRIVATE HAVE_LIBUPNP)
+ target_link_libraries(dhtnet PRIVATE PkgConfig::upnp)
+endif()
+if (DHTNET_NATPMP)
+ target_compile_definitions(dhtnet PRIVATE HAVE_LIBNATPMP)
+ target_link_libraries(dhtnet PRIVATE ${natpmp_LIBRARIES})
+endif()
if (BUILD_TESTING)
target_compile_definitions(dhtnet PUBLIC DHTNET_TESTABLE)
endif()
diff --git a/include/upnp/mapping.h b/include/upnp/mapping.h
index a9cff54..4b3f427 100644
--- a/include/upnp/mapping.h
+++ b/include/upnp/mapping.h
@@ -36,7 +36,7 @@
enum class NatProtocolType;
class IGD;
-class Mapping
+class Mapping : std::enable_shared_from_this<Mapping>
{
friend class UPnPContext;
friend class NatPmp;
@@ -112,6 +112,7 @@
void setIgd(const std::shared_ptr<IGD>& igd);
void setAvailable(bool val);
void setState(const MappingState& state);
+ void updateState(const MappingState& state, bool notify = true);
void updateDescription();
#if HAVE_LIBNATPMP
void setRenewalTime(sys_clock::time_point time);
diff --git a/include/upnp/upnp_context.h b/include/upnp/upnp_context.h
index a486eb5..62ada12 100644
--- a/include/upnp/upnp_context.h
+++ b/include/upnp/upnp_context.h
@@ -27,7 +27,6 @@
#include "../ip_utils.h"
-#include "upnp_thread_util.h"
#include "mapping.h"
#include <opendht/rng.h>
@@ -82,7 +81,7 @@
virtual void onMappingRemoved(const std::shared_ptr<IGD>& igd, const Mapping& map) = 0;
};
-class UPnPContext : public UpnpMappingObserver, protected UpnpThreadUtil
+class UPnPContext : public UpnpMappingObserver
{
private:
struct MappingStatus
@@ -166,8 +165,6 @@
Mapping::sharedPtr_t registerMapping(Mapping& map);
// Removes the mapping from the list.
- std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator unregisterMapping(
- std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator it);
void unregisterMapping(const Mapping::sharedPtr_t& map);
// Perform the request on the provided IGD.
@@ -179,11 +176,6 @@
// Remove all mappings of the given type.
void deleteAllMappings(PortType type);
- // Update the state and notify the listener
- void updateMappingState(const Mapping::sharedPtr_t& map,
- MappingState newState,
- bool notify = true);
-
// Provision ports.
uint16_t getAvailablePortNumber(PortType type);
@@ -297,6 +289,8 @@
int maxOpenPortLimit_[2] {8, 12};
//std::shared_ptr<Task> mappingListUpdateTimer_ {};
+ std::shared_ptr<asio::io_context> ctx;
+ std::shared_ptr<dht::log::Logger> logger_;
asio::steady_timer mappingListUpdateTimer_;// {};
// Current preferred IGD. Can be null if there is no valid IGD.
diff --git a/include/upnp/upnp_thread_util.h b/include/upnp/upnp_thread_util.h
deleted file mode 100644
index 22b2502..0000000
--- a/include/upnp/upnp_thread_util.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (C) 2004-2023 Savoir-faire Linux Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <https://www.gnu.org/licenses/>.
- */
-#pragma once
-
-#include <thread>
-#include <memory>
-#include <asio/io_context.hpp>
-#include <fmt/format.h>
-
-// This macro is used to validate that a code is executed from the expected
-// thread. It's useful to detect unexpected race on data members.
-#define CHECK_VALID_THREAD() \
- if (not isValidThread()) \
- fmt::print("The calling thread {} is not the expected thread: {}\n", getCurrentThread(), threadId_);
- /*JAMI_ERR() << "The calling thread " << getCurrentThread() \
- << " is not the expected thread: " << threadId_;*/
-
-namespace dhtnet {
-namespace upnp {
-
-class UpnpThreadUtil
-{
-protected:
- std::thread::id getCurrentThread() const { return std::this_thread::get_id(); }
-
- bool isValidThread() const { return threadId_ == getCurrentThread(); }
-
- // Upnp context execution queue (same as manager's scheduler)
- // Helpers to run tasks on upnp context queue.
- //static ScheduledExecutor* getScheduler() { return &Manager::instance().scheduler(); }
-
- template<typename Callback>
- static void runOnUpnpContextQueue(Callback&& cb)
- {
- //getScheduler()->run([cb = std::forward<Callback>(cb)]() mutable { cb(); });
- //ioContext->post(std::move(cb));
- }
-
- std::shared_ptr<asio::io_context> ioContext;
- std::thread::id threadId_;
-};
-
-} // namespace upnp
-} // namespace dhtnet
diff --git a/src/ice_transport.cpp b/src/ice_transport.cpp
index 47a67e6..a31fdb3 100644
--- a/src/ice_transport.cpp
+++ b/src/ice_transport.cpp
@@ -448,8 +448,8 @@
config_.stun_tp_cnt = 0;
- if (logger_)
- logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this));
+ // if (logger_)
+ // logger_->debug("[ice:{}] Add host candidates", fmt::ptr(this));
addStunConfig(pj_AF_INET());
addStunConfig(pj_AF_INET6());
@@ -459,8 +459,8 @@
upnpSrflxCand = setupUpnpReflexiveCandidates();
if (not upnpSrflxCand.empty()) {
addServerReflexiveCandidates(upnpSrflxCand);
- if (logger_)
- logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this));
+ // if (logger_)
+ // logger_->debug("[ice:{}] Added UPNP srflx candidates:", fmt::ptr(this));
}
}
@@ -472,8 +472,8 @@
if (upnpSrflxCand.empty()
or (upnpSrflxCand[0].second.toString() != genericSrflxCand[0].second.toString())) {
addServerReflexiveCandidates(genericSrflxCand);
- if (logger_)
- logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this));
+ // if (logger_)
+ // logger_->debug("[ice:{}] Added generic srflx candidates:", fmt::ptr(this));
}
}
@@ -908,10 +908,10 @@
stun.af = af;
stun.conn_type = config_.stun.conn_type;
- if (logger_)
- logger_->debug("[ice:{}] added host stun config for {:s} transport",
- fmt::ptr(this),
- config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP");
+ // if (logger_)
+ // logger_->debug("[ice:{}] added host stun config for {:s} transport",
+ // fmt::ptr(this),
+ // config_.protocol == PJ_ICE_TP_TCP ? "TCP" : "UDP");
return true;
}
diff --git a/src/upnp/protocol/mapping.cpp b/src/upnp/protocol/mapping.cpp
index 3aa24b3..4b9d5ba 100644
--- a/src/upnp/protocol/mapping.cpp
+++ b/src/upnp/protocol/mapping.cpp
@@ -94,6 +94,20 @@
state_ = state;
}
+void
+Mapping::updateState(const MappingState& newState, bool notify)
+{
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (newState == state_)
+ return;
+ state_ = newState;
+
+ if (notify && notifyCb_) {
+ lock.unlock();
+ notifyCb_(shared_from_this());
+ }
+}
+
const char*
Mapping::getStateStr() const
{
diff --git a/src/upnp/protocol/natpmp/nat_pmp.cpp b/src/upnp/protocol/natpmp/nat_pmp.cpp
index 599205e..2f48713 100644
--- a/src/upnp/protocol/natpmp/nat_pmp.cpp
+++ b/src/upnp/protocol/natpmp/nat_pmp.cpp
@@ -21,11 +21,11 @@
namespace dhtnet {
namespace upnp {
-NatPmp::NatPmp()
+NatPmp::NatPmp(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger)
+ : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx)
{
// JAMI_DBG("NAT-PMP: Instance [%p] created", this);
- runOnNatPmpQueue([this] {
- threadId_ = getCurrentThread();
+ ioContext->dispatch([this] {
igd_ = std::make_shared<PMPIGD>();
});
}
@@ -38,15 +38,6 @@
void
NatPmp::initNatPmp()
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak()] {
- if (auto pmpThis = w.lock()) {
- pmpThis->initNatPmp();
- }
- });
- return;
- }
-
initialized_ = false;
{
@@ -82,8 +73,7 @@
err = NATPMP_ERR_CANNOTGETGATEWAY;
} else {
// JAMI_WARN("NAT-PMP: Trying to initialize using detected gateway %s",
- localGw.toString().c_str());
-
+ // localGw.toString().c_str());
struct in_addr inaddr;
inet_pton(AF_INET, localGw.toString().c_str(), &inaddr);
err = initnatpmp(&natpmpHdl_, 1, inaddr.s_addr);
@@ -119,17 +109,6 @@
void
NatPmp::setObserver(UpnpMappingObserver* obs)
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), obs] {
- if (auto pmpThis = w.lock()) {
- pmpThis->setObserver(obs);
- }
- });
- return;
- }
-
- // JAMI_DBG("NAT-PMP: Setting observer to %p", obs);
-
observer_ = obs;
}
@@ -152,10 +131,8 @@
std::unique_lock<std::mutex> lk(natpmpMutex_);
std::condition_variable cv {};
- runOnNatPmpQueue([w = weak(), &cv = cv] {
- if (auto pmpThis = w.lock()) {
- pmpThis->terminate(cv);
- }
+ ioContext->dispatch([&] {
+ terminate(cv);
});
if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) {
@@ -175,15 +152,6 @@
void
NatPmp::clearIgds()
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak()] {
- if (auto pmpThis = w.lock()) {
- pmpThis->clearIgds();
- }
- });
- return;
- }
-
bool do_close = false;
if (igd_) {
@@ -194,8 +162,7 @@
}
initialized_ = false;
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
+ searchForIgdTimer_.cancel();
igdSearchCounter_ = 0;
@@ -208,15 +175,6 @@
void
NatPmp::searchForIgd()
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak()] {
- if (auto pmpThis = w.lock()) {
- pmpThis->searchForIgd();
- }
- });
- return;
- }
-
if (not initialized_) {
initNatPmp();
}
@@ -227,12 +185,11 @@
// JAMI_DBG("NAT-PMP: Start search for IGDs. Attempt %i", igdSearchCounter_);
// Cancel the current timer (if any) and re-schedule.
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
-
- searchForIgdTimer_ = getNatpmpScheduler()->scheduleIn([this] { searchForIgd(); },
- NATPMP_SEARCH_RETRY_UNIT
- * igdSearchCounter_);
+ searchForIgdTimer_.expires_after(NATPMP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ searchForIgdTimer_.async_wait([this](const asio::error_code& ec) {
+ if (!ec)
+ searchForIgd();
+ });
} else {
// JAMI_WARN("NAT-PMP: Setup failed after %u trials. NAT-PMP will be disabled!",
// MAX_RESTART_SEARCH_RETRIES);
@@ -291,14 +248,14 @@
NatPmp::requestMappingAdd(const Mapping& mapping)
{
// Process on nat-pmp thread.
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), mapping] {
+ /*if (not isValidThread()) {
+ ioContext->post([w = weak(), mapping] {
if (auto pmpThis = w.lock()) {
pmpThis->requestMappingAdd(mapping);
}
});
return;
- }
+ }*/
Mapping map(mapping);
assert(map.getIgd());
@@ -329,14 +286,14 @@
NatPmp::requestMappingRenew(const Mapping& mapping)
{
// Process on nat-pmp thread.
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), mapping] {
+ /*if (not isValidThread()) {
+ ioContext->post([w = weak(), mapping] {
if (auto pmpThis = w.lock()) {
pmpThis->requestMappingRenew(mapping);
}
});
return;
- }
+ }*/
Mapping map(mapping);
auto err = addPortMapping(map);
@@ -401,7 +358,7 @@
int
NatPmp::sendMappingRequest(const Mapping& mapping, uint32_t& lifetime)
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
int err = sendnewportmappingrequest(&natpmpHdl_,
mapping.getType() == PortType::UDP ? NATPMP_PROTOCOL_UDP
@@ -478,16 +435,12 @@
void
NatPmp::requestMappingRemove(const Mapping& mapping)
{
- // Process on nat-pmp thread.
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), mapping] {
- if (auto pmpThis = w.lock()) {
- Mapping map {mapping};
- pmpThis->removePortMapping(map);
- }
- });
- return;
- }
+ ioContext->dispatch([w = weak(), mapping] {
+ if (auto pmpThis = w.lock()) {
+ Mapping map {mapping};
+ pmpThis->removePortMapping(map);
+ }
+ });
}
void
@@ -522,7 +475,7 @@
void
NatPmp::getIgdPublicAddress()
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// Set the public address for this IGD if it does not
// have one already.
@@ -585,7 +538,7 @@
void
NatPmp::removeAllMappings()
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// JAMI_WARN("NAT-PMP: Send request to close all existing mappings to IGD %s",
// igd_->toString().c_str());
@@ -721,7 +674,7 @@
if (observer_ == nullptr)
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); });
+ ioContext->post([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); });
}
void
@@ -731,7 +684,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); });
}
void
@@ -741,7 +694,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); });
}
void
@@ -751,7 +704,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); });
}
void
@@ -761,7 +714,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); });
}
} // namespace upnp
diff --git a/src/upnp/protocol/natpmp/nat_pmp.h b/src/upnp/protocol/natpmp/nat_pmp.h
index 30642a0..fbc2fc0 100644
--- a/src/upnp/protocol/natpmp/nat_pmp.h
+++ b/src/upnp/protocol/natpmp/nat_pmp.h
@@ -16,14 +16,10 @@
*/
#pragma once
-#include "connectivity/upnp/protocol/upnp_protocol.h"
-#include "connectivity/upnp/protocol/igd.h"
+#include "../upnp_protocol.h"
+#include "../igd.h"
#include "pmp_igd.h"
-
-#include "logger.h"
-#include "connectivity/ip_utils.h"
-#include "noncopyable.h"
-#include "compiler_intrinsics.h"
+#include "ip_utils.h"
// uncomment to enable native natpmp error messages
//#define ENABLE_STRNATPMPERR 1
@@ -53,7 +49,7 @@
class NatPmp : public UPnPProtocol
{
public:
- NatPmp();
+ NatPmp(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger);
~NatPmp();
// Set the observer.
@@ -94,21 +90,11 @@
void terminate() override;
private:
- NON_COPYABLE(NatPmp);
+ NatPmp& operator=(const NatPmp&) = delete;
+ NatPmp(const NatPmp&) = delete;
std::weak_ptr<NatPmp> weak() { return std::static_pointer_cast<NatPmp>(shared_from_this()); }
- // Helpers to run tasks on NAT-PMP internal execution queue.
- ScheduledExecutor* getNatpmpScheduler() { return &natpmpScheduler_; }
- template<typename Callback>
- void runOnNatPmpQueue(Callback&& cb)
- {
- natpmpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); });
- }
-
- // Helpers to run tasks on UPNP context execution queue.
- ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); }
-
void terminate(std::condition_variable& cv);
void initNatPmp();
@@ -147,8 +133,8 @@
// Data members
std::shared_ptr<PMPIGD> igd_;
natpmp_t natpmpHdl_;
- ScheduledExecutor natpmpScheduler_ {"natpmp"};
- std::shared_ptr<Task> searchForIgdTimer_ {};
+ std::shared_ptr<asio::io_context> ioContext;
+ asio::steady_timer searchForIgdTimer_;
unsigned int igdSearchCounter_ {0};
UpnpMappingObserver* observer_ {nullptr};
IpAddr hostAddress_ {};
diff --git a/src/upnp/protocol/natpmp/pmp_igd.h b/src/upnp/protocol/natpmp/pmp_igd.h
index df1c44b..1ca7fe7 100644
--- a/src/upnp/protocol/natpmp/pmp_igd.h
+++ b/src/upnp/protocol/natpmp/pmp_igd.h
@@ -17,8 +17,7 @@
#pragma once
#include "../igd.h"
-#include "noncopyable.h"
-#include "connectivity/ip_utils.h"
+#include "ip_utils.h"
#include <map>
#include <atomic>
diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp
index e6ad85d..df68696 100644
--- a/src/upnp/protocol/pupnp/pupnp.cpp
+++ b/src/upnp/protocol/pupnp/pupnp.cpp
@@ -15,6 +15,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "pupnp.h"
+#include "string_utils.h"
#include <opendht/thread_pool.h>
#include <opendht/http.h>
@@ -95,13 +96,10 @@
// UPNP class implementation
-PUPnP::PUPnP()
+PUPnP::PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger)
+ : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx)
{
- // JAMI_DBG("PUPnP: Creating instance [%p] ...", this);
- runOnPUPnPQueue([this] {
- threadId_ = getCurrentThread();
- // JAMI_DBG("PUPnP: Instance [%p] created", this);
- });
+ // JAMI_LOG("PUPnP: Creating instance [{}] ...", fmt::ptr(this));
}
PUPnP::~PUPnP()
@@ -142,10 +140,10 @@
ip_address6 = UpnpGetServerIp6Address();
port6 = UpnpGetServerPort6();
#endif
- if (ip_address6 and port6)
- // JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
+ /*if (ip_address6 and port6)
+ JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
else
- // JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);
+ JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);*/
// Relax the parser to allow malformed XML text.
ixmlRelaxParser(1);
@@ -165,8 +163,6 @@
{
assert(not clientRegistered_);
- CHECK_VALID_THREAD();
-
// Register Upnp control point.
int upnp_err = UpnpRegisterClient(ctrlPtCallback, this, &ctrlptHandle_);
if (upnp_err != UPNP_E_SUCCESS) {
@@ -180,17 +176,6 @@
void
PUPnP::setObserver(UpnpMappingObserver* obs)
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak(), obs] {
- if (auto upnpThis = w.lock()) {
- upnpThis->setObserver(obs);
- }
- });
- return;
- }
-
- // JAMI_DBG("PUPnP: Setting observer to %p", obs);
-
observer_ = obs;
}
@@ -236,10 +221,8 @@
std::unique_lock<std::mutex> lk(pupnpMutex_);
std::condition_variable cv {};
- runOnPUPnPQueue([w = weak(), &cv = cv] {
- if (auto upnpThis = w.lock()) {
- upnpThis->terminate(cv);
- }
+ ioContext->dispatch([&] {
+ terminate(cv);
});
if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) {
@@ -254,8 +237,6 @@
void
PUPnP::searchForDevices()
{
- CHECK_VALID_THREAD();
-
// JAMI_DBG("PUPnP: Send IGD search request");
// Send out search for multiple types of devices, as some routers may possibly
@@ -293,19 +274,9 @@
void
PUPnP::clearIgds()
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak()] {
- if (auto upnpThis = w.lock()) {
- upnpThis->clearIgds();
- }
- });
- return;
- }
-
// JAMI_DBG("PUPnP: clearing IGDs and devices lists");
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
+ searchForIgdTimer_.cancel();
igdSearchCounter_ = 0;
@@ -324,15 +295,6 @@
void
PUPnP::searchForIgd()
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak()] {
- if (auto upnpThis = w.lock()) {
- upnpThis->searchForIgd();
- }
- });
- return;
- }
-
// Update local address before searching.
updateHostAddress();
@@ -375,15 +337,13 @@
// The connectivity change may be received while the the local
// interface is not fully setup. The rescheduling typically
// usefull to mitigate this race.
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
-
- searchForIgdTimer_ = getUpnContextScheduler()->scheduleIn(
- [w = weak()] {
+ searchForIgdTimer_.expires_after(PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ searchForIgdTimer_.async_wait([w = weak()] (const asio::error_code& ec) {
+ if (not ec) {
if (auto upnpThis = w.lock())
upnpThis->searchForIgd();
- },
- PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ }
+ });
}
std::list<std::shared_ptr<IGD>>
@@ -453,8 +413,6 @@
bool
PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr)
{
- CHECK_VALID_THREAD();
-
assert(doc_container_ptr != nullptr);
XMLDocument document(doc_container_ptr, ixmlDocument_free);
@@ -574,7 +532,7 @@
}
// Report to the listener.
- runOnUpnpContextQueue([w = weak(), igd_candidate] {
+ ioContext->post([w = weak(), igd_candidate] {
if (auto upnpThis = w.lock()) {
if (upnpThis->observer_)
upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED);
@@ -587,7 +545,7 @@
void
PUPnP::requestMappingAdd(const Mapping& mapping)
{
- runOnPUPnPQueue([w = weak(), mapping] {
+ ioContext->post([w = weak(), mapping] {
if (auto upnpThis = w.lock()) {
if (not upnpThis->isRunning())
return;
@@ -609,7 +567,7 @@
PUPnP::requestMappingRemove(const Mapping& mapping)
{
// Send remove request using the matching IGD
- runOnPUPnPQueue([w = weak(), mapping] {
+ ioContext->dispatch([w = weak(), mapping] {
if (auto upnpThis = w.lock()) {
// Abort if we are shutting down.
if (not upnpThis->isRunning())
@@ -650,12 +608,10 @@
void
PUPnP::processAddMapAction(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([w = weak(), map] {
+ ioContext->post([w = weak(), map] {
if (auto upnpThis = w.lock()) {
if (upnpThis->observer_)
upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map));
@@ -666,12 +622,10 @@
void
PUPnP::processRequestMappingFailure(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([w = weak(), map] {
+ ioContext->post([w = weak(), map] {
if (auto upnpThis = w.lock()) {
// JAMI_DBG("PUPnP: Failed to request mapping %s", map.toString().c_str());
if (upnpThis->observer_)
@@ -683,12 +637,10 @@
void
PUPnP::processRemoveMapAction(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([map, obs = observer_] {
+ ioContext->post([map, obs = observer_] {
// JAMI_DBG("PUPnP: Closed mapping %s", map.toString().c_str());
obs->onMappingRemoved(map.getIgd(), std::move(map));
});
@@ -779,8 +731,6 @@
const std::string& igdLocationUrl,
const IpAddr& dstAddr)
{
- CHECK_VALID_THREAD();
-
// Update host address if needed.
if (not hasValidHostAddress())
updateHostAddress();
@@ -833,12 +783,12 @@
int upnp_err = UpnpDownloadXmlDoc(locationUrl.c_str(), &doc_container_ptr);
if (upnp_err != UPNP_E_SUCCESS or not doc_container_ptr) {
- // JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s",
- // locationUrl.c_str(),
- // UpnpGetErrorMessage(upnp_err));
+ if(logger_) logger_->warn("PUPnP: Error downloading device XML document from {} -> {}",
+ locationUrl,
+ UpnpGetErrorMessage(upnp_err));
} else {
- // JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", locationUrl.c_str());
- runOnPUPnPQueue([w = weak(), url = locationUrl, doc_container_ptr] {
+ if(logger_) logger_->debug("PUPnP: Succeeded to download device XML document from {}", locationUrl);
+ ioContext->post([w = weak(), url = locationUrl, doc_container_ptr] {
if (auto upnpThis = w.lock()) {
upnpThis->validateIgd(url, doc_container_ptr);
}
@@ -849,8 +799,6 @@
void
PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId)
{
- CHECK_VALID_THREAD();
-
discoveredIgdList_.erase(cpDeviceId);
std::shared_ptr<IGD> igd;
@@ -882,8 +830,6 @@
void
PUPnP::processDiscoverySubscriptionExpired(Upnp_EventType event_type, const std::string& eventSubUrl)
{
- CHECK_VALID_THREAD();
-
std::lock_guard<std::mutex> lk(pupnpMutex_);
for (auto& it : validIgdList_) {
if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) {
@@ -925,7 +871,7 @@
std::string deviceId {UpnpDiscovery_get_DeviceID_cstr(d_event)};
std::string location {UpnpDiscovery_get_Location_cstr(d_event)};
IpAddr dstAddr(*(const pj_sockaddr*) (UpnpDiscovery_get_DestAddr(d_event)));
- runOnPUPnPQueue([w = weak(),
+ ioContext->post([w = weak(),
deviceId = std::move(deviceId),
location = std::move(location),
dstAddr = std::move(dstAddr)] {
@@ -941,7 +887,7 @@
std::string deviceId(UpnpDiscovery_get_DeviceID_cstr(d_event));
// Process the response on the main thread.
- runOnPUPnPQueue([w = weak(), deviceId = std::move(deviceId)] {
+ ioContext->post([w = weak(), deviceId = std::move(deviceId)] {
if (auto upnpThis = w.lock()) {
upnpThis->processDiscoveryAdvertisementByebye(deviceId);
}
@@ -971,7 +917,7 @@
std::string publisherUrl(UpnpEventSubscribe_get_PublisherUrl_cstr(es_event));
// Process the response on the main thread.
- runOnPUPnPQueue([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
+ ioContext->post([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
if (auto upnpThis = w.lock()) {
upnpThis->processDiscoverySubscriptionExpired(event_type, publisherUrl);
}
@@ -1332,7 +1278,7 @@
break;
} else {
auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
- JAMI_ERROR("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
+ if (logger_) logger_->error("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
errorCode,
errorDescription);
break;
@@ -1356,7 +1302,7 @@
std::string transport(getFirstDocItem(response.get(), "NewProtocol"));
if (port_internal.empty() || port_external.empty() || transport.empty()) {
- // JAMI_ERR("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
+ // if (logger_) logger_->e("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
// entry_idx);
continue;
}
@@ -1372,9 +1318,9 @@
mapList.emplace(map.getMapKey(), std::move(map));
}
- JAMI_DEBUG("PUPnP: Found {:d} allocated mappings on IGD {:s}",
- mapList.size(),
- upnpIgd->toString());
+ // if (logger_) logger_->debug("PUPnP: Found {:d} allocated mappings on IGD {:s}",
+ // mapList.size(),
+ // upnpIgd->toString());
return mapList;
}
@@ -1385,22 +1331,23 @@
if (not(clientRegistered_ and igd->getLocalIp()))
return;
- // JAMI_DBG("PUPnP: Remove all mappings (if any) on IGD %s matching descr prefix %s",
- // igd->toString().c_str(),
+ // if (logger_) logger_->debug("PUPnP: Remove all mappings (if any) on IGD {} matching descr prefix {}",
+ // igd->toString(),
// Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX);
- auto mapList = getMappingsListByDescr(igd, description);
-
- for (auto const& [_, map] : mapList) {
- requestMappingRemove(map);
- }
+ ioContext->post([w=weak(), igd, description]{
+ if (auto sthis = w.lock()) {
+ auto mapList = sthis->getMappingsListByDescr(igd, description);
+ for (auto const& [_, map] : mapList) {
+ sthis->requestMappingRemove(map);
+ }
+ }
+ });
}
bool
PUPnP::actionAddPortMapping(const Mapping& mapping)
{
- CHECK_VALID_THREAD();
-
if (not clientRegistered_)
return false;
@@ -1475,13 +1422,13 @@
bool success = true;
if (upnp_err != UPNP_E_SUCCESS) {
- // JAMI_WARN("PUPnP: Failed to send action %s for mapping %s. %d: %s",
+ // if (logger_) logger_->warn("PUPnP: Failed to send action {} for mapping {}. {:d}: {}",
// ACTION_ADD_PORT_MAPPING,
- // mapping.toString().c_str(),
+ // mapping.toString(),
// upnp_err,
// UpnpGetErrorMessage(upnp_err));
- // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
- // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
+ // if (logger_) logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+ // if (logger_) logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
success = false;
}
@@ -1496,7 +1443,7 @@
errorDescription = getFirstDocItem(response.get(), "errorDescription");
}
- // JAMI_WARNING("PUPnP: {:s} returned with error: {:s} {:s}",
+ // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s} {:s}",
// ACTION_ADD_PORT_MAPPING,
// errorCode,
// errorDescription);
@@ -1507,8 +1454,6 @@
bool
PUPnP::actionDeletePortMapping(const Mapping& mapping)
{
- CHECK_VALID_THREAD();
-
if (not clientRegistered_)
return false;
@@ -1558,19 +1503,20 @@
bool success = true;
if (upnp_err != UPNP_E_SUCCESS) {
- // JAMI_WARN("PUPnP: Failed to send action %s for mapping from %s. %d: %s",
+ // if (logger_) {
+ // logger_->warn("PUPnP: Failed to send action {} for mapping from {}. {:d}: {}",
// ACTION_DELETE_PORT_MAPPING,
- // mapping.toString().c_str(),
+ // mapping.toString(),
// upnp_err,
// UpnpGetErrorMessage(upnp_err));
- // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
- // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
-
+ // logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+ // logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
+ // }
success = false;
}
if (not response) {
- // JAMI_WARN("PUPnP: Failed to get response for %s", ACTION_DELETE_PORT_MAPPING);
+ // if (logger_) logger_->warn("PUPnP: Failed to get response for {}", ACTION_DELETE_PORT_MAPPING);
success = false;
}
@@ -1578,7 +1524,7 @@
auto errorCode = getFirstDocItem(response.get(), "errorCode");
if (not errorCode.empty()) {
auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
- // JAMI_WARNING("PUPnP: {:s} returned with error: {:s}: {:s}",
+ // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s}: {:s}",
// ACTION_DELETE_PORT_MAPPING,
// errorCode,
// errorDescription);
diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h
index d45fdde..4c0ea78 100644
--- a/src/upnp/protocol/pupnp/pupnp.h
+++ b/src/upnp/protocol/pupnp/pupnp.h
@@ -24,11 +24,7 @@
#include "../upnp_protocol.h"
#include "../igd.h"
#include "upnp_igd.h"
-
-#include "logger.h"
-#include "connectivity/ip_utils.h"
-#include "noncopyable.h"
-#include "compiler_intrinsics.h"
+#include "ip_utils.h"
#include <upnp/upnp.h>
#include <upnp/upnptools.h>
@@ -68,7 +64,7 @@
GET_EXTERNAL_IP_ADDRESS
};
- PUPnP();
+ PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger);
~PUPnP();
// Set the observer
@@ -113,18 +109,8 @@
void terminate() override;
private:
- NON_COPYABLE(PUPnP);
-
- // Helpers to run tasks on PUPNP private execution queue.
- ScheduledExecutor* getPUPnPScheduler() { return &pupnpScheduler_; }
- template<typename Callback>
- void runOnPUPnPQueue(Callback&& cb)
- {
- pupnpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); });
- }
-
- // Helper to run tasks on UPNP context execution queue.
- ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); }
+ PUPnP& operator=(const PUPnP&) = delete;
+ PUPnP(const PUPnP&) = delete;
void terminate(std::condition_variable& cv);
@@ -224,15 +210,13 @@
std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); }
- // Execution queue to run lib upnp actions
- ScheduledExecutor pupnpScheduler_ {"pupnp"};
-
// Initialization status.
std::atomic_bool initialized_ {false};
// Client registration status.
std::atomic_bool clientRegistered_ {false};
- std::shared_ptr<Task> searchForIgdTimer_ {};
+ std::shared_ptr<asio::io_context> ioContext;
+ asio::steady_timer searchForIgdTimer_;
unsigned int igdSearchCounter_ {0};
// List of discovered IGDs.
diff --git a/src/upnp/protocol/pupnp/upnp_igd.h b/src/upnp/protocol/pupnp/upnp_igd.h
index 9f33505..5ea7fe5 100644
--- a/src/upnp/protocol/pupnp/upnp_igd.h
+++ b/src/upnp/protocol/pupnp/upnp_igd.h
@@ -16,10 +16,8 @@
*/
#pragma once
-#include "connectivity/upnp/protocol/igd.h"
-
-#include "noncopyable.h"
-#include "connectivity/ip_utils.h"
+#include "../igd.h"
+#include "ip_utils.h"
#include <map>
#include <string>
diff --git a/src/upnp/protocol/upnp_protocol.h b/src/upnp/protocol/upnp_protocol.h
index ce891e1..3dde4ab 100644
--- a/src/upnp/protocol/upnp_protocol.h
+++ b/src/upnp/protocol/upnp_protocol.h
@@ -16,7 +16,8 @@
*/
#pragma once
-#include "igd.h"
+#include "./igd.h"
+#include "upnp/upnp_context.h"
#include "upnp/mapping.h"
#include "ip_utils.h"
@@ -48,7 +49,7 @@
public:
enum class UpnpError : int { INVALID_ERR = -1, ERROR_OK, CONFLICT_IN_MAPPING };
- UPnPProtocol() {};
+ UPnPProtocol(const std::shared_ptr<dht::log::Logger>& logger) : logger_(logger) {};
virtual ~UPnPProtocol() {};
// Get protocol type.
@@ -93,6 +94,8 @@
// Terminate
virtual void terminate() = 0;
+
+ std::shared_ptr<dht::log::Logger> logger_;
};
} // namespace upnp
diff --git a/src/upnp/upnp_context.cpp b/src/upnp/upnp_context.cpp
index 6b1ff90..5022eb3 100644
--- a/src/upnp/upnp_context.cpp
+++ b/src/upnp/upnp_context.cpp
@@ -17,6 +17,13 @@
#include "upnp/upnp_context.h"
#include "protocol/upnp_protocol.h"
+#if HAVE_LIBNATPMP
+#include "protocol/natpmp/nat_pmp.h"
+#endif
+#if HAVE_LIBUPNP
+#include "protocol/pupnp/pupnp.h"
+#endif
+
#include <asio/steady_timer.hpp>
#if __has_include(<fmt/std.h>)
#include <fmt/std.h>
@@ -37,7 +44,7 @@
constexpr static uint16_t UPNP_UDP_PORT_MAX {UPNP_UDP_PORT_MIN + 5000};
UPnPContext::UPnPContext(const std::shared_ptr<asio::io_context>& ioContext, const std::shared_ptr<dht::log::Logger>& logger)
- : mappingListUpdateTimer_(*ioContext)
+ : mappingListUpdateTimer_(*ioContext), ctx(ioContext), logger_(logger)
{
// JAMI_DBG("Creating UPnPContext instance [%p]", this);
@@ -45,7 +52,7 @@
portRange_.emplace(PortType::TCP, std::make_pair(UPNP_TCP_PORT_MIN, UPNP_TCP_PORT_MAX));
portRange_.emplace(PortType::UDP, std::make_pair(UPNP_UDP_PORT_MIN, UPNP_UDP_PORT_MAX));
- ioContext->post([this] { init(); });
+ ctx->post([this] { init(); });
}
/*std::shared_ptr<UPnPContext>
@@ -86,8 +93,8 @@
std::unique_lock<std::mutex> lk(mappingMutex_);
std::condition_variable cv;
- runOnUpnpContextQueue([&, this] { shutdown(cv); });
-
+ ctx->post([&, this] { shutdown(cv); });
+
// JAMI_DBG("Waiting for shutdown ...");
if (cv.wait_for(lk, std::chrono::seconds(30), [this] { return shutdownComplete_; })) {
@@ -105,17 +112,14 @@
void
UPnPContext::init()
{
- threadId_ = getCurrentThread();
- CHECK_VALID_THREAD();
-
#if HAVE_LIBNATPMP
- auto natPmp = std::make_shared<NatPmp>();
+ auto natPmp = std::make_shared<NatPmp>(ctx, logger_);
natPmp->setObserver(this);
protocolList_.emplace(NatProtocolType::NAT_PMP, std::move(natPmp));
#endif
#if HAVE_LIBUPNP
- auto pupnp = std::make_shared<PUPnP>();
+ auto pupnp = std::make_shared<PUPnP>(ctx, logger_);
pupnp->setObserver(this);
protocolList_.emplace(NatProtocolType::PUPNP, std::move(pupnp));
#endif
@@ -126,13 +130,11 @@
{
assert(not controllerList_.empty());
- CHECK_VALID_THREAD();
-
// JAMI_DBG("Starting UPNP context");
// Request a new IGD search.
for (auto const& [_, protocol] : protocolList_) {
- protocol->searchForIgd();
+ ctx->dispatch([p=protocol] { p->searchForIgd(); });
}
started_ = true;
@@ -141,10 +143,10 @@
void
UPnPContext::stopUpnp(bool forceRelease)
{
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, forceRelease] { stopUpnp(forceRelease); });
+ /*if (not isValidThread()) {
+ ctx->post([this, forceRelease] { stopUpnp(forceRelease); });
return;
- }
+ }*/
// JAMI_DBG("Stopping UPNP context");
@@ -170,13 +172,12 @@
for (auto const& map : toRemoveList) {
requestRemoveMapping(map);
- // Notify is not needed in updateMappingState when
+ // Notify is not needed in updateState when
// shutting down (hence set it to false). NotifyCallback
// would trigger a new SIP registration and create a
// false registered state upon program close.
// It's handled by upper layers.
-
- updateMappingState(map, MappingState::FAILED, false);
+ map->updateState(MappingState::FAILED, false);
// We dont remove mappings with auto-update enabled,
// unless forceRelease is true.
if (not map->getAutoUpdate() or forceRelease) {
@@ -187,7 +188,7 @@
// Clear all current IGDs.
for (auto const& [_, protocol] : protocolList_) {
- protocol->clearIgds();
+ ctx->dispatch([p=protocol]{ p->clearIgds(); });
}
started_ = false;
@@ -221,10 +222,10 @@
void
UPnPContext::connectivityChanged()
{
- if (not isValidThread()) {
- runOnUpnpContextQueue([this] { connectivityChanged(); });
+ /*if (not isValidThread()) {
+ ctx->post([this] { connectivityChanged(); });
return;
- }
+ }*/
auto hostAddr = ip_utils::getLocalAddr(AF_INET);
@@ -367,10 +368,10 @@
void
UPnPContext::releaseMapping(const Mapping& map)
{
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, map] { releaseMapping(map); });
+ /*if (not isValidThread()) {
+ ctx->post([this, map] { releaseMapping(map); });
return;
- }
+ }*/
auto mapPtr = getMappingWithKey(map.getMapKey());
@@ -401,10 +402,10 @@
}
}
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, controller] { registerController(controller); });
+ /*if (not isValidThread()) {
+ ctx->post([this, controller] { registerController(controller); });
return;
- }
+ }*/
auto ret = controllerList_.emplace(controller);
if (not ret.second) {
@@ -420,10 +421,10 @@
void
UPnPContext::unregisterController(void* controller)
{
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, controller] { unregisterController(controller); });
+ /*if (not isValidThread()) {
+ ctx->post([this, controller] { unregisterController(controller); });
return;
- }
+ }*/
if (controllerList_.erase(controller) == 1) {
// JAMI_DBG("Successfully unregistered controller %p", controller);
@@ -462,10 +463,10 @@
{
assert(map);
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, map] { requestMapping(map); });
+ /*if (not isValidThread()) {
+ ctx->post([this, map] { requestMapping(map); });
return;
- }
+ }*/
auto const& igd = getPreferredIgd();
// We must have at least a valid IGD pointer if we get here.
@@ -484,8 +485,7 @@
// igd->getProtocolName(),
// igd->toString().c_str());
- if (map->getState() != MappingState::IN_PROGRESS)
- updateMappingState(map, MappingState::IN_PROGRESS);
+ map->updateState(MappingState::IN_PROGRESS);
auto const& protocol = protocolList_.at(igd->getProtocol());
protocol->requestMappingAdd(*map);
@@ -522,7 +522,7 @@
assert(portCount > 0);
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
std::lock_guard<std::mutex> lock(mappingMutex_);
auto& mappingList = getMappingList(type);
@@ -539,14 +539,14 @@
if (map->getState() == MappingState::OPEN and portCount > 0) {
// Close portCount mappings in "OPEN" state.
requestRemoveMapping(map);
- it = unregisterMapping(it);
+ it = mappingList.erase(it);
portCount--;
} else if (map->getState() != MappingState::OPEN) {
// If this methods is called, it means there are more open
// mappings than required. So, all mappings in a state other
// than "OPEN" state (typically in in-progress state) will
// be deleted as well.
- it = unregisterMapping(it);
+ it = mappingList.erase(it);
} else {
it++;
}
@@ -558,7 +558,7 @@
void
UPnPContext::updatePreferredIgd()
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
if (preferredIgd_ and preferredIgd_->isValid())
return;
@@ -594,7 +594,7 @@
std::shared_ptr<IGD>
UPnPContext::getPreferredIgd() const
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
return preferredIgd_;
}
@@ -604,11 +604,11 @@
{
// Run async if requested.
if (async) {
- runOnUpnpContextQueue([this] { updateMappingList(false); });
+ ctx->post([this] { updateMappingList(false); });
return;
}
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// Update the preferred IGD.
updatePreferredIgd();
@@ -712,7 +712,7 @@
void
UPnPContext::pruneMappingList()
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
MappingStatus status;
getMappingStatus(status);
@@ -776,7 +776,7 @@
}
for (auto const& map : toRemoveList) {
- updateMappingState(map, MappingState::FAILED);
+ map->updateState(MappingState::FAILED);
unregisterMapping(map);
}
}
@@ -817,7 +817,7 @@
void
UPnPContext::pruneMappingsWithInvalidIgds(const std::shared_ptr<IGD>& igd)
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// Use temporary list to avoid holding the lock while
// processing the mapping list.
@@ -840,7 +840,7 @@
// map->toString().c_str(),
// igd->toString().c_str(),
// igd->getProtocolName());
- updateMappingState(map, MappingState::FAILED);
+ map->updateState(MappingState::FAILED);
unregisterMapping(map);
}
}
@@ -926,10 +926,10 @@
{
assert(igd);
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, igd, event] { onIgdUpdated(igd, event); });
+ /*if (not isValidThread()) {
+ ctx->post([this, igd, event] { onIgdUpdated(igd, event); });
return;
- }
+ }*/
// Reset to start search for a new best IGD.
preferredIgd_.reset();
@@ -1005,7 +1005,7 @@
void
UPnPContext::onMappingAdded(const std::shared_ptr<IGD>& igd, const Mapping& mapRes)
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// Check if we have a pending request for this response.
auto map = getMappingWithKey(mapRes.getMapKey());
@@ -1024,7 +1024,7 @@
map->setExternalPort(mapRes.getExternalPort());
// Update the state and report to the owner.
- updateMappingState(map, MappingState::OPEN);
+ map->updateState(MappingState::OPEN);
// JAMI_DBG("Mapping %s (on IGD %s [%s]) successfully performed",
// map->toString().c_str(),
@@ -1066,18 +1066,10 @@
void
UPnPContext::requestRemoveMapping(const Mapping::sharedPtr_t& map)
{
- CHECK_VALID_THREAD();
-
- if (not map) {
- // JAMI_ERR("Mapping shared pointer is null!");
- return;
- }
-
- if (not map->isValid()) {
+ if (not map or not map->isValid()) {
// Silently ignore if the mapping is invalid
return;
}
-
auto protocol = protocolList_.at(map->getIgd()->getProtocol());
protocol->requestMappingRemove(*map);
}
@@ -1085,10 +1077,10 @@
void
UPnPContext::deleteAllMappings(PortType type)
{
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, type] { deleteAllMappings(type); });
+ /*if (not isValidThread()) {
+ ctx->post([this, type] { deleteAllMappings(type); });
return;
- }
+ }*/
std::lock_guard<std::mutex> lock(mappingMutex_);
auto& mappingList = getMappingList(type);
@@ -1104,10 +1096,10 @@
if (not mapRes.isValid())
return;
- if (not isValidThread()) {
- runOnUpnpContextQueue([this, igd, mapRes] { onMappingRemoved(igd, mapRes); });
+ /*if (not isValidThread()) {
+ ctx->post([this, igd, mapRes] { onMappingRemoved(igd, mapRes); });
return;
- }
+ }*/
auto map = getMappingWithKey(mapRes.getMapKey());
// Notify the listener.
@@ -1154,23 +1146,10 @@
return mapPtr;
}
-std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator
-UPnPContext::unregisterMapping(std::map<Mapping::key_t, Mapping::sharedPtr_t>::iterator it)
-{
- assert(it->second);
-
- CHECK_VALID_THREAD();
- auto descr = it->second->toString();
- auto& mappingList = getMappingList(it->second->getType());
- auto ret = mappingList.erase(it);
-
- return ret;
-}
-
void
UPnPContext::unregisterMapping(const Mapping::sharedPtr_t& map)
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
if (not map) {
// JAMI_ERR("Mapping pointer is null");
@@ -1256,8 +1235,6 @@
void
UPnPContext::onMappingRequestFailed(const Mapping& mapRes)
{
- CHECK_VALID_THREAD();
-
auto const& map = getMappingWithKey(mapRes.getMapKey());
if (not map) {
// We may receive a response for a removed request. Just ignore it.
@@ -1273,7 +1250,7 @@
return;
}
- updateMappingState(map, MappingState::FAILED);
+ map->updateState(MappingState::FAILED);
unregisterMapping(map);
// JAMI_WARN("Mapping request for %s failed on IGD %s [%s]",
@@ -1282,32 +1259,11 @@
// igd->getProtocolName());
}
-void
-UPnPContext::updateMappingState(const Mapping::sharedPtr_t& map, MappingState newState, bool notify)
-{
- CHECK_VALID_THREAD();
-
- assert(map);
-
- // Ignore if the state did not change.
- if (newState == map->getState()) {
- // JAMI_DBG("Mapping %s already in state %s", map->toString().c_str(), map->getStateStr());
- return;
- }
-
- // Update the state.
- map->setState(newState);
-
- // Notify the listener if set.
- if (notify and map->getNotifyCallback())
- map->getNotifyCallback()(map);
-}
-
#if HAVE_LIBNATPMP
void
UPnPContext::renewAllocations()
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// Check if the we have valid PMP IGD.
auto pmpProto = protocolList_.at(NatProtocolType::NAT_PMP);