tools: add dsh (distributed shell)
Change-Id: Ied91b88933ae5cb4be054a167ef803edf9ea6554
diff --git a/tools/dsh/dsh.cpp b/tools/dsh/dsh.cpp
new file mode 100644
index 0000000..cd9df82
--- /dev/null
+++ b/tools/dsh/dsh.cpp
@@ -0,0 +1,259 @@
+/*
+ * Copyright (C) 2023 Savoir-faire Linux Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+#include "dsh.h"
+#include "../common.h"
+#include <opendht/log.h>
+#include <opendht/crypto.h>
+
+#include <asio/io_context.hpp>
+#include <sys/types.h>
+#include <sys/wait.h>
+namespace dhtnet {
+
+const int READ_END = 0;
+const int WRITE_END = 1;
+
+void
+create_pipe(int pipe[2])
+{
+ if (pipe2(pipe, O_CLOEXEC) == -1) {
+ perror("pipe2");
+ exit(EXIT_FAILURE);
+ }
+}
+void
+child_proc(const int in_pipe[2],
+ const int out_pipe[2],
+ const int error_pipe[2],
+ const std::string& name)
+{
+ // close unused write end of input pipe and read end of output pipe
+ close(in_pipe[WRITE_END]);
+ close(out_pipe[READ_END]);
+ close(error_pipe[READ_END]);
+
+ // replace stdin with input pipe
+ if (dup2(in_pipe[READ_END], STDIN_FILENO) == -1) {
+ perror("dup2: error replacing stdin");
+ exit(EXIT_FAILURE);
+ }
+
+ // replace stdout with output pipe
+ if (dup2(out_pipe[WRITE_END], STDOUT_FILENO) == -1) {
+ perror("dup2: error replacing stdout");
+ exit(EXIT_FAILURE);
+ }
+ // replace stderr with error pipe
+ if (dup2(error_pipe[WRITE_END], STDERR_FILENO) == -1) {
+ perror("dup2: error replacing stderr");
+ exit(EXIT_FAILURE);
+ }
+
+ // prepare arguments
+ const char* args[] = {name.c_str(), NULL};
+ // execute subprocess
+ execvp(args[0], const_cast<char* const*>(args));
+
+ // if execv returns, an error occurred
+ perror("execvp");
+ exit(EXIT_FAILURE);
+}
+
+dhtnet::Dsh::Dsh(const std::filesystem::path& path,
+ dht::crypto::Identity identity,
+ const std::string& bootstrap)
+ : logger(dht::log::getStdLogger())
+ // , std::shared_ptr<tls::CertificateStore>(path / "certstore", logger)
+{
+ auto certStore = std::make_shared<tls::CertificateStore>(path / "certstore", logger);
+
+ ioContext = std::make_shared<asio::io_context>();
+ ioContextRunner = std::thread([context = ioContext, logger = logger] {
+ try {
+ auto work = asio::make_work_guard(*context);
+ context->run();
+ } catch (const std::exception& ex) {
+ if (logger)
+ logger->error("Error in ioContextRunner: {}", ex.what());
+ }
+ });
+ // Build a server
+ auto config = connectionManagerConfig(path,
+ identity,
+ bootstrap,
+ logger,
+ certStore,
+ ioContext,
+ factory);
+ // create a connection manager
+ connectionManager = std::make_unique<ConnectionManager>(std::move(config));
+
+ connectionManager->onDhtConnected(identity.first->getPublicKey());
+ connectionManager->onICERequest([this](const dht::Hash<32>&) { // handle ICE request
+ if (logger)
+ logger->debug("ICE request received");
+ return true;
+ });
+
+ std::mutex mtx;
+ std::unique_lock<std::mutex> lk {mtx};
+
+ connectionManager->onChannelRequest(
+ [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
+ // handle channel request
+ if (logger)
+ logger->debug("Channel request received");
+ return true;
+ });
+
+ connectionManager->onConnectionReady([&](const DeviceId&,
+ const std::string& name,
+ std::shared_ptr<ChannelSocket> socket) {
+ // handle connection ready
+ try {
+ // Create a pipe for communication with the subprocess
+ // create pipes
+ int in_pipe[2], out_pipe[2], error_pipe[2];
+ create_pipe(in_pipe);
+ create_pipe(out_pipe);
+ create_pipe(error_pipe);
+
+ ioContext->notify_fork(asio::io_context::fork_prepare);
+
+ // Fork to create a child process
+ pid_t pid = fork();
+ if (pid == -1) {
+ perror("fork");
+ return EXIT_FAILURE;
+ } else if (pid == 0) { // Child process
+ ioContext->notify_fork(asio::io_context::fork_child);
+ child_proc(in_pipe, out_pipe, error_pipe, name);
+ return EXIT_SUCCESS; // never reached
+ } else {
+ ioContext->notify_fork(asio::io_context::fork_parent);
+
+ // close unused read end of input pipe and write end of output pipe
+ close(in_pipe[READ_END]);
+ close(out_pipe[WRITE_END]);
+ close(error_pipe[WRITE_END]);
+
+ asio::io_context& ioContextRef = *ioContext;
+ // create stream descriptors
+ auto inStream
+ = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
+ in_pipe[WRITE_END]);
+ auto outStream
+ = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
+ out_pipe[READ_END]);
+ auto errorStream
+ = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
+ error_pipe[READ_END]);
+
+ if (socket) {
+ socket->setOnRecv([this, socket, inStream](const uint8_t* data, size_t size) {
+ auto data_copy = std::make_shared<std::vector<uint8_t>>(data, data + size);
+ // write on pipe to sub child
+ std::error_code ec;
+ asio::async_write(*inStream,
+ asio::buffer(*data_copy),
+ [data_copy, this](const std::error_code& error,
+ std::size_t bytesWritten) {
+ if (error) {
+ if (logger)
+ logger->error("Write error: {}",
+ error.message());
+ }
+ });
+ return size;
+ });
+
+ // read from pipe to sub child
+
+ // Create a buffer to read data into
+ auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
+
+ // Create a shared_ptr to the stream_descriptor
+ readFromPipe(socket, outStream, buffer);
+ readFromPipe(socket, errorStream, buffer);
+
+ return EXIT_SUCCESS;
+ }
+ }
+
+ } catch (const std::exception& e) {
+ if (logger)
+ logger->error("Error: {}", e.what());
+ }
+ return 0;
+ });
+}
+
+dhtnet::Dsh::Dsh(const std::filesystem::path& path,
+ dht::crypto::Identity identity,
+ const std::string& bootstrap,
+ dht::InfoHash peer_id,
+ const std::string& binary)
+ : Dsh(path, identity, bootstrap)
+{
+ // Build a client
+ std::condition_variable cv;
+ connectionManager->connectDevice(
+ peer_id, binary, [&](std::shared_ptr<ChannelSocket> socket, const dht::InfoHash&) {
+ if (socket) {
+ socket->setOnRecv([this, socket](const uint8_t* data, size_t size) {
+ std::cout.write((const char*) data, size);
+ std::cout.flush();
+ return size;
+ });
+ // Create a buffer to read data into
+ auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
+
+ // Create a shared_ptr to the stream_descriptor
+ auto stdinPipe = std::make_shared<asio::posix::stream_descriptor>(*ioContext,
+ ::dup(
+ STDIN_FILENO));
+ readFromPipe(socket, stdinPipe, buffer);
+
+ socket->onShutdown([this]() {
+ if (logger)
+ logger->debug("Exit program");
+ ioContext->stop();
+ });
+ }
+ });
+
+ connectionManager->onConnectionReady([&](const DeviceId&,
+ const std::string& name,
+ std::shared_ptr<ChannelSocket> socket_received) {
+ if (logger)
+ logger->debug("Connected!");
+ });
+}
+
+void
+dhtnet::Dsh::run()
+{
+ ioContext->run();
+}
+
+dhtnet::Dsh::~Dsh()
+{
+ ioContext->stop();
+ ioContextRunner.join();
+}
+
+} // namespace dhtnet
\ No newline at end of file
diff --git a/tools/dsh/dsh.h b/tools/dsh/dsh.h
new file mode 100644
index 0000000..09ab147
--- /dev/null
+++ b/tools/dsh/dsh.h
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2023 Savoir-faire Linux Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+#include "connectionmanager.h"
+#include "multiplexed_socket.h"
+#include "ice_transport_factory.h"
+#include "certstore.h"
+
+#include <asio.hpp>
+
+namespace dhtnet {
+
+class Dsh
+{
+public:
+ // Build a server
+ Dsh(const std::filesystem::path& path,
+ dht::crypto::Identity identity,
+ const std::string& bootstrap);
+ // Build a client
+ Dsh(const std::filesystem::path& path,
+ dht::crypto::Identity identity,
+ const std::string& bootstrap,
+ dht::InfoHash peer_id,
+ const std::string& binary);
+ ~Dsh();
+ void run();
+
+private:
+ std::unique_ptr<ConnectionManager> connectionManager;
+ std::shared_ptr<Logger> logger;
+ std::shared_ptr<tls::CertificateStore> certStore {nullptr};
+ std::shared_ptr<dhtnet::IceTransportFactory> factory {nullptr};
+ std::shared_ptr<asio::io_context> ioContext;
+ std::thread ioContextRunner;
+};
+
+} // namespace dhtnet
diff --git a/tools/dsh/main.cpp b/tools/dsh/main.cpp
new file mode 100644
index 0000000..dd9cf6f
--- /dev/null
+++ b/tools/dsh/main.cpp
@@ -0,0 +1,140 @@
+/*
+ * Copyright (C) 2023 Savoir-faire Linux Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+#include "dsh.h"
+#include "../common.h"
+#include <string>
+#include <vector>
+#include <iostream>
+#include <unistd.h>
+#include <getopt.h>
+
+#include <netinet/in.h>
+
+struct dhtsh_params
+{
+ bool help {false};
+ bool version {false};
+ bool listen {false};
+ std::filesystem::path path {};
+ std::string bootstrap {};
+ dht::InfoHash peer_id {};
+ std::string binary {};
+};
+
+static const constexpr struct option long_options[] = {{"help", no_argument, nullptr, 'h'},
+ {"version", no_argument, nullptr, 'v'},
+ {"listen", no_argument, nullptr, 'l'},
+ {"bootstrap", required_argument, nullptr, 'b'},
+ {"binary", required_argument, nullptr, 's'},
+ {"id_path", required_argument, nullptr, 'I'},
+ {nullptr, 0, nullptr, 0}};
+
+dhtsh_params
+parse_args(int argc, char** argv)
+{
+ dhtsh_params params;
+ int opt;
+ while ((opt = getopt_long(argc, argv, "hvlsI:p:i:", long_options, nullptr)) != -1) {
+ switch (opt) {
+ case 'h':
+ params.help = true;
+ break;
+ case 'v':
+ params.version = true;
+ break;
+ case 'l':
+ params.listen = true;
+ break;
+ case 'b':
+ params.bootstrap = optarg;
+ break;
+ case 's':
+ params.binary = optarg;
+ break;
+ case 'I':
+ params.path = optarg;
+ break;
+ default:
+ std::cerr << "Invalid option" << std::endl;
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ // If not listening, the peer_id argument is required
+ if (!params.listen) {
+ if (optind < argc) {
+ params.peer_id = dht::InfoHash(argv[optind]);
+ optind++; // Move to the next argument
+ } else {
+ std::cerr << "Error: Missing peer_id argument.\n";
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ // default values
+ if (params.bootstrap.empty())
+ params.bootstrap = "bootstrap.jami.net";
+ if (params.binary.empty())
+ params.binary = "bash";
+ if (params.path.empty())
+ params.path = std::filesystem::path(getenv("HOME")) / ".dhtnet";
+ return params;
+}
+
+static void
+setSipLogLevel()
+{
+ char* envvar = getenv("SIPLOGLEVEL");
+
+ int level = 0;
+
+ if (envvar != nullptr) {
+ level = std::stoi(envvar);
+
+ // From 0 (min) to 6 (max)
+ level = std::max(0, std::min(level, 6));
+ }
+
+ pj_log_set_level(level);
+ pj_log_set_log_func([](int level, const char* data, int /*len*/) {});
+}
+
+int
+main(int argc, char** argv)
+{
+ fmt::print("DSH 1.0\n");
+ setSipLogLevel();
+ auto params = parse_args(argc, argv);
+ auto identity = dhtnet::loadIdentity(params.path);
+ fmt::print("Loaded identity: {} from {}\n", identity.second->getId(), params.path);
+
+ std::unique_ptr<dhtnet::Dsh> dhtsh;
+ if (params.listen) {
+ // create dnc instance
+ dhtsh = std::make_unique<dhtnet::Dsh>(params.path, identity, params.bootstrap);
+ } else {
+ dhtsh = std::make_unique<dhtnet::Dsh>(params.path,
+ identity,
+ params.bootstrap,
+ params.peer_id,
+ params.binary);
+ }
+
+ dhtsh->run();
+ return EXIT_SUCCESS;
+
+}