Skip to content

Commit

Permalink
Implement the TUS resumebale upload protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
TheOneRing committed Jun 22, 2020
1 parent dea019e commit 2763c78
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 4 deletions.
7 changes: 7 additions & 0 deletions changelog/unreleased/product-19
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Change: Add support for the TUS resumeable upload protocol

With the support of the TUS protocol we are now able to easily and reliably
upload files to ocis.


https://github.com/owncloud/product/issues/19
1 change: 1 addition & 0 deletions src/libsync/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ set(libsync_SRCS
propagateupload.cpp
propagateuploadv1.cpp
propagateuploadng.cpp
propagateuploadtus.cpp
propagateremotedelete.cpp
propagateremotemove.cpp
propagateremotemkdir.cpp
Expand Down
23 changes: 23 additions & 0 deletions src/libsync/capabilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Capabilities::Capabilities(const QVariantMap &capabilities)
: _capabilities(capabilities)
, _fileSharingCapabilities(_capabilities.value(QStringLiteral("files_sharing")).toMap())
, _fileSharingPublicCapabilities(_fileSharingCapabilities.value(QStringLiteral("public"), {}).toMap())
, _tusSupport(_capabilities.value(QLatin1String("files")).toMap().value(QStringLiteral("tus_support")).toMap())
{
}

Expand Down Expand Up @@ -163,6 +164,11 @@ bool Capabilities::bigfilechunkingEnabled() const
return _capabilities.value("files").toMap().value(QStringLiteral("bigfilechunking"), true).toBool();
}

const TusSupport &Capabilities::tusSupport() const
{
return _tusSupport;
}

bool Capabilities::chunkingParallelUploadDisabled() const
{
return _capabilities.value("dav").toMap().value("chunkingParallelUploadDisabled").toBool();
Expand Down Expand Up @@ -216,4 +222,21 @@ QStringList Capabilities::blacklistedFiles() const
{
return _capabilities.value("files").toMap().value("blacklisted_files").toStringList();
}

TusSupport::TusSupport(const QVariantMap &tus_support)
{
if (tus_support.isEmpty()) {
return;
}
version = QVersionNumber::fromString(tus_support.value(QStringLiteral("version")).toString());
resumable = QVersionNumber::fromString(tus_support.value(QStringLiteral("resumable")).toString());
extensions = tus_support.value(QStringLiteral("extension")).toString().split(QLatin1Char(','), QString::SkipEmptyParts);
max_chunk_size = tus_support.value(QStringLiteral("max_chunk_size")).value<quint64>();
http_method_override = tus_support.value(QStringLiteral("http_method_override")).toString();
}

bool TusSupport::isValid() const
{
return !version.isNull();
}
}
26 changes: 26 additions & 0 deletions src/libsync/capabilities.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,32 @@

#include <QVariantMap>
#include <QStringList>
#include <QVersionNumber>

