From: Matthieu Gallien Date: Wed, 8 Sep 2021 10:10:01 +0000 (+0200) Subject: implement bulk upload X-Git-Tag: archive/raspbian/3.16.7-1_deb13u1+rpi1~1^2~12^2~18^2~88^2 X-Git-Url: https://dgit.raspbian.org/?a=commitdiff_plain;h=c194605c356a0ffcc31f752e7792b6b6cd014911;p=nextcloud-desktop.git implement bulk upload add PutMultiFileJob to send many files at once use it in BulkPropagatorJob to implement bulk upload feature Signed-off-by: Matthieu Gallien --- diff --git a/src/libsync/CMakeLists.txt b/src/libsync/CMakeLists.txt index d58e27d21..6618f28e5 100644 --- a/src/libsync/CMakeLists.txt +++ b/src/libsync/CMakeLists.txt @@ -40,6 +40,8 @@ set(libsync_SRCS propagateupload.cpp propagateuploadv1.cpp propagateuploadng.cpp + bulkpropagatorjob.cpp + putmultifilejob.cpp propagateremotedelete.cpp propagateremotedeleteencrypted.cpp propagateremotedeleteencryptedrootfolder.cpp diff --git a/src/libsync/abstractnetworkjob.cpp b/src/libsync/abstractnetworkjob.cpp index 6bb00a6b2..feac00601 100644 --- a/src/libsync/abstractnetworkjob.cpp +++ b/src/libsync/abstractnetworkjob.cpp @@ -148,6 +148,17 @@ QNetworkReply *AbstractNetworkJob::sendRequest(const QByteArray &verb, const QUr return reply; } +QNetworkReply *AbstractNetworkJob::sendRequest(const QByteArray &verb, + const QUrl &url, + QNetworkRequest req, + QHttpMultiPart *requestBody) +{ + auto reply = _account->sendRawRequest(verb, url, req, requestBody); + _requestBody = nullptr; + adoptRequest(reply); + return reply; +} + void AbstractNetworkJob::adoptRequest(QNetworkReply *reply) { addTimer(reply); diff --git a/src/libsync/abstractnetworkjob.h b/src/libsync/abstractnetworkjob.h index 04dadb49e..aadbc3872 100644 --- a/src/libsync/abstractnetworkjob.h +++ b/src/libsync/abstractnetworkjob.h @@ -138,6 +138,9 @@ protected: QNetworkRequest req = QNetworkRequest(), QIODevice *requestBody = nullptr); + QNetworkReply *sendRequest(const QByteArray &verb, const QUrl &url, + QNetworkRequest req, QHttpMultiPart *requestBody); + /** Makes this job drive a pre-made QNetworkReply * * This reply cannot have a QIODevice request body because we can't get diff --git a/src/libsync/account.cpp b/src/libsync/account.cpp index b6be1b111..5242861ad 100644 --- a/src/libsync/account.cpp +++ b/src/libsync/account.cpp @@ -47,6 +47,7 @@ #include #include #include +#include #include #include @@ -360,6 +361,18 @@ QNetworkReply *Account::sendRawRequest(const QByteArray &verb, const QUrl &url, return _am->sendCustomRequest(req, verb, data); } +QNetworkReply *Account::sendRawRequest(const QByteArray &verb, const QUrl &url, QNetworkRequest req, QHttpMultiPart *data) +{ + req.setUrl(url); + req.setSslConfiguration(this->getOrCreateSslConfig()); + if (verb == "PUT") { + return _am->put(req, data); + } else if (verb == "POST") { + return _am->post(req, data); + } + return _am->sendCustomRequest(req, verb, data); +} + SimpleNetworkJob *Account::sendRequest(const QByteArray &verb, const QUrl &url, QNetworkRequest req, QIODevice *data) { auto job = new SimpleNetworkJob(sharedFromThis()); diff --git a/src/libsync/account.h b/src/libsync/account.h index 334425853..91cd36b15 100644 --- a/src/libsync/account.h +++ b/src/libsync/account.h @@ -154,6 +154,9 @@ public: QNetworkReply *sendRawRequest(const QByteArray &verb, const QUrl &url, QNetworkRequest req, const QByteArray &data); + QNetworkReply *sendRawRequest(const QByteArray &verb, + const QUrl &url, QNetworkRequest req, QHttpMultiPart *data); + /** Create and start network job for a simple one-off request. * * More complicated requests typically create their own job types. diff --git a/src/libsync/bulkpropagatorjob.cpp b/src/libsync/bulkpropagatorjob.cpp new file mode 100644 index 000000000..3803bd5ee --- /dev/null +++ b/src/libsync/bulkpropagatorjob.cpp @@ -0,0 +1,679 @@ +/* + * Copyright 2021 (c) Matthieu Gallien + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "bulkpropagatorjob.h" + +#include "putmultifilejob.h" +#include "owncloudpropagator_p.h" +#include "syncfileitem.h" +#include "syncengine.h" +#include "propagateupload.h" +#include "propagatorjobs.h" +#include "filesystem.h" +#include "account.h" +#include "common/utility.h" +#include "common/checksums.h" +#include "networkjobs.h" + +#include +#include +#include +#include +#include +#include + +namespace OCC { + +Q_LOGGING_CATEGORY(lcBulkPropagatorJob, "nextcloud.sync.propagator.bulkupload", QtInfoMsg) + +} + +namespace { + +QByteArray getEtagFromJsonReply(const QJsonObject &reply) +{ + const auto ocEtag = OCC::parseEtag(reply.value("OC-ETag").toString().toLatin1()); + const auto ETag = OCC::parseEtag(reply.value("ETag").toString().toLatin1()); + const auto etag = OCC::parseEtag(reply.value("etag").toString().toLatin1()); + QByteArray ret = ocEtag; + if (ret.isEmpty()) { + ret = ETag; + } + if (ret.isEmpty()) { + ret = etag; + } + if (ocEtag.length() > 0 && ocEtag != etag && ocEtag != ETag) { + qCDebug(OCC::lcBulkPropagatorJob) << "Quite peculiar, we have an etag != OC-Etag [no problem!]" << etag << ETag << ocEtag; + } + return ret; +} + +QByteArray getHeaderFromJsonReply(const QJsonObject &reply, const QByteArray &headerName) +{ + return reply.value(headerName).toString().toLatin1(); +} + +} + +namespace OCC { + +BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator, + const std::deque &items) + : PropagatorJob(propagator) + , _items(items) +{ + _filesToUpload.reserve(100); +} + +bool BulkPropagatorJob::scheduleSelfOrChild() +{ + if (_items.empty()) { + return false; + } + + _state = Running; + for(int i = 0; i < 100 && !_items.empty(); ++i) { + auto currentItem = _items.front(); + _items.pop_front(); + _pendingChecksumFiles.insert(currentItem->_file); + QMetaObject::invokeMethod(this, [this, currentItem] () { + UploadFileInfo fileToUpload; + fileToUpload._file = currentItem->_file; + fileToUpload._size = currentItem->_size; + fileToUpload._path = propagator()->fullLocalPath(fileToUpload._file); + startUploadFile(currentItem, fileToUpload); + }); // We could be in a different thread (neon jobs) + } + + return _items.empty() && _filesToUpload.empty(); +} + +PropagatorJob::JobParallelism BulkPropagatorJob::parallelism() +{ + return PropagatorJob::JobParallelism::WaitForFinished; +} + +void BulkPropagatorJob::startUploadFile(SyncFileItemPtr item, UploadFileInfo fileToUpload) +{ + if (propagator()->_abortRequested) { + return; + } + + // Check if the specific file can be accessed + if (propagator()->hasCaseClashAccessibilityProblem(fileToUpload._file)) { + done(item, SyncFileItem::NormalError, tr("File %1 cannot be uploaded because another file with the same name, differing only in case, exists").arg(QDir::toNativeSeparators(item->_file))); + return; + } + + return slotComputeTransmissionChecksum(item, fileToUpload); +} + +void BulkPropagatorJob::doStartUpload(SyncFileItemPtr item, + UploadFileInfo fileToUpload, + QByteArray transmissionChecksumHeader) +{ + if (propagator()->_abortRequested) { + return; + } + + // write the checksum in the database, so if the POST is sent + // to the server, but the connection drops before we get the etag, we can check the checksum + // in reconcile (issue #5106) + SyncJournalDb::UploadInfo pi; + pi._valid = true; + pi._chunk = 0; + pi._transferid = 0; // We set a null transfer id because it is not chunked. + pi._modtime = item->_modtime; + pi._errorCount = 0; + pi._contentChecksum = item->_checksumHeader; + pi._size = item->_size; + propagator()->_journal->setUploadInfo(item->_file, pi); + propagator()->_journal->commit("Upload info"); + + auto currentHeaders = headers(item); + currentHeaders[QByteArrayLiteral("Content-Length")] = QByteArray::number(fileToUpload._size); + + if (!item->_renameTarget.isEmpty() && item->_file != item->_renameTarget) { + // Try to rename the file + const auto originalFilePathAbsolute = propagator()->fullLocalPath(item->_file); + const auto newFilePathAbsolute = propagator()->fullLocalPath(item->_renameTarget); + const auto renameSuccess = QFile::rename(originalFilePathAbsolute, newFilePathAbsolute); + if (!renameSuccess) { + done(item, SyncFileItem::NormalError, "File contains trailing spaces and couldn't be renamed"); + return; + } + qCWarning(lcBulkPropagatorJob()) << item->_file << item->_renameTarget; + fileToUpload._file = item->_file = item->_renameTarget; + fileToUpload._path = propagator()->fullLocalPath(fileToUpload._file); + item->_modtime = FileSystem::getModTime(newFilePathAbsolute); + } + + const auto remotePath = propagator()->fullRemotePath(fileToUpload._file); + + currentHeaders["X-File-MD5"] = transmissionChecksumHeader; + + BulkUploadItem newUploadFile{propagator()->account(), item, fileToUpload, + remotePath, fileToUpload._path, + fileToUpload._size, currentHeaders}; + + qCInfo(lcBulkPropagatorJob) << remotePath << "transmission checksum" << transmissionChecksumHeader << fileToUpload._path; + _filesToUpload.push_back(std::move(newUploadFile)); + _pendingChecksumFiles.remove(item->_file); + + if (_pendingChecksumFiles.empty()) { + triggerUpload(); + } +} + +void BulkPropagatorJob::triggerUpload() +{ + auto uploadParametersData = std::vector{}; + uploadParametersData.reserve(_filesToUpload.size()); + + int timeout = 0; + for(auto &singleFile : _filesToUpload) { + // job takes ownership of device via a QScopedPointer. Job deletes itself when finishing + auto device = std::make_unique( + singleFile._localPath, 0, singleFile._fileSize, &propagator()->_bandwidthManager); + if (!device->open(QIODevice::ReadOnly)) { + qCWarning(lcBulkPropagatorJob) << "Could not prepare upload device: " << device->errorString(); + + // If the file is currently locked, we want to retry the sync + // when it becomes available again. + if (FileSystem::isFileLocked(singleFile._localPath)) { + emit propagator()->seenLockedFile(singleFile._localPath); + } + // Soft error because this is likely caused by the user modifying his files while syncing + abortWithError(singleFile._item, SyncFileItem::SoftError, device->errorString()); + return; + } + singleFile._headers["X-File-Path"] = singleFile._remotePath.toUtf8(); + uploadParametersData.push_back({std::move(device), singleFile._headers}); + timeout += singleFile._fileSize; + } + + const auto bulkUploadUrl = Utility::concatUrlPath(propagator()->account()->url(), QStringLiteral("/remote.php/dav/bulk")); + auto job = std::make_unique(propagator()->account(), bulkUploadUrl, std::move(uploadParametersData), this); + connect(job.get(), &PutMultiFileJob::finishedSignal, this, &BulkPropagatorJob::slotPutFinished); + + for(auto &singleFile : _filesToUpload) { + connect(job.get(), &PutMultiFileJob::uploadProgress, + this, [this, singleFile] (qint64 sent, qint64 total) { + slotUploadProgress(singleFile._item, sent, total); + }); + } + + adjustLastJobTimeout(job.get(), timeout); + _jobs.append(job.get()); + job.release()->start(); +} + +void BulkPropagatorJob::slotComputeTransmissionChecksum(SyncFileItemPtr item, + UploadFileInfo fileToUpload) +{ + // Reuse the content checksum as the transmission checksum if possible + const auto supportedTransmissionChecksums = + propagator()->account()->capabilities().supportedChecksumTypes(); + + // Compute the transmission checksum. + auto computeChecksum = std::make_unique(this); + if (uploadChecksumEnabled()) { + computeChecksum->setChecksumType("MD5" /*propagator()->account()->capabilities().uploadChecksumType()*/); + } else { + computeChecksum->setChecksumType(QByteArray()); + } + + connect(computeChecksum.get(), &ComputeChecksum::done, + this, [this, item, fileToUpload] (const QByteArray &contentChecksumType, const QByteArray &contentChecksum) { + slotStartUpload(item, fileToUpload, contentChecksumType, contentChecksum); + }); + connect(computeChecksum.get(), &ComputeChecksum::done, + computeChecksum.get(), &QObject::deleteLater); + computeChecksum.release()->start(fileToUpload._path); +} + +void BulkPropagatorJob::slotStartUpload(SyncFileItemPtr item, + UploadFileInfo fileToUpload, + const QByteArray &transmissionChecksumType, + const QByteArray &transmissionChecksum) +{ + const auto transmissionChecksumHeader = makeChecksumHeader(transmissionChecksumType, transmissionChecksum); + + item->_checksumHeader = transmissionChecksumHeader; + + const QString fullFilePath = fileToUpload._path; + const QString originalFilePath = propagator()->fullLocalPath(item->_file); + + if (!FileSystem::fileExists(fullFilePath)) { + return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("File Removed (start upload) %1").arg(fullFilePath)); + } + const time_t prevModtime = item->_modtime; // the _item value was set in PropagateUploadFile::start() + // but a potential checksum calculation could have taken some time during which the file could + // have been changed again, so better check again here. + + item->_modtime = FileSystem::getModTime(originalFilePath); + if (prevModtime != item->_modtime) { + propagator()->_anotherSyncNeeded = true; + qDebug() << "trigger another sync after checking modified time of item" << item->_file << "prevModtime" << prevModtime << "Curr" << item->_modtime; + return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("Local file changed during syncing. It will be resumed.")); + } + + fileToUpload._size = FileSystem::getSize(fullFilePath); + item->_size = FileSystem::getSize(originalFilePath); + + // But skip the file if the mtime is too close to 'now'! + // That usually indicates a file that is still being changed + // or not yet fully copied to the destination. + if (fileIsStillChanging(*item)) { + propagator()->_anotherSyncNeeded = true; + return slotOnErrorStartFolderUnlock(item, SyncFileItem::SoftError, tr("Local file changed during sync.")); + } + + doStartUpload(item, fileToUpload, transmissionChecksum); +} + +void BulkPropagatorJob::slotOnErrorStartFolderUnlock(SyncFileItemPtr item, + SyncFileItem::Status status, + const QString &errorString) +{ + qCInfo(lcBulkPropagatorJob()) << status << errorString; + done(item, status, errorString); +} + +void BulkPropagatorJob::slotPutFinishedOneFile(const BulkUploadItem &singleFile, + PutMultiFileJob *job, + const QJsonObject &fullReplyObject) +{ + bool finished = false; + + const auto fileReply = fullReplyObject.value(QChar('/') + singleFile._item->_file).toObject(); + qCInfo(lcBulkPropagatorJob()) << singleFile._item->_file << "file headers" << fileReply; + + if (!fileReply[QStringLiteral("error")].toBool()) { + singleFile._item->_httpErrorCode = static_cast(200); + } else { + singleFile._item->_httpErrorCode = static_cast(412); + } + + singleFile._item->_responseTimeStamp = job->responseTimestamp(); + singleFile._item->_requestId = job->requestId(); + if (singleFile._item->_httpErrorCode != 200) { + commonErrorHandling(singleFile._item); + return; + } + + singleFile._item->_status = SyncFileItem::Success; + + // Check the file again post upload. + // Two cases must be considered separately: If the upload is finished, + // the file is on the server and has a changed ETag. In that case, + // the etag has to be properly updated in the client journal, and because + // of that we can bail out here with an error. But we can reschedule a + // sync ASAP. + // But if the upload is ongoing, because not all chunks were uploaded + // yet, the upload can be stopped and an error can be displayed, because + // the server hasn't registered the new file yet. + const auto etag = getEtagFromJsonReply(fileReply); + finished = etag.length() > 0; + + const auto fullFilePath(propagator()->fullLocalPath(singleFile._item->_file)); + + // Check if the file still exists + if (!checkFileStillExists(singleFile._item, finished, fullFilePath)) { + return; + } + + // Check whether the file changed since discovery. the file check here is the original and not the temporary. + if (!checkFileChanged(singleFile._item, finished, fullFilePath)) { + return; + } + + // the file id should only be empty for new files up- or downloaded + computeFileId(singleFile._item, fileReply); + + singleFile._item->_etag = etag; + + if (getHeaderFromJsonReply(fileReply, "X-OC-MTime") != "accepted") { + // X-OC-MTime is supported since owncloud 5.0. But not when chunking. + // Normally Owncloud 6 always puts X-OC-MTime + qCWarning(lcBulkPropagatorJob) << "Server does not support X-OC-MTime" << getHeaderFromJsonReply(fileReply, "X-OC-MTime"); + // Well, the mtime was not set + } +} + +void BulkPropagatorJob::slotPutFinished() +{ + auto *job = qobject_cast(sender()); + Q_ASSERT(job); + + slotJobDestroyed(job); // remove it from the _jobs list + + const auto replyData = job->reply()->readAll(); + const auto replyJson = QJsonDocument::fromJson(replyData); + const auto fullReplyObject = replyJson.object(); + + for (const auto &oneFile : _filesToUpload) { + slotPutFinishedOneFile(oneFile, job, fullReplyObject); + } + + finalize(); +} + +void BulkPropagatorJob::slotUploadProgress(SyncFileItemPtr item, qint64 sent, qint64 total) +{ + // Completion is signaled with sent=0, total=0; avoid accidentally + // resetting progress due to the sent being zero by ignoring it. + // finishedSignal() is bound to be emitted soon anyway. + // See https://bugreports.qt.io/browse/QTBUG-44782. + if (sent == 0 && total == 0) { + return; + } + propagator()->reportProgress(*item, sent - total); +} + +void BulkPropagatorJob::slotJobDestroyed(QObject *job) +{ + _jobs.erase(std::remove(_jobs.begin(), _jobs.end(), job), _jobs.end()); +} + +void BulkPropagatorJob::adjustLastJobTimeout(AbstractNetworkJob *job, qint64 fileSize) const +{ + constexpr double threeMinutes = 3.0 * 60 * 1000; + + job->setTimeout(qBound( + job->timeoutMsec(), + // Calculate 3 minutes for each gigabyte of data + qRound64(threeMinutes * static_cast(fileSize) / 1e9), + // Maximum of 30 minutes + static_cast(30 * 60 * 1000))); +} + +void BulkPropagatorJob::finalizeOneFile(const BulkUploadItem &oneFile) +{ + // Update the database entry + const auto result = propagator()->updateMetadata(*oneFile._item); + if (!result) { + done(oneFile._item, SyncFileItem::FatalError, tr("Error updating metadata: %1").arg(result.error())); + return; + } else if (*result == Vfs::ConvertToPlaceholderResult::Locked) { + done(oneFile._item, SyncFileItem::SoftError, tr("The file %1 is currently in use").arg(oneFile._item->_file)); + return; + } + + // Files that were new on the remote shouldn't have online-only pin state + // even if their parent folder is online-only. + if (oneFile._item->_instruction == CSYNC_INSTRUCTION_NEW + || oneFile._item->_instruction == CSYNC_INSTRUCTION_TYPE_CHANGE) { + auto &vfs = propagator()->syncOptions()._vfs; + const auto pin = vfs->pinState(oneFile._item->_file); + if (pin && *pin == PinState::OnlineOnly && !vfs->setPinState(oneFile._item->_file, PinState::Unspecified)) { + qCWarning(lcBulkPropagatorJob) << "Could not set pin state of" << oneFile._item->_file << "to unspecified"; + } + } + + // Remove from the progress database: + propagator()->_journal->setUploadInfo(oneFile._item->_file, SyncJournalDb::UploadInfo()); + propagator()->_journal->commit("upload file start"); +} + +void BulkPropagatorJob::finalize() +{ + for(const auto &oneFile : _filesToUpload) { + if (!oneFile._item->hasErrorStatus()) { + finalizeOneFile(oneFile); + } + + done(oneFile._item, oneFile._item->_status, {}); + } + + Q_ASSERT(!_filesToUpload.empty()); + _filesToUpload.clear(); + + if (_items.empty()) { + if (!_jobs.empty()) { + // just wait for the other job to finish. + return; + } + if (!_pendingChecksumFiles.empty()) { + // just wait for the other job to finish. + return; + } + + qCInfo(lcBulkPropagatorJob) << "final status" << _finalStatus; + emit finished(_finalStatus); + propagator()->scheduleNextJob(); + } else { + scheduleSelfOrChild(); + } +} + +void BulkPropagatorJob::done(SyncFileItemPtr item, + SyncFileItem::Status status, + const QString &errorString) +{ + item->_status = status; + item->_errorString = errorString; + + qCInfo(lcBulkPropagatorJob) << "Item completed" << item->destination() << item->_status << item->_instruction << item->_errorString; + + handleFileRestoration(item, errorString); + + if (propagator()->_abortRequested && (item->_status == SyncFileItem::NormalError + || item->_status == SyncFileItem::FatalError)) { + // an abort request is ongoing. Change the status to Soft-Error + item->_status = SyncFileItem::SoftError; + } + + if (item->_status != SyncFileItem::Success) { + // Blacklist handling + handleBulkUploadBlackList(item); + propagator()->_anotherSyncNeeded = true; + } + + handleJobDoneErrors(item, status); + + emit propagator()->itemCompleted(item); +} + +QMap BulkPropagatorJob::headers(SyncFileItemPtr item) const +{ + QMap headers; + headers[QByteArrayLiteral("Content-Type")] = QByteArrayLiteral("application/octet-stream"); + headers[QByteArrayLiteral("X-File-Mtime")] = QByteArray::number(qint64(item->_modtime)); + if (qEnvironmentVariableIntValue("OWNCLOUD_LAZYOPS")) { + headers[QByteArrayLiteral("OC-LazyOps")] = QByteArrayLiteral("true"); + } + + if (item->_file.contains(QLatin1String(".sys.admin#recall#"))) { + // This is a file recall triggered by the admin. Note: the + // recall list file created by the admin and downloaded by the + // client (.sys.admin#recall#) also falls into this category + // (albeit users are not supposed to mess up with it) + + // We use a special tag header so that the server may decide to store this file away in some admin stage area + // And not directly in the user's area (which would trigger redownloads etc). + headers["OC-Tag"] = ".sys.admin#recall#"; + } + + if (!item->_etag.isEmpty() && item->_etag != "empty_etag" + && item->_instruction != CSYNC_INSTRUCTION_NEW // On new files never send a If-Match + && item->_instruction != CSYNC_INSTRUCTION_TYPE_CHANGE) { + // We add quotes because the owncloud server always adds quotes around the etag, and + // csync_owncloud.c's owncloud_file_id always strips the quotes. + headers[QByteArrayLiteral("If-Match")] = '"' + item->_etag + '"'; + } + + // Set up a conflict file header pointing to the original file + auto conflictRecord = propagator()->_journal->conflictRecord(item->_file.toUtf8()); + if (conflictRecord.isValid()) { + headers[QByteArrayLiteral("OC-Conflict")] = "1"; + if (!conflictRecord.initialBasePath.isEmpty()) { + headers[QByteArrayLiteral("OC-ConflictInitialBasePath")] = conflictRecord.initialBasePath; + } + if (!conflictRecord.baseFileId.isEmpty()) { + headers[QByteArrayLiteral("OC-ConflictBaseFileId")] = conflictRecord.baseFileId; + } + if (conflictRecord.baseModtime != -1) { + headers[QByteArrayLiteral("OC-ConflictBaseMtime")] = QByteArray::number(conflictRecord.baseModtime); + } + if (!conflictRecord.baseEtag.isEmpty()) { + headers[QByteArrayLiteral("OC-ConflictBaseEtag")] = conflictRecord.baseEtag; + } + } + + return headers; +} + +void BulkPropagatorJob::abortWithError(SyncFileItemPtr item, + SyncFileItem::Status status, + const QString &error) +{ + abort(AbortType::Synchronous); + done(item, status, error); +} + +void BulkPropagatorJob::checkResettingErrors(SyncFileItemPtr item) const +{ + if (item->_httpErrorCode == 412 + || propagator()->account()->capabilities().httpErrorCodesThatResetFailingChunkedUploads().contains(item->_httpErrorCode)) { + auto uploadInfo = propagator()->_journal->getUploadInfo(item->_file); + uploadInfo._errorCount += 1; + if (uploadInfo._errorCount > 3) { + qCInfo(lcBulkPropagatorJob) << "Reset transfer of" << item->_file + << "due to repeated error" << item->_httpErrorCode; + uploadInfo = SyncJournalDb::UploadInfo(); + } else { + qCInfo(lcBulkPropagatorJob) << "Error count for maybe-reset error" << item->_httpErrorCode + << "on file" << item->_file + << "is" << uploadInfo._errorCount; + } + propagator()->_journal->setUploadInfo(item->_file, uploadInfo); + propagator()->_journal->commit("Upload info"); + } +} + +void BulkPropagatorJob::commonErrorHandling(SyncFileItemPtr item) +{ + // Ensure errors that should eventually reset the chunked upload are tracked. + checkResettingErrors(item); + + abortWithError(item, SyncFileItem::NormalError, tr("Error")); +} + +bool BulkPropagatorJob::checkFileStillExists(SyncFileItemPtr item, + const bool finished, + const QString &fullFilePath) +{ + if (!FileSystem::fileExists(fullFilePath)) { + if (!finished) { + abortWithError(item, SyncFileItem::SoftError, tr("The local file was removed during sync.")); + return false; + } else { + propagator()->_anotherSyncNeeded = true; + } + } + + return true; +} + +bool BulkPropagatorJob::checkFileChanged(SyncFileItemPtr item, + const bool finished, + const QString &fullFilePath) +{ + if (!FileSystem::verifyFileUnchanged(fullFilePath, item->_size, item->_modtime)) { + propagator()->_anotherSyncNeeded = true; + if (!finished) { + abortWithError(item, SyncFileItem::SoftError, tr("Local file changed during sync.")); + // FIXME: the legacy code was retrying for a few seconds. + // and also checking that after the last chunk, and removed the file in case of INSTRUCTION_NEW + return false; + } + } + + return true; +} + +void BulkPropagatorJob::computeFileId(SyncFileItemPtr item, + const QJsonObject &fileReply) const +{ + const auto fid = getHeaderFromJsonReply(fileReply, "OC-FileID"); + if (!fid.isEmpty()) { + if (!item->_fileId.isEmpty() && item->_fileId != fid) { + qCWarning(lcBulkPropagatorJob) << "File ID changed!" << item->_fileId << fid; + } + item->_fileId = fid; + } +} + +void BulkPropagatorJob::handleFileRestoration(SyncFileItemPtr item, + const QString &errorString) const +{ + if (item->_isRestoration) { + if (item->_status == SyncFileItem::Success + || item->_status == SyncFileItem::Conflict) { + item->_status = SyncFileItem::Restoration; + } else { + item->_errorString += tr("; Restoration Failed: %1").arg(errorString); + } + } else { + if (item->_errorString.isEmpty()) { + item->_errorString = errorString; + } + } +} + +void BulkPropagatorJob::handleBulkUploadBlackList(SyncFileItemPtr item) const +{ + propagator()->addToBulkUploadBlackList(item->_file); +} + +void BulkPropagatorJob::handleJobDoneErrors(SyncFileItemPtr item, + SyncFileItem::Status status) +{ + if (item->hasErrorStatus()) { + qCWarning(lcPropagator) << "Could not complete propagation of" << item->destination() << "by" << this << "with status" << item->_status << "and error:" << item->_errorString; + } else { + qCInfo(lcPropagator) << "Completed propagation of" << item->destination() << "by" << this << "with status" << item->_status; + } + + if (item->_status == SyncFileItem::FatalError) { + // Abort all remaining jobs. + propagator()->abort(); + } + + switch (item->_status) + { + case SyncFileItem::BlacklistedError: + case SyncFileItem::Conflict: + case SyncFileItem::FatalError: + case SyncFileItem::FileIgnored: + case SyncFileItem::FileLocked: + case SyncFileItem::FileNameInvalid: + case SyncFileItem::NoStatus: + case SyncFileItem::NormalError: + case SyncFileItem::Restoration: + case SyncFileItem::SoftError: + _finalStatus = SyncFileItem::NormalError; + qCInfo(lcBulkPropagatorJob) << "modify final status NormalError" << _finalStatus << status; + break; + case SyncFileItem::DetailError: + _finalStatus = SyncFileItem::DetailError; + qCInfo(lcBulkPropagatorJob) << "modify final status DetailError" << _finalStatus << status; + break; + case SyncFileItem::Success: + break; + } +} + +} diff --git a/src/libsync/bulkpropagatorjob.h b/src/libsync/bulkpropagatorjob.h new file mode 100644 index 000000000..bee2e29fa --- /dev/null +++ b/src/libsync/bulkpropagatorjob.h @@ -0,0 +1,164 @@ +/* + * Copyright 2021 (c) Matthieu Gallien + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#pragma once + +#include "owncloudpropagator.h" +#include "abstractnetworkjob.h" + +#include +#include +#include +#include +#include + +namespace OCC { + +Q_DECLARE_LOGGING_CATEGORY(lcBulkPropagatorJob) + +class ComputeChecksum; +class PutMultiFileJob; + +class BulkPropagatorJob : public PropagatorJob +{ + Q_OBJECT + + /* This is a minified version of the SyncFileItem, + * that holds only the specifics about the file that's + * being uploaded. + * + * This is needed if we wanna apply changes on the file + * that's being uploaded while keeping the original on disk. + */ + struct UploadFileInfo { + QString _file; /// I'm still unsure if I should use a SyncFilePtr here. + QString _path; /// the full path on disk. + qint64 _size; + }; + + struct BulkUploadItem + { + AccountPtr _account; + SyncFileItemPtr _item; + UploadFileInfo _fileToUpload; + QString _remotePath; + QString _localPath; + qint64 _fileSize; + QMap _headers; + }; + +public: + explicit BulkPropagatorJob(OwncloudPropagator *propagator, + const std::deque &items); + + bool scheduleSelfOrChild() override; + + JobParallelism parallelism() override; + +private slots: + void startUploadFile(SyncFileItemPtr item, UploadFileInfo fileToUpload); + + // Content checksum computed, compute the transmission checksum + void slotComputeTransmissionChecksum(SyncFileItemPtr item, + UploadFileInfo fileToUpload); + + // transmission checksum computed, prepare the upload + void slotStartUpload(SyncFileItemPtr item, + UploadFileInfo fileToUpload, + const QByteArray &transmissionChecksumType, + const QByteArray &transmissionChecksum); + + // invoked on internal error to unlock a folder and faile + void slotOnErrorStartFolderUnlock(SyncFileItemPtr item, + SyncFileItem::Status status, + const QString &errorString); + + void slotPutFinished(); + + void slotUploadProgress(SyncFileItemPtr item, qint64 sent, qint64 total); + + void slotJobDestroyed(QObject *job); + +private: + void doStartUpload(SyncFileItemPtr item, + UploadFileInfo fileToUpload, + QByteArray transmissionChecksumHeader); + + void adjustLastJobTimeout(AbstractNetworkJob *job, + qint64 fileSize) const; + + void finalize(); + + void finalizeOneFile(const BulkUploadItem &oneFile); + + void slotPutFinishedOneFile(const BulkUploadItem &singleFile, + OCC::PutMultiFileJob *job, + const QJsonObject &fullReplyObject); + + void done(SyncFileItemPtr item, + SyncFileItem::Status status, + const QString &errorString); + + /** Bases headers that need to be sent on the PUT, or in the MOVE for chunking-ng */ + QMap headers(SyncFileItemPtr item) const; + + void abortWithError(SyncFileItemPtr item, + SyncFileItem::Status status, + const QString &error); + + /** + * Checks whether the current error is one that should reset the whole + * transfer if it happens too often. If so: Bump UploadInfo::errorCount + * and maybe perform the reset. + */ + void checkResettingErrors(SyncFileItemPtr item) const; + + /** + * Error handling functionality that is shared between jobs. + */ + void commonErrorHandling(SyncFileItemPtr item); + + bool checkFileStillExists(SyncFileItemPtr item, + const bool finished, + const QString &fullFilePath); + + bool checkFileChanged(SyncFileItemPtr item, + const bool finished, + const QString &fullFilePath); + + void computeFileId(SyncFileItemPtr item, + const QJsonObject &fileReply) const; + + void handleFileRestoration(SyncFileItemPtr item, + const QString &errorString) const; + + void handleBulkUploadBlackList(SyncFileItemPtr item) const; + + void handleJobDoneErrors(SyncFileItemPtr item, + SyncFileItem::Status status); + + void triggerUpload(); + + std::deque _items; + + QVector _jobs; /// network jobs that are currently in transit + + QSet _pendingChecksumFiles; + + std::vector _filesToUpload; + + SyncFileItem::Status _finalStatus = SyncFileItem::Status::NoStatus; +}; + +} diff --git a/src/libsync/owncloudpropagator.cpp b/src/libsync/owncloudpropagator.cpp index b36365780..f33f309d6 100644 --- a/src/libsync/owncloudpropagator.cpp +++ b/src/libsync/owncloudpropagator.cpp @@ -21,6 +21,7 @@ #include "propagateremotedelete.h" #include "propagateremotemove.h" #include "propagateremotemkdir.h" +#include "bulkpropagatorjob.h" #include "propagatorjobs.h" #include "filesystem.h" #include "common/utility.h" @@ -173,7 +174,7 @@ static SyncJournalErrorBlacklistRecord createBlacklistEntry( * * May adjust the status or item._errorString. */ -static void blacklistUpdate(SyncJournalDb *journal, SyncFileItem &item) +void blacklistUpdate(SyncJournalDb *journal, SyncFileItem &item) { SyncJournalErrorBlacklistRecord oldEntry = journal->errorBlacklistEntry(item._file); @@ -396,6 +397,8 @@ std::unique_ptr OwncloudPropagator::createUploadJob(S job->setDeleteExisting(deleteExisting); + removeFromBulkUploadBlackList(item->_file); + return job; } @@ -861,7 +864,7 @@ Result OwncloudPropagator::staticUpdat bool OwncloudPropagator::isDelayedUploadItem(const SyncFileItemPtr &item) const { - return account()->capabilities().bulkUpload() && !_scheduleDelayedTasks && !item->_isEncrypted && _syncOptions._minChunkSize > item->_size; + return account()->capabilities().bulkUpload() && !_scheduleDelayedTasks && !item->_isEncrypted && _syncOptions._minChunkSize > item->_size && !isInBulkUploadBlackList(item->_file); } void OwncloudPropagator::setScheduleDelayedTasks(bool active) @@ -874,6 +877,23 @@ void OwncloudPropagator::clearDelayedTasks() _delayedTasks.clear(); } +void OwncloudPropagator::addToBulkUploadBlackList(const QString &file) +{ + qCDebug(lcPropagator) << "black list for bulk upload" << file; + _bulkUploadBlackList.insert(file); +} + +void OwncloudPropagator::removeFromBulkUploadBlackList(const QString &file) +{ + qCDebug(lcPropagator) << "black list for bulk upload" << file; + _bulkUploadBlackList.remove(file); +} + +bool OwncloudPropagator::isInBulkUploadBlackList(const QString &file) const +{ + return _bulkUploadBlackList.contains(file); +} + // ================================================================================ PropagatorJob::PropagatorJob(OwncloudPropagator *propagator) @@ -1304,13 +1324,4 @@ QString OwncloudPropagator::remotePath() const return _remoteFolder; } -BulkPropagatorJob::BulkPropagatorJob(OwncloudPropagator *propagator, const QVector &items) - : PropagatorCompositeJob(propagator) - , _items(items) -{ - for(const auto &oneItemJob : _items) { - appendTask(oneItemJob); - } - _items.clear(); -} } diff --git a/src/libsync/owncloudpropagator.h b/src/libsync/owncloudpropagator.h index 98bbbfeab..28f3994b0 100644 --- a/src/libsync/owncloudpropagator.h +++ b/src/libsync/owncloudpropagator.h @@ -31,6 +31,8 @@ #include "accountfwd.h" #include "syncoptions.h" +#include + namespace OCC { Q_DECLARE_LOGGING_CATEGORY(lcPropagator) @@ -46,6 +48,8 @@ qint64 criticalFreeSpaceLimit(); */ qint64 freeSpaceLimit(); +void blacklistUpdate(SyncJournalDb *journal, SyncFileItem &item); + class SyncJournalDb; class OwncloudPropagator; class PropagatorCompositeJob; @@ -380,19 +384,6 @@ private: bool scheduleDelayedJobs(); }; -class BulkPropagatorJob : public PropagatorCompositeJob -{ - Q_OBJECT -public: - - explicit BulkPropagatorJob(OwncloudPropagator *propagator, - const QVector &items); - -private: - - QVector _items; -}; - /** * @brief Dummy job that just mark it as completed and ignored * @ingroup libsync @@ -431,7 +422,8 @@ public: public: OwncloudPropagator(AccountPtr account, const QString &localDir, - const QString &remoteFolder, SyncJournalDb *progressDb) + const QString &remoteFolder, SyncJournalDb *progressDb, + QSet &bulkUploadBlackList) : _journal(progressDb) , _finishedEmited(false) , _bandwidthManager(this) @@ -440,6 +432,7 @@ public: , _account(account) , _localDir((localDir.endsWith(QChar('/'))) ? localDir : localDir + '/') , _remoteFolder((remoteFolder.endsWith(QChar('/'))) ? remoteFolder : remoteFolder + '/') + , _bulkUploadBlackList(bulkUploadBlackList) { qRegisterMetaType("PropagatorJob::AbortType"); } @@ -611,7 +604,7 @@ public: Q_REQUIRED_RESULT bool isDelayedUploadItem(const SyncFileItemPtr &item) const; - Q_REQUIRED_RESULT const QVector& delayedTasks() const + Q_REQUIRED_RESULT const std::deque& delayedTasks() const { return _delayedTasks; } @@ -620,6 +613,12 @@ public: void clearDelayedTasks(); + void addToBulkUploadBlackList(const QString &file); + + void removeFromBulkUploadBlackList(const QString &file); + + bool isInBulkUploadBlackList(const QString &file) const; + private slots: void abortTimeout() @@ -674,8 +673,12 @@ private: const QString _localDir; // absolute path to the local directory. ends with '/' const QString _remoteFolder; // remote folder, ends with '/' - QVector _delayedTasks; + std::deque _delayedTasks; bool _scheduleDelayedTasks = false; + + QSet &_bulkUploadBlackList; + + static bool _allowDelayedUpload; }; diff --git a/src/libsync/owncloudpropagator_p.h b/src/libsync/owncloudpropagator_p.h index d6ae07a3a..e203c57fe 100644 --- a/src/libsync/owncloudpropagator_p.h +++ b/src/libsync/owncloudpropagator_p.h @@ -18,9 +18,33 @@ #include "owncloudpropagator.h" #include "syncfileitem.h" #include "networkjobs.h" +#include "syncengine.h" #include #include +namespace { + +/** + * We do not want to upload files that are currently being modified. + * To avoid that, we don't upload files that have a modification time + * that is too close to the current time. + * + * This interacts with the msBetweenRequestAndSync delay in the folder + * manager. If that delay between file-change notification and sync + * has passed, we should accept the file for upload here. + */ +inline bool fileIsStillChanging(const OCC::SyncFileItem &item) +{ + const auto modtime = OCC::Utility::qDateTimeFromTime_t(item._modtime); + const qint64 msSinceMod = modtime.msecsTo(QDateTime::currentDateTimeUtc()); + + return std::chrono::milliseconds(msSinceMod) < OCC::SyncEngine::minimumFileAgeForUpload + // if the mtime is too much in the future we *do* upload the file + && msSinceMod > -10000; +} + +} + namespace OCC { inline QByteArray getEtagFromReply(QNetworkReply *reply) diff --git a/src/libsync/propagatedownload.h b/src/libsync/propagatedownload.h index fc4942c4e..ba9fff195 100644 --- a/src/libsync/propagatedownload.h +++ b/src/libsync/propagatedownload.h @@ -96,7 +96,7 @@ public: void giveBandwidthQuota(qint64 q); qint64 currentDownloadPosition(); - QString errorString() const; + QString errorString() const override; void setErrorString(const QString &s) { _errorString = s; } SyncFileItem::Status errorStatus() { return _errorStatus; } diff --git a/src/libsync/propagateupload.cpp b/src/libsync/propagateupload.cpp index cd9e9db1c..0ec9116e4 100644 --- a/src/libsync/propagateupload.cpp +++ b/src/libsync/propagateupload.cpp @@ -49,25 +49,6 @@ Q_LOGGING_CATEGORY(lcPropagateUpload, "nextcloud.sync.propagator.upload", QtInfo Q_LOGGING_CATEGORY(lcPropagateUploadV1, "nextcloud.sync.propagator.upload.v1", QtInfoMsg) Q_LOGGING_CATEGORY(lcPropagateUploadNG, "nextcloud.sync.propagator.upload.ng", QtInfoMsg) -/** - * We do not want to upload files that are currently being modified. - * To avoid that, we don't upload files that have a modification time - * that is too close to the current time. - * - * This interacts with the msBetweenRequestAndSync delay in the folder - * manager. If that delay between file-change notification and sync - * has passed, we should accept the file for upload here. - */ -static bool fileIsStillChanging(const SyncFileItem &item) -{ - const QDateTime modtime = Utility::qDateTimeFromTime_t(item._modtime); - const qint64 msSinceMod = modtime.msecsTo(QDateTime::currentDateTimeUtc()); - - return std::chrono::milliseconds(msSinceMod) < SyncEngine::minimumFileAgeForUpload - // if the mtime is too much in the future we *do* upload the file - && msSinceMod > -10000; -} - PUTFileJob::~PUTFileJob() { // Make sure that we destroy the QNetworkReply before our _device of which it keeps an internal pointer. diff --git a/src/libsync/putmultifilejob.cpp b/src/libsync/putmultifilejob.cpp new file mode 100644 index 000000000..42f397d69 --- /dev/null +++ b/src/libsync/putmultifilejob.cpp @@ -0,0 +1,70 @@ +/* + * Copyright 2021 (c) Matthieu Gallien + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#include "putmultifilejob.h" + +#include + +namespace OCC { + +Q_LOGGING_CATEGORY(lcPutMultiFileJob, "nextcloud.sync.networkjob.put.multi", QtInfoMsg) + +PutMultiFileJob::~PutMultiFileJob() = default; + +void PutMultiFileJob::start() +{ + QNetworkRequest req; + + for(auto &oneDevice : _devices) { + auto onePart = QHttpPart{}; + + onePart.setBodyDevice(oneDevice._device.get()); + + for (QMap::const_iterator it = oneDevice._headers.begin(); it != oneDevice._headers.end(); ++it) { + onePart.setRawHeader(it.key(), it.value()); + } + + req.setPriority(QNetworkRequest::LowPriority); // Long uploads must not block non-propagation jobs. + + _body.append(onePart); + } + + sendRequest("POST", _url, req, &_body); + + if (reply()->error() != QNetworkReply::NoError) { + qCWarning(lcPutMultiFileJob) << " Network error: " << reply()->errorString(); + } + + connect(reply(), &QNetworkReply::uploadProgress, this, &PutMultiFileJob::uploadProgress); + connect(this, &AbstractNetworkJob::networkActivity, account().data(), &Account::propagatorNetworkActivity); + _requestTimer.start(); + AbstractNetworkJob::start(); +} + +bool PutMultiFileJob::finished() +{ + for(const auto &oneDevice : _devices) { + oneDevice._device->close(); + } + + qCInfo(lcPutMultiFileJob) << "POST of" << reply()->request().url().toString() << path() << "FINISHED WITH STATUS" + << replyStatusString() + << reply()->attribute(QNetworkRequest::HttpStatusCodeAttribute) + << reply()->attribute(QNetworkRequest::HttpReasonPhraseAttribute); + + emit finishedSignal(); + return true; +} + +} diff --git a/src/libsync/putmultifilejob.h b/src/libsync/putmultifilejob.h new file mode 100644 index 000000000..a43f2805f --- /dev/null +++ b/src/libsync/putmultifilejob.h @@ -0,0 +1,94 @@ +/* + * Copyright 2021 (c) Matthieu Gallien + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + */ + +#pragma once + +#include "abstractnetworkjob.h" + +#include "propagateupload.h" +#include "account.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +class QIODevice; + +namespace OCC { + +Q_DECLARE_LOGGING_CATEGORY(lcPutMultiFileJob) + +struct SingleUploadFileData +{ + std::unique_ptr _device; + QMap _headers; +}; + +/** + * @brief The PutMultiFileJob class + * @ingroup libsync + */ +class OWNCLOUDSYNC_EXPORT PutMultiFileJob : public AbstractNetworkJob +{ + Q_OBJECT + +public: + explicit PutMultiFileJob(AccountPtr account, const QUrl &url, + std::vector devices, QObject *parent = nullptr) + : AbstractNetworkJob(account, {}, parent) + , _devices(std::move(devices)) + , _url(url) + { + _body.setContentType(QHttpMultiPart::RelatedType); + for(auto &singleDevice : _devices) { + singleDevice._device->setParent(this); + connect(this, &PutMultiFileJob::uploadProgress, + singleDevice._device.get(), &UploadDevice::slotJobUploadProgress); + } + } + + ~PutMultiFileJob() override; + + void start() override; + + bool finished() override; + + QString errorString() const override + { + return _errorString.isEmpty() ? AbstractNetworkJob::errorString() : _errorString; + } + + std::chrono::milliseconds msSinceStart() const + { + return std::chrono::milliseconds(_requestTimer.elapsed()); + } + +signals: + void finishedSignal(); + void uploadProgress(qint64, qint64); + +private: + QHttpMultiPart _body; + std::vector _devices; + QString _errorString; + QUrl _url; + QElapsedTimer _requestTimer; +}; + +} diff --git a/src/libsync/syncengine.cpp b/src/libsync/syncengine.cpp index 25a2826bb..3ce782e3c 100644 --- a/src/libsync/syncengine.cpp +++ b/src/libsync/syncengine.cpp @@ -711,7 +711,7 @@ void SyncEngine::slotDiscoveryFinished() _journal->commit(QStringLiteral("post treewalk")); _propagator = QSharedPointer( - new OwncloudPropagator(_account, _localPath, _remotePath, _journal)); + new OwncloudPropagator(_account, _localPath, _remotePath, _journal, _bulkUploadBlackList)); _propagator->setSyncOptions(_syncOptions); connect(_propagator.data(), &OwncloudPropagator::itemCompleted, this, &SyncEngine::slotItemCompleted); diff --git a/src/libsync/syncengine.h b/src/libsync/syncengine.h index 9d0bada92..87b0cf4df 100644 --- a/src/libsync/syncengine.h +++ b/src/libsync/syncengine.h @@ -241,6 +241,8 @@ private: QScopedPointer _discoveryPhase; QSharedPointer _propagator; + QSet _bulkUploadBlackList; + // List of all files with conflicts QSet _seenConflictFiles; diff --git a/test/syncenginetestutils.cpp b/test/syncenginetestutils.cpp index 8b8f58871..83af262be 100644 --- a/test/syncenginetestutils.cpp +++ b/test/syncenginetestutils.cpp @@ -9,6 +9,10 @@ #include "httplogger.h" #include "accessmanager.h" +#include +#include +#include +#include #include @@ -416,6 +420,109 @@ void FakePutReply::abort() emit finished(); } +FakePutMultiFileReply::FakePutMultiFileReply(FileInfo &remoteRootFileInfo, QNetworkAccessManager::Operation op, const QNetworkRequest &request, const QString &contentType, const QByteArray &putPayload, QObject *parent) + : FakeReply { parent } +{ + setRequest(request); + setUrl(request.url()); + setOperation(op); + open(QIODevice::ReadOnly); + _allFileInfo = performMultiPart(remoteRootFileInfo, request, putPayload, contentType); + QMetaObject::invokeMethod(this, "respond", Qt::QueuedConnection); +} + +QVector FakePutMultiFileReply::performMultiPart(FileInfo &remoteRootFileInfo, const QNetworkRequest &request, const QByteArray &putPayload, const QString &contentType) +{ + QVector result; + + auto stringPutPayload = QString::fromUtf8(putPayload); + constexpr int boundaryPosition = sizeof("multipart/related; boundary="); + const QString boundaryValue = QStringLiteral("--") + contentType.mid(boundaryPosition, contentType.length() - boundaryPosition - 1) + QStringLiteral("\r\n"); + auto stringPutPayloadRef = QString{stringPutPayload}.left(stringPutPayload.size() - 2 - boundaryValue.size()); + auto allParts = stringPutPayloadRef.split(boundaryValue, Qt::SkipEmptyParts); + for (const auto &onePart : allParts) { + auto headerEndPosition = onePart.indexOf(QStringLiteral("\r\n\r\n")); + auto onePartHeaderPart = onePart.left(headerEndPosition); + auto onePartBody = onePart.mid(headerEndPosition + 4, onePart.size() - headerEndPosition - 6); + auto onePartHeaders = onePartHeaderPart.split(QStringLiteral("\r\n")); + QMap allHeaders; + for(auto oneHeader : onePartHeaders) { + auto headerParts = oneHeader.split(QStringLiteral(": ")); + allHeaders[headerParts.at(0)] = headerParts.at(1); + } + auto fileName = allHeaders[QStringLiteral("X-File-Path")]; + Q_ASSERT(!fileName.isEmpty()); + FileInfo *fileInfo = remoteRootFileInfo.find(fileName); + if (fileInfo) { + fileInfo->size = onePartBody.size(); + fileInfo->contentChar = onePartBody.at(0).toLatin1(); + } else { + // Assume that the file is filled with the same character + fileInfo = remoteRootFileInfo.create(fileName, onePartBody.size(), onePartBody.at(0).toLatin1()); + } + fileInfo->lastModified = OCC::Utility::qDateTimeFromTime_t(request.rawHeader("X-OC-Mtime").toLongLong()); + remoteRootFileInfo.find(fileName, /*invalidateEtags=*/true); + result.push_back(fileInfo); + } + return result; +} + +void FakePutMultiFileReply::respond() +{ + QJsonDocument reply; + QJsonObject allFileInfoReply; + + qint64 totalSize = 0; + std::for_each(_allFileInfo.begin(), _allFileInfo.end(), [&totalSize](const auto &fileInfo) { + totalSize += fileInfo->size; + }); + + for(auto fileInfo : qAsConst(_allFileInfo)) { + QJsonObject fileInfoReply; + fileInfoReply.insert("error", QStringLiteral("false")); + fileInfoReply.insert("OC-OperationStatus", fileInfo->operationStatus); + fileInfoReply.insert("X-File-Path", fileInfo->path()); + fileInfoReply.insert("OC-ETag", QLatin1String{fileInfo->etag}); + fileInfoReply.insert("ETag", QLatin1String{fileInfo->etag}); + fileInfoReply.insert("etag", QLatin1String{fileInfo->etag}); + fileInfoReply.insert("OC-FileID", QLatin1String{fileInfo->fileId}); + fileInfoReply.insert("X-OC-MTime", "accepted"); // Prevents Q_ASSERT(!_runningNow) since we'll call PropagateItemJob::done twice in that case. + emit uploadProgress(fileInfo->size, totalSize); + allFileInfoReply.insert(fileInfo->path(), fileInfoReply); + } + reply.setObject(allFileInfoReply); + _payload = reply.toJson(); + + setAttribute(QNetworkRequest::HttpStatusCodeAttribute, 200); + + setFinished(true); + if (bytesAvailable()) { + emit readyRead(); + } + + emit metaDataChanged(); + emit finished(); +} + +void FakePutMultiFileReply::abort() +{ + setError(OperationCanceledError, QStringLiteral("abort")); + emit finished(); +} + +qint64 FakePutMultiFileReply::bytesAvailable() const +{ + return _payload.size() + QIODevice::bytesAvailable(); +} + +qint64 FakePutMultiFileReply::readData(char *data, qint64 maxlen) +{ + qint64 len = std::min(qint64 { _payload.size() }, maxlen); + std::copy(_payload.cbegin(), _payload.cbegin() + len, data); + _payload.remove(0, static_cast(len)); + return len; +} + FakeMkcolReply::FakeMkcolReply(FileInfo &remoteRootFileInfo, QNetworkAccessManager::Operation op, const QNetworkRequest &request, QObject *parent) : FakeReply { parent } { @@ -813,43 +920,77 @@ FakeQNAM::FakeQNAM(FileInfo initialRoot) setCookieJar(new OCC::CookieJar); } +QJsonObject FakeQNAM::forEachReplyPart(QIODevice *outgoingData, + const QString &contentType, + std::function &)> replyFunction) +{ + auto fullReply = QJsonObject{}; + auto putPayload = outgoingData->peek(outgoingData->bytesAvailable()); + outgoingData->reset(); + auto stringPutPayload = QString::fromUtf8(putPayload); + constexpr int boundaryPosition = sizeof("multipart/related; boundary="); + const QString boundaryValue = QStringLiteral("--") + contentType.mid(boundaryPosition, contentType.length() - boundaryPosition - 1) + QStringLiteral("\r\n"); + auto stringPutPayloadRef = QString{stringPutPayload}.left(stringPutPayload.size() - 2 - boundaryValue.size()); + auto allParts = stringPutPayloadRef.split(boundaryValue, Qt::SkipEmptyParts); + for (const auto &onePart : qAsConst(allParts)) { + auto headerEndPosition = onePart.indexOf(QStringLiteral("\r\n\r\n")); + auto onePartHeaderPart = onePart.left(headerEndPosition); + auto onePartHeaders = onePartHeaderPart.split(QStringLiteral("\r\n")); + QMap allHeaders; + for(const auto &oneHeader : qAsConst(onePartHeaders)) { + auto headerParts = oneHeader.split(QStringLiteral(": ")); + allHeaders[headerParts.at(0)] = headerParts.at(1).toLatin1(); + } + + auto reply = replyFunction(allHeaders); + if (reply.contains(QStringLiteral("error")) && + reply.contains(QStringLiteral("etag"))) { + fullReply.insert(allHeaders[QStringLiteral("X-File-Path")], reply); + } + } + + return fullReply; +} + QNetworkReply *FakeQNAM::createRequest(QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData) { QNetworkReply *reply = nullptr; auto newRequest = request; newRequest.setRawHeader("X-Request-ID", OCC::AccessManager::generateRequestId()); + auto contentType = request.header(QNetworkRequest::ContentTypeHeader).toString(); if (_override) { if (auto _reply = _override(op, newRequest, outgoingData)) { reply = _reply; } } if (!reply) { - const QString fileName = getFilePathFromUrl(newRequest.url()); - Q_ASSERT(!fileName.isNull()); - if (_errorPaths.contains(fileName)) { - reply = new FakeErrorReply { op, newRequest, this, _errorPaths[fileName] }; - } + reply = overrideReplyWithError(getFilePathFromUrl(newRequest.url()), op, newRequest); } - if (!reply) { const bool isUpload = newRequest.url().path().startsWith(sUploadUrl.path()); + if (!reply) { + const bool isUpload = newRequest.url().path().startsWith(sUploadUrl.path()); FileInfo &info = isUpload ? _uploadFileInfo : _remoteRootFileInfo; auto verb = newRequest.attribute(QNetworkRequest::CustomVerbAttribute); - if (verb == QLatin1String("PROPFIND")) + if (verb == QLatin1String("PROPFIND")) { // Ignore outgoingData always returning somethign good enough, works for now. reply = new FakePropfindReply { info, op, newRequest, this }; - else if (verb == QLatin1String("GET") || op == QNetworkAccessManager::GetOperation) + } else if (verb == QLatin1String("GET") || op == QNetworkAccessManager::GetOperation) { reply = new FakeGetReply { info, op, newRequest, this }; - else if (verb == QLatin1String("PUT") || op == QNetworkAccessManager::PutOperation) + } else if (verb == QLatin1String("PUT") || op == QNetworkAccessManager::PutOperation) { reply = new FakePutReply { info, op, newRequest, outgoingData->readAll(), this }; - else if (verb == QLatin1String("MKCOL")) + } else if (verb == QLatin1String("MKCOL")) { reply = new FakeMkcolReply { info, op, newRequest, this }; - else if (verb == QLatin1String("DELETE") || op == QNetworkAccessManager::DeleteOperation) + } else if (verb == QLatin1String("DELETE") || op == QNetworkAccessManager::DeleteOperation) { reply = new FakeDeleteReply { info, op, newRequest, this }; - else if (verb == QLatin1String("MOVE") && !isUpload) + } else if (verb == QLatin1String("MOVE") && !isUpload) { reply = new FakeMoveReply { info, op, newRequest, this }; - else if (verb == QLatin1String("MOVE") && isUpload) + } else if (verb == QLatin1String("MOVE") && isUpload) { reply = new FakeChunkMoveReply { info, _remoteRootFileInfo, op, newRequest, this }; - else { + } else if (verb == QLatin1String("POST") || op == QNetworkAccessManager::PostOperation) { + if (contentType.startsWith(QStringLiteral("multipart/related; boundary="))) { + reply = new FakePutMultiFileReply { info, op, newRequest, contentType, outgoingData->readAll(), this }; + } + } else { qDebug() << verb << outgoingData; Q_UNREACHABLE(); } @@ -858,6 +999,18 @@ QNetworkReply *FakeQNAM::createRequest(QNetworkAccessManager::Operation op, cons return reply; } +QNetworkReply * FakeQNAM::overrideReplyWithError(QString fileName, QNetworkAccessManager::Operation op, QNetworkRequest newRequest) +{ + QNetworkReply *reply = nullptr; + + Q_ASSERT(!fileName.isNull()); + if (_errorPaths.contains(fileName)) { + reply = new FakeErrorReply { op, newRequest, this, _errorPaths[fileName] }; + } + + return reply; +} + FakeFolder::FakeFolder(const FileInfo &fileTemplate, const OCC::Optional &localFileInfo, const QString &remotePath) : _localModifier(_tempDir.path()) { @@ -1079,3 +1232,12 @@ FakeReply::FakeReply(QObject *parent) } FakeReply::~FakeReply() = default; + +FakeJsonErrorReply::FakeJsonErrorReply(QNetworkAccessManager::Operation op, + const QNetworkRequest &request, + QObject *parent, + int httpErrorCode, + const QJsonDocument &reply) + : FakeErrorReply{ op, request, parent, httpErrorCode, reply.toJson() } +{ +} diff --git a/test/syncenginetestutils.h b/test/syncenginetestutils.h index 0fdfb22df..c434925cc 100644 --- a/test/syncenginetestutils.h +++ b/test/syncenginetestutils.h @@ -28,6 +28,8 @@ #include #include +class QJsonDocument; + /* * TODO: In theory we should use QVERIFY instead of Q_ASSERT for testing, but this * only works when directly called from a QTest :-( @@ -148,6 +150,7 @@ public: void fixupParentPathRecursively(); QString name; + int operationStatus = 200; bool isDir = true; bool isShared = false; OCC::RemotePermissions permissions; // When uset, defaults to everything @@ -214,6 +217,27 @@ public: qint64 readData(char *, qint64) override { return 0; } }; +class FakePutMultiFileReply : public FakeReply +{ + Q_OBJECT +public: + FakePutMultiFileReply(FileInfo &remoteRootFileInfo, QNetworkAccessManager::Operation op, const QNetworkRequest &request, const QString &contentType, const QByteArray &putPayload, QObject *parent); + + static QVector performMultiPart(FileInfo &remoteRootFileInfo, const QNetworkRequest &request, const QByteArray &putPayload, const QString &contentType); + + Q_INVOKABLE virtual void respond(); + + void abort() override; + + qint64 bytesAvailable() const override; + qint64 readData(char *data, qint64 maxlen) override; + +private: + QVector _allFileInfo; + + QByteArray _payload; +}; + class FakeMkcolReply : public FakeReply { Q_OBJECT @@ -354,6 +378,17 @@ public: QByteArray _body; }; +class FakeJsonErrorReply : public FakeErrorReply +{ + Q_OBJECT +public: + FakeJsonErrorReply(QNetworkAccessManager::Operation op, + const QNetworkRequest &request, + QObject *parent, + int httpErrorCode, + const QJsonDocument &reply = QJsonDocument()); +}; + // A reply that never responds class FakeHangingReply : public FakeReply { @@ -409,6 +444,12 @@ public: void setOverride(const Override &override) { _override = override; } + QJsonObject forEachReplyPart(QIODevice *outgoingData, + const QString &contentType, + std::function &)> replyFunction); + + QNetworkReply *overrideReplyWithError(QString fileName, Operation op, QNetworkRequest newRequest); + protected: QNetworkReply *createRequest(Operation op, const QNetworkRequest &request, QIODevice *outgoingData = nullptr) override; @@ -467,6 +508,11 @@ public: }; ErrorList serverErrorPaths() { return {_fakeQnam}; } void setServerOverride(const FakeQNAM::Override &override) { _fakeQnam->setOverride(override); } + QJsonObject forEachReplyPart(QIODevice *outgoingData, + const QString &contentType, + std::function&)> replyFunction) { + return _fakeQnam->forEachReplyPart(outgoingData, contentType, replyFunction); + } QString localPath() const; diff --git a/test/testsyncengine.cpp b/test/testsyncengine.cpp index e17e7ac31..328e813ee 100644 --- a/test/testsyncengine.cpp +++ b/test/testsyncengine.cpp @@ -41,6 +41,18 @@ bool itemDidCompleteSuccessfullyWithExpectedRank(const ItemCompletedSpy &spy, co return false; } +int itemSuccessfullyCompletedGetRank(const ItemCompletedSpy &spy, const QString &path) +{ + auto itItem = std::find_if(spy.begin(), spy.end(), [&path] (auto currentItem) { + auto item = currentItem[0].template value(); + return item->destination() == path; + }); + if (itItem != spy.end()) { + return itItem - spy.begin(); + } + return -1; +} + class TestSyncEngine : public QObject { Q_OBJECT @@ -106,12 +118,18 @@ private slots: fakeFolder.syncOnce(); QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Y", 0)); QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Z", 1)); - QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Y/d0", 2)); - QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "Z/d0", 3)); - QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "A/a0", 4)); - QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "B/b0", 5)); - QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "r0", 6)); - QVERIFY(itemDidCompleteSuccessfullyWithExpectedRank(completeSpy, "r1", 7)); + QVERIFY(itemDidCompleteSuccessfully(completeSpy, "Y/d0")); + QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "Y/d0") > 1); + QVERIFY(itemDidCompleteSuccessfully(completeSpy, "Z/d0")); + QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "Z/d0") > 1); + QVERIFY(itemDidCompleteSuccessfully(completeSpy, "A/a0")); + QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "A/a0") > 1); + QVERIFY(itemDidCompleteSuccessfully(completeSpy, "B/b0")); + QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "B/b0") > 1); + QVERIFY(itemDidCompleteSuccessfully(completeSpy, "r0")); + QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "r0") > 1); + QVERIFY(itemDidCompleteSuccessfully(completeSpy, "r1")); + QVERIFY(itemSuccessfullyCompletedGetRank(completeSpy, "r1") > 1); QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState()); } @@ -492,7 +510,9 @@ private slots: int remoteQuota = 1000; int n507 = 0, nPUT = 0; QObject parent; - fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *) -> QNetworkReply * { + fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData) -> QNetworkReply * { + Q_UNUSED(outgoingData) + if (op == QNetworkAccessManager::PutOperation) { nPUT++; if (request.rawHeader("OC-Total-Length").toInt() > remoteQuota) { @@ -778,6 +798,95 @@ private slots: QCOMPARE(QFileInfo(fakeFolder.localPath() + "foo").lastModified(), datetime); } + + /** + * Checks whether subsequent large uploads are skipped after a 507 error + */ + void testErrorsWithBulkUpload() + { + FakeFolder fakeFolder{ FileInfo::A12_B12_C12_S12() }; + fakeFolder.syncEngine().account()->setCapabilities({ { "dav", QVariantMap{ {"bulkupload", "1.0"} } } }); + + // Disable parallel uploads + SyncOptions syncOptions; + syncOptions._parallelNetworkJobs = 0; + fakeFolder.syncEngine().setSyncOptions(syncOptions); + + int nPUT = 0; + int nPOST = 0; + fakeFolder.setServerOverride([&](QNetworkAccessManager::Operation op, const QNetworkRequest &request, QIODevice *outgoingData) -> QNetworkReply * { + auto contentType = request.header(QNetworkRequest::ContentTypeHeader).toString(); + if (op == QNetworkAccessManager::PostOperation) { + ++nPOST; + if (contentType.startsWith(QStringLiteral("multipart/related; boundary="))) { + auto jsonReplyObject = fakeFolder.forEachReplyPart(outgoingData, contentType, [] (const QMap &allHeaders) -> QJsonObject { + auto reply = QJsonObject{}; + const auto fileName = allHeaders[QStringLiteral("X-File-Path")]; + if (fileName.endsWith("A/big2") || + fileName.endsWith("A/big3") || + fileName.endsWith("A/big4") || + fileName.endsWith("A/big5") || + fileName.endsWith("A/big7") || + fileName.endsWith("B/big8")) { + reply.insert(QStringLiteral("error"), true); + reply.insert(QStringLiteral("etag"), {}); + return reply; + } else { + reply.insert(QStringLiteral("error"), false); + reply.insert(QStringLiteral("etag"), {}); + } + return reply; + }); + if (jsonReplyObject.size()) { + auto jsonReply = QJsonDocument{}; + jsonReply.setObject(jsonReplyObject); + return new FakeJsonErrorReply{op, request, this, 200, jsonReply}; + } + return nullptr; + } + } else if (op == QNetworkAccessManager::PutOperation) { + ++nPUT; + const auto fileName = getFilePathFromUrl(request.url()); + if (fileName.endsWith("A/big2") || + fileName.endsWith("A/big3") || + fileName.endsWith("A/big4") || + fileName.endsWith("A/big5") || + fileName.endsWith("A/big7") || + fileName.endsWith("B/big8")) { + return new FakeErrorReply(op, request, this, 412); + } + return nullptr; + } + return nullptr; + }); + + fakeFolder.localModifier().insert("A/big", 1); + QVERIFY(fakeFolder.syncOnce()); + QCOMPARE(nPUT, 0); + QCOMPARE(nPOST, 1); + nPUT = 0; + nPOST = 0; + + fakeFolder.localModifier().insert("A/big1", 1); // ok + fakeFolder.localModifier().insert("A/big2", 1); // ko + fakeFolder.localModifier().insert("A/big3", 1); // ko + fakeFolder.localModifier().insert("A/big4", 1); // ko + fakeFolder.localModifier().insert("A/big5", 1); // ko + fakeFolder.localModifier().insert("A/big6", 1); // ok + fakeFolder.localModifier().insert("A/big7", 1); // ko + fakeFolder.localModifier().insert("A/big8", 1); // ok + fakeFolder.localModifier().insert("B/big8", 1); // ko + + QVERIFY(!fakeFolder.syncOnce()); + QCOMPARE(nPUT, 0); + QCOMPARE(nPOST, 1); + nPUT = 0; + nPOST = 0; + + QVERIFY(!fakeFolder.syncOnce()); + QCOMPARE(nPUT, 6); + QCOMPARE(nPOST, 0); + } }; QTEST_GUILESS_MAIN(TestSyncEngine)