if (WIN32)
set(libsync_SRCS ${libsync_SRCS}
vfs/cfapi/cfapiwrapper.cpp
+ vfs/cfapi/hydrationjob.cpp
vfs/cfapi/vfs_cfapi.cpp
)
add_definitions(-D_WIN32_WINNT=_WIN32_WINNT_WIN10)
#include "cfapiwrapper.h"
#include "common/utility.h"
+#include "vfs_cfapi.h"
#include <QDir>
#include <QFileInfo>
+#include <QLocalSocket>
#include <QLoggingCategory>
#include <cfapi.h>
#include <comdef.h>
+#include <ntstatus.h>
Q_LOGGING_CATEGORY(lcCfApiWrapper, "nextcloud.sync.vfs.cfapi.wrapper", QtInfoMsg)
+#define FIELD_SIZE( type, field ) ( sizeof( ( (type*)0 )->field ) )
+#define CF_SIZE_OF_OP_PARAM( field ) \
+ ( FIELD_OFFSET( CF_OPERATION_PARAMETERS, field ) + \
+ FIELD_SIZE( CF_OPERATION_PARAMETERS, field ) )
+
namespace {
-void CALLBACK cfApiFetchDataCallback(_In_ CONST CF_CALLBACK_INFO* callbackInfo, _In_ CONST CF_CALLBACK_PARAMETERS* callbackParameters)
+void cfApiSendTransferInfo(const CF_CONNECTION_KEY &connectionKey, const CF_TRANSFER_KEY &transferKey, NTSTATUS status, void *buffer, qint64 offset, qint64 length)
+{
+
+ CF_OPERATION_INFO opInfo = { 0 };
+ CF_OPERATION_PARAMETERS opParams = { 0 };
+
+ opInfo.StructSize = sizeof(opInfo);
+ opInfo.Type = CF_OPERATION_TYPE_TRANSFER_DATA;
+ opInfo.ConnectionKey = connectionKey;
+ opInfo.TransferKey = transferKey;
+ opParams.ParamSize = CF_SIZE_OF_OP_PARAM(TransferData);
+ opParams.TransferData.CompletionStatus = status;
+ opParams.TransferData.Buffer = buffer;
+ opParams.TransferData.Offset.QuadPart = offset;
+ opParams.TransferData.Length.QuadPart = length;
+
+ const qint64 result = CfExecute(&opInfo, &opParams);
+ Q_ASSERT(result == S_OK);
+ if (result != S_OK) {
+ qCCritical(lcCfApiWrapper) << "Couldn't send transfer info" << QString::number(transferKey.QuadPart, 16) << ":" << _com_error(result).ErrorMessage();
+ }
+}
+
+
+void CALLBACK cfApiFetchDataCallback(const CF_CALLBACK_INFO *callbackInfo, const CF_CALLBACK_PARAMETERS *callbackParameters)
{
- qCCritical(lcCfApiWrapper()) << "Got in!";
- Q_ASSERT(false);
+ const auto sendTransferError = [=] {
+ cfApiSendTransferInfo(callbackInfo->ConnectionKey,
+ callbackInfo->TransferKey,
+ STATUS_UNSUCCESSFUL,
+ nullptr,
+ callbackParameters->FetchData.RequiredFileOffset.QuadPart,
+ callbackParameters->FetchData.RequiredLength.QuadPart);
+ };
+
+ const auto sendTransferInfo = [=](QByteArray &data, qint64 offset) {
+ cfApiSendTransferInfo(callbackInfo->ConnectionKey,
+ callbackInfo->TransferKey,
+ STATUS_SUCCESS,
+ data.data(),
+ offset,
+ data.length());
+ };
+
+ auto vfs = reinterpret_cast<OCC::VfsCfApi *>(callbackInfo->CallbackContext);
+ Q_ASSERT(vfs->metaObject()->className() == QByteArrayLiteral("OCC::VfsCfApi"));
+ const auto path = QString(QString::fromWCharArray(callbackInfo->VolumeDosName) + QString::fromWCharArray(callbackInfo->NormalizedPath));
+ const auto requestId = QString::number(callbackInfo->TransferKey.QuadPart, 16);
+
+ const auto invokeResult = QMetaObject::invokeMethod(vfs, [=] { vfs->requestHydration(requestId, path); }, Qt::QueuedConnection);
+ if (!invokeResult) {
+ qCCritical(lcCfApiWrapper) << "Failed to trigger hydration for" << path << requestId;
+ sendTransferError();
+ return;
+ }
+
+ // Block and wait for vfs to signal back the hydration is ready
+ bool hydrationRequestResult = false;
+ QEventLoop loop;
+ QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestReady, &loop, [&](const QString &id) {
+ if (requestId == id) {
+ hydrationRequestResult = true;
+ loop.quit();
+ }
+ });
+ QObject::connect(vfs, &OCC::VfsCfApi::hydrationRequestFailed, &loop, [&](const QString &id) {
+ if (requestId == id) {
+ hydrationRequestResult = false;
+ loop.quit();
+ }
+ });
+ loop.exec();
+ qCInfo(lcCfApiWrapper) << "VFS replied for hydration of" << path << requestId << "status was:" << hydrationRequestResult;
+
+ if (!hydrationRequestResult) {
+ sendTransferError();
+ return;
+ }
+
+ QLocalSocket socket;
+ socket.connectToServer(requestId);
+ const auto connectResult = socket.waitForConnected();
+ if (!connectResult) {
+ qCWarning(lcCfApiWrapper) << "Couldn't connect the socket" << requestId << socket.error() << socket.errorString();
+ sendTransferError();
+ return;
+ }
+
+ qint64 offset = 0;
+ while (socket.waitForReadyRead()) {
+ auto data = socket.readAll();
+ if (data.isEmpty()) {
+ qCWarning(lcCfApiWrapper) << "Unexpected empty data received" << requestId;
+ sendTransferError();
+ break;
+ }
+ sendTransferInfo(data, offset);
+ offset += data.length();
+ }
+
+ qCInfo(lcCfApiWrapper) << "Hydration done for" << path << requestId;
}
CF_CALLBACK_REGISTRATION cfApiCallbacks[] = {
--- /dev/null
+/*
+ * Copyright (C) by Kevin Ottens <kevin.ottens@nextcloud.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+
+#include "hydrationjob.h"
+
+#include "common/syncjournaldb.h"
+#include "propagatedownload.h"
+
+#include <QLocalServer>
+#include <QLocalSocket>
+
+Q_LOGGING_CATEGORY(lcHydration, "nextcloud.sync.vfs.hydrationjob", QtInfoMsg)
+
+OCC::HydrationJob::HydrationJob(QObject *parent)
+ : QObject(parent)
+{
+ connect(this, &HydrationJob::finished, this, &HydrationJob::deleteLater);
+}
+
+OCC::AccountPtr OCC::HydrationJob::account() const
+{
+ return _account;
+}
+
+void OCC::HydrationJob::setAccount(const AccountPtr &account)
+{
+ _account = account;
+}
+
+QString OCC::HydrationJob::remotePath() const
+{
+ return _remotePath;
+}
+
+void OCC::HydrationJob::setRemotePath(const QString &remotePath)
+{
+ _remotePath = remotePath;
+}
+
+QString OCC::HydrationJob::localPath() const
+{
+ return _localPath;
+}
+
+void OCC::HydrationJob::setLocalPath(const QString &localPath)
+{
+ _localPath = localPath;
+}
+
+OCC::SyncJournalDb *OCC::HydrationJob::journal() const
+{
+ return _journal;
+}
+
+void OCC::HydrationJob::setJournal(SyncJournalDb *journal)
+{
+ _journal = journal;
+}
+
+QString OCC::HydrationJob::requestId() const
+{
+ return _requestId;
+}
+
+void OCC::HydrationJob::setRequestId(const QString &requestId)
+{
+ _requestId = requestId;
+}
+
+QString OCC::HydrationJob::folderPath() const
+{
+ return _folderPath;
+}
+
+void OCC::HydrationJob::setFolderPath(const QString &folderPath)
+{
+ _folderPath = folderPath;
+}
+
+void OCC::HydrationJob::start()
+{
+ Q_ASSERT(_account);
+ Q_ASSERT(_journal);
+ Q_ASSERT(!_remotePath.isEmpty() && !_localPath.isEmpty());
+ Q_ASSERT(!_requestId.isEmpty() && !_folderPath.isEmpty());
+
+ Q_ASSERT(_remotePath.endsWith('/'));
+ Q_ASSERT(_localPath.endsWith('/'));
+ Q_ASSERT(!_folderPath.startsWith('/'));
+
+ _server = new QLocalServer(this);
+ const auto listenResult = _server->listen(_requestId);
+ if (!listenResult) {
+ qCCritical(lcHydration) << "Couldn't get server to listen" << _requestId << _localPath << _folderPath;
+ emitFinished();
+ return;
+ }
+
+ qCInfo(lcHydration) << "Server ready, waiting for connections" << _requestId << _localPath << _folderPath;
+ connect(_server, &QLocalServer::newConnection, this, &HydrationJob::onNewConnection);
+}
+
+void OCC::HydrationJob::emitFinished()
+{
+ _socket->disconnectFromServer();
+ connect(_socket, &QLocalSocket::disconnected, this, [=]{
+ _socket->close();
+ emit finished(this);
+ });
+}
+
+void OCC::HydrationJob::onNewConnection()
+{
+ Q_ASSERT(!_socket);
+ Q_ASSERT(!_job);
+
+ qCInfo(lcHydration) << "Got new connection starting GETFileJob" << _requestId << _folderPath;
+ _socket = _server->nextPendingConnection();
+ _job = new GETFileJob(_account, _remotePath + _folderPath, _socket, {}, {}, 0, this);
+ connect(_job, &GETFileJob::finishedSignal, this, &HydrationJob::onGetFinished);
+ _job->start();
+}
+
+void OCC::HydrationJob::onGetFinished()
+{
+ qCInfo(lcHydration) << "GETFileJob finished" << _requestId << _folderPath << _job->errorStatus() << _job->errorString();
+
+ if (_job->errorStatus() != SyncFileItem::NoStatus && _job->errorStatus() != SyncFileItem::Success) {
+ emitFinished();
+ return;
+ }
+
+ SyncJournalFileRecord record;
+ _journal->getFileRecord(_folderPath, &record);
+ Q_ASSERT(record.isValid());
+ if (!record.isValid()) {
+ qCWarning(lcHydration) << "Couldn't find record to update after hydration" << _requestId << _folderPath;
+ emitFinished();
+ return;
+ }
+
+ record._type = ItemTypeFile;
+ _journal->setFileRecord(record);
+ emitFinished();
+}
--- /dev/null
+/*
+ * Copyright (C) by Kevin Ottens <kevin.ottens@nextcloud.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
+ */
+#pragma once
+
+#include <QObject>
+
+#include "account.h"
+
+class QLocalServer;
+class QLocalSocket;
+
+namespace OCC {
+class GETFileJob;
+class SyncJournalDb;
+
+class OWNCLOUDSYNC_EXPORT HydrationJob : public QObject
+{
+ Q_OBJECT
+public:
+ explicit HydrationJob(QObject *parent = nullptr);
+
+ AccountPtr account() const;
+ void setAccount(const AccountPtr &account);
+
+ QString remotePath() const;
+ void setRemotePath(const QString &remotePath);
+
+ QString localPath() const;
+ void setLocalPath(const QString &localPath);
+
+ SyncJournalDb *journal() const;
+ void setJournal(SyncJournalDb *journal);
+
+ QString requestId() const;
+ void setRequestId(const QString &requestId);
+
+ QString folderPath() const;
+ void setFolderPath(const QString &folderPath);
+
+ void start();
+
+signals:
+ void finished(HydrationJob *job);
+
+private:
+ void emitFinished();
+
+ void onNewConnection();
+ void onGetFinished();
+
+ AccountPtr _account;
+ QString _remotePath;
+ QString _localPath;
+ SyncJournalDb *_journal = nullptr;
+
+ QString _requestId;
+ QString _folderPath;
+
+ QLocalServer *_server = nullptr;
+ QLocalSocket *_socket = nullptr;
+ GETFileJob *_job = nullptr;
+};
+
+} // namespace OCC
#include <QFile>
#include "cfapiwrapper.h"
+#include "hydrationjob.h"
#include "syncfileitem.h"
#include "filesystem.h"
#include "common/syncjournaldb.h"
class VfsCfApiPrivate
{
public:
+ QList<HydrationJob *> hydrationJobs;
cfapi::ConnectionKey connectionKey;
};
bool VfsCfApi::isHydrating() const
{
- return false;
+ return !d->hydrationJobs.isEmpty();
}
Result<void, QString> VfsCfApi::updateMetadata(const QString &filePath, time_t modtime, qint64 size, const QByteArray &fileId)
return AvailabilityError::NoSuchItem;
}
+void VfsCfApi::requestHydration(const QString &requestId, const QString &path)
+{
+ qCInfo(lcCfApi) << "Received request to hydrate" << path << requestId;
+ const auto root = QDir::toNativeSeparators(params().filesystemPath);
+ Q_ASSERT(path.startsWith(root));
+
+ const auto relativePath = QDir::fromNativeSeparators(path.mid(root.length()));
+ const auto journal = params().journal;
+
+ // Set in the database that we should download the file
+ SyncJournalFileRecord record;
+ journal->getFileRecord(relativePath, &record);
+ if (!record.isValid()) {
+ qCInfo(lcCfApi) << "Couldn't hydrate, did not find file in db";
+ emit hydrationRequestFailed(requestId);
+ return;
+ }
+
+ if (!record.isVirtualFile()) {
+ qCInfo(lcCfApi) << "Couldn't hydrate, the file is not virtual";
+ emit hydrationRequestFailed(requestId);
+ return;
+ }
+
+ // This is impossible to handle with CfAPI since the file size is generally different
+ // between the encrypted and the decrypted file which would make CfAPI reject the hydration
+ // of the placeholder with decrypted data
+ if (record._isE2eEncrypted || !record._e2eMangledName.isEmpty()) {
+ qCInfo(lcCfApi) << "Couldn't hydrate, the file is E2EE this is not supported";
+ emit hydrationRequestFailed(requestId);
+ return;
+ }
+
+ // All good, let's hydrate now
+ scheduleHydrationJob(requestId, relativePath);
+}
+
void VfsCfApi::fileStatusChanged(const QString &systemFileName, SyncFileStatus fileStatus)
{
Q_UNUSED(systemFileName);
Q_UNUSED(fileStatus);
}
+void VfsCfApi::scheduleHydrationJob(const QString &requestId, const QString &folderPath)
+{
+ Q_ASSERT(std::none_of(std::cbegin(d->hydrationJobs), std::cend(d->hydrationJobs), [=](HydrationJob *job) {
+ return job->requestId() == requestId || job->folderPath() == folderPath;
+ }));
+
+ if (d->hydrationJobs.isEmpty()) {
+ emit beginHydrating();
+ }
+
+ auto job = new HydrationJob(this);
+ job->setAccount(params().account);
+ job->setRemotePath(params().remotePath);
+ job->setLocalPath(params().filesystemPath);
+ job->setJournal(params().journal);
+ job->setRequestId(requestId);
+ job->setFolderPath(folderPath);
+ connect(job, &HydrationJob::finished, this, &VfsCfApi::onHydrationJobFinished);
+ d->hydrationJobs << job;
+ job->start();
+ emit hydrationRequestReady(requestId);
+}
+
+void VfsCfApi::onHydrationJobFinished(HydrationJob *job)
+{
+ Q_ASSERT(d->hydrationJobs.contains(job));
+ d->hydrationJobs.removeAll(job);
+ if (d->hydrationJobs.isEmpty()) {
+ emit doneHydrating();
+ }
+}
+
VfsCfApi::HydratationAndPinStates VfsCfApi::computeRecursiveHydrationAndPinStates(const QString &folderPath, const Optional<PinState> &basePinState)
{
Q_ASSERT(!folderPath.endsWith('/'));
#include "common/vfs.h"
namespace OCC {
+class HydrationJob;
class VfsCfApiPrivate;
class VfsCfApi : public Vfs
AvailabilityResult availability(const QString &folderPath) override;
public slots:
+ void requestHydration(const QString &requestId, const QString &path);
void fileStatusChanged(const QString &systemFileName, SyncFileStatus fileStatus) override;
+signals:
+ void hydrationRequestReady(const QString &requestId);
+ void hydrationRequestFailed(const QString &requestId);
+
protected:
void startImpl(const VfsSetupParams ¶ms) override;
private:
+ void scheduleHydrationJob(const QString &requestId, const QString &folderPath);
+ void onHydrationJobFinished(HydrationJob *job);
+
struct HasHydratedDehydrated {
bool hasHydrated = false;
bool hasDehydrated = false;
CFVERIFY_NONVIRTUAL(fakeFolder, "online/file1");
CFVERIFY_VIRTUAL(fakeFolder, "local/file1");
}
+
+ void testOpeningOnlineFileTriggersDownload()
+ {
+ FakeFolder fakeFolder{ FileInfo() };
+ setupVfs(fakeFolder);
+ QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
+
+ fakeFolder.remoteModifier().mkdir("online");
+ fakeFolder.remoteModifier().mkdir("online/sub");
+ QVERIFY(fakeFolder.syncOnce());
+ QCOMPARE(fakeFolder.currentLocalState(), fakeFolder.currentRemoteState());
+
+ setPinState(fakeFolder.localPath() + "online", PinState::OnlineOnly, cfapi::Recurse);
+
+ fakeFolder.remoteModifier().insert("online/sub/file1", 10 * 1024 * 1024);
+ QVERIFY(fakeFolder.syncOnce());
+
+ CFVERIFY_VIRTUAL(fakeFolder, "online/sub/file1");
+
+ // Simulate another process requesting the open
+ QEventLoop loop;
+ bool openResult = false;
+ bool readResult = false;
+ std::thread t([&] {
+ QFile file(fakeFolder.localPath() + "online/sub/file1");
+ openResult = file.open(QFile::ReadOnly);
+ readResult = !file.readAll().isEmpty();
+ file.close();
+ QMetaObject::invokeMethod(&loop, &QEventLoop::quit, Qt::QueuedConnection);
+ });
+ loop.exec();
+ t.join();
+
+ CFVERIFY_NONVIRTUAL(fakeFolder, "online/sub/file1");
+
+ // Nothing should change
+ ItemCompletedSpy completeSpy(fakeFolder);
+ QVERIFY(fakeFolder.syncOnce());
+ QVERIFY(completeSpy.isEmpty());
+
+ CFVERIFY_NONVIRTUAL(fakeFolder, "online/sub/file1");
+ }
};
QTEST_GUILESS_MAIN(TestSyncCfApi)