namespace OCC {


struct TusSupport
{
/**
<tus_support>
<version>1.0.0</version>
<resumable>1.0.0</resumable>
<extension>creation,creation-with-upload</extension>
<max_chunk_size>0</max_chunk_size>
<http_method_override/>
</tus_support>
*/
TusSupport(const QVariantMap &tus_support);
QVersionNumber version;
QVersionNumber resumable;
QStringList extensions;
quint64 max_chunk_size;
QString http_method_override;

bool isValid() const;
};

/**
* @brief The Capabilities class represents the capabilities of an ownCloud
* server
Expand Down Expand Up @@ -62,6 +85,8 @@ class OWNCLOUDSYNC_EXPORT Capabilities
/// Wheter to use chunking
bool bigfilechunkingEnabled() const;

const TusSupport &tusSupport() const;

/// disable parallel upload in chunking
bool chunkingParallelUploadDisabled() const;

Expand Down Expand Up @@ -154,6 +179,7 @@ class OWNCLOUDSYNC_EXPORT Capabilities
QVariantMap _capabilities;
QVariantMap _fileSharingCapabilities;
QVariantMap _fileSharingPublicCapabilities;
TusSupport _tusSupport;
};
}

Expand Down
13 changes: 9 additions & 4 deletions src/libsync/owncloudpropagator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "common/syncjournalfilerecord.h"
#include "propagatedownload.h"
#include "propagateupload.h"
#include "propagateuploadtus.h"
#include "propagateremotedelete.h"
#include "propagateremotemove.h"
#include "propagateremotemkdir.h"
Expand Down Expand Up @@ -335,11 +336,15 @@ PropagateItemJob *OwncloudPropagator::createJob(const SyncFileItemPtr &item)
return job;
} else {
PropagateUploadFileCommon *job = nullptr;
if (item->_size > syncOptions()._initialChunkSize && account()->capabilities().chunkingNg()) {
// Item is above _initialChunkSize, thus will be classified as to be chunked
job = new PropagateUploadFileNG(this, item);
if (account()->capabilities().tusSupport().isValid()) {
job = new PropagateUploadFileTUS(this, item);
} else {
job = new PropagateUploadFileV1(this, item);
if (item->_size > syncOptions()._initialChunkSize && account()->capabilities().chunkingNg()) {
// Item is above _initialChunkSize, thus will be classified as to be chunked
job = new PropagateUploadFileNG(this, item);
} else {
job = new PropagateUploadFileV1(this, item);
}
}
job->setDeleteExisting(deleteExisting);
return job;
Expand Down
259 changes: 259 additions & 0 deletions src/libsync/propagateuploadtus.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Copyright (C) by Hannah von Reth <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/

#include "account.h"
#include "common/asserts.h"
#include "common/checksums.h"
#include "common/syncjournaldb.h"
#include "common/syncjournalfilerecord.h"
#include "common/utility.h"
#include "filesystem.h"
#include "httplogger.h"
#include "networkjobs.h"
#include "owncloudpropagator_p.h"
#include "propagateremotedelete.h"
#include "propagateupload.h"
#include "propagateuploadtus.h"
#include "propagatorjobs.h"
#include "syncengine.h"

#include <QNetworkAccessManager>
#include <QFileInfo>
#include <QDir>
#include <cmath>
#include <cstring>
#include <memory>

namespace {
QUrl uploadURL(const OCC::AccountPtr &account)
{
return OCC::Utility::concatUrlPath(account->url(), QStringLiteral("remote.php/dav/files/%1/").arg(account->davUser()));
}

QByteArray uploadOffset()
{
return QByteArrayLiteral("Upload-Offset");
}

void setTusVersionHeader(QNetworkRequest &req){
req.setRawHeader(QByteArrayLiteral("Tus-Resumable"), QByteArrayLiteral("1.0.0"));
}
}

namespace OCC {
// be very verbose for now
Q_LOGGING_CATEGORY(lcPropagateUploadTUS, "sync.propagator.upload.tus", QtDebugMsg)


UploadDevice *PropagateUploadFileTUS::prepareDevice(const quint64 &chunkSize)
{
const QString localFileName = propagator()->getFilePath(_item->_file);
auto device = new UploadDevice(localFileName, _currentOffset, chunkSize, &propagator()->_bandwidthManager);
if (!device->open(QIODevice::ReadOnly)) {
qCWarning(lcPropagateUploadTUS) << "Could not prepare upload device: " << device->errorString();

// If the file is currently locked, we want to retry the sync
// when it becomes available again.
if (FileSystem::isFileLocked(localFileName)) {
emit propagator()->seenLockedFile(localFileName);
}
// Soft error because this is likely caused by the user modifying his files while syncing
abortWithError(SyncFileItem::SoftError, device->errorString());
return nullptr;
}
return device;
}


SimpleNetworkJob *PropagateUploadFileTUS::makeCreationWithUploadJob(QNetworkRequest *request, UploadDevice *device)
{
Q_ASSERT(propagator()->account()->capabilities().tusSupport().extensions.contains(QStringLiteral("creation-with-upload")));
// in difference to the old protocol the algrithm and the value are space seperated
const auto checkSum = _transmissionChecksumHeader.replace(':', ' ').toBase64();
request->setRawHeader(QByteArrayLiteral("Upload-Metadata"), "filename " + _item->_file.toUtf8().toBase64() + ",checksum " + checkSum);
request->setRawHeader(QByteArrayLiteral("Upload-Length"), QByteArray::number(_item->_size));
return propagator()->account()->sendRequest("POST", uploadURL(propagator()->account()), *request, device);
}

QNetworkRequest PropagateUploadFileTUS::prepareRequest(const quint64 &chunkSize)
{
QNetworkRequest request;
const auto headers = PropagateUploadFileCommon::headers();
for (auto it = headers.cbegin(); it != headers.cend(); ++it) {
request.setRawHeader(it.key(), it.value());
}

request.setHeader(QNetworkRequest::ContentTypeHeader, QByteArrayLiteral("application/offset+octet-stream"));
request.setHeader(QNetworkRequest::ContentLengthHeader, QByteArray::number(chunkSize));
request.setRawHeader(uploadOffset(), QByteArray::number(_currentOffset));
setTusVersionHeader(request);
return request;
}

PropagateUploadFileTUS::PropagateUploadFileTUS(OwncloudPropagator *propagator, const SyncFileItemPtr &item)
: PropagateUploadFileCommon(propagator, item)
{
}

void PropagateUploadFileTUS::doStartUpload()
{
const SyncJournalDb::UploadInfo progressInfo = propagator()->_journal->getUploadInfo(_item->_file);
propagator()->reportProgress(*_item, 0);
startNextChunk();
propagator()->_activeJobList.append(this);
}

void PropagateUploadFileTUS::startNextChunk()
{
if (propagator()->_abortRequested)
return;
const quint64 chunkSize = [&] {
auto chunkSize = _item->_size - _currentOffset;
if (propagator()->account()->capabilities().tusSupport().max_chunk_size) {
chunkSize = qMin(chunkSize - _currentOffset, propagator()->account()->capabilities().tusSupport().max_chunk_size);
}
return chunkSize;
}();

QNetworkRequest req = prepareRequest(chunkSize);
auto device = prepareDevice(chunkSize);
if (!device) {
return;
}

SimpleNetworkJob *job;
if (_currentOffset != 0) {
job = propagator()->account()->sendRequest("PATCH", _location, req, device);
} else {
job = makeCreationWithUploadJob(&req, device);
}

_jobs.append(job);
connect(job, &SimpleNetworkJob::finishedSignal, this, &PropagateUploadFileTUS::slotChunkFinished);
connect(job, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed);
job->start();
}

void PropagateUploadFileTUS::slotChunkFinished()
{
SimpleNetworkJob *job = qobject_cast<SimpleNetworkJob *>(sender());
slotJobDestroyed(job); // remove it from the _jobs list
ASSERT(job);

_item->_httpErrorCode = job->reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
_item->_responseTimeStamp = job->responseTimestamp();
_item->_requestId = job->requestId();

QNetworkReply::NetworkError err = job->reply()->error();
if (err != QNetworkReply::NoError) {
// try to get the offset if possible, only try once
if (err == QNetworkReply::TimeoutError && !_location.isEmpty() && HttpLogger::requestVerb(*job->reply()) != "HEAD")
{
QNetworkRequest req;
setTusVersionHeader(req);
auto updateJob = propagator()->account()->sendRequest("HEAD", _location, req);
_jobs.append(updateJob);
connect(updateJob, &SimpleNetworkJob::finishedSignal, this, &PropagateUploadFileTUS::slotChunkFinished);
connect(updateJob, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed);
updateJob->start();
return;

}
commonErrorHandling(job);
return;
}

const int offset = job->reply()->rawHeader(uploadOffset()).toInt();
propagator()->reportProgress(*_item, offset);
_currentOffset = offset;
// first response after a POST request
if (_location.isEmpty()) {
_location = job->reply()->header(QNetworkRequest::LocationHeader).toUrl();
}


_finished = offset == _item->_size;

// Check if the file still exists
const QString fullFilePath(propagator()->getFilePath(_item->_file));
if (!FileSystem::fileExists(fullFilePath)) {
if (!_finished) {
abortWithError(SyncFileItem::SoftError, tr("The local file was removed during sync."));
return;
} else {
propagator()->_anotherSyncNeeded = true;
}
}

// Check whether the file changed since discovery.
if (!FileSystem::verifyFileUnchanged(fullFilePath, _item->_size, _item->_modtime)) {
propagator()->_anotherSyncNeeded = true;
if (!_finished) {
abortWithError(SyncFileItem::SoftError, tr("Local file changed during sync."));
// FIXME: the legacy code was retrying for a few seconds.
// and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW
return;
}
}
if (!_finished) {
qCDebug(lcPropagateUploadTUS) << "we need to patch";
startNextChunk();
return;
}
const QByteArray etag = getEtagFromReply(job->reply());

_finished = !etag.isEmpty();
if (!_finished) {
auto check = new PropfindJob(propagator()->account(), _item->_file);
_jobs.append(check);
check->setProperties({ "http://owncloud.org/ns:fileid", "getetag" });
connect(check, &PropfindJob::result, this, [this, check](const QVariantMap &map) {
_finished = true;
finalize(Utility::normalizeEtag(map.value("getetag").toByteArray()), map.value("fileid").toByteArray());
slotJobDestroyed(check);
});
connect(check, &QObject::destroyed, this, &PropagateUploadFileCommon::slotJobDestroyed);
check->start();
return;
}
// the file id should only be empty for new files up- or downloaded
finalize(etag, job->reply()->rawHeader("OC-FileID"));
}

void PropagateUploadFileTUS::finalize(const QByteArray &etag, const QByteArray &fileId)
{
ASSERT(_finished);
qCDebug(lcPropagateUploadTUS) << _item->_etag << etag << fileId;
_item->_etag = etag;
if (!fileId.isEmpty()) {
if (!_item->_fileId.isEmpty() && _item->_fileId != fileId) {
qCWarning(lcPropagateUploadTUS) << "File ID changed!" << _item->_fileId << fileId;
}
_item->_fileId = fileId;
}
propagator()->_activeJobList.removeOne(this);
PropagateUploadFileCommon::finalize();
}

void PropagateUploadFileTUS::abort(PropagatorJob::AbortType abortType)
{
abortNetworkJobs(
abortType,
[](AbstractNetworkJob *) {
// TODO
return true;
});
}

}
Loading

0 comments on commit 2763c78

Please sign in to comment.