tools: add dsh (distributed shell)

Change-Id: Ied91b88933ae5cb4be054a167ef803edf9ea6554
diff --git a/tools/dsh/dsh.cpp b/tools/dsh/dsh.cpp
new file mode 100644
index 0000000..cd9df82
--- /dev/null
+++ b/tools/dsh/dsh.cpp
@@ -0,0 +1,259 @@
+ *  Copyright (C) 2023 Savoir-faire Linux Inc.
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program. If not, see <>.
+ */
+#include "dsh.h"
+#include "../common.h"
+#include <opendht/log.h>
+#include <opendht/crypto.h>
+#include <asio/io_context.hpp>
+#include <sys/types.h>
+#include <sys/wait.h>
+namespace dhtnet {
+const int READ_END = 0;
+const int WRITE_END = 1;
+create_pipe(int pipe[2])
+    if (pipe2(pipe, O_CLOEXEC) == -1) {
+        perror("pipe2");
+        exit(EXIT_FAILURE);
+    }
+child_proc(const int in_pipe[2],
+           const int out_pipe[2],
+           const int error_pipe[2],
+           const std::string& name)
+    // close unused write end of input pipe and read end of output pipe
+    close(in_pipe[WRITE_END]);
+    close(out_pipe[READ_END]);
+    close(error_pipe[READ_END]);
+    // replace stdin with input pipe
+    if (dup2(in_pipe[READ_END], STDIN_FILENO) == -1) {
+        perror("dup2: error replacing stdin");
+        exit(EXIT_FAILURE);
+    }
+    // replace stdout with output pipe
+    if (dup2(out_pipe[WRITE_END], STDOUT_FILENO) == -1) {
+        perror("dup2: error replacing stdout");
+        exit(EXIT_FAILURE);
+    }
+    // replace stderr with error pipe
+    if (dup2(error_pipe[WRITE_END], STDERR_FILENO) == -1) {
+        perror("dup2: error replacing stderr");
+        exit(EXIT_FAILURE);
+    }
+    // prepare arguments
+    const char* args[] = {name.c_str(), NULL};
+    // execute subprocess
+    execvp(args[0], const_cast<char* const*>(args));
+    // if execv returns, an error occurred
+    perror("execvp");
+    exit(EXIT_FAILURE);
+dhtnet::Dsh::Dsh(const std::filesystem::path& path,
+                 dht::crypto::Identity identity,
+                 const std::string& bootstrap)
+    : logger(dht::log::getStdLogger())
+    // , std::shared_ptr<tls::CertificateStore>(path / "certstore", logger)
+    auto certStore = std::make_shared<tls::CertificateStore>(path / "certstore", logger);
+    ioContext = std::make_shared<asio::io_context>();
+    ioContextRunner = std::thread([context = ioContext, logger = logger] {
+        try {
+            auto work = asio::make_work_guard(*context);
+            context->run();
+        } catch (const std::exception& ex) {
+            if (logger)
+                logger->error("Error in ioContextRunner: {}", ex.what());
+        }
+    });
+    // Build a server
+    auto config = connectionManagerConfig(path,
+                                          identity,
+                                          bootstrap,
+                                          logger,
+                                          certStore,
+                                          ioContext,
+                                          factory);
+    // create a connection manager
+    connectionManager = std::make_unique<ConnectionManager>(std::move(config));
+    connectionManager->onDhtConnected(identity.first->getPublicKey());
+    connectionManager->onICERequest([this](const dht::Hash<32>&) { // handle ICE request
+        if (logger)
+            logger->debug("ICE request received");
+        return true;
+    });
+    std::mutex mtx;
+    std::unique_lock<std::mutex> lk {mtx};
+    connectionManager->onChannelRequest(
+        [&](const std::shared_ptr<dht::crypto::Certificate>&, const std::string& name) {
+            // handle channel request
+            if (logger)
+                logger->debug("Channel request received");
+            return true;
+        });
+    connectionManager->onConnectionReady([&](const DeviceId&,
+                                             const std::string& name,
+                                             std::shared_ptr<ChannelSocket> socket) {
+        // handle connection ready
+        try {
+            // Create a pipe for communication with the  subprocess
+            // create pipes
+            int in_pipe[2], out_pipe[2], error_pipe[2];
+            create_pipe(in_pipe);
+            create_pipe(out_pipe);
+            create_pipe(error_pipe);
+            ioContext->notify_fork(asio::io_context::fork_prepare);
+            // Fork to create a child process
+            pid_t pid = fork();
+            if (pid == -1) {
+                perror("fork");
+                return EXIT_FAILURE;
+            } else if (pid == 0) { // Child process
+                ioContext->notify_fork(asio::io_context::fork_child);
+                child_proc(in_pipe, out_pipe, error_pipe, name);
+                return EXIT_SUCCESS; // never reached
+            } else {
+                ioContext->notify_fork(asio::io_context::fork_parent);
+                // close unused read end of input pipe and write end of output pipe
+                close(in_pipe[READ_END]);
+                close(out_pipe[WRITE_END]);
+                close(error_pipe[WRITE_END]);
+                asio::io_context& ioContextRef = *ioContext;
+                // create stream descriptors
+                auto inStream
+                    = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
+                                                                       in_pipe[WRITE_END]);
+                auto outStream
+                    = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
+                                                                       out_pipe[READ_END]);
+                auto errorStream
+                    = std::make_shared<asio::posix::stream_descriptor>(ioContextRef.get_executor(),
+                                                                       error_pipe[READ_END]);
+                if (socket) {
+                    socket->setOnRecv([this, socket, inStream](const uint8_t* data, size_t size) {
+                        auto data_copy = std::make_shared<std::vector<uint8_t>>(data, data + size);
+                        // write on pipe to sub child
+                        std::error_code ec;
+                        asio::async_write(*inStream,
+                                          asio::buffer(*data_copy),
+                                          [data_copy, this](const std::error_code& error,
+                                                            std::size_t bytesWritten) {
+                                              if (error) {
+                                                  if (logger)
+                                                      logger->error("Write error: {}",
+                                                                    error.message());
+                                              }
+                                          });
+                        return size;
+                    });
+                    // read from pipe to sub child
+                    // Create a buffer to read data into
+                    auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
+                    // Create a shared_ptr to the stream_descriptor
+                    readFromPipe(socket, outStream, buffer);
+                    readFromPipe(socket, errorStream, buffer);
+                    return EXIT_SUCCESS;
+                }
+            }
+        } catch (const std::exception& e) {
+            if (logger)
+                logger->error("Error: {}", e.what());
+        }
+        return 0;
+    });
+dhtnet::Dsh::Dsh(const std::filesystem::path& path,
+                 dht::crypto::Identity identity,
+                 const std::string& bootstrap,
+                 dht::InfoHash peer_id,
+                 const std::string& binary)
+    : Dsh(path, identity, bootstrap)
+    // Build a client
+    std::condition_variable cv;
+    connectionManager->connectDevice(
+        peer_id, binary, [&](std::shared_ptr<ChannelSocket> socket, const dht::InfoHash&) {
+            if (socket) {
+                socket->setOnRecv([this, socket](const uint8_t* data, size_t size) {
+                    std::cout.write((const char*) data, size);
+                    std::cout.flush();
+                    return size;
+                });
+                // Create a buffer to read data into
+                auto buffer = std::make_shared<std::vector<uint8_t>>(BUFFER_SIZE);
+                // Create a shared_ptr to the stream_descriptor
+                auto stdinPipe = std::make_shared<asio::posix::stream_descriptor>(*ioContext,
+                                                                                  ::dup(
+                                                                                      STDIN_FILENO));
+                readFromPipe(socket, stdinPipe, buffer);
+                socket->onShutdown([this]() {
+                    if (logger)
+                        logger->debug("Exit program");
+                    ioContext->stop();
+                });
+            }
+        });
+    connectionManager->onConnectionReady([&](const DeviceId&,
+                                             const std::string& name,
+                                             std::shared_ptr<ChannelSocket> socket_received) {
+        if (logger)
+            logger->debug("Connected!");
+    });
+    ioContext->run();
+    ioContext->stop();
+    ioContextRunner.join();
+} // namespace dhtnet
\ No newline at end of file
diff --git a/tools/dsh/dsh.h b/tools/dsh/dsh.h
new file mode 100644
index 0000000..09ab147
--- /dev/null
+++ b/tools/dsh/dsh.h
@@ -0,0 +1,51 @@
+ *  Copyright (C) 2023 Savoir-faire Linux Inc.
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program. If not, see <>.
+ */
+#include "connectionmanager.h"
+#include "multiplexed_socket.h"
+#include "ice_transport_factory.h"
+#include "certstore.h"
+#include <asio.hpp>
+namespace dhtnet {
+class Dsh
+    // Build a server
+    Dsh(const std::filesystem::path& path,
+        dht::crypto::Identity identity,
+        const std::string& bootstrap);
+    // Build a client
+    Dsh(const std::filesystem::path& path,
+        dht::crypto::Identity identity,
+        const std::string& bootstrap,
+        dht::InfoHash peer_id,
+        const std::string& binary);
+    ~Dsh();
+    void run();
+    std::unique_ptr<ConnectionManager> connectionManager;
+    std::shared_ptr<Logger> logger;
+    std::shared_ptr<tls::CertificateStore> certStore {nullptr};
+    std::shared_ptr<dhtnet::IceTransportFactory> factory {nullptr};
+    std::shared_ptr<asio::io_context> ioContext;
+    std::thread ioContextRunner;
+} // namespace dhtnet
diff --git a/tools/dsh/main.cpp b/tools/dsh/main.cpp
new file mode 100644
index 0000000..dd9cf6f
--- /dev/null
+++ b/tools/dsh/main.cpp
@@ -0,0 +1,140 @@
+ *  Copyright (C) 2023 Savoir-faire Linux Inc.
+ *
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program. If not, see <>.
+ */
+#include "dsh.h"
+#include "../common.h"
+#include <string>
+#include <vector>
+#include <iostream>
+#include <unistd.h>
+#include <getopt.h>
+#include <netinet/in.h>
+struct dhtsh_params
+    bool help {false};
+    bool version {false};
+    bool listen {false};
+    std::filesystem::path path {};
+    std::string bootstrap {};
+    dht::InfoHash peer_id {};
+    std::string binary {};
+static const constexpr struct option long_options[] = {{"help", no_argument, nullptr, 'h'},
+                                                       {"version", no_argument, nullptr, 'v'},
+                                                       {"listen", no_argument, nullptr, 'l'},
+                                                       {"bootstrap", required_argument, nullptr, 'b'},
+                                                       {"binary", required_argument, nullptr, 's'},
+                                                       {"id_path", required_argument, nullptr, 'I'},
+                                                       {nullptr, 0, nullptr, 0}};
+parse_args(int argc, char** argv)
+    dhtsh_params params;
+    int opt;
+    while ((opt = getopt_long(argc, argv, "hvlsI:p:i:", long_options, nullptr)) != -1) {
+        switch (opt) {
+        case 'h':
+   = true;
+            break;
+        case 'v':
+            params.version = true;
+            break;
+        case 'l':
+            params.listen = true;
+            break;
+        case 'b':
+            params.bootstrap = optarg;
+            break;
+        case 's':
+            params.binary = optarg;
+            break;
+        case 'I':
+            params.path = optarg;
+            break;
+        default:
+            std::cerr << "Invalid option" << std::endl;
+            exit(EXIT_FAILURE);
+        }
+    }
+    // If not listening, the peer_id argument is required
+    if (!params.listen) {
+        if (optind < argc) {
+            params.peer_id = dht::InfoHash(argv[optind]);
+            optind++; // Move to the next argument
+        } else {
+            std::cerr << "Error: Missing peer_id argument.\n";
+            exit(EXIT_FAILURE);
+        }
+    }
+    // default values
+    if (params.bootstrap.empty())
+        params.bootstrap = "";
+    if (params.binary.empty())
+        params.binary = "bash";
+    if (params.path.empty())
+        params.path = std::filesystem::path(getenv("HOME")) / ".dhtnet";
+    return params;
+static void
+    char* envvar = getenv("SIPLOGLEVEL");
+    int level = 0;
+    if (envvar != nullptr) {
+        level = std::stoi(envvar);
+        // From 0 (min) to 6 (max)
+        level = std::max(0, std::min(level, 6));
+    }
+    pj_log_set_level(level);
+    pj_log_set_log_func([](int level, const char* data, int /*len*/) {});
+main(int argc, char** argv)
+    fmt::print("DSH 1.0\n");
+    setSipLogLevel();
+    auto params = parse_args(argc, argv);
+    auto identity = dhtnet::loadIdentity(params.path);
+    fmt::print("Loaded identity: {} from {}\n", identity.second->getId(), params.path);
+    std::unique_ptr<dhtnet::Dsh> dhtsh;
+    if (params.listen) {
+        // create dnc instance
+        dhtsh = std::make_unique<dhtnet::Dsh>(params.path, identity, params.bootstrap);
+    } else {
+        dhtsh = std::make_unique<dhtnet::Dsh>(params.path,
+                                              identity,
+                                              params.bootstrap,
+                                              params.peer_id,
+                                              params.binary);
+    }
+    dhtsh->run();
+    return EXIT_SUCCESS;