tests: add testManyChannels

Change-Id: I768da190ea9dc891aba059ab9b44664897a9fffe
diff --git a/tests/connectionManager.cpp b/tests/connectionManager.cpp
index 8391442..0bbe5bf 100644
--- a/tests/connectionManager.cpp
+++ b/tests/connectionManager.cpp
@@ -83,6 +83,7 @@
 
     void testConnectDevice();
     void testAcceptConnection();
+    void testManyChannels();
     void testMultipleChannels();
     void testMultipleChannelsOneDeclined();
     void testMultipleChannelsSameName();
@@ -109,6 +110,7 @@
     CPPUNIT_TEST(testIsConnecting);
     CPPUNIT_TEST(testAcceptConnection);
     CPPUNIT_TEST(testDeclineConnection);
+    CPPUNIT_TEST(testManyChannels);
     CPPUNIT_TEST(testMultipleChannels);
     CPPUNIT_TEST(testMultipleChannelsOneDeclined);
     CPPUNIT_TEST(testMultipleChannelsSameName);
@@ -332,6 +334,140 @@
     CPPUNIT_ASSERT(!receiverConnected);
 }
 
+
+void
+ConnectionManagerTest::testManyChannels()
+{
+    bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
+    alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
+
+    std::condition_variable cv;
+    size_t successfullyConnected = 0;
+    size_t accepted = 0;
+    size_t receiverConnected = 0;
+    size_t successfullyReceived = 0;
+    size_t shutdownCount = 0;
+
+    auto acceptAll = [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
+        if (name.empty()) return false;
+        std::lock_guard<std::mutex> lk {mtx};
+        accepted++;
+        cv.notify_one();
+        return true;
+    };
+    bob->connectionManager->onChannelRequest(acceptAll);
+    alice->connectionManager->onChannelRequest(acceptAll);
+
+    auto onReady = [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
+        if (not socket or name.empty()) return;
+        if (socket->isInitiator())
+            return;
+        socket->setOnRecv([rxbuf = std::make_shared<std::vector<uint8_t>>(), w = std::weak_ptr(socket)](const uint8_t* data, size_t size) {
+            rxbuf->insert(rxbuf->end(), data, data + size);
+            if (rxbuf->size() == 32) {
+                if (auto socket = w.lock()) {
+                    std::error_code ec;
+                    socket->write(rxbuf->data(), rxbuf->size(), ec);
+                    CPPUNIT_ASSERT(!ec);
+                    socket->shutdown();
+                }
+            }
+            return size;
+        });
+        std::lock_guard<std::mutex> lk {mtx};
+        receiverConnected++;
+        cv.notify_one();
+    };
+    bob->connectionManager->onConnectionReady(onReady);
+    alice->connectionManager->onConnectionReady(onReady);
+
+    // max supported number of channels per side (64k - 2 reserved channels)
+    static constexpr size_t N = 1024 * 32 - 1;
+
+    auto onConnect = [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
+        CPPUNIT_ASSERT(socket);
+        if (socket) {
+            std::lock_guard<std::mutex> lk {mtx};
+            successfullyConnected++;
+            cv.notify_one();
+        }
+        auto data_sent = dht::PkId::get(socket->name());
+        socket->setOnRecv([&, data_sent, rxbuf = std::make_shared<std::vector<uint8_t>>()](const uint8_t* data, size_t size) {
+            rxbuf->insert(rxbuf->end(), data, data + size);
+            if (rxbuf->size() == 32) {
+                CPPUNIT_ASSERT(!std::memcmp(data_sent.data(), rxbuf->data(), data_sent.size()));
+                std::lock_guard<std::mutex> lk {mtx};
+                successfullyReceived++;
+                cv.notify_one();
+            }
+            return size;
+        });
+        socket->onShutdown([&]() {
+            std::lock_guard<std::mutex> lk {mtx};
+            shutdownCount++;
+            cv.notify_one();
+        });
+        std::error_code ec;
+        socket->write(data_sent.data(), data_sent.size(), ec);
+        CPPUNIT_ASSERT(!ec);
+    };
+
+    for (size_t i = 0; i < N; ++i) {
+        alice->connectionManager->connectDevice(bob->id.second,
+                                                fmt::format("git://{}", i+1),
+                                                onConnect);
+
+        bob->connectionManager->connectDevice(alice->id.second,
+                                                fmt::format("sip://{}", i+1),
+                                                onConnect);
+
+        if (i % 128 == 0)
+           std::this_thread::sleep_for(5ms);
+    }
+
+    std::unique_lock<std::mutex> lk {mtx};
+    cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 2; });
+    CPPUNIT_ASSERT_EQUAL(N * 2, successfullyConnected);
+    cv.wait_for(lk, 30s, [&] { return accepted == N * 2; });
+    CPPUNIT_ASSERT_EQUAL(N * 2, accepted);
+    cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 2; });
+    CPPUNIT_ASSERT_EQUAL(N * 2, receiverConnected);
+    cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 2; });
+    CPPUNIT_ASSERT_EQUAL(N * 2, successfullyReceived);
+    cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 2; });
+    CPPUNIT_ASSERT_EQUAL(N * 2, shutdownCount);
+    lk.unlock();
+
+    // Wait a bit to let at least some channels shutdown
+    std::this_thread::sleep_for(10ms);
+
+    // Second time to make sure we can re-use the channels after shutdown
+    for (size_t i = 0; i < N; ++i) {
+        alice->connectionManager->connectDevice(bob->id.second,
+                                                fmt::format("git://{}", N+i+1),
+                                                onConnect);
+
+        bob->connectionManager->connectDevice(alice->id.second,
+                                                fmt::format("sip://{}", N+i+1),
+                                                onConnect);
+
+        if (i % 128 == 0)
+           std::this_thread::sleep_for(5ms);
+    }
+
+    lk.lock();
+    cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 4; });
+    CPPUNIT_ASSERT_EQUAL(N * 4, successfullyConnected);
+    cv.wait_for(lk, 30s, [&] { return accepted == N * 4; });
+    CPPUNIT_ASSERT_EQUAL(N * 4, accepted);
+    cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 4; });
+    CPPUNIT_ASSERT_EQUAL(N * 4, receiverConnected);
+    cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 4; });
+    CPPUNIT_ASSERT_EQUAL(N * 4, successfullyReceived);
+    cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 4; });
+    CPPUNIT_ASSERT_EQUAL(N * 4, shutdownCount);
+}
+
 void
 ConnectionManagerTest::testMultipleChannels()
 {