add upnp/natpmp support

Change-Id: I4945a7df3a30cb39d81a33fc7a32e9fea600bdff
diff --git a/src/upnp/protocol/pupnp/pupnp.cpp b/src/upnp/protocol/pupnp/pupnp.cpp
index e6ad85d..df68696 100644
--- a/src/upnp/protocol/pupnp/pupnp.cpp
+++ b/src/upnp/protocol/pupnp/pupnp.cpp
@@ -15,6 +15,7 @@
  *  along with this program. If not, see <https://www.gnu.org/licenses/>.
  */
 #include "pupnp.h"
+#include "string_utils.h"
 
 #include <opendht/thread_pool.h>
 #include <opendht/http.h>
@@ -95,13 +96,10 @@
 
 // UPNP class implementation
 
-PUPnP::PUPnP()
+PUPnP::PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger)
+ : UPnPProtocol(logger), ioContext(ctx), searchForIgdTimer_(*ctx)
 {
-    // JAMI_DBG("PUPnP: Creating instance [%p] ...", this);
-    runOnPUPnPQueue([this] {
-        threadId_ = getCurrentThread();
-        // JAMI_DBG("PUPnP: Instance [%p] created", this);
-    });
+    // JAMI_LOG("PUPnP: Creating instance [{}] ...", fmt::ptr(this));
 }
 
 PUPnP::~PUPnP()
@@ -142,10 +140,10 @@
     ip_address6 = UpnpGetServerIp6Address();
     port6 = UpnpGetServerPort6();
 #endif
-    if (ip_address6 and port6)
-        // JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
+    /*if (ip_address6 and port6)
+        JAMI_DBG("PUPnP: Initialized on %s:%u | %s:%u", ip_address, port, ip_address6, port6);
     else
-        // JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);
+        JAMI_DBG("PUPnP: Initialized on %s:%u", ip_address, port);*/
 
     // Relax the parser to allow malformed XML text.
     ixmlRelaxParser(1);
