| /* |
| * Copyright (C) 2004-2021 Savoir-faire Linux Inc. |
| * |
| * Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> |
| * |
| * This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation; either version 3 of the License, or |
| * (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with this program; if not, write to the Free Software |
| * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| */ |
| |
| #include "data_transfer.h" |
| |
| #include "base64.h" |
| #include "client/ring_signal.h" |
| #include "fileutils.h" |
| #include "jamidht/jamiaccount.h" |
| #include "jamidht/p2p.h" |
| #include "manager.h" |
| #include "map_utils.h" |
| #include "peer_connection.h" |
| #include "string_utils.h" |
| |
| #include <thread> |
| #include <stdexcept> |
| #include <fstream> |
| #include <sstream> |
| #include <ios> |
| #include <iostream> |
| #include <unordered_map> |
| #include <mutex> |
| #include <future> |
| #include <atomic> |
| #include <charconv> // std::from_chars |
| #include <cstdlib> // mkstemp |
| #include <filesystem> |
| |
| #include <opendht/rng.h> |
| #include <opendht/thread_pool.h> |
| |
| namespace jami { |
| |
| DRing::DataTransferId |
| generateUID() |
| { |
| thread_local dht::crypto::random_device rd; |
| return std::uniform_int_distribution<DRing::DataTransferId> {1, JAMI_ID_MAX_VAL}(rd); |
| } |
| |
| constexpr const uint32_t MAX_BUFFER_SIZE {65534}; /* Channeled max packet size */ |
| //============================================================================== |
| |
| class DataTransfer : public Stream |
| { |
| public: |
| DataTransfer(DRing::DataTransferId id, InternalCompletionCb cb = {}) |
| : Stream() |
| , id {id} |
| , internalCompletionCb_ {std::move(cb)} |
| {} |
| |
| virtual ~DataTransfer() = default; |
| |
| DRing::DataTransferId getId() const override { return id; } |
| |
| virtual void accept(const std::string&, std::size_t) {}; |
| |
| virtual bool start() |
| { |
| wasStarted_ = true; |
| bool expected = false; |
| return started_.compare_exchange_strong(expected, true); |
| } |
| |
| virtual bool hasBeenStarted() const { return wasStarted_; } |
| |
| void close() noexcept override { started_ = false; } |
| |
| void bytesProgress(int64_t& total, int64_t& progress) const |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| total = info_.totalSize; |
| progress = info_.bytesProgress; |
| } |
| |
| void setBytesProgress(int64_t progress) const |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| info_.bytesProgress = progress; |
| } |
| |
| void info(DRing::DataTransferInfo& info) const |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| info = info_; |
| } |
| |
| bool isFinished() const |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| return info_.lastEvent >= DRing::DataTransferEventCode::finished; |
| } |
| |
| DRing::DataTransferInfo info() const { return info_; } |
| |
| virtual void emit(DRing::DataTransferEventCode code) const; |
| |
| const DRing::DataTransferId id; |
| |
| virtual void cancel() {} |
| |
| void setOnStateChangedCb(const OnStateChangedCb& cb) override; |
| |
| protected: |
| mutable std::mutex infoMutex_; |
| mutable DRing::DataTransferInfo info_; |
| mutable std::atomic_bool started_ {false}; |
| std::atomic_bool wasStarted_ {false}; |
| InternalCompletionCb internalCompletionCb_ {}; |
| OnStateChangedCb stateChangedCb_ {}; |
| }; |
| |
| void |
| DataTransfer::emit(DRing::DataTransferEventCode code) const |
| { |
| std::string accountId, to; |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| info_.lastEvent = code; |
| accountId = info_.accountId; |
| to = info_.conversationId; |
| if (to.empty()) |
| to = info_.peer; |
| } |
| if (stateChangedCb_) |
| stateChangedCb_(id, code); |
| if (internalCompletionCb_) |
| return; // VCard transfer is just for the daemon |
| runOnMainThread([id = id, code, accountId, to]() { |
| emitSignal<DRing::DataTransferSignal::DataTransferEvent>(accountId, |
| "", |
| "", |
| std::to_string(id), |
| uint32_t(code)); |
| }); |
| } |
| |
| void |
| DataTransfer::setOnStateChangedCb(const OnStateChangedCb& cb) |
| { |
| stateChangedCb_ = std::move(cb); |
| } |
| |
| //============================================================================== |
| |
| /** |
| * This class is used as a sort of buffer between the OutgoingFileTransfer |
| * used by clients to represent a transfer between the user and a contact |
| * and SubOutgoingFileTransfer representing the transfer between the user and |
| * each peer devices. It gives the optimistic view of a transfer (show a failure) |
| * only if all related transfer has failed. If one transfer succeed, ignore failures. |
| */ |
| class OptimisticMetaOutgoingInfo |
| { |
| public: |
| OptimisticMetaOutgoingInfo(const DataTransfer* parent, const DRing::DataTransferInfo& info); |
| /** |
| * Update the DataTransferInfo of the parent if necessary (if the event is more *interesting* |
| * for the user) |
| * @param info the last modified linked info (for example if a subtransfer is accepted, it will |
| * gives as a parameter its info) |
| */ |
| void updateInfo(const DRing::DataTransferInfo& info) const; |
| /** |
| * Add a subtransfer as a linked transfer |
| * @param linked |
| */ |
| void addLinkedTransfer(DataTransfer* linked) const; |
| /** |
| * Return the optimistic representation of the transfer |
| */ |
| const DRing::DataTransferInfo& info() const; |
| |
| private: |
| const DataTransfer* parent_; |
| mutable std::mutex infoMutex_; |
| mutable DRing::DataTransferInfo info_; |
| mutable std::vector<DataTransfer*> linkedTransfers_; |
| }; |
| |
| OptimisticMetaOutgoingInfo::OptimisticMetaOutgoingInfo(const DataTransfer* parent, |
| const DRing::DataTransferInfo& info) |
| : parent_(parent) |
| , info_(info) |
| {} |
| |
| void |
| OptimisticMetaOutgoingInfo::updateInfo(const DRing::DataTransferInfo& info) const |
| { |
| bool emitCodeChanged = false; |
| bool checkOngoing = false; |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| if (info_.lastEvent > DRing::DataTransferEventCode::timeout_expired) { |
| info_.lastEvent = DRing::DataTransferEventCode::invalid; |
| } |
| if (info.lastEvent >= DRing::DataTransferEventCode::created |
| && info.lastEvent <= DRing::DataTransferEventCode::finished |
| && info.lastEvent > info_.lastEvent) { |
| // Show the more advanced info |
| info_.lastEvent = info.lastEvent; |
| emitCodeChanged = true; |
| } |
| |
| if (info.lastEvent >= DRing::DataTransferEventCode::closed_by_host |
| && info.lastEvent <= DRing::DataTransferEventCode::timeout_expired |
| && info_.lastEvent < DRing::DataTransferEventCode::finished) { |
| // if not finished show error if all failed |
| // if the transfer was ongoing and canceled, we should go to the best status |
| bool isAllFailed = true; |
| checkOngoing = info_.lastEvent == DRing::DataTransferEventCode::ongoing; |
| DRing::DataTransferEventCode bestEvent {DRing::DataTransferEventCode::invalid}; |
| for (const auto* transfer : linkedTransfers_) { |
| const auto& i = transfer->info(); |
| if (i.lastEvent >= DRing::DataTransferEventCode::created |
| && i.lastEvent <= DRing::DataTransferEventCode::finished) { |
| isAllFailed = false; |
| if (checkOngoing) |
| bestEvent = bestEvent > i.lastEvent ? bestEvent : i.lastEvent; |
| else |
| break; |
| } |
| } |
| if (isAllFailed) { |
| info_.lastEvent = info.lastEvent; |
| emitCodeChanged = true; |
| } else if (checkOngoing && bestEvent != DRing::DataTransferEventCode::invalid) { |
| info_.lastEvent = bestEvent; |
| emitCodeChanged = true; |
| } |
| } |
| |
| int64_t bytesProgress {0}; |
| for (const auto* transfer : linkedTransfers_) { |
| const auto& i = transfer->info(); |
| if (i.bytesProgress > bytesProgress) { |
| bytesProgress = i.bytesProgress; |
| } |
| } |
| if (bytesProgress > info_.bytesProgress) { |
| info_.bytesProgress = bytesProgress; |
| parent_->setBytesProgress(info_.bytesProgress); |
| } |
| if (checkOngoing && info_.lastEvent != DRing::DataTransferEventCode::invalid) { |
| parent_->setBytesProgress(0); |
| } |
| } |
| |
| if (emitCodeChanged) { |
| parent_->emit(info_.lastEvent); |
| } |
| } |
| |
| void |
| OptimisticMetaOutgoingInfo::addLinkedTransfer(DataTransfer* linked) const |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| linkedTransfers_.emplace_back(linked); |
| } |
| |
| const DRing::DataTransferInfo& |
| OptimisticMetaOutgoingInfo::info() const |
| { |
| return info_; |
| } |
| |
| /** |
| * Represent a outgoing file transfer between a user and a device |
| */ |
| class SubOutgoingFileTransfer final : public DataTransfer |
| { |
| public: |
| SubOutgoingFileTransfer(DRing::DataTransferId tid, |
| const std::string& peerUri, |
| const InternalCompletionCb& cb, |
| std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo); |
| ~SubOutgoingFileTransfer(); |
| |
| void close() noexcept override; |
| void closeAndEmit(DRing::DataTransferEventCode code) const noexcept; |
| bool write(std::string_view) override; |
| void emit(DRing::DataTransferEventCode code) const override; |
| const std::string& peer() { return peerUri_; } |
| |
| void cancel() override |
| { |
| if (auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId)) |
| account->closePeerConnection(id); |
| } |
| |
| void setOnRecv(std::function<void(std::string_view)>&& cb) override |
| { |
| bool send = false; |
| { |
| std::lock_guard<std::mutex> lock(onRecvCbMtx_); |
| if (cb) |
| send = true; |
| onRecvCb_ = std::move(cb); |
| } |
| if (send) { |
| sendHeader(); // Pass headers to the new callback |
| } |
| } |
| |
| private: |
| SubOutgoingFileTransfer() = delete; |
| |
| void sendHeader() const |
| { |
| auto header = fmt::format("Content-Length: {}\n" |
| "Display-Name: {}\n" |
| "Offset: 0\n\n", |
| info_.totalSize, |
| info_.displayName); |
| headerSent_ = true; |
| emit(DRing::DataTransferEventCode::wait_peer_acceptance); |
| if (onRecvCb_) |
| onRecvCb_(header); |
| } |
| |
| void sendFile() const |
| { |
| dht::ThreadPool::io().run([this]() { |
| std::vector<char> buf; |
| while (!input_.eof() && onRecvCb_) { |
| buf.resize(MAX_BUFFER_SIZE); |
| |
| input_.read(&buf[0], buf.size()); |
| buf.resize(input_.gcount()); |
| if (buf.size()) { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| info_.bytesProgress += buf.size(); |
| metaInfo_->updateInfo(info_); |
| } |
| if (onRecvCb_) |
| onRecvCb_(std::string_view(buf.data(), buf.size())); |
| } |
| JAMI_DBG() << "FTP#" << getId() << ": sent " << info_.bytesProgress << " bytes"; |
| |
| if (info_.bytesProgress != info_.totalSize) |
| emit(DRing::DataTransferEventCode::closed_by_peer); |
| else { |
| if (internalCompletionCb_) |
| internalCompletionCb_(info_.path); |
| emit(DRing::DataTransferEventCode::finished); |
| } |
| }); |
| } |
| |
| mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_; |
| mutable std::ifstream input_; |
| mutable bool headerSent_ {false}; |
| bool peerReady_ {false}; |
| const std::string peerUri_; |
| mutable std::shared_ptr<Task> timeoutTask_; |
| std::mutex onRecvCbMtx_; |
| std::function<void(std::string_view)> onRecvCb_ {}; |
| }; |
| |
| SubOutgoingFileTransfer::SubOutgoingFileTransfer(DRing::DataTransferId tid, |
| const std::string& peerUri, |
| const InternalCompletionCb& cb, |
| std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo) |
| : DataTransfer(tid, cb) |
| , metaInfo_(std::move(metaInfo)) |
| , peerUri_(peerUri) |
| { |
| info_ = metaInfo_->info(); |
| fileutils::openStream(input_, info_.path, std::ios::in | std::ios::binary); |
| if (!input_) |
| throw std::runtime_error("input file open failed"); |
| metaInfo_->addLinkedTransfer(this); |
| } |
| |
| SubOutgoingFileTransfer::~SubOutgoingFileTransfer() |
| { |
| if (timeoutTask_) |
| timeoutTask_->cancel(); |
| } |
| |
| void |
| SubOutgoingFileTransfer::close() noexcept |
| { |
| closeAndEmit(DRing::DataTransferEventCode::closed_by_host); |
| } |
| |
| void |
| SubOutgoingFileTransfer::closeAndEmit(DRing::DataTransferEventCode code) const noexcept |
| { |
| started_ = false; // NOTE: replace DataTransfer::close(); which is non const |
| input_.close(); |
| |
| if (info_.lastEvent < DRing::DataTransferEventCode::finished) |
| emit(code); |
| } |
| |
| bool |
| SubOutgoingFileTransfer::write(std::string_view buffer) |
| { |
| if (buffer.empty()) |
| return true; |
| if (not peerReady_ and headerSent_) { |
| // detect GO or NGO msg |
| if (buffer.size() == 3 and buffer[0] == 'G' and buffer[1] == 'O' and buffer[2] == '\n') { |
| peerReady_ = true; |
| emit(DRing::DataTransferEventCode::ongoing); |
| if (onRecvCb_) |
| sendFile(); |
| } else { |
| // consider any other response as a cancel msg |
| JAMI_WARN() << "FTP#" << getId() << ": refused by peer"; |
| emit(DRing::DataTransferEventCode::closed_by_peer); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| void |
| SubOutgoingFileTransfer::emit(DRing::DataTransferEventCode code) const |
| { |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| info_.lastEvent = code; |
| } |
| if (stateChangedCb_) |
| stateChangedCb_(id, code); |
| metaInfo_->updateInfo(info_); |
| if (code == DRing::DataTransferEventCode::wait_peer_acceptance) { |
| if (timeoutTask_) |
| timeoutTask_->cancel(); |
| timeoutTask_ = Manager::instance().scheduleTaskIn( |
| [this]() { |
| JAMI_WARN() << "FTP#" << getId() << ": timeout. Cancel"; |
| closeAndEmit(DRing::DataTransferEventCode::timeout_expired); |
| }, |
| std::chrono::minutes(10)); |
| } else if (timeoutTask_) { |
| timeoutTask_->cancel(); |
| timeoutTask_.reset(); |
| } |
| } |
| |
| /** |
| * Represent a file transfer between a user and a peer (all of its device) |
| */ |
| class OutgoingFileTransfer final : public DataTransfer |
| { |
| public: |
| OutgoingFileTransfer(DRing::DataTransferId tid, |
| const DRing::DataTransferInfo& info, |
| const InternalCompletionCb& cb = {}); |
| ~OutgoingFileTransfer() {} |
| |
| std::shared_ptr<DataTransfer> startNewOutgoing(const std::string& peer_uri) |
| { |
| auto newTransfer = std::make_shared<SubOutgoingFileTransfer>(id, |
| peer_uri, |
| internalCompletionCb_, |
| metaInfo_); |
| newTransfer->setOnStateChangedCb(stateChangedCb_); |
| subtransfer_.emplace_back(newTransfer); |
| newTransfer->start(); |
| return newTransfer; |
| } |
| |
| bool hasBeenStarted() const override |
| { |
| // Started if one subtransfer is started |
| for (const auto& subtransfer : subtransfer_) |
| if (subtransfer->hasBeenStarted()) |
| return true; |
| return false; |
| } |
| |
| void close() noexcept override; |
| |
| bool cancelWithPeer(const std::string& peer) |
| { |
| auto allFinished = true; |
| for (const auto& subtransfer : subtransfer_) { |
| if (subtransfer->peer() == peer) |
| subtransfer->cancel(); |
| else if (!subtransfer->isFinished()) |
| allFinished = false; |
| } |
| return allFinished; |
| } |
| |
| private: |
| OutgoingFileTransfer() = delete; |
| |
| mutable std::shared_ptr<OptimisticMetaOutgoingInfo> metaInfo_; |
| mutable std::ifstream input_; |
| mutable std::vector<std::shared_ptr<SubOutgoingFileTransfer>> subtransfer_; |
| }; |
| |
| OutgoingFileTransfer::OutgoingFileTransfer(DRing::DataTransferId tid, |
| const DRing::DataTransferInfo& info, |
| const InternalCompletionCb& cb) |
| : DataTransfer(tid, cb) |
| { |
| fileutils::openStream(input_, info.path, std::ios::binary); |
| if (!input_) |
| throw std::runtime_error("input file open failed"); |
| |
| info_ = info; |
| info_.flags &= ~((uint32_t) 1 << int(DRing::DataTransferFlags::direction)); // outgoing |
| |
| // File size? |
| input_.seekg(0, std::ios_base::end); |
| info_.totalSize = input_.tellg(); |
| input_.close(); |
| |
| metaInfo_ = std::make_shared<OptimisticMetaOutgoingInfo>(this, this->info_); |
| } |
| |
| void |
| OutgoingFileTransfer::close() noexcept |
| { |
| for (const auto& subtransfer : subtransfer_) |
| subtransfer->close(); |
| } |
| |
| //============================================================================== |
| |
| class IncomingFileTransfer final : public DataTransfer |
| { |
| public: |
| IncomingFileTransfer(const DRing::DataTransferInfo&, |
| DRing::DataTransferId, |
| const InternalCompletionCb& cb = {}); |
| |
| bool start() override; |
| |
| void close() noexcept override; |
| |
| void requestFilename(const std::function<void(const std::string&)>& cb); |
| |
| void accept(const std::string&, std::size_t offset) override; |
| |
| bool write(std::string_view data) override; |
| |
| void setFilename(const std::string& filename); |
| |
| void cancel() override |
| { |
| auto account = Manager::instance().getAccount<JamiAccount>(info_.accountId); |
| if (account) |
| account->closePeerConnection(internalId_); |
| } |
| |
| private: |
| IncomingFileTransfer() = delete; |
| |
| DRing::DataTransferId internalId_; |
| |
| std::ofstream fout_; |
| std::mutex cbMtx_ {}; |
| std::function<void(const std::string&)> onFilenameCb_ {}; |
| }; |
| |
| IncomingFileTransfer::IncomingFileTransfer(const DRing::DataTransferInfo& info, |
| DRing::DataTransferId internalId, |
| const InternalCompletionCb& cb) |
| : DataTransfer(internalId, cb) |
| , internalId_(internalId) |
| { |
| JAMI_WARN() << "[FTP] incoming transfert of " << info.totalSize |
| << " byte(s): " << info.displayName; |
| |
| info_ = info; |
| info_.flags |= (uint32_t) 1 << int(DRing::DataTransferFlags::direction); // incoming |
| } |
| |
| void |
| IncomingFileTransfer::setFilename(const std::string& filename) |
| { |
| info_.path = filename; |
| } |
| |
| void |
| IncomingFileTransfer::requestFilename(const std::function<void(const std::string&)>& cb) |
| { |
| if (!internalCompletionCb_) { |
| std::lock_guard<std::mutex> lk(cbMtx_); |
| onFilenameCb_ = cb; |
| } |
| |
| emit(DRing::DataTransferEventCode::wait_host_acceptance); |
| |
| if (internalCompletionCb_) { |
| std::string filename = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + std::to_string(id); |
| fileutils::ofstream(filename); |
| if (not fileutils::isFile(filename)) |
| throw std::system_error(errno, std::generic_category()); |
| info_.path = filename; |
| cb(filename); |
| } |
| } |
| |
| bool |
| IncomingFileTransfer::start() |
| { |
| if (!DataTransfer::start()) |
| return false; |
| |
| fileutils::openStream(fout_, &info_.path[0], std::ios::binary); |
| if (!fout_) { |
| JAMI_ERR() << "[FTP] Can't open file " << info_.path; |
| return false; |
| } |
| |
| emit(DRing::DataTransferEventCode::ongoing); |
| return true; |
| } |
| |
| void |
| IncomingFileTransfer::close() noexcept |
| { |
| { |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| if (info_.lastEvent >= DRing::DataTransferEventCode::finished) |
| return; |
| } |
| DataTransfer::close(); |
| |
| decltype(onFilenameCb_) cb; |
| { |
| std::lock_guard<std::mutex> lk(cbMtx_); |
| cb = std::move(onFilenameCb_); |
| } |
| if (cb) |
| cb(""); |
| |
| fout_.close(); |
| |
| JAMI_DBG() << "[FTP] file closed, rx " << info_.bytesProgress << " on " << info_.totalSize; |
| if (info_.bytesProgress >= info_.totalSize) { |
| if (internalCompletionCb_) |
| internalCompletionCb_(info_.path); |
| emit(DRing::DataTransferEventCode::finished); |
| } else |
| emit(DRing::DataTransferEventCode::closed_by_host); |
| } |
| |
| void |
| IncomingFileTransfer::accept(const std::string& filename, std::size_t offset) |
| { |
| // TODO: offset? |
| (void) offset; |
| |
| info_.path = filename; |
| decltype(onFilenameCb_) cb; |
| { |
| std::lock_guard<std::mutex> lk(cbMtx_); |
| cb = std::move(onFilenameCb_); |
| } |
| if (cb) |
| cb(filename); |
| } |
| |
| bool |
| IncomingFileTransfer::write(std::string_view buffer) |
| { |
| if (buffer.empty()) |
| return true; |
| fout_ << buffer; |
| if (!fout_) |
| return false; |
| std::lock_guard<std::mutex> lk {infoMutex_}; |
| info_.bytesProgress += buffer.size(); |
| return true; |
| } |
| |
| //============================================================================== |
| // With Swarm |
| //============================================================================== |
| |
| FileInfo::FileInfo(const std::shared_ptr<ChannelSocket>& channel, |
| const std::string& fileId, |
| const std::string& interactionId, |
| const DRing::DataTransferInfo& info) |
| : fileId_(fileId) |
| , interactionId_(interactionId) |
| , info_(info) |
| , channel_(channel) |
| {} |
| |
| void |
| FileInfo::emit(DRing::DataTransferEventCode code) |
| { |
| if (finishedCb_ && code >= DRing::DataTransferEventCode::finished) |
| finishedCb_(uint32_t(code)); |
| if (interactionId_ != "") { |
| // Else it's an internal transfer |
| runOnMainThread([info = info_, iid = interactionId_, fid = fileId_, code]() { |
| emitSignal<DRing::DataTransferSignal::DataTransferEvent>(info.accountId, |
| info.conversationId, |
| iid, |
| fid, |
| uint32_t(code)); |
| }); |
| } |
| } |
| |
| OutgoingFile::OutgoingFile(const std::shared_ptr<ChannelSocket>& channel, |
| const std::string& fileId, |
| const std::string& interactionId, |
| const DRing::DataTransferInfo& info, |
| size_t start, |
| size_t end) |
| : FileInfo(channel, fileId, interactionId, info) |
| , start_(start) |
| , end_(end) |
| { |
| if (!fileutils::isFile(info_.path)) { |
| channel_->shutdown(); |
| return; |
| } |
| fileutils::openStream(stream_, info_.path, std::ios::binary | std::ios::in); |
| if (!stream_ || !stream_.is_open()) { |
| channel_->shutdown(); |
| return; |
| } |
| } |
| |
| OutgoingFile::~OutgoingFile() |
| { |
| if (stream_ && stream_.is_open()) |
| stream_.close(); |
| if (channel_) |
| channel_->shutdown(); |
| } |
| |
| void |
| OutgoingFile::process() |
| { |
| if (!channel_ or !stream_ or !stream_.is_open()) |
| return; |
| auto correct = false; |
| stream_.seekg(start_, std::ios::beg); |
| try { |
| std::vector<char> buffer(UINT16_MAX, 0); |
| std::error_code ec; |
| auto pos = start_; |
| while (!stream_.eof()) { |
| stream_.read(buffer.data(), |
| end_ > start_ ? std::min(end_ - pos, buffer.size()) : buffer.size()); |
| auto gcount = stream_.gcount(); |
| pos += gcount; |
| channel_->write(reinterpret_cast<const uint8_t*>(buffer.data()), gcount, ec); |
| if (ec) |
| break; |
| } |
| if (!ec) |
| correct = true; |
| stream_.close(); |
| } catch (...) { |
| } |
| if (!isUserCancelled_) { |
| // NOTE: emit(code) MUST be changed to improve handling of multiple destinations |
| // But for now, we can just avoid to emit errors to the client, because for outgoing |
| // transfer in a swarm, for outgoingFiles, we know that the file is ok. And the peer |
| // will retry the transfer if they need, so we don't need to show errors. |
| if (!interactionId_.empty() && !correct) |
| return; |
| auto code = correct ? DRing::DataTransferEventCode::finished |
| : DRing::DataTransferEventCode::closed_by_peer; |
| emit(code); |
| } |
| } |
| |
| void |
| OutgoingFile::cancel() |
| { |
| // Remove link, not original file |
| auto path = fileutils::get_data_dir() + DIR_SEPARATOR_STR + "conversation_data" |
| + DIR_SEPARATOR_STR + info_.accountId + DIR_SEPARATOR_STR + info_.conversationId |
| + DIR_SEPARATOR_STR + fileId_; |
| if (fileutils::isSymLink(path)) |
| fileutils::remove(path); |
| isUserCancelled_ = true; |
| emit(DRing::DataTransferEventCode::closed_by_host); |
| } |
| |
| IncomingFile::IncomingFile(const std::shared_ptr<ChannelSocket>& channel, |
| const DRing::DataTransferInfo& info, |
| const std::string& fileId, |
| const std::string& interactionId, |
| const std::string& sha3Sum) |
| : FileInfo(channel, fileId, interactionId, info) |
| , sha3Sum_(sha3Sum) |
| { |
| fileutils::openStream(stream_, info_.path, std::ios::binary | std::ios::out); |
| if (!stream_) |
| return; |
| |
| emit(DRing::DataTransferEventCode::ongoing); |
| } |
| |
| IncomingFile::~IncomingFile() |
| { |
| if (channel_) |
| channel_->setOnRecv({}); |
| if (stream_ && stream_.is_open()) |
| stream_.close(); |
| if (channel_) |
| channel_->shutdown(); |
| } |
| |
| void |
| IncomingFile::cancel() |
| { |
| isUserCancelled_ = true; |
| emit(DRing::DataTransferEventCode::closed_by_peer); |
| if (channel_) |
| channel_->shutdown(); |
| } |
| |
| void |
| IncomingFile::process() |
| { |
| channel_->setOnRecv([w = weak()](const uint8_t* buf, size_t len) { |
| if (auto shared = w.lock()) { |
| if (shared->stream_.is_open()) |
| shared->stream_.write(reinterpret_cast<const char*>(buf), len); |
| shared->info_.bytesProgress = shared->stream_.tellp(); |
| } |
| return len; |
| }); |
| channel_->onShutdown([w = weak()] { |
| auto shared = w.lock(); |
| if (!shared) |
| return; |
| auto correct = shared->sha3Sum_.empty(); |
| if (!correct) { |
| if (shared->stream_ && shared->stream_.is_open()) |
| shared->stream_.close(); |
| // Verify shaSum |
| auto sha3Sum = fileutils::sha3File(shared->info_.path); |
| if (shared->sha3Sum_ == sha3Sum) { |
| JAMI_INFO() << "New file received: " << shared->info_.path; |
| correct = true; |
| } else { |
| JAMI_WARN() << "Remove file, invalid sha3sum detected for " << shared->info_.path; |
| fileutils::remove(shared->info_.path, true); |
| } |
| } |
| if (shared->isUserCancelled_) |
| return; |
| auto code = correct ? DRing::DataTransferEventCode::finished |
| : DRing::DataTransferEventCode::closed_by_host; |
| shared->emit(code); |
| }); |
| } |
| |
| //============================================================================== |
| |
| class TransferManager::Impl |
| { |
| public: |
| Impl(const std::string& accountId, const std::string& to) |
| : accountId_(accountId) |
| , to_(to) |
| { |
| if (!to_.empty()) { |
| conversationDataPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR + accountId_ |
| + DIR_SEPARATOR_STR + "conversation_data" + DIR_SEPARATOR_STR |
| + to_; |
| fileutils::check_dir(conversationDataPath_.c_str()); |
| waitingPath_ = conversationDataPath_ + DIR_SEPARATOR_STR + "waiting"; |
| } |
| profilesPath_ = fileutils::get_data_dir() + DIR_SEPARATOR_STR + accountId_ |
| + DIR_SEPARATOR_STR + "profiles"; |
| loadWaiting(); |
| } |
| |
| ~Impl() |
| { |
| std::lock_guard<std::mutex> lk {mapMutex_}; |
| for (const auto& [channel, _of] : outgoings_) { |
| channel->shutdown(); |
| } |
| outgoings_.clear(); |
| incomings_.clear(); |
| vcards_.clear(); |
| } |
| |
| void loadWaiting() |
| { |
| try { |
| // read file |
| auto file = fileutils::loadFile(waitingPath_); |
| // load values |
| msgpack::object_handle oh = msgpack::unpack((const char*) file.data(), file.size()); |
| std::lock_guard<std::mutex> lk {mapMutex_}; |
| oh.get().convert(waitingIds_); |
| } catch (const std::exception& e) { |
| return; |
| } |
| } |
| void saveWaiting() |
| { |
| std::ofstream file(waitingPath_, std::ios::trunc | std::ios::binary); |
| msgpack::pack(file, waitingIds_); |
| } |
| |
| std::string accountId_ {}; |
| std::string to_ {}; |
| std::string waitingPath_ {}; |
| std::string profilesPath_ {}; |
| std::string conversationDataPath_ {}; |
| |
| // Pre swarm |
| std::map<DRing::DataTransferId, std::shared_ptr<OutgoingFileTransfer>> oMap_ {}; |
| std::map<DRing::DataTransferId, std::shared_ptr<IncomingFileTransfer>> iMap_ {}; |
| |
| std::mutex mapMutex_ {}; |
| std::map<std::string, WaitingRequest> waitingIds_ {}; |
| std::map<std::shared_ptr<ChannelSocket>, std::shared_ptr<OutgoingFile>> outgoings_ {}; |
| std::map<std::string, std::shared_ptr<IncomingFile>> incomings_ {}; |
| std::map<std::pair<std::string, std::string>, std::shared_ptr<IncomingFile>> vcards_ {}; |
| }; |
| |
| TransferManager::TransferManager(const std::string& accountId, const std::string& to) |
| : pimpl_ {std::make_unique<Impl>(accountId, to)} |
| {} |
| |
| TransferManager::~TransferManager() {} |
| |
| DRing::DataTransferId |
| TransferManager::sendFile(const std::string& path, |
| const std::string& peer, |
| const InternalCompletionCb& icb) |
| { |
| // IMPLEMENTATION NOTE: requestPeerConnection() may call the given callback a multiple time. |
| // This happen when multiple agents handle communications of the given peer for the given |
| // account. Example: Jami account supports multi-devices, each can answer to the request. |
| auto account = Manager::instance().getAccount<JamiAccount>(pimpl_->accountId_); |
| if (!account) { |
| return {}; |
| } |
| |
| auto tid = generateUID(); |
| std::size_t found = path.find_last_of(DIR_SEPARATOR_CH); |
| auto filename = path.substr(found + 1); |
| |
| DRing::DataTransferInfo info; |
| info.accountId = pimpl_->accountId_; |
| info.author = account->getUsername(); |
| info.peer = peer; |
| info.path = path; |
| info.displayName = filename; |
| info.bytesProgress = 0; |
| |
| auto transfer = std::make_shared<OutgoingFileTransfer>(tid, info, icb); |
| { |
| std::lock_guard<std::mutex> lk {pimpl_->mapMutex_}; |
| auto it = pimpl_->oMap_.find(tid); |
| if (it != pimpl_->oMap_.end()) { |
| // If the transfer is already in progress (aka not finished) |
| // we do not need to send the request and can ignore it. |
| if (!it->second->isFinished()) { |
| JAMI_DBG("Can't send request for %lu. Already sending the file", tid); |
| return {}; |
| } |
| pimpl_->oMap_.erase(it); |
| } |
| pimpl_->oMap_.emplace(tid, transfer); |
| } |
| transfer->emit(DRing::DataTransferEventCode::created); |
| |
| try { |
| account->requestConnection( |
| info, |
| tid, |
| static_cast<bool>(icb), |
| [transfer](const std::shared_ptr<ChanneledOutgoingTransfer>& out) { |
| if (out) |
| out->linkTransfer(transfer->startNewOutgoing(out->peer())); |
| }, |
| [transfer](const std::string& peer) { |
| auto allFinished = transfer->cancelWithPeer(peer); |
| if (allFinished and not transfer->hasBeenStarted()) { |
| transfer->emit(DRing::DataTransferEventCode::unjoinable_peer); |
| transfer->cancel(); |
| transfer->close(); |
| } |
| }); |
| } catch (const std::exception& ex) { |
| JAMI_ERR() << "[XFER] exception during sendFile(): " << ex.what(); |
| return {}; |
| } |
| |
| return tid; |
| } |
| |
| bool |
| TransferManager::acceptFile(const DRing::DataTransferId& id, const std::string& path) |
| { |
| std::lock_guard<std::mutex> lk {pimpl_->mapMutex_}; |
| auto it = pimpl_->iMap_.find(id); |
| if (it == pimpl_->iMap_.end()) { |
| JAMI_WARN("Cannot accept %lu, request not found", id); |
| return false; |
| } |
| it->second->accept(path, 0); |
| return true; |
| } |
| |
| void |
| TransferManager::transferFile(const std::shared_ptr<ChannelSocket>& channel, |
| const std::string& fileId, |
| const std::string& interactionId, |
| const std::string& path, |
| size_t start, |
| size_t end) |
| { |
| std::lock_guard<std::mutex> lk {pimpl_->mapMutex_}; |
| if (pimpl_->outgoings_.find(channel) != pimpl_->outgoings_.end()) |
| return; |
| DRing::DataTransferInfo info; |
| info.accountId = pimpl_->accountId_; |
| info.conversationId = pimpl_->to_; |
| info.path = path; |
| auto f = std::make_shared<OutgoingFile>(channel, fileId, interactionId, info, start, end); |
| f->onFinished([w = weak(), channel](uint32_t) { |
| // schedule destroy outgoing transfer as not needed |
| dht::ThreadPool().computation().run([w, channel] { |
| if (auto sthis_ = w.lock()) { |
| auto& pimpl = sthis_->pimpl_; |
| std::lock_guard<std::mutex> lk {pimpl->mapMutex_}; |
| auto itO = pimpl->outgoings_.find(channel); |
| if (itO != pimpl->outgoings_.end()) |
| pimpl->outgoings_.erase(itO); |
| } |
| }); |
| }); |
| pimpl_->outgoings_.emplace(channel, f); |
| dht::ThreadPool::io().run([w = std::weak_ptr<OutgoingFile>(f)] { |
| if (auto of = w.lock()) |
| of->process(); |
| }); |
| } |
| |
| bool |
| TransferManager::cancel(const std::string& fileId) |
| { |
| std::shared_ptr<ChannelSocket> channel; |
| std::lock_guard<std::mutex> lk {pimpl_->mapMutex_}; |
| if (!pimpl_->to_.empty()) { |
| // Remove from waiting, this avoid auto-download |
| auto itW = pimpl_->waitingIds_.find(fileId); |
| if (itW != pimpl_->waitingIds_.end()) { |
| pimpl_->waitingIds_.erase(itW); |
| JAMI_DBG() << "Cancel " << fileId; |
| pimpl_->saveWaiting(); |
| } |
| // Note: For now, there is no cancel for outgoings. |
| // The client can just remove the file. |
| auto itC = pimpl_->incomings_.find(fileId); |
| if (itC == pimpl_->incomings_.end()) |
| return false; |
| itC->second->cancel(); |
| return true; |
| } |
| // Else, this is fallack. |
| try { |
| auto it = pimpl_->iMap_.find(std::stoull(fileId)); |
| if (it != pimpl_->iMap_.end()) { |
| if (it->second) |
| it->second->close(); |
| return true; |
| } |
| auto itO = pimpl_->oMap_.find(std::stoull(fileId)); |
| if (itO != pimpl_->oMap_.end()) { |
| if (itO->second) |
| itO->second->close(); |
| return true; |
| } |
| } catch (...) { |
| JAMI_ERR() << "Invalid fileId: " << fileId; |
| } |
| return false; |
| } |
| |
| bool |
| TransferManager::info(const DRing::DataTransferId& id, DRing::DataTransferInfo& info) const noexcept |
| { |
| std::unique_lock<std::mutex> lk {pimpl_->mapMutex_}; |
| if (!pimpl_->to_.empty()) |
| return false; |
| // Else it's fallback |
| if (auto it = pimpl_->iMap_.find(id); it != pimpl_->iMap_.end()) { |
| if (it->second) |
| it->second->info(info); |
| return true; |
| } |
| if (auto it = pimpl_->oMap_.find(id); it != pimpl_->oMap_.end()) { |
| if (it->second) |
| it->second->info(info); |
| return true; |
| } |
| return false; |
| } |
| |
| bool |
| TransferManager::info(const std::string& fileId, |
| std::string& path, |
| int64_t& total, |
| int64_t& progress) const noexcept |
| { |
| std::unique_lock<std::mutex> lk {pimpl_->mapMutex_}; |
| if (pimpl_->to_.empty()) |
| return false; |
| |
| auto itI = pimpl_->incomings_.find(fileId); |
| auto itW = pimpl_->waitingIds_.find(fileId); |
| path = this->path(fileId); |
| if (itI != pimpl_->incomings_.end()) { |
| total = itI->second->info().totalSize; |
| progress = itI->second->info().bytesProgress; |
| return true; |
| } else if (fileutils::isFile(path)) { |
| std::ifstream transfer(path, std::ios::binary); |
| transfer.seekg(0, std::ios::end); |
| progress = transfer.tellg(); |
| if (itW != pimpl_->waitingIds_.end()) { |
| total = itW->second.totalSize; |
| } else { |
| // If not waiting it's finished |
| total = progress; |
| } |
| return true; |
| } else if (itW != pimpl_->waitingIds_.end()) { |
| total = itW->second.totalSize; |
| progress = 0; |
| return true; |
| } |
| // Else we don't know infos there. |
| progress = 0; |
| return false; |
| } |
| |
| void |
| TransferManager::onIncomingFileRequest(const DRing::DataTransferInfo& info, |
| const DRing::DataTransferId& id, |
| const std::function<void(const IncomingFileInfo&)>& cb, |
| const InternalCompletionCb& icb) |
| { |
| auto transfer = std::make_shared<IncomingFileTransfer>(info, id, icb); |
| { |
| std::lock_guard<std::mutex> lk {pimpl_->mapMutex_}; |
| pimpl_->iMap_.emplace(id, transfer); |
| } |
| transfer->emit(DRing::DataTransferEventCode::created); |
| transfer->requestFilename([transfer, id, cb = std::move(cb)](const std::string& filename) { |
| if (!filename.empty() && transfer->start()) |
| cb({id, std::static_pointer_cast<Stream>(transfer)}); |
| else |
| cb({id, nullptr}); |
| }); |
| } |
| |
| void |
| TransferManager::waitForTransfer(const std::string& fileId, |
| const std::string& interactionId, |
| const std::string& sha3sum, |
| const std::string& path, |
| std::size_t total) |
| { |
| std::unique_lock<std::mutex> lk(pimpl_->mapMutex_); |
| auto itW = pimpl_->waitingIds_.find(fileId); |
| if (itW != pimpl_->waitingIds_.end()) |
| return; |
| pimpl_->waitingIds_[fileId] = {fileId, interactionId, sha3sum, path, total}; |
| JAMI_DBG() << "Wait for " << fileId; |
| if (!pimpl_->to_.empty()) |
| pimpl_->saveWaiting(); |
| lk.unlock(); |
| emitSignal<DRing::DataTransferSignal::DataTransferEvent>( |
| pimpl_->accountId_, |
| pimpl_->to_, |
| interactionId, |
| fileId, |
| uint32_t(DRing::DataTransferEventCode::wait_peer_acceptance)); |
| } |
| |
| void |
| TransferManager::onIncomingFileTransfer(const std::string& fileId, |
| const std::shared_ptr<ChannelSocket>& channel) |
| { |
| std::lock_guard<std::mutex> lk(pimpl_->mapMutex_); |
| // Check if not already an incoming file for this id and that we are waiting this file |
| auto itC = pimpl_->incomings_.find(fileId); |
| if (itC != pimpl_->incomings_.end()) { |
| channel->shutdown(); |
| return; |
| } |
| auto itW = pimpl_->waitingIds_.find(fileId); |
| if (itW == pimpl_->waitingIds_.end()) { |
| channel->shutdown(); |
| return; |
| } |
| |
| DRing::DataTransferInfo info; |
| info.accountId = pimpl_->accountId_; |
| info.conversationId = pimpl_->to_; |
| info.path = itW->second.path; |
| info.totalSize = itW->second.totalSize; |
| info.bytesProgress = 0; |
| |
| // Generate the file path within the conversation data directory |
| // using the file id if no path has been specified, otherwise create |
| // a symlink(Note: this will not work on Windows). |
| auto filePath = path(fileId); |
| if (info.path.empty()) { |
| info.path = filePath; |
| } else { |
| // We don't need to check if this is an existing symlink here, as |
| // the attempt to create one should report the error string correctly. |
| fileutils::createFileLink(filePath, info.path); |
| } |
| |
| auto ifile = std::make_shared<IncomingFile>(std::move(channel), |
| info, |
| fileId, |
| itW->second.interactionId, |
| itW->second.sha3sum); |
| auto res = pimpl_->incomings_.emplace(fileId, std::move(ifile)); |
| if (res.second) { |
| res.first->second->onFinished([w = weak(), fileId](uint32_t code) { |
| // schedule destroy transfer as not needed |
| dht::ThreadPool().computation().run([w, fileId, code] { |
| if (auto sthis_ = w.lock()) { |
| auto& pimpl = sthis_->pimpl_; |
| std::lock_guard<std::mutex> lk {pimpl->mapMutex_}; |
| auto itO = pimpl->incomings_.find(fileId); |
| if (itO != pimpl->incomings_.end()) |
| pimpl->incomings_.erase(itO); |
| if (code == uint32_t(DRing::DataTransferEventCode::finished)) { |
| auto itW = pimpl->waitingIds_.find(fileId); |
| if (itW != pimpl->waitingIds_.end()) { |
| pimpl->waitingIds_.erase(itW); |
| pimpl->saveWaiting(); |
| } |
| } |
| } |
| }); |
| }); |
| res.first->second->process(); |
| } |
| } |
| |
| std::string |
| TransferManager::path(const std::string& fileId) const |
| { |
| return pimpl_->conversationDataPath_ + DIR_SEPARATOR_STR + fileId; |
| } |
| |
| void |
| TransferManager::onIncomingProfile(const std::shared_ptr<ChannelSocket>& channel) |
| { |
| if (!channel) |
| return; |
| |
| auto name = channel->name(); |
| auto lastSep = name.find_last_of('/'); |
| auto fileId = name.substr(lastSep + 1); |
| |
| auto deviceId = channel->deviceId().toString(); |
| auto cert = channel->peerCertificate(); |
| if (!cert || !cert->issuer || fileId.find(".vcf") == std::string::npos) |
| return; |
| |
| auto uri = fileId == "profile.vcf" ? cert->issuer->getId().toString() |
| : fileId.substr(0, fileId.size() - 4 /*.vcf*/); |
| |
| std::lock_guard<std::mutex> lk(pimpl_->mapMutex_); |
| auto idx = std::pair<std::string, std::string> {deviceId, uri}; |
| // Check if not already an incoming file for this id and that we are waiting this file |
| auto itV = pimpl_->vcards_.find(idx); |
| if (itV != pimpl_->vcards_.end()) { |
| channel->shutdown(); |
| return; |
| } |
| |
| auto tid = generateUID(); |
| DRing::DataTransferInfo info; |
| info.accountId = pimpl_->accountId_; |
| info.conversationId = pimpl_->to_; |
| info.path = fileutils::get_cache_dir() + DIR_SEPARATOR_STR + pimpl_->accountId_ |
| + DIR_SEPARATOR_STR + "vcard" + DIR_SEPARATOR_STR + deviceId + "_" + uri + "_" |
| + std::to_string(tid); |
| |
| auto ifile = std::make_shared<IncomingFile>(std::move(channel), info, "profile.vcf", ""); |
| auto res = pimpl_->vcards_.emplace(idx, std::move(ifile)); |
| if (res.second) { |
| res.first->second->onFinished([w = weak(), |
| uri = std::move(uri), |
| deviceId = std::move(deviceId), |
| accountId = pimpl_->accountId_, |
| cert = std::move(cert), |
| path = info.path](uint32_t code) { |
| // schedule destroy transfer as not needed |
| dht::ThreadPool().computation().run([w, |
| uri = std::move(uri), |
| deviceId = std::move(deviceId), |
| accountId = std::move(accountId), |
| cert = std::move(cert), |
| path = std::move(path), |
| code] { |
| if (auto sthis_ = w.lock()) { |
| auto& pimpl = sthis_->pimpl_; |
| std::lock_guard<std::mutex> lk {pimpl->mapMutex_}; |
| auto itO = pimpl->vcards_.find({deviceId, uri}); |
| if (itO != pimpl->vcards_.end()) |
| pimpl->vcards_.erase(itO); |
| if (code == uint32_t(DRing::DataTransferEventCode::finished)) |
| emitSignal<DRing::ConfigurationSignal::ProfileReceived>(accountId, |
| uri, |
| path); |
| } |
| }); |
| }); |
| res.first->second->process(); |
| } |
| } |
| |
| std::string |
| TransferManager::profilePath(const std::string& contactId) const |
| { |
| // TODO Android? iOS? |
| return pimpl_->profilesPath_ + DIR_SEPARATOR_STR + base64::encode(contactId) + ".vcf"; |
| } |
| |
| std::vector<WaitingRequest> |
| TransferManager::waitingRequests() const |
| { |
| std::vector<WaitingRequest> res; |
| std::lock_guard<std::mutex> lk(pimpl_->mapMutex_); |
| for (const auto& [fileId, req] : pimpl_->waitingIds_) { |
| auto itC = pimpl_->incomings_.find(fileId); |
| if (itC == pimpl_->incomings_.end()) |
| res.emplace_back(req); |
| } |
| return res; |
| } |
| |
| bool |
| TransferManager::isWaiting(const std::string& fileId) const |
| { |
| std::lock_guard<std::mutex> lk(pimpl_->mapMutex_); |
| return pimpl_->waitingIds_.find(fileId) != pimpl_->waitingIds_.end(); |
| } |
| |
| } // namespace jami |