tests: add testManyChannels
Change-Id: I768da190ea9dc891aba059ab9b44664897a9fffe
diff --git a/tests/connectionManager.cpp b/tests/connectionManager.cpp
index 8391442..0bbe5bf 100644
--- a/tests/connectionManager.cpp
+++ b/tests/connectionManager.cpp
@@ -83,6 +83,7 @@
void testConnectDevice();
void testAcceptConnection();
+ void testManyChannels();
void testMultipleChannels();
void testMultipleChannelsOneDeclined();
void testMultipleChannelsSameName();
@@ -109,6 +110,7 @@
CPPUNIT_TEST(testIsConnecting);
CPPUNIT_TEST(testAcceptConnection);
CPPUNIT_TEST(testDeclineConnection);
+ CPPUNIT_TEST(testManyChannels);
CPPUNIT_TEST(testMultipleChannels);
CPPUNIT_TEST(testMultipleChannelsOneDeclined);
CPPUNIT_TEST(testMultipleChannelsSameName);
@@ -332,6 +334,140 @@
CPPUNIT_ASSERT(!receiverConnected);
}
+
+void
+ConnectionManagerTest::testManyChannels()
+{
+ bob->connectionManager->onICERequest([](const DeviceId&) { return true; });
+ alice->connectionManager->onICERequest([](const DeviceId&) { return true; });
+
+ std::condition_variable cv;
+ size_t successfullyConnected = 0;
+ size_t accepted = 0;
+ size_t receiverConnected = 0;
+ size_t successfullyReceived = 0;
+ size_t shutdownCount = 0;
+
+ auto acceptAll = [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
+ if (name.empty()) return false;
+ std::lock_guard<std::mutex> lk {mtx};
+ accepted++;
+ cv.notify_one();
+ return true;
+ };
+ bob->connectionManager->onChannelRequest(acceptAll);
+ alice->connectionManager->onChannelRequest(acceptAll);
+
+ auto onReady = [&](const DeviceId&, const std::string& name, std::shared_ptr<ChannelSocket> socket) {
+ if (not socket or name.empty()) return;
+ if (socket->isInitiator())
+ return;
+ socket->setOnRecv([rxbuf = std::make_shared<std::vector<uint8_t>>(), w = std::weak_ptr(socket)](const uint8_t* data, size_t size) {
+ rxbuf->insert(rxbuf->end(), data, data + size);
+ if (rxbuf->size() == 32) {
+ if (auto socket = w.lock()) {
+ std::error_code ec;
+ socket->write(rxbuf->data(), rxbuf->size(), ec);
+ CPPUNIT_ASSERT(!ec);
+ socket->shutdown();
+ }
+ }
+ return size;
+ });
+ std::lock_guard<std::mutex> lk {mtx};
+ receiverConnected++;
+ cv.notify_one();
+ };
+ bob->connectionManager->onConnectionReady(onReady);
+ alice->connectionManager->onConnectionReady(onReady);
+
+ // max supported number of channels per side (64k - 2 reserved channels)
+ static constexpr size_t N = 1024 * 32 - 1;
+
+ auto onConnect = [&](std::shared_ptr<ChannelSocket> socket, const DeviceId&) {
+ CPPUNIT_ASSERT(socket);
+ if (socket) {
+ std::lock_guard<std::mutex> lk {mtx};
+ successfullyConnected++;
+ cv.notify_one();
+ }
+ auto data_sent = dht::PkId::get(socket->name());
+ socket->setOnRecv([&, data_sent, rxbuf = std::make_shared<std::vector<uint8_t>>()](const uint8_t* data, size_t size) {
+ rxbuf->insert(rxbuf->end(), data, data + size);
+ if (rxbuf->size() == 32) {
+ CPPUNIT_ASSERT(!std::memcmp(data_sent.data(), rxbuf->data(), data_sent.size()));
+ std::lock_guard<std::mutex> lk {mtx};
+ successfullyReceived++;
+ cv.notify_one();
+ }
+ return size;
+ });
+ socket->onShutdown([&]() {
+ std::lock_guard<std::mutex> lk {mtx};
+ shutdownCount++;
+ cv.notify_one();
+ });
+ std::error_code ec;
+ socket->write(data_sent.data(), data_sent.size(), ec);
+ CPPUNIT_ASSERT(!ec);
+ };
+
+ for (size_t i = 0; i < N; ++i) {
+ alice->connectionManager->connectDevice(bob->id.second,
+ fmt::format("git://{}", i+1),
+ onConnect);
+
+ bob->connectionManager->connectDevice(alice->id.second,
+ fmt::format("sip://{}", i+1),
+ onConnect);
+
+ if (i % 128 == 0)
+ std::this_thread::sleep_for(5ms);
+ }
+
+ std::unique_lock<std::mutex> lk {mtx};
+ cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 2; });
+ CPPUNIT_ASSERT_EQUAL(N * 2, successfullyConnected);
+ cv.wait_for(lk, 30s, [&] { return accepted == N * 2; });
+ CPPUNIT_ASSERT_EQUAL(N * 2, accepted);
+ cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 2; });
+ CPPUNIT_ASSERT_EQUAL(N * 2, receiverConnected);
+ cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 2; });
+ CPPUNIT_ASSERT_EQUAL(N * 2, successfullyReceived);
+ cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 2; });
+ CPPUNIT_ASSERT_EQUAL(N * 2, shutdownCount);
+ lk.unlock();
+
+ // Wait a bit to let at least some channels shutdown
+ std::this_thread::sleep_for(10ms);
+
+ // Second time to make sure we can re-use the channels after shutdown
+ for (size_t i = 0; i < N; ++i) {
+ alice->connectionManager->connectDevice(bob->id.second,
+ fmt::format("git://{}", N+i+1),
+ onConnect);
+
+ bob->connectionManager->connectDevice(alice->id.second,
+ fmt::format("sip://{}", N+i+1),
+ onConnect);
+
+ if (i % 128 == 0)
+ std::this_thread::sleep_for(5ms);
+ }
+
+ lk.lock();
+ cv.wait_for(lk, 30s, [&] { return successfullyConnected == N * 4; });
+ CPPUNIT_ASSERT_EQUAL(N * 4, successfullyConnected);
+ cv.wait_for(lk, 30s, [&] { return accepted == N * 4; });
+ CPPUNIT_ASSERT_EQUAL(N * 4, accepted);
+ cv.wait_for(lk, 20s, [&] { return receiverConnected == N * 4; });
+ CPPUNIT_ASSERT_EQUAL(N * 4, receiverConnected);
+ cv.wait_for(lk, 60s, [&] { return successfullyReceived == N * 4; });
+ CPPUNIT_ASSERT_EQUAL(N * 4, successfullyReceived);
+ cv.wait_for(lk, 60s, [&] { return shutdownCount == N * 4; });
+ CPPUNIT_ASSERT_EQUAL(N * 4, shutdownCount);
+}
+
void
ConnectionManagerTest::testMultipleChannels()
{