add upnp/natpmp support
Change-Id: I4945a7df3a30cb39d81a33fc7a32e9fea600bdff
diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp
index e6ad85d..df68696 100644
--- a/src/upnp/protocol/pupnp/pupnp.cpp
+++ b/src/upnp/protocol/pupnp/pupnp.cpp
@@ -15,6 +15,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include "pupnp.h"
+#include "string_utils.h"
#include <opendht/thread_pool.h>
#include <opendht/http.h>
@@ -95,13 +96,10 @@
// UPNP class implementation
-PUPnP::PUPnP()
+PUPnP::PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger)
+ : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx)
{
- // JAMI_DBG("PUPnP: Creating instance [%p] ...", this);
- runOnPUPnPQueue([this] {
- threadId_ = getCurrentThread();
- // JAMI_DBG("PUPnP: Instance [%p] created", this);
- });
+ // JAMI_LOG("PUPnP: Creating instance [{}] ...", fmt::ptr(this));
}
PUPnP::~PUPnP()
@@ -142,10 +140,10 @@
ip_address6 = UpnpGetServerIp6Address();
port6 = UpnpGetServerPort6();
#endif
- if (ip_address6 and port6)
- // JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
+ /*if (ip_address6 and port6)
+ JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
else
- // JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);
+ JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);*/
// Relax the parser to allow malformed XML text.
ixmlRelaxParser(1);
@@ -165,8 +163,6 @@
{
assert(not clientRegistered_);
- CHECK_VALID_THREAD();
-
// Register Upnp control point.
int upnp_err = UpnpRegisterClient(ctrlPtCallback, this, &ctrlptHandle_);
if (upnp_err != UPNP_E_SUCCESS) {
@@ -180,17 +176,6 @@
void
PUPnP::setObserver(UpnpMappingObserver* obs)
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak(), obs] {
- if (auto upnpThis = w.lock()) {
- upnpThis->setObserver(obs);
- }
- });
- return;
- }
-
- // JAMI_DBG("PUPnP: Setting observer to %p", obs);
-
observer_ = obs;
}
@@ -236,10 +221,8 @@
std::unique_lock<std::mutex> lk(pupnpMutex_);
std::condition_variable cv {};
- runOnPUPnPQueue([w = weak(), &cv = cv] {
- if (auto upnpThis = w.lock()) {
- upnpThis->terminate(cv);
- }
+ ioContext->dispatch([&] {
+ terminate(cv);
});
if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) {
@@ -254,8 +237,6 @@
void
PUPnP::searchForDevices()
{
- CHECK_VALID_THREAD();
-
// JAMI_DBG("PUPnP: Send IGD search request");
// Send out search for multiple types of devices, as some routers may possibly
@@ -293,19 +274,9 @@
void
PUPnP::clearIgds()
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak()] {
- if (auto upnpThis = w.lock()) {
- upnpThis->clearIgds();
- }
- });
- return;
- }
-
// JAMI_DBG("PUPnP: clearing IGDs and devices lists");
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
+ searchForIgdTimer_.cancel();
igdSearchCounter_ = 0;
@@ -324,15 +295,6 @@
void
PUPnP::searchForIgd()
{
- if (not isValidThread()) {
- runOnPUPnPQueue([w = weak()] {
- if (auto upnpThis = w.lock()) {
- upnpThis->searchForIgd();
- }
- });
- return;
- }
-
// Update local address before searching.
updateHostAddress();
@@ -375,15 +337,13 @@
// The connectivity change may be received while the the local
// interface is not fully setup. The rescheduling typically
// usefull to mitigate this race.
- if (searchForIgdTimer_)
- searchForIgdTimer_->cancel();
-
- searchForIgdTimer_ = getUpnContextScheduler()->scheduleIn(
- [w = weak()] {
+ searchForIgdTimer_.expires_after(PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ searchForIgdTimer_.async_wait([w = weak()] (const asio::error_code& ec) {
+ if (not ec) {
if (auto upnpThis = w.lock())
upnpThis->searchForIgd();
- },
- PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+ }
+ });
}
std::list<std::shared_ptr<IGD>>
@@ -453,8 +413,6 @@
bool
PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr)
{
- CHECK_VALID_THREAD();
-
assert(doc_container_ptr != nullptr);
XMLDocument document(doc_container_ptr, ixmlDocument_free);
@@ -574,7 +532,7 @@
}
// Report to the listener.
- runOnUpnpContextQueue([w = weak(), igd_candidate] {
+ ioContext->post([w = weak(), igd_candidate] {
if (auto upnpThis = w.lock()) {
if (upnpThis->observer_)
upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED);
@@ -587,7 +545,7 @@
void
PUPnP::requestMappingAdd(const Mapping& mapping)
{
- runOnPUPnPQueue([w = weak(), mapping] {
+ ioContext->post([w = weak(), mapping] {
if (auto upnpThis = w.lock()) {
if (not upnpThis->isRunning())
return;
@@ -609,7 +567,7 @@
PUPnP::requestMappingRemove(const Mapping& mapping)
{
// Send remove request using the matching IGD
- runOnPUPnPQueue([w = weak(), mapping] {
+ ioContext->dispatch([w = weak(), mapping] {
if (auto upnpThis = w.lock()) {
// Abort if we are shutting down.
if (not upnpThis->isRunning())
@@ -650,12 +608,10 @@
void
PUPnP::processAddMapAction(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([w = weak(), map] {
+ ioContext->post([w = weak(), map] {
if (auto upnpThis = w.lock()) {
if (upnpThis->observer_)
upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map));
@@ -666,12 +622,10 @@
void
PUPnP::processRequestMappingFailure(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([w = weak(), map] {
+ ioContext->post([w = weak(), map] {
if (auto upnpThis = w.lock()) {
// JAMI_DBG("PUPnP: Failed to request mapping %s", map.toString().c_str());
if (upnpThis->observer_)
@@ -683,12 +637,10 @@
void
PUPnP::processRemoveMapAction(const Mapping& map)
{
- CHECK_VALID_THREAD();
-
if (observer_ == nullptr)
return;
- runOnUpnpContextQueue([map, obs = observer_] {
+ ioContext->post([map, obs = observer_] {
// JAMI_DBG("PUPnP: Closed mapping %s", map.toString().c_str());
obs->onMappingRemoved(map.getIgd(), std::move(map));
});
@@ -779,8 +731,6 @@
const std::string& igdLocationUrl,
const IpAddr& dstAddr)
{
- CHECK_VALID_THREAD();
-
// Update host address if needed.
if (not hasValidHostAddress())
updateHostAddress();
@@ -833,12 +783,12 @@
int upnp_err = UpnpDownloadXmlDoc(locationUrl.c_str(), &doc_container_ptr);
if (upnp_err != UPNP_E_SUCCESS or not doc_container_ptr) {
- // JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s",
- // locationUrl.c_str(),
- // UpnpGetErrorMessage(upnp_err));
+ if(logger_) logger_->warn("PUPnP: Error downloading device XML document from {} -> {}",
+ locationUrl,
+ UpnpGetErrorMessage(upnp_err));
} else {
- // JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", locationUrl.c_str());
- runOnPUPnPQueue([w = weak(), url = locationUrl, doc_container_ptr] {
+ if(logger_) logger_->debug("PUPnP: Succeeded to download device XML document from {}", locationUrl);
+ ioContext->post([w = weak(), url = locationUrl, doc_container_ptr] {
if (auto upnpThis = w.lock()) {
upnpThis->validateIgd(url, doc_container_ptr);
}
@@ -849,8 +799,6 @@
void
PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId)
{
- CHECK_VALID_THREAD();
-
discoveredIgdList_.erase(cpDeviceId);
std::shared_ptr<IGD> igd;
@@ -882,8 +830,6 @@
void
PUPnP::processDiscoverySubscriptionExpired(Upnp_EventType event_type, const std::string& eventSubUrl)
{
- CHECK_VALID_THREAD();
-
std::lock_guard<std::mutex> lk(pupnpMutex_);
for (auto& it : validIgdList_) {
if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) {
@@ -925,7 +871,7 @@
std::string deviceId {UpnpDiscovery_get_DeviceID_cstr(d_event)};
std::string location {UpnpDiscovery_get_Location_cstr(d_event)};
IpAddr dstAddr(*(const pj_sockaddr*) (UpnpDiscovery_get_DestAddr(d_event)));
- runOnPUPnPQueue([w = weak(),
+ ioContext->post([w = weak(),
deviceId = std::move(deviceId),
location = std::move(location),
dstAddr = std::move(dstAddr)] {
@@ -941,7 +887,7 @@
std::string deviceId(UpnpDiscovery_get_DeviceID_cstr(d_event));
// Process the response on the main thread.
- runOnPUPnPQueue([w = weak(), deviceId = std::move(deviceId)] {
+ ioContext->post([w = weak(), deviceId = std::move(deviceId)] {
if (auto upnpThis = w.lock()) {
upnpThis->processDiscoveryAdvertisementByebye(deviceId);
}
@@ -971,7 +917,7 @@
std::string publisherUrl(UpnpEventSubscribe_get_PublisherUrl_cstr(es_event));
// Process the response on the main thread.
- runOnPUPnPQueue([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
+ ioContext->post([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
if (auto upnpThis = w.lock()) {
upnpThis->processDiscoverySubscriptionExpired(event_type, publisherUrl);
}
@@ -1332,7 +1278,7 @@
break;
} else {
auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
- JAMI_ERROR("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
+ if (logger_) logger_->error("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
errorCode,
errorDescription);
break;
@@ -1356,7 +1302,7 @@
std::string transport(getFirstDocItem(response.get(), "NewProtocol"));
if (port_internal.empty() || port_external.empty() || transport.empty()) {
- // JAMI_ERR("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
+ // if (logger_) logger_->e("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
// entry_idx);
continue;
}
@@ -1372,9 +1318,9 @@
mapList.emplace(map.getMapKey(), std::move(map));
}
- JAMI_DEBUG("PUPnP: Found {:d} allocated mappings on IGD {:s}",
- mapList.size(),
- upnpIgd->toString());
+ // if (logger_) logger_->debug("PUPnP: Found {:d} allocated mappings on IGD {:s}",
+ // mapList.size(),
+ // upnpIgd->toString());
return mapList;
}
@@ -1385,22 +1331,23 @@
if (not(clientRegistered_ and igd->getLocalIp()))
return;
- // JAMI_DBG("PUPnP: Remove all mappings (if any) on IGD %s matching descr prefix %s",
- // igd->toString().c_str(),
+ // if (logger_) logger_->debug("PUPnP: Remove all mappings (if any) on IGD {} matching descr prefix {}",
+ // igd->toString(),
// Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX);
- auto mapList = getMappingsListByDescr(igd, description);
-
- for (auto const& [_, map] : mapList) {
- requestMappingRemove(map);
- }
+ ioContext->post([w=weak(), igd, description]{
+ if (auto sthis = w.lock()) {
+ auto mapList = sthis->getMappingsListByDescr(igd, description);
+ for (auto const& [_, map] : mapList) {
+ sthis->requestMappingRemove(map);
+ }
+ }
+ });
}
bool
PUPnP::actionAddPortMapping(const Mapping& mapping)
{
- CHECK_VALID_THREAD();
-
if (not clientRegistered_)
return false;
@@ -1475,13 +1422,13 @@
bool success = true;
if (upnp_err != UPNP_E_SUCCESS) {
- // JAMI_WARN("PUPnP: Failed to send action %s for mapping %s. %d: %s",
+ // if (logger_) logger_->warn("PUPnP: Failed to send action {} for mapping {}. {:d}: {}",
// ACTION_ADD_PORT_MAPPING,
- // mapping.toString().c_str(),
+ // mapping.toString(),
// upnp_err,
// UpnpGetErrorMessage(upnp_err));
- // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
- // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
+ // if (logger_) logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+ // if (logger_) logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
success = false;
}
@@ -1496,7 +1443,7 @@
errorDescription = getFirstDocItem(response.get(), "errorDescription");
}
- // JAMI_WARNING("PUPnP: {:s} returned with error: {:s} {:s}",
+ // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s} {:s}",
// ACTION_ADD_PORT_MAPPING,
// errorCode,
// errorDescription);
@@ -1507,8 +1454,6 @@
bool
PUPnP::actionDeletePortMapping(const Mapping& mapping)
{
- CHECK_VALID_THREAD();
-
if (not clientRegistered_)
return false;
@@ -1558,19 +1503,20 @@
bool success = true;
if (upnp_err != UPNP_E_SUCCESS) {
- // JAMI_WARN("PUPnP: Failed to send action %s for mapping from %s. %d: %s",
+ // if (logger_) {
+ // logger_->warn("PUPnP: Failed to send action {} for mapping from {}. {:d}: {}",
// ACTION_DELETE_PORT_MAPPING,
- // mapping.toString().c_str(),
+ // mapping.toString(),
// upnp_err,
// UpnpGetErrorMessage(upnp_err));
- // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
- // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
-
+ // logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+ // logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
+ // }
success = false;
}
if (not response) {
- // JAMI_WARN("PUPnP: Failed to get response for %s", ACTION_DELETE_PORT_MAPPING);
+ // if (logger_) logger_->warn("PUPnP: Failed to get response for {}", ACTION_DELETE_PORT_MAPPING);
success = false;
}
@@ -1578,7 +1524,7 @@
auto errorCode = getFirstDocItem(response.get(), "errorCode");
if (not errorCode.empty()) {
auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
- // JAMI_WARNING("PUPnP: {:s} returned with error: {:s}: {:s}",
+ // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s}: {:s}",
// ACTION_DELETE_PORT_MAPPING,
// errorCode,
// errorDescription);
diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h
index d45fdde..4c0ea78 100644
--- a/src/upnp/protocol/pupnp/pupnp.h
+++ b/src/upnp/protocol/pupnp/pupnp.h
@@ -24,11 +24,7 @@
#include "../upnp_protocol.h"
#include "../igd.h"
#include "upnp_igd.h"
-
-#include "logger.h"
-#include "connectivity/ip_utils.h"
-#include "noncopyable.h"
-#include "compiler_intrinsics.h"
+#include "ip_utils.h"
#include <upnp/upnp.h>
#include <upnp/upnptools.h>
@@ -68,7 +64,7 @@
GET_EXTERNAL_IP_ADDRESS
};
- PUPnP();
+ PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger);
~PUPnP();
// Set the observer
@@ -113,18 +109,8 @@
void terminate() override;
private:
- NON_COPYABLE(PUPnP);
-
- // Helpers to run tasks on PUPNP private execution queue.
- ScheduledExecutor* getPUPnPScheduler() { return &pupnpScheduler_; }
- template<typename Callback>
- void runOnPUPnPQueue(Callback&& cb)
- {
- pupnpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); });
- }
-
- // Helper to run tasks on UPNP context execution queue.
- ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); }
+ PUPnP& operator=(const PUPnP&) = delete;
+ PUPnP(const PUPnP&) = delete;
void terminate(std::condition_variable& cv);
@@ -224,15 +210,13 @@
std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); }
- // Execution queue to run lib upnp actions
- ScheduledExecutor pupnpScheduler_ {"pupnp"};
-
// Initialization status.
std::atomic_bool initialized_ {false};
// Client registration status.
std::atomic_bool clientRegistered_ {false};
- std::shared_ptr<Task> searchForIgdTimer_ {};
+ std::shared_ptr<asio::io_context> ioContext;
+ asio::steady_timer searchForIgdTimer_;
unsigned int igdSearchCounter_ {0};
// List of discovered IGDs.
diff --git a/src/upnp/protocol/pupnp/upnp_igd.h b/src/upnp/protocol/pupnp/upnp_igd.h
index 9f33505..5ea7fe5 100644
--- a/src/upnp/protocol/pupnp/upnp_igd.h
+++ b/src/upnp/protocol/pupnp/upnp_igd.h
@@ -16,10 +16,8 @@
*/
#pragma once
-#include "connectivity/upnp/protocol/igd.h"
-
-#include "noncopyable.h"
-#include "connectivity/ip_utils.h"
+#include "../igd.h"
+#include "ip_utils.h"
#include <map>
#include <string>