add upnp/natpmp support
Change-Id: I4945a7df3a30cb39d81a33fc7a32e9fea600bdff
diff --git a/src/upnp/protocol/mapping.cpp b/src/upnp/protocol/mapping.cpp
index 3aa24b3..4b9d5ba 100644
--- a/src/upnp/protocol/mapping.cpp
+++ b/src/upnp/protocol/mapping.cpp
@@ -94,6 +94,20 @@
state_ = state;
}
+void
+Mapping::updateState(const MappingState& newState, bool notify)
+{
+ std::unique_lock<std::mutex> lock(mutex_);
+ if (newState == state_)
+ return;
+ state_ = newState;
+
+ if (notify && notifyCb_) {
+ lock.unlock();
+ notifyCb_(shared_from_this());
+ }
+}
+
const char*
Mapping::getStateStr() const
{
diff --git a/src/upnp/protocol/natpmp/nat_pmp.cpp b/src/upnp/protocol/natpmp/nat_pmp.cpp
index 599205e..2f48713 100644
--- a/src/upnp/protocol/natpmp/nat_pmp.cpp
+++ b/src/upnp/protocol/natpmp/nat_pmp.cpp
@@ -21,11 +21,11 @@
namespace dhtnet {
namespace upnp {
-NatPmp::NatPmp()
+NatPmp::NatPmp(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger)
+ : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx)
{
// JAMI_DBG("NAT-PMP: Instance [%p] created", this);
- runOnNatPmpQueue([this] {
- threadId_ = getCurrentThread();
+ ioContext->dispatch([this] {
igd_ = std::make_shared<PMPIGD>();
});
}
@@ -38,15 +38,6 @@
void
NatPmp::initNatPmp()
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak()] {
- if (auto pmpThis = w.lock()) {
- pmpThis->initNatPmp();
- }
- });
- return;
- }
-
initialized_ = false;
{
@@ -82,8 +73,7 @@
err = NATPMP_ERR_CANNOTGETGATEWAY;
} else {
// JAMI_WARN("NAT-PMP: Trying to initialize using detected gateway %s",
- localGw.toString().c_str());
-
+ // localGw.toString().c_str());
struct in_addr inaddr;
inet_pton(AF_INET, localGw.toString().c_str(), &inaddr);
err = initnatpmp(&natpmpHdl_, 1, inaddr.s_addr);
@@ -119,17 +109,6 @@
void
NatPmp::setObserver(UpnpMappingObserver* obs)
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), obs] {
- if (auto pmpThis = w.lock()) {
- pmpThis->setObserver(obs);
- }
- });
- return;
- }
-
- // JAMI_DBG("NAT-PMP: Setting observer to %p", obs);
-
observer_ = obs;
}
@@ -152,10 +131,8 @@
std::unique_lock<std::mutex> lk(natpmpMutex_);
std::condition_variable cv {};
- runOnNatPmpQueue([w = weak(), &cv = cv] {
- if (auto pmpThis = w.lock()) {
- pmpThis->terminate(cv);
- }
+ ioContext->dispatch([&] {
+ terminate(cv);
});
if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) {
@@ -175,15 +152,6 @@
void
NatPmp::clearIgds()
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak()] {
- if (auto pmpThis = w.lock()) {
- pmpThis->clearIgds();
- }
- });
- return;
- }
-
bool do_close = false;
if (igd_) {
@@ -194,8 +162,7 @@
}
initialized_ = false;
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
+ searchForIgdTimer_.cancel();
igdSearchCounter_ = 0;
@@ -208,15 +175,6 @@
void
NatPmp::searchForIgd()
{
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak()] {
- if (auto pmpThis = w.lock()) {
- pmpThis->searchForIgd();
- }
- });
- return;
- }
-
if (not initialized_) {
initNatPmp();
}
@@ -227,12 +185,11 @@
// JAMI_DBG("NAT-PMP: Start search for IGDs. Attempt %i", igdSearchCounter_);
// Cancel the current timer (if any) and re-schedule.
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
-
- searchForIgdTimer_ = getNatpmpScheduler()->scheduleIn([this] { searchForIgd(); },
- NATPMP_SEARCH_RETRY_UNIT
- * igdSearchCounter_);
+ searchForIgdTimer_.expires_after(NATPMP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ searchForIgdTimer_.async_wait([this](const asio::error_code& ec) {
+ if (!ec)
+ searchForIgd();
+ });
} else {
// JAMI_WARN("NAT-PMP: Setup failed after %u trials. NAT-PMP will be disabled!",
// MAX_RESTART_SEARCH_RETRIES);
@@ -291,14 +248,14 @@
NatPmp::requestMappingAdd(const Mapping& mapping)
{
// Process on nat-pmp thread.
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), mapping] {
+ /*if (not isValidThread()) {
+ ioContext->post([w = weak(), mapping] {
if (auto pmpThis = w.lock()) {
pmpThis->requestMappingAdd(mapping);
}
});
return;
- }
+ }*/
Mapping map(mapping);
assert(map.getIgd());
@@ -329,14 +286,14 @@
NatPmp::requestMappingRenew(const Mapping& mapping)
{
// Process on nat-pmp thread.
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), mapping] {
+ /*if (not isValidThread()) {
+ ioContext->post([w = weak(), mapping] {
if (auto pmpThis = w.lock()) {
pmpThis->requestMappingRenew(mapping);
}
});
return;
- }
+ }*/
Mapping map(mapping);
auto err = addPortMapping(map);
@@ -401,7 +358,7 @@
int
NatPmp::sendMappingRequest(const Mapping& mapping, uint32_t& lifetime)
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
int err = sendnewportmappingrequest(&natpmpHdl_,
mapping.getType() == PortType::UDP ? NATPMP_PROTOCOL_UDP
@@ -478,16 +435,12 @@
void
NatPmp::requestMappingRemove(const Mapping& mapping)
{
- // Process on nat-pmp thread.
- if (not isValidThread()) {
- runOnNatPmpQueue([w = weak(), mapping] {
- if (auto pmpThis = w.lock()) {
- Mapping map {mapping};
- pmpThis->removePortMapping(map);
- }
- });
- return;
- }
+ ioContext->dispatch([w = weak(), mapping] {
+ if (auto pmpThis = w.lock()) {
+ Mapping map {mapping};
+ pmpThis->removePortMapping(map);
+ }
+ });
}
void
@@ -522,7 +475,7 @@
void
NatPmp::getIgdPublicAddress()
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// Set the public address for this IGD if it does not
// have one already.
@@ -585,7 +538,7 @@
void
NatPmp::removeAllMappings()
{
- CHECK_VALID_THREAD();
+ //CHECK_VALID_THREAD();
// JAMI_WARN("NAT-PMP: Send request to close all existing mappings to IGD %s",
// igd_->toString().c_str());
@@ -721,7 +674,7 @@
if (observer_ == nullptr)
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); });
+ ioContext->post([obs = observer_, igd = igd_, event] { obs->onIgdUpdated(igd, event); });
}
void
@@ -731,7 +684,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingAdded(igd, map); });
}
void
@@ -741,7 +694,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRequestFailed(map); });
}
void
@@ -751,7 +704,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRenewed(igd, map); });
}
void
@@ -761,7 +714,7 @@
return;
// Process the response on the context thread.
- runOnUpnpContextQueue([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); });
+ ioContext->post([obs = observer_, igd = igd_, map] { obs->onMappingRemoved(igd, map); });
}
} // namespace upnp
diff --git a/src/upnp/protocol/natpmp/nat_pmp.h b/src/upnp/protocol/natpmp/nat_pmp.h
index 30642a0..fbc2fc0 100644
--- a/src/upnp/protocol/natpmp/nat_pmp.h
+++ b/src/upnp/protocol/natpmp/nat_pmp.h
@@ -16,14 +16,10 @@
*/
#pragma once
-#include "connectivity/upnp/protocol/upnp_protocol.h"
-#include "connectivity/upnp/protocol/igd.h"
+#include "../upnp_protocol.h"
+#include "../igd.h"
#include "pmp_igd.h"
-
-#include "logger.h"
-#include "connectivity/ip_utils.h"
-#include "noncopyable.h"
-#include "compiler_intrinsics.h"
+#include "ip_utils.h"
// uncomment to enable native natpmp error messages
//#define ENABLE_STRNATPMPERR 1
@@ -53,7 +49,7 @@
class NatPmp : public UPnPProtocol
{
public:
- NatPmp();
+ NatPmp(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger);
~NatPmp();
// Set the observer.
@@ -94,21 +90,11 @@
void terminate() override;
private:
- NON_COPYABLE(NatPmp);
+ NatPmp& operator=(const NatPmp&) = delete;
+ NatPmp(const NatPmp&) = delete;
std::weak_ptr<NatPmp> weak() { return std::static_pointer_cast<NatPmp>(shared_from_this()); }
- // Helpers to run tasks on NAT-PMP internal execution queue.
- ScheduledExecutor* getNatpmpScheduler() { return &natpmpScheduler_; }
- template<typename Callback>
- void runOnNatPmpQueue(Callback&& cb)
- {
- natpmpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); });
- }
-
- // Helpers to run tasks on UPNP context execution queue.
- ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); }
-
void terminate(std::condition_variable& cv);
void initNatPmp();
@@ -147,8 +133,8 @@
// Data members
std::shared_ptr<PMPIGD> igd_;
natpmp_t natpmpHdl_;
- ScheduledExecutor natpmpScheduler_ {"natpmp"};
- std::shared_ptr<Task> searchForIgdTimer_ {};
+ std::shared_ptr<asio::io_context> ioContext;
+ asio::steady_timer searchForIgdTimer_;
unsigned int igdSearchCounter_ {0};
UpnpMappingObserver* observer_ {nullptr};
IpAddr hostAddress_ {};
diff --git a/src/upnp/protocol/natpmp/pmp_igd.h b/src/upnp/protocol/natpmp/pmp_igd.h
index df1c44b..1ca7fe7 100644
--- a/src/upnp/protocol/natpmp/pmp_igd.h
+++ b/src/upnp/protocol/natpmp/pmp_igd.h
@@ -17,8 +17,7 @@
#pragma once
#include "../igd.h"
-#include "noncopyable.h"
-#include "connectivity/ip_utils.h"
+#include "ip_utils.h"
#include <map>
#include <atomic>
diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp
index e6ad85d..df68696 100644
--- a/src/upnp/protocol/pupnp/pupnp.cpp
+++ b/src/upnp/protocol/pupnp/pupnp.cpp
@@ -15,6 +15,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "pupnp.h"
+#include "string_utils.h"
#include <opendht/thread_pool.h>
#include <opendht/http.h>
@@ -95,13 +96,10 @@
// UPNP class implementation
-PUPnP::PUPnP()
+PUPnP::PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger)
+ : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx)
{
- // JAMI_DBG("PUPnP: Creating instance [%p] ...", this);
- runOnPUPnPQueue([this] {
- threadId_ = getCurrentThread();
- // JAMI_DBG("PUPnP: Instance [%p] created", this);
- });
+ // JAMI_LOG("PUPnP: Creating instance [{}] ...", fmt::ptr(this));
}
PUPnP::~PUPnP()
@@ -142,10 +140,10 @@
ip_address6 = UpnpGetServerIp6Address();
port6 = UpnpGetServerPort6();
#endif
- if (ip_address6 and port6)
- // JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
+ /*if (ip_address6 and port6)
+ JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
else
- // JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);
+ JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);*/
// Relax the parser to allow malformed XML text.
ixmlRelaxParser(1);
@@ -165,8 +163,6 @@
{
assert(not clientRegistered_);
- CHECK_VALID_THREAD();
-
// Register Upnp control point.
int upnp_err = UpnpRegisterClient(ctrlPtCallback, this, &ctrlptHandle_);
if (upnp_err != UPNP_E_SUCCESS) {
@@ -180,17 +176,6 @@
void
PUPnP::setObserver(UpnpMappingObserver* obs)
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak(), obs] {
- if (auto upnpThis = w.lock()) {
- upnpThis->setObserver(obs);
- }
- });
- return;
- }
-
- // JAMI_DBG("PUPnP: Setting observer to %p", obs);
-
observer_ = obs;
}
@@ -236,10 +221,8 @@
std::unique_lock<std::mutex> lk(pupnpMutex_);
std::condition_variable cv {};
- runOnPUPnPQueue([w = weak(), &cv = cv] {
- if (auto upnpThis = w.lock()) {
- upnpThis->terminate(cv);
- }
+ ioContext->dispatch([&] {
+ terminate(cv);
});
if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) {
@@ -254,8 +237,6 @@
void
PUPnP::searchForDevices()
{
- CHECK_VALID_THREAD();
-
// JAMI_DBG("PUPnP: Send IGD search request");
// Send out search for multiple types of devices, as some routers may possibly
@@ -293,19 +274,9 @@
void
PUPnP::clearIgds()
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak()] {
- if (auto upnpThis = w.lock()) {
- upnpThis->clearIgds();
- }
- });
- return;
- }
-
// JAMI_DBG("PUPnP: clearing IGDs and devices lists");
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
+ searchForIgdTimer_.cancel();
igdSearchCounter_ = 0;
@@ -324,15 +295,6 @@
void
PUPnP::searchForIgd()
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak()] {
- if (auto upnpThis = w.lock()) {
- upnpThis->searchForIgd();
- }
- });
- return;
- }
-
// Update local address before searching.
updateHostAddress();
@@ -375,15 +337,13 @@
// The connectivity change may be received while the the local
// interface is not fully setup. The rescheduling typically
// usefull to mitigate this race.
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
-
- searchForIgdTimer_ = getUpnContextScheduler()->scheduleIn(
- [w = weak()] {
+ searchForIgdTimer_.expires_after(PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ searchForIgdTimer_.async_wait([w = weak()] (const asio::error_code& ec) {
+ if (not ec) {
if (auto upnpThis = w.lock())
upnpThis->searchForIgd();
- },
- PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ }
+ });
}
std::list<std::shared_ptr<IGD>>
@@ -453,8 +413,6 @@
bool
PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr)
{
- CHECK_VALID_THREAD();
-
assert(doc_container_ptr != nullptr);
XMLDocument document(doc_container_ptr, ixmlDocument_free);
@@ -574,7 +532,7 @@
}
// Report to the listener.
- runOnUpnpContextQueue([w = weak(), igd_candidate] {
+ ioContext->post([w = weak(), igd_candidate] {
if (auto upnpThis = w.lock()) {
if (upnpThis->observer_)
upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED);
@@ -587,7 +545,7 @@
void
PUPnP::requestMappingAdd(const Mapping& mapping)
{
- runOnPUPnPQueue([w = weak(), mapping] {
+ ioContext->post([w = weak(), mapping] {
if (auto upnpThis = w.lock()) {
if (not upnpThis->isRunning())
return;
@@ -609,7 +567,7 @@
PUPnP::requestMappingRemove(const Mapping& mapping)
{
// Send remove request using the matching IGD
- runOnPUPnPQueue([w = weak(), mapping] {
+ ioContext->dispatch([w = weak(), mapping] {
if (auto upnpThis = w.lock()) {
// Abort if we are shutting down.
if (not upnpThis->isRunning())
@@ -650,12 +608,10 @@
void
PUPnP::processAddMapAction(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([w = weak(), map] {
+ ioContext->post([w = weak(), map] {
if (auto upnpThis = w.lock()) {
if (upnpThis->observer_)
upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map));
@@ -666,12 +622,10 @@
void
PUPnP::processRequestMappingFailure(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([w = weak(), map] {
+ ioContext->post([w = weak(), map] {
if (auto upnpThis = w.lock()) {
// JAMI_DBG("PUPnP: Failed to request mapping %s", map.toString().c_str());
if (upnpThis->observer_)
@@ -683,12 +637,10 @@
void
PUPnP::processRemoveMapAction(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([map, obs = observer_] {
+ ioContext->post([map, obs = observer_] {
// JAMI_DBG("PUPnP: Closed mapping %s", map.toString().c_str());
obs->onMappingRemoved(map.getIgd(), std::move(map));
});
@@ -779,8 +731,6 @@
const std::string& igdLocationUrl,
const IpAddr& dstAddr)
{
- CHECK_VALID_THREAD();
-
// Update host address if needed.
if (not hasValidHostAddress())
updateHostAddress();
@@ -833,12 +783,12 @@
int upnp_err = UpnpDownloadXmlDoc(locationUrl.c_str(), &doc_container_ptr);
if (upnp_err != UPNP_E_SUCCESS or not doc_container_ptr) {
- // JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s",
- // locationUrl.c_str(),
- // UpnpGetErrorMessage(upnp_err));
+ if(logger_) logger_->warn("PUPnP: Error downloading device XML document from {} -> {}",
+ locationUrl,
+ UpnpGetErrorMessage(upnp_err));
} else {
- // JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", locationUrl.c_str());
- runOnPUPnPQueue([w = weak(), url = locationUrl, doc_container_ptr] {
+ if(logger_) logger_->debug("PUPnP: Succeeded to download device XML document from {}", locationUrl);
+ ioContext->post([w = weak(), url = locationUrl, doc_container_ptr] {
if (auto upnpThis = w.lock()) {
upnpThis->validateIgd(url, doc_container_ptr);
}
@@ -849,8 +799,6 @@
void
PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId)
{
- CHECK_VALID_THREAD();
-
discoveredIgdList_.erase(cpDeviceId);
std::shared_ptr<IGD> igd;
@@ -882,8 +830,6 @@
void
PUPnP::processDiscoverySubscriptionExpired(Upnp_EventType event_type, const std::string& eventSubUrl)
{
- CHECK_VALID_THREAD();
-
std::lock_guard<std::mutex> lk(pupnpMutex_);
for (auto& it : validIgdList_) {
if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) {
@@ -925,7 +871,7 @@
std::string deviceId {UpnpDiscovery_get_DeviceID_cstr(d_event)};
std::string location {UpnpDiscovery_get_Location_cstr(d_event)};
IpAddr dstAddr(*(const pj_sockaddr*) (UpnpDiscovery_get_DestAddr(d_event)));
- runOnPUPnPQueue([w = weak(),
+ ioContext->post([w = weak(),
deviceId = std::move(deviceId),
location = std::move(location),
dstAddr = std::move(dstAddr)] {
@@ -941,7 +887,7 @@
std::string deviceId(UpnpDiscovery_get_DeviceID_cstr(d_event));
// Process the response on the main thread.
- runOnPUPnPQueue([w = weak(), deviceId = std::move(deviceId)] {
+ ioContext->post([w = weak(), deviceId = std::move(deviceId)] {
if (auto upnpThis = w.lock()) {
upnpThis->processDiscoveryAdvertisementByebye(deviceId);
}
@@ -971,7 +917,7 @@
std::string publisherUrl(UpnpEventSubscribe_get_PublisherUrl_cstr(es_event));
// Process the response on the main thread.
- runOnPUPnPQueue([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
+ ioContext->post([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
if (auto upnpThis = w.lock()) {
upnpThis->processDiscoverySubscriptionExpired(event_type, publisherUrl);
}
@@ -1332,7 +1278,7 @@
break;
} else {
auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
- JAMI_ERROR("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
+ if (logger_) logger_->error("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
errorCode,
errorDescription);
break;
@@ -1356,7 +1302,7 @@
std::string transport(getFirstDocItem(response.get(), "NewProtocol"));
if (port_internal.empty() || port_external.empty() || transport.empty()) {
- // JAMI_ERR("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
+ // if (logger_) logger_->e("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
// entry_idx);
continue;
}
@@ -1372,9 +1318,9 @@
mapList.emplace(map.getMapKey(), std::move(map));
}
- JAMI_DEBUG("PUPnP: Found {:d} allocated mappings on IGD {:s}",
- mapList.size(),
- upnpIgd->toString());
+ // if (logger_) logger_->debug("PUPnP: Found {:d} allocated mappings on IGD {:s}",
+ // mapList.size(),
+ // upnpIgd->toString());
return mapList;
}
@@ -1385,22 +1331,23 @@
if (not(clientRegistered_ and igd->getLocalIp()))
return;
- // JAMI_DBG("PUPnP: Remove all mappings (if any) on IGD %s matching descr prefix %s",
- // igd->toString().c_str(),
+ // if (logger_) logger_->debug("PUPnP: Remove all mappings (if any) on IGD {} matching descr prefix {}",
+ // igd->toString(),
// Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX);
- auto mapList = getMappingsListByDescr(igd, description);
-
- for (auto const& [_, map] : mapList) {
- requestMappingRemove(map);
- }
+ ioContext->post([w=weak(), igd, description]{
+ if (auto sthis = w.lock()) {
+ auto mapList = sthis->getMappingsListByDescr(igd, description);
+ for (auto const& [_, map] : mapList) {
+ sthis->requestMappingRemove(map);
+ }
+ }
+ });
}
bool
PUPnP::actionAddPortMapping(const Mapping& mapping)
{
- CHECK_VALID_THREAD();
-
if (not clientRegistered_)
return false;
@@ -1475,13 +1422,13 @@
bool success = true;
if (upnp_err != UPNP_E_SUCCESS) {
- // JAMI_WARN("PUPnP: Failed to send action %s for mapping %s. %d: %s",
+ // if (logger_) logger_->warn("PUPnP: Failed to send action {} for mapping {}. {:d}: {}",
// ACTION_ADD_PORT_MAPPING,
- // mapping.toString().c_str(),
+ // mapping.toString(),
// upnp_err,
// UpnpGetErrorMessage(upnp_err));
- // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
- // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
+ // if (logger_) logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+ // if (logger_) logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
success = false;
}
@@ -1496,7 +1443,7 @@
errorDescription = getFirstDocItem(response.get(), "errorDescription");
}
- // JAMI_WARNING("PUPnP: {:s} returned with error: {:s} {:s}",
+ // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s} {:s}",
// ACTION_ADD_PORT_MAPPING,
// errorCode,
// errorDescription);
@@ -1507,8 +1454,6 @@
bool
PUPnP::actionDeletePortMapping(const Mapping& mapping)
{
- CHECK_VALID_THREAD();
-
if (not clientRegistered_)
return false;
@@ -1558,19 +1503,20 @@
bool success = true;
if (upnp_err != UPNP_E_SUCCESS) {
- // JAMI_WARN("PUPnP: Failed to send action %s for mapping from %s. %d: %s",
+ // if (logger_) {
+ // logger_->warn("PUPnP: Failed to send action {} for mapping from {}. {:d}: {}",
// ACTION_DELETE_PORT_MAPPING,
- // mapping.toString().c_str(),
+ // mapping.toString(),
// upnp_err,
// UpnpGetErrorMessage(upnp_err));
- // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
- // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
-
+ // logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+ // logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
+ // }
success = false;
}
if (not response) {
- // JAMI_WARN("PUPnP: Failed to get response for %s", ACTION_DELETE_PORT_MAPPING);
+ // if (logger_) logger_->warn("PUPnP: Failed to get response for {}", ACTION_DELETE_PORT_MAPPING);
success = false;
}
@@ -1578,7 +1524,7 @@
auto errorCode = getFirstDocItem(response.get(), "errorCode");
if (not errorCode.empty()) {
auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
- // JAMI_WARNING("PUPnP: {:s} returned with error: {:s}: {:s}",
+ // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s}: {:s}",
// ACTION_DELETE_PORT_MAPPING,
// errorCode,
// errorDescription);
diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h
index d45fdde..4c0ea78 100644
--- a/src/upnp/protocol/pupnp/pupnp.h
+++ b/src/upnp/protocol/pupnp/pupnp.h
@@ -24,11 +24,7 @@
#include "../upnp_protocol.h"
#include "../igd.h"
#include "upnp_igd.h"
-
-#include "logger.h"
-#include "connectivity/ip_utils.h"
-#include "noncopyable.h"
-#include "compiler_intrinsics.h"
+#include "ip_utils.h"
#include <upnp/upnp.h>
#include <upnp/upnptools.h>
@@ -68,7 +64,7 @@
GET_EXTERNAL_IP_ADDRESS
};
- PUPnP();
+ PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger);
~PUPnP();
// Set the observer
@@ -113,18 +109,8 @@
void terminate() override;
private:
- NON_COPYABLE(PUPnP);
-
- // Helpers to run tasks on PUPNP private execution queue.
- ScheduledExecutor* getPUPnPScheduler() { return &pupnpScheduler_; }
- template<typename Callback>
- void runOnPUPnPQueue(Callback&& cb)
- {
- pupnpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); });
- }
-
- // Helper to run tasks on UPNP context execution queue.
- ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); }
+ PUPnP& operator=(const PUPnP&) = delete;
+ PUPnP(const PUPnP&) = delete;
void terminate(std::condition_variable& cv);
@@ -224,15 +210,13 @@
std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); }
- // Execution queue to run lib upnp actions
- ScheduledExecutor pupnpScheduler_ {"pupnp"};
-
// Initialization status.
std::atomic_bool initialized_ {false};
// Client registration status.
std::atomic_bool clientRegistered_ {false};
- std::shared_ptr<Task> searchForIgdTimer_ {};
+ std::shared_ptr<asio::io_context> ioContext;
+ asio::steady_timer searchForIgdTimer_;
unsigned int igdSearchCounter_ {0};
// List of discovered IGDs.
diff --git a/src/upnp/protocol/pupnp/upnp_igd.h b/src/upnp/protocol/pupnp/upnp_igd.h
index 9f33505..5ea7fe5 100644
--- a/src/upnp/protocol/pupnp/upnp_igd.h
+++ b/src/upnp/protocol/pupnp/upnp_igd.h
@@ -16,10 +16,8 @@
*/
#pragma once
-#include "connectivity/upnp/protocol/igd.h"
-
-#include "noncopyable.h"
-#include "connectivity/ip_utils.h"
+#include "../igd.h"
+#include "ip_utils.h"
#include <map>
#include <string>
diff --git a/src/upnp/protocol/upnp_protocol.h b/src/upnp/protocol/upnp_protocol.h
index ce891e1..3dde4ab 100644
--- a/src/upnp/protocol/upnp_protocol.h
+++ b/src/upnp/protocol/upnp_protocol.h
@@ -16,7 +16,8 @@
*/
#pragma once
-#include "igd.h"
+#include "./igd.h"
+#include "upnp/upnp_context.h"
#include "upnp/mapping.h"
#include "ip_utils.h"
@@ -48,7 +49,7 @@
public:
enum class UpnpError : int { INVALID_ERR = -1, ERROR_OK, CONFLICT_IN_MAPPING };
- UPnPProtocol() {};
+ UPnPProtocol(const std::shared_ptr<dht::log::Logger>& logger) : logger_(logger) {};
virtual ~UPnPProtocol() {};
// Get protocol type.
@@ -93,6 +94,8 @@
// Terminate
virtual void terminate() = 0;
+
+ std::shared_ptr<dht::log::Logger> logger_;
};
} // namespace upnp