@@ -165,8 +163,6 @@
 {
     assert(not clientRegistered_);
 
-    CHECK_VALID_THREAD();
-
     // Register Upnp control point.
     int upnp_err = UpnpRegisterClient(ctrlPtCallback, this, &ctrlptHandle_);
     if (upnp_err != UPNP_E_SUCCESS) {
@@ -180,17 +176,6 @@
 void
 PUPnP::setObserver(UpnpMappingObserver* obs)
 {
-    if (not isValidThread()) {
-        runOnPUPnPQueue([w = weak(), obs] {
-            if (auto upnpThis = w.lock()) {
-                upnpThis->setObserver(obs);
-            }
-        });
-        return;
-    }
-
-    // JAMI_DBG("PUPnP: Setting observer to %p", obs);
-
     observer_ = obs;
 }
 
@@ -236,10 +221,8 @@
     std::unique_lock<std::mutex> lk(pupnpMutex_);
     std::condition_variable cv {};
 
-    runOnPUPnPQueue([w = weak(), &cv = cv] {
-            if (auto upnpThis = w.lock()) {
-                upnpThis->terminate(cv);
-            }
+    ioContext->dispatch([&] {
+        terminate(cv);
     });
 
     if (cv.wait_for(lk, std::chrono::seconds(10), [this] { return shutdownComplete_; })) {
@@ -254,8 +237,6 @@
 void
 PUPnP::searchForDevices()
 {
-    CHECK_VALID_THREAD();
-
     // JAMI_DBG("PUPnP: Send IGD search request");
 
     // Send out search for multiple types of devices, as some routers may possibly
@@ -293,19 +274,9 @@
 void
 PUPnP::clearIgds()
 {
-    if (not isValidThread()) {
-        runOnPUPnPQueue([w = weak()] {
-            if (auto upnpThis = w.lock()) {
-                upnpThis->clearIgds();
-            }
-        });
-        return;
-    }
-
     // JAMI_DBG("PUPnP: clearing IGDs and devices lists");
 
-    if (searchForIgdTimer_)
-        searchForIgdTimer_->cancel();
+    searchForIgdTimer_.cancel();
 
     igdSearchCounter_ = 0;
 
@@ -324,15 +295,6 @@
 void
 PUPnP::searchForIgd()
 {
-    if (not isValidThread()) {
-        runOnPUPnPQueue([w = weak()] {
-            if (auto upnpThis = w.lock()) {
-                upnpThis->searchForIgd();
-            }
-        });
-        return;
-    }
-
     // Update local address before searching.
     updateHostAddress();
 
@@ -375,15 +337,13 @@
     // The connectivity change may be received while the the local
     // interface is not fully setup. The rescheduling typically
     // usefull to mitigate this race.
-    if (searchForIgdTimer_)
-        searchForIgdTimer_->cancel();
-
-    searchForIgdTimer_ = getUpnContextScheduler()->scheduleIn(
-        [w = weak()] {
+    searchForIgdTimer_.expires_after(PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+    searchForIgdTimer_.async_wait([w = weak()] (const asio::error_code& ec) {
+        if (not ec) {
             if (auto upnpThis = w.lock())
                 upnpThis->searchForIgd();
-        },
-        PUPNP_SEARCH_RETRY_UNIT * igdSearchCounter_);
+        }
+    });
 }
 
 std::list<std::shared_ptr<IGD>>
@@ -453,8 +413,6 @@
 bool
 PUPnP::validateIgd(const std::string& location, IXML_Document* doc_container_ptr)
 {
-    CHECK_VALID_THREAD();
-
     assert(doc_container_ptr != nullptr);
 
     XMLDocument document(doc_container_ptr, ixmlDocument_free);
@@ -574,7 +532,7 @@
     }
 
     // Report to the listener.
-    runOnUpnpContextQueue([w = weak(), igd_candidate] {
+    ioContext->post([w = weak(), igd_candidate] {
         if (auto upnpThis = w.lock()) {
             if (upnpThis->observer_)
                 upnpThis->observer_->onIgdUpdated(igd_candidate, UpnpIgdEvent::ADDED);
@@ -587,7 +545,7 @@
 void
 PUPnP::requestMappingAdd(const Mapping& mapping)
 {
-    runOnPUPnPQueue([w = weak(), mapping] {
+    ioContext->post([w = weak(), mapping] {
         if (auto upnpThis = w.lock()) {
             if (not upnpThis->isRunning())
                 return;
@@ -609,7 +567,7 @@
 PUPnP::requestMappingRemove(const Mapping& mapping)
 {
     // Send remove request using the matching IGD
-    runOnPUPnPQueue([w = weak(), mapping] {
+    ioContext->dispatch([w = weak(), mapping] {
         if (auto upnpThis = w.lock()) {
             // Abort if we are shutting down.
             if (not upnpThis->isRunning())
@@ -650,12 +608,10 @@
 void
 PUPnP::processAddMapAction(const Mapping& map)
 {
-    CHECK_VALID_THREAD();
-
     if (observer_ == nullptr)
         return;
 
-    runOnUpnpContextQueue([w = weak(), map] {
+    ioContext->post([w = weak(), map] {
         if (auto upnpThis = w.lock()) {
             if (upnpThis->observer_)
                 upnpThis->observer_->onMappingAdded(map.getIgd(), std::move(map));
@@ -666,12 +622,10 @@
 void
 PUPnP::processRequestMappingFailure(const Mapping& map)
 {
-    CHECK_VALID_THREAD();
-
     if (observer_ == nullptr)
         return;
 
-    runOnUpnpContextQueue([w = weak(), map] {
+    ioContext->post([w = weak(), map] {
         if (auto upnpThis = w.lock()) {
             // JAMI_DBG("PUPnP: Failed to request mapping %s", map.toString().c_str());
             if (upnpThis->observer_)
@@ -683,12 +637,10 @@
 void
 PUPnP::processRemoveMapAction(const Mapping& map)
 {
-    CHECK_VALID_THREAD();
-
     if (observer_ == nullptr)
         return;
 
-    runOnUpnpContextQueue([map, obs = observer_] {
+    ioContext->post([map, obs = observer_] {
         // JAMI_DBG("PUPnP: Closed mapping %s", map.toString().c_str());
         obs->onMappingRemoved(map.getIgd(), std::move(map));
     });
@@ -779,8 +731,6 @@
                                     const std::string& igdLocationUrl,
                                     const IpAddr& dstAddr)
 {
-    CHECK_VALID_THREAD();
-
     // Update host address if needed.
     if (not hasValidHostAddress())
         updateHostAddress();
@@ -833,12 +783,12 @@
     int upnp_err = UpnpDownloadXmlDoc(locationUrl.c_str(), &doc_container_ptr);
 
     if (upnp_err != UPNP_E_SUCCESS or not doc_container_ptr) {
-        // JAMI_WARN("PUPnP: Error downloading device XML document from %s -> %s",
-        //           locationUrl.c_str(),
-        //           UpnpGetErrorMessage(upnp_err));
+        if(logger_) logger_->warn("PUPnP: Error downloading device XML document from {} -> {}",
+                  locationUrl,
+                  UpnpGetErrorMessage(upnp_err));
     } else {
-        // JAMI_DBG("PUPnP: Succeeded to download device XML document from %s", locationUrl.c_str());
-        runOnPUPnPQueue([w = weak(), url = locationUrl, doc_container_ptr] {
+        if(logger_) logger_->debug("PUPnP: Succeeded to download device XML document from {}", locationUrl);
+        ioContext->post([w = weak(), url = locationUrl, doc_container_ptr] {
             if (auto upnpThis = w.lock()) {
                 upnpThis->validateIgd(url, doc_container_ptr);
             }
@@ -849,8 +799,6 @@
 void
 PUPnP::processDiscoveryAdvertisementByebye(const std::string& cpDeviceId)
 {
-    CHECK_VALID_THREAD();
-
     discoveredIgdList_.erase(cpDeviceId);
 
     std::shared_ptr<IGD> igd;
@@ -882,8 +830,6 @@
 void
 PUPnP::processDiscoverySubscriptionExpired(Upnp_EventType event_type, const std::string& eventSubUrl)
 {
-    CHECK_VALID_THREAD();
-
     std::lock_guard<std::mutex> lk(pupnpMutex_);
     for (auto& it : validIgdList_) {
         if (auto igd = std::dynamic_pointer_cast<UPnPIGD>(it)) {
@@ -925,7 +871,7 @@
         std::string deviceId {UpnpDiscovery_get_DeviceID_cstr(d_event)};
         std::string location {UpnpDiscovery_get_Location_cstr(d_event)};
         IpAddr dstAddr(*(const pj_sockaddr*) (UpnpDiscovery_get_DestAddr(d_event)));
-        runOnPUPnPQueue([w = weak(),
+        ioContext->post([w = weak(),
                          deviceId = std::move(deviceId),
                          location = std::move(location),
                          dstAddr = std::move(dstAddr)] {
@@ -941,7 +887,7 @@
         std::string deviceId(UpnpDiscovery_get_DeviceID_cstr(d_event));
 
         // Process the response on the main thread.
-        runOnPUPnPQueue([w = weak(), deviceId = std::move(deviceId)] {
+        ioContext->post([w = weak(), deviceId = std::move(deviceId)] {
             if (auto upnpThis = w.lock()) {
                 upnpThis->processDiscoveryAdvertisementByebye(deviceId);
             }
@@ -971,7 +917,7 @@
         std::string publisherUrl(UpnpEventSubscribe_get_PublisherUrl_cstr(es_event));
 
         // Process the response on the main thread.
-        runOnPUPnPQueue([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
+        ioContext->post([w = weak(), event_type, publisherUrl = std::move(publisherUrl)] {
             if (auto upnpThis = w.lock()) {
                 upnpThis->processDiscoverySubscriptionExpired(event_type, publisherUrl);
             }
@@ -1332,7 +1278,7 @@
                 break;
             } else {
                 auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
-                JAMI_ERROR("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
+                if (logger_) logger_->error("PUPnP: GetGenericPortMappingEntry returned with error: {:s}: {:s}",
                          errorCode,
                          errorDescription);
                 break;
@@ -1356,7 +1302,7 @@
         std::string transport(getFirstDocItem(response.get(), "NewProtocol"));
 
         if (port_internal.empty() || port_external.empty() || transport.empty()) {
-            // JAMI_ERR("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
+            // if (logger_) logger_->e("PUPnP: GetGenericPortMappingEntry returned an invalid entry at index %i",
             //          entry_idx);
             continue;
         }
@@ -1372,9 +1318,9 @@
         mapList.emplace(map.getMapKey(), std::move(map));
     }
 
-    JAMI_DEBUG("PUPnP: Found {:d} allocated mappings on IGD {:s}",
-             mapList.size(),
-             upnpIgd->toString());
+    // if (logger_) logger_->debug("PUPnP: Found {:d} allocated mappings on IGD {:s}",
+    //          mapList.size(),
+    //          upnpIgd->toString());
 
     return mapList;
 }
@@ -1385,22 +1331,23 @@
     if (not(clientRegistered_ and igd->getLocalIp()))
         return;
 
-    // JAMI_DBG("PUPnP: Remove all mappings (if any) on IGD %s matching descr prefix %s",
-    //          igd->toString().c_str(),
+    // if (logger_) logger_->debug("PUPnP: Remove all mappings (if any) on IGD {} matching descr prefix {}",
+    //          igd->toString(),
     //          Mapping::UPNP_MAPPING_DESCRIPTION_PREFIX);
 
-    auto mapList = getMappingsListByDescr(igd, description);
-
-    for (auto const& [_, map] : mapList) {
-        requestMappingRemove(map);
-    }
+    ioContext->post([w=weak(), igd, description]{
+        if (auto sthis = w.lock()) {
+            auto mapList = sthis->getMappingsListByDescr(igd, description);
+            for (auto const& [_, map] : mapList) {
+                sthis->requestMappingRemove(map);
+            }
+        }
+    });
 }
 
 bool
 PUPnP::actionAddPortMapping(const Mapping& mapping)
 {
-    CHECK_VALID_THREAD();
-
     if (not clientRegistered_)
         return false;
 
@@ -1475,13 +1422,13 @@
     bool success = true;
 
     if (upnp_err != UPNP_E_SUCCESS) {
-        // JAMI_WARN("PUPnP: Failed to send action %s for mapping %s. %d: %s",
+        // if (logger_) logger_->warn("PUPnP: Failed to send action {} for mapping {}. {:d}: {}",
         //           ACTION_ADD_PORT_MAPPING,
-        //           mapping.toString().c_str(),
+        //           mapping.toString(),
         //           upnp_err,
         //           UpnpGetErrorMessage(upnp_err));
-        // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
-        // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
+        // if (logger_) logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+        // if (logger_) logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
 
         success = false;
     }
@@ -1496,7 +1443,7 @@
             errorDescription = getFirstDocItem(response.get(), "errorDescription");
         }
 
-        // JAMI_WARNING("PUPnP: {:s} returned with error: {:s} {:s}",
+        // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s} {:s}",
         //           ACTION_ADD_PORT_MAPPING,
         //           errorCode,
         //           errorDescription);
@@ -1507,8 +1454,6 @@
 bool
 PUPnP::actionDeletePortMapping(const Mapping& mapping)
 {
-    CHECK_VALID_THREAD();
-
     if (not clientRegistered_)
         return false;
 
@@ -1558,19 +1503,20 @@
     bool success = true;
 
     if (upnp_err != UPNP_E_SUCCESS) {
-        // JAMI_WARN("PUPnP: Failed to send action %s for mapping from %s. %d: %s",
+        // if (logger_) {
+        //     logger_->warn("PUPnP: Failed to send action {} for mapping from {}. {:d}: {}",
         //           ACTION_DELETE_PORT_MAPPING,
-        //           mapping.toString().c_str(),
+        //           mapping.toString(),
         //           upnp_err,
         //           UpnpGetErrorMessage(upnp_err));
-        // JAMI_WARN("PUPnP: IGD ctrlUrl %s", igd->getControlURL().c_str());
-        // JAMI_WARN("PUPnP: IGD service type %s", igd->getServiceType().c_str());
-
+        //     logger_->warn("PUPnP: IGD ctrlUrl {}", igd->getControlURL());
+        //     logger_->warn("PUPnP: IGD service type {}", igd->getServiceType());
+        // }
         success = false;
     }
 
     if (not response) {
-        // JAMI_WARN("PUPnP: Failed to get response for %s", ACTION_DELETE_PORT_MAPPING);
+        // if (logger_) logger_->warn("PUPnP: Failed to get response for {}", ACTION_DELETE_PORT_MAPPING);
         success = false;
     }
 
@@ -1578,7 +1524,7 @@
     auto errorCode = getFirstDocItem(response.get(), "errorCode");
     if (not errorCode.empty()) {
         auto errorDescription = getFirstDocItem(response.get(), "errorDescription");
-        // JAMI_WARNING("PUPnP: {:s} returned with error: {:s}: {:s}",
+        // if (logger_) logger_->warn("PUPnP: {:s} returned with error: {:s}: {:s}",
         //           ACTION_DELETE_PORT_MAPPING,
         //           errorCode,
         //           errorDescription);
diff --git a/src/upnp/protocol/pupnp/pupnp.h b/src/upnp/protocol/pupnp/pupnp.h
index d45fdde..4c0ea78 100644
--- a/src/upnp/protocol/pupnp/pupnp.h
+++ b/src/upnp/protocol/pupnp/pupnp.h
@@ -24,11 +24,7 @@
 #include "../upnp_protocol.h"
 #include "../igd.h"
 #include "upnp_igd.h"
-
-#include "logger.h"
-#include "connectivity/ip_utils.h"
-#include "noncopyable.h"
-#include "compiler_intrinsics.h"
+#include "ip_utils.h"
 
 #include <upnp/upnp.h>
 #include <upnp/upnptools.h>
@@ -68,7 +64,7 @@
         GET_EXTERNAL_IP_ADDRESS
     };
 
-    PUPnP();
+    PUPnP(const std::shared_ptr<asio::io_context>& ctx, const std::shared_ptr<dht::log::Logger>& logger);
     ~PUPnP();
 
     // Set the observer
@@ -113,18 +109,8 @@
     void terminate() override;
 
 private:
-    NON_COPYABLE(PUPnP);
-
-    // Helpers to run tasks on PUPNP private execution queue.
-    ScheduledExecutor* getPUPnPScheduler() { return &pupnpScheduler_; }
-    template<typename Callback>
-    void runOnPUPnPQueue(Callback&& cb)
-    {
-        pupnpScheduler_.run([cb = std::forward<Callback>(cb)]() mutable { cb(); });
-    }
-
-    // Helper to run tasks on UPNP context execution queue.
-    ScheduledExecutor* getUpnContextScheduler() { return UpnpThreadUtil::getScheduler(); }
+    PUPnP& operator=(const PUPnP&) = delete;
+    PUPnP(const PUPnP&) = delete;
 
     void terminate(std::condition_variable& cv);
 
@@ -224,15 +210,13 @@
 
     std::weak_ptr<PUPnP> weak() { return std::static_pointer_cast<PUPnP>(shared_from_this()); }
 
-    // Execution queue to run lib upnp actions
-    ScheduledExecutor pupnpScheduler_ {"pupnp"};
-
     // Initialization status.
     std::atomic_bool initialized_ {false};
     // Client registration status.
     std::atomic_bool clientRegistered_ {false};
 
-    std::shared_ptr<Task> searchForIgdTimer_ {};
+    std::shared_ptr<asio::io_context> ioContext;
+    asio::steady_timer searchForIgdTimer_;
     unsigned int igdSearchCounter_ {0};
 
     // List of discovered IGDs.
diff --git a/src/upnp/protocol/pupnp/upnp_igd.h b/src/upnp/protocol/pupnp/upnp_igd.h
index 9f33505..5ea7fe5 100644
--- a/src/upnp/protocol/pupnp/upnp_igd.h
+++ b/src/upnp/protocol/pupnp/upnp_igd.h
@@ -16,10 +16,8 @@
  */
 #pragma once
 
-#include "connectivity/upnp/protocol/igd.h"
-
-#include "noncopyable.h"
-#include "connectivity/ip_utils.h"
+#include "../igd.h"
+#include "ip_utils.h"
 
 #include <map>
 #include <string>