New discovery algorithm: Parallel PROPFIND
authorOlivier Goffart <ogoffart@woboq.com>
Fri, 12 Oct 2018 12:44:33 +0000 (14:44 +0200)
committerKevin Ottens <kevin.ottens@nextcloud.com>
Tue, 15 Dec 2020 09:58:08 +0000 (10:58 +0100)
src/gui/folder.cpp
src/libsync/discovery.cpp
src/libsync/discovery.h
src/libsync/discoveryphase.cpp
src/libsync/discoveryphase.h
src/libsync/owncloudpropagator.cpp
src/libsync/syncoptions.h
test/testsyncengine.cpp

index 1ff7e8af7303411d3468918a58564b5520fb7756..73da3162adb8dbca23d195462f66538fc2833f16 100644 (file)
@@ -750,6 +750,9 @@ void Folder::setSyncOptions()
         opt._maxChunkSize = cfgFile.maxChunkSize();
     }
 
+    int maxParallel = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt();
+    opt._parallelNetworkJobs = maxParallel ? maxParallel : _accountState->account()->isHttp2Supported() ? 20 : 6;
+
     // Previously min/max chunk size values didn't exist, so users might
     // have setups where the chunk size exceeds the new min/max default
     // values. To cope with this, adjust min/max to always include the
index b6f8531565c296039daa1482be8ae2bec6ce741a..0a3367dcd01aa3cd2839e38a5a74b82bf18655e6 100644 (file)
@@ -38,7 +38,11 @@ void ProcessDirectoryJob::start()
         serverJob = new DiscoverySingleDirectoryJob(_discoveryData->_account,
             _discoveryData->_remoteFolder + _currentFolder._server, this);
         connect(serverJob, &DiscoverySingleDirectoryJob::etag, this, &ProcessDirectoryJob::etag);
+        _discoveryData->_currentlyActiveJobs++;
+        _pendingAsyncJobs++;
         connect(serverJob, &DiscoverySingleDirectoryJob::finished, this, [this, serverJob](const auto &results) {
+            _discoveryData->_currentlyActiveJobs--;
+            _pendingAsyncJobs--;
             if (results) {
                 _serverEntries = *results;
                 _hasServerEntries = true;
@@ -252,8 +256,7 @@ void ProcessDirectoryJob::process()
         }
         processFile(std::move(path), localEntry, serverEntry, record);
     }
-
-    progress();
+    QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
 }
 
 bool ProcessDirectoryJob::handleExcluded(const QString &path, bool isDirectory, bool isHidden, bool isSymlink)
@@ -444,7 +447,7 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
                         if (!result) {
                             processFileAnalyzeLocalInfo(item, path, localEntry, serverEntry, dbEntry, _queryServer);
                         }
-                        progress();
+                        QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
                     });
                 return;
             }
@@ -567,12 +570,12 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
                     auto job = new RequestEtagJob(_discoveryData->_account, originalPath, this);
                     connect(job, &RequestEtagJob::finishedWithResult, this, [=](const Result<QString> &etag) mutable {
                         _pendingAsyncJobs--;
+                        QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
                         if (etag.errorCode() != 404 ||
                             // Somehow another item claimed this original path, consider as if it existed
                             _discoveryData->_renamedItems.contains(originalPath)) {
                             // If the file exist or if there is another error, consider it is a new file.
                             postProcessServerNew();
-                            progress();
                             return;
                         }
 
@@ -588,7 +591,6 @@ void ProcessDirectoryJob::processFileAnalyzeRemoteInfo(
 
                         postProcessRename(path);
                         processFileFinalize(item, path, item->isDirectory(), item->_instruction == CSYNC_INSTRUCTION_RENAME ? NormalQuery : ParentDontExist, _queryServer);
-                        progress();
                     });
                     job->start();
                     done = true; // Ideally, if the origin still exist on the server, we should continue searching...  but that'd be difficult
@@ -939,7 +941,7 @@ void ProcessDirectoryJob::processFileAnalyzeLocalInfo(
                         }
                         processFileFinalize(item, path, item->isDirectory(), NormalQuery, recurseQueryServer);
                         _pendingAsyncJobs--;
-                        progress();
+                        QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
                     });
                     job->start();
                     return;
@@ -1173,23 +1175,13 @@ void ProcessDirectoryJob::subJobFinished()
     int count = _runningJobs.removeAll(job);
     ASSERT(count == 1);
     job->deleteLater();
-    progress();
+    QTimer::singleShot(0, _discoveryData, &DiscoveryPhase::scheduleMoreJobs);
 }
 
