From a79b957f3912d253a6cf3c057dcfe38961f169e2 Mon Sep 17 00:00:00 2001 From: Arjen Hiemstra Date: Tue, 8 Oct 2024 14:03:20 +0200 Subject: [PATCH] [PATCH] encodedstream: Deprecate setActive() and replace with an explicit API setActive() currently implies starting/stopping the recording process. However, this is somewhat awkward as everything is rather asynchronous with multiple threads involved, which means calling setActive() may mean things are not actually active or may still be active. To avoid this awkwardness, deprecate setActive() and replace it with an explicit start() and stop() method that are clearly documented to be purely requests, with the real active/inactive state matching the stream state. This also makes the "Rendering" state more explicit, when "Stop" is called we immediately switch to the "Rendering" state to indicate we may still be processing frames but are no longer receiving new frames. Gbp-Pq: Name upstream_0ac4aa41_encodedstream-Deprecate-setActive-and-replace-with-an-explicit-API.patch --- src/pipewirebaseencodedstream.cpp | 111 +++++++++++++++++++----------- src/pipewirebaseencodedstream.h | 40 ++++++++++- src/pipewireproduce.cpp | 2 + src/pipewireproduce_p.h | 2 + src/pipewirerecord.cpp | 1 - tests/HeadlessTest.cpp | 20 ++++-- 6 files changed, 127 insertions(+), 49 deletions(-) diff --git a/src/pipewirebaseencodedstream.cpp b/src/pipewirebaseencodedstream.cpp index 67c5445..27ef198 100644 --- a/src/pipewirebaseencodedstream.cpp +++ b/src/pipewirebaseencodedstream.cpp @@ -17,6 +17,8 @@ extern "C" { } #include +#include + #include "pipewireproduce_p.h" #include "vaapiutils_p.h" @@ -29,6 +31,7 @@ struct PipeWireEncodedStreamPrivate { PipeWireBaseEncodedStream::Encoder m_encoder; std::optional m_quality; PipeWireBaseEncodedStream::EncodingPreference m_encodingPreference; + PipeWireBaseEncodedStream::State m_state = PipeWireBaseEncodedStream::Idle; std::unique_ptr m_produceThread; std::unique_ptr m_produce; @@ -36,13 +39,7 @@ struct PipeWireEncodedStreamPrivate { PipeWireBaseEncodedStream::State PipeWireBaseEncodedStream::state() const { - if (isActive()) { - return Recording; - } else if (d->m_produceThread && d->m_produce->m_deactivated && d->m_produceThread->isRunning()) { - return Rendering; - } - - return Idle; + return d->m_state; } PipeWireBaseEncodedStream::PipeWireBaseEncodedStream(QObject *parent) @@ -65,10 +62,10 @@ PipeWireBaseEncodedStream::PipeWireBaseEncodedStream(QObject *parent) PipeWireBaseEncodedStream::~PipeWireBaseEncodedStream() { - setActive(false); + stop(); - if (d->m_fd) { - close(*d->m_fd); + if (d->m_produceThread) { + d->m_produceThread->wait(); } } @@ -78,7 +75,6 @@ void PipeWireBaseEncodedStream::setNodeId(uint nodeId) return; d->m_nodeId = nodeId; - refresh(); Q_EMIT nodeIdChanged(nodeId); } @@ -91,7 +87,6 @@ void PipeWireBaseEncodedStream::setFd(uint fd) close(*d->m_fd); } d->m_fd = fd; - refresh(); Q_EMIT fdChanged(fd); } @@ -141,50 +136,84 @@ int PipeWireBaseEncodedStream::maxBufferSize() const void PipeWireBaseEncodedStream::setActive(bool active) { - if (d->m_active == active) - return; + if (active) { + start(); + } else { + stop(); - d->m_active = active; - refresh(); - Q_EMIT activeChanged(active); + if (d->m_produceThread) { + d->m_produceThread->wait(); + } + } } -std::optional PipeWireBaseEncodedStream::quality() const +void PipeWireBaseEncodedStream::start() { - return d->m_quality; -} + if (d->m_nodeId == 0) { + qCWarning(PIPEWIRERECORD_LOGGING) << "Cannot start recording on a stream without a node ID"; + return; + } -void PipeWireBaseEncodedStream::setQuality(quint8 quality) -{ - d->m_quality = quality; - if (d->m_produce) { - d->m_produce->setQuality(d->m_quality); + if (d->m_produceThread || d->m_state != Idle) { + return; } + + d->m_produceThread = std::make_unique(); + d->m_produceThread->setObjectName("PipeWireProduce::input"); + d->m_produce = makeProduce(); + d->m_produce->setQuality(d->m_quality); + d->m_produce->setMaxPendingFrames(d->m_maxPendingFrames); + d->m_produce->setEncodingPreference(d->m_encodingPreference); + d->m_produce->moveToThread(d->m_produceThread.get()); + d->m_produceThread->start(); + QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::initialize, Qt::QueuedConnection); + + connect(d->m_produce.get(), &PipeWireProduce::started, this, [this]() { + d->m_active = true; + Q_EMIT activeChanged(true); + d->m_state = Recording; + Q_EMIT stateChanged(); + }); + + connect(d->m_produce.get(), &PipeWireProduce::finished, this, [this]() { + d->m_active = false; + Q_EMIT activeChanged(false); + d->m_state = Idle; + Q_EMIT stateChanged(); + }); + + connect(d->m_produceThread.get(), &QThread::finished, this, [this]() { + d->m_produce.reset(); + d->m_produceThread.reset(); + d->m_nodeId = 0; + + if (d->m_fd) { + close(d->m_fd.value()); + } + }); } -void PipeWireBaseEncodedStream::refresh() +void PipeWireBaseEncodedStream::stop() { if (d->m_produceThread) { QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::deactivate, Qt::QueuedConnection); - d->m_produceThread->wait(); - - d->m_produce.reset(); - d->m_produceThread.reset(); } - if (d->m_active && d->m_nodeId > 0) { - d->m_produceThread = std::make_unique(); - d->m_produceThread->setObjectName("PipeWireProduce::input"); - d->m_produce = makeProduce(); + d->m_state = PipeWireBaseEncodedStream::Rendering; + Q_EMIT stateChanged(); +} + +std::optional PipeWireBaseEncodedStream::quality() const +{ + return d->m_quality; +} + +void PipeWireBaseEncodedStream::setQuality(quint8 quality) +{ + d->m_quality = quality; + if (d->m_produce) { d->m_produce->setQuality(d->m_quality); - d->m_produce->setMaxPendingFrames(d->m_maxPendingFrames); - d->m_produce->setEncodingPreference(d->m_encodingPreference); - d->m_produce->moveToThread(d->m_produceThread.get()); - d->m_produceThread->start(); - QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::initialize, Qt::QueuedConnection); } - - Q_EMIT stateChanged(); } void PipeWireBaseEncodedStream::setEncoder(Encoder encoder) diff --git a/src/pipewirebaseencodedstream.h b/src/pipewirebaseencodedstream.h index 9f4c0fd..8728252 100644 --- a/src/pipewirebaseencodedstream.h +++ b/src/pipewirebaseencodedstream.h @@ -25,7 +25,7 @@ class KPIPEWIRE_EXPORT PipeWireBaseEncodedStream : public QObject * Transfers the ownership of the fd, will close it when it's done with it. */ Q_PROPERTY(uint fd READ fd WRITE setFd NOTIFY fdChanged) - Q_PROPERTY(bool active READ isActive WRITE setActive NOTIFY activeChanged) + Q_PROPERTY(bool active READ isActive NOTIFY activeChanged) Q_PROPERTY(State state READ state NOTIFY stateChanged) Q_PROPERTY(Encoder encoder READ encoder WRITE setEncoder NOTIFY encoderChanged) @@ -67,7 +67,42 @@ public: int maxBufferSize() const; bool isActive() const; - void setActive(bool active); + /** + * Set the active state of recording. + * + * @deprecated Since 6.4, use the separate `start()`/`stop()`calls instead. + * This function now just calls `start()`/`stop()`. + * + * @note When calling `setActive(false)`, unlike `stop()`, this function will + * block until the internal encoding threads are finished. + */ + KPIPEWIRE_DEPRECATED void setActive(bool active); + + /** + * Request to start recording. + * + * This will create everything required to perform recording, like a PipeWire + * stream and an encoder, then start receiving frames from the stream and + * encoding those. + * + * This requires a valid node ID to be set and that the current state is Idle. + * + * Note that recording all happens on separate threads, this method only + * starts the process, only when state() returns Recording is recording + * actually happening. + */ + Q_INVOKABLE void start(); + /** + * Request to stop recording. + * + * This will terminate receiving frames from PipeWire and do any cleanup + * necessary to fully terminate recording after that. + * + * Note that after this request, there may still be some processing required + * due to internal queues. As long as state() does not return Idle processing + * is still happening and teardown has not been completed. + */ + Q_INVOKABLE void stop(); /** * The quality used for encoding. @@ -127,6 +162,5 @@ protected: virtual std::unique_ptr makeProduce() = 0; EncodingPreference encodingPreference(); - void refresh(); QScopedPointer d; }; diff --git a/src/pipewireproduce.cpp b/src/pipewireproduce.cpp index 10878d1..610931d 100644 --- a/src/pipewireproduce.cpp +++ b/src/pipewireproduce.cpp @@ -201,6 +201,7 @@ void PipeWireProduce::deactivate() if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) { QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection); } + Q_EMIT started(); } void PipeWireProduce::destroy() @@ -231,6 +232,7 @@ void PipeWireProduce::destroy() qCDebug(PIPEWIRERECORD_LOGGING) << "finished"; cleanup(); + Q_EMIT finished(); QThread::currentThread()->quit(); } diff --git a/src/pipewireproduce_p.h b/src/pipewireproduce_p.h index b87fd00..5830c48 100644 --- a/src/pipewireproduce_p.h +++ b/src/pipewireproduce_p.h @@ -151,6 +151,8 @@ public: Q_SIGNALS: void producedFrames(); + void started(); + void finished(); private: void initFiltersVaapi(); diff --git a/src/pipewirerecord.cpp b/src/pipewirerecord.cpp index b8cd042..e29051e 100644 --- a/src/pipewirerecord.cpp +++ b/src/pipewirerecord.cpp @@ -70,7 +70,6 @@ void PipeWireRecord::setOutput(const QString &_output) return; d->m_output = output; - refresh(); Q_EMIT outputChanged(output); } diff --git a/tests/HeadlessTest.cpp b/tests/HeadlessTest.cpp index 33db742..6d31f77 100644 --- a/tests/HeadlessTest.cpp +++ b/tests/HeadlessTest.cpp @@ -55,16 +55,29 @@ void createStream(int nodeId, std::optional fd = {}) } encoded->setEncoder(enc); } - encoded->setActive(true); + encoded->start(); QObject::connect(encoded, &PipeWireEncodedStream::newPacket, qGuiApp, [](const PipeWireEncodedStream::Packet &packet) { qDebug() << "packet received" << packet.data().size() << "key:" << packet.isKeyFrame(); }); QObject::connect(encoded, &PipeWireEncodedStream::cursorChanged, qGuiApp, [](const PipeWireCursor &cursor) { qDebug() << "cursor received. position:" << cursor.position << "hotspot:" << cursor.hotspot << "image:" << cursor.texture; }); + QObject::connect(encoded, &PipeWireEncodedStream::stateChanged, qGuiApp, [encoded]() { + switch (encoded->state()) { + case PipeWireEncodedStream::Recording: + qDebug() << "Started recording"; + break; + case PipeWireEncodedStream::Rendering: + qDebug() << "Stopped recording, flushing remaining frames"; + break; + case PipeWireEncodedStream::Idle: + qDebug() << "Recording finished, quitting"; + exit(0); + break; + } + }); QObject::connect(KSignalHandler::self(), &KSignalHandler::signalReceived, encoded, [encoded] { - encoded->setActive(false); - exit(0); + encoded->stop(); }); return; } @@ -96,7 +109,6 @@ void createStream(int nodeId, std::optional fd = {}) }); QObject::connect(KSignalHandler::self(), &KSignalHandler::signalReceived, pwStream, [pwStream] { pwStream->setActive(false); - exit(0); }); } -- 2.30.2