-void ProcessDirectoryJob::progress()
+int ProcessDirectoryJob::progress(int nbJobs)
 {
-    int maxRunning = 3; // FIXME
-    if (_pendingAsyncJobs + _runningJobs.size() > maxRunning)
-        return;
-
-    if (!_queuedJobs.empty()) {
-        auto f = _queuedJobs.front();
-        _queuedJobs.pop_front();
-        _runningJobs.push_back(f);
-        f->start();
-        return;
-    }
-    if (_runningJobs.empty() && _pendingAsyncJobs == 0) {
+    if (_queuedJobs.empty() && _runningJobs.empty() && _pendingAsyncJobs == 0) {
+        _pendingAsyncJobs = -1; // We're finished, we don't want to emit finished again
         if (_dirItem) {
             if (_childModified && _dirItem->_instruction == CSYNC_INSTRUCTION_REMOVE) {
                 // re-create directory that has modified contents
@@ -1209,6 +1201,22 @@ void ProcessDirectoryJob::progress()
         }
         emit finished();
     }
+
+    int started = 0;
+    foreach (auto *rj, _runningJobs) {
+        started += rj->progress(nbJobs - started);
+        if (started >= nbJobs)
+            return started;
+    }
+
+    while (started < nbJobs && !_queuedJobs.empty()) {
+        auto f = _queuedJobs.front();
+        _queuedJobs.pop_front();
+        _runningJobs.push_back(f);
+        f->start();
+        started++;
+    }
+    return started;
 }
 
 void ProcessDirectoryJob::dbError()
index 783280f774707ac04b02640f455d1220470e0810..c914d5c876ccca9d9ca0b970b28db2054c87f922 100644 (file)
@@ -46,6 +46,8 @@ public:
     {
     }
     void start();
+    /** Start up to nbJobs, return the number of job started  */
+    int progress(int nbJobs);
 
     SyncFileItemPtr _dirItem;
 
@@ -83,7 +85,6 @@ private:
     bool checkPermissions(const SyncFileItemPtr &item);
     void processBlacklisted(const PathTuple &, const LocalInfo &, const SyncJournalFileRecord &dbEntry);
     void subJobFinished();
-    void progress();
 
     /** An DB operation failed */
     void dbError();
index 65491f60035b8660b3533bba4687aedd81ee2f33..db9a7c8d44ec8e162b5dc611d36de4fb94cdbec0 100644 (file)
@@ -148,6 +148,7 @@ QString DiscoveryPhase::adjustRenamedPath(const QString &original) const
 void DiscoveryPhase::startJob(ProcessDirectoryJob *job)
 {
     connect(job, &ProcessDirectoryJob::finished, this, [this, job] {
+        _currentRootJob = nullptr;
         if (job->_dirItem)
             emit itemDiscovered(job->_dirItem);
         job->deleteLater();
@@ -158,9 +159,18 @@ void DiscoveryPhase::startJob(ProcessDirectoryJob *job)
             emit finished();
         }
     });
+    _currentRootJob = job;
     job->start();
 }
 
+void DiscoveryPhase::scheduleMoreJobs()
+{
+    auto limit = qMax(1, _syncOptions._parallelNetworkJobs);
+    if (_currentRootJob && _currentlyActiveJobs < limit) {
+        _currentRootJob->progress(limit - _currentlyActiveJobs);
+    }
+}
+
 DiscoverySingleDirectoryJob::DiscoverySingleDirectoryJob(const AccountPtr &account, const QString &path, QObject *parent)
     : QObject(parent)
     , _subPath(path)
index c447a595bd6956428cc0c4dd0bc12ea5ac236ba3..486d8bce7168692db405632ffa5741be6e2270f9 100644 (file)
@@ -122,6 +122,9 @@ public:
 class DiscoveryPhase : public QObject
 {
     Q_OBJECT
+
+    ProcessDirectoryJob *_currentRootJob = nullptr;
+
 public:
     QString _localDir; // absolute path to the local directory. ends with '/'
     QString _remoteFolder; // remote folder, ends with '/'
@@ -132,6 +135,7 @@ public:
     QStringList _selectiveSyncWhiteList;
     ExcludedFiles *_excludes;
     QString _invalidFilenamePattern; // FIXME: maybe move in ExcludedFiles
+    int _currentlyActiveJobs = 0;
     bool _ignoreHiddenFiles = false;
     std::function<bool(const QString &)> _shouldDiscoverLocaly;
 
@@ -151,6 +155,7 @@ public:
 
     QByteArray _dataFingerprint;
 
+    void scheduleMoreJobs();
 signals:
     void fatalError(const QString &errorString);
     void itemDiscovered(const SyncFileItemPtr &item);
index e1b83ac33801746051b14698ee80086da0cf63ef..bb23e28432087001084d9245b9ea464a529a26e5 100644 (file)
@@ -84,7 +84,7 @@ int OwncloudPropagator::maximumActiveTransferJob()
         // disable parallelism when there is a network limit.
         return 1;
     }
-    return qMin(3, qCeil(hardMaximumActiveJob() / 2.));
+    return qMin(3, qCeil(_syncOptions._parallelNetworkJobs / 2.));
 }
 
 /* The maximum number of active jobs in parallel  */
@@ -92,12 +92,7 @@ int OwncloudPropagator::hardMaximumActiveJob()
 {
     if (!_syncOptions._parallelNetworkJobs)
         return 1;
-    static int max = qgetenv("OWNCLOUD_MAX_PARALLEL").toUInt();
-    if (max)
-        return max;
-    if (_account->isHttp2Supported())
-        return 20;
-    return 6; // (Qt cannot do more anyway)
+    return _syncOptions._parallelNetworkJobs;
 }
 
 PropagateItemJob::~PropagateItemJob()
index 43ef3bf189614be56f86bf91d6ea02bcf0d5eac8..3d74b8e51b4de56242b84ef5cd97e55e1ce490f0 100644 (file)
@@ -61,8 +61,8 @@ struct SyncOptions
      */
     std::chrono::milliseconds _targetChunkUploadDuration = std::chrono::minutes(1);
 
-    /** Whether parallel network jobs are allowed. */
-    bool _parallelNetworkJobs = true;
+    /** The maximum number of active jobs in parallel  */
+    int _parallelNetworkJobs = 6;
 };
 
 
index 7071eef87b0114a85ea30c15f7aa53d41162d283..64b49fb5c5536325ffee9e8f9338694ecaaea44f 100644 (file)
@@ -433,7 +433,7 @@ private slots:
 
         // Disable parallel uploads
         SyncOptions syncOptions;
-        syncOptions._parallelNetworkJobs = false;
+        syncOptions._parallelNetworkJobs = 0;
         fakeFolder.syncEngine().setSyncOptions(syncOptions);
 
         // Produce an error based on upload size