[PATCH] encodedstream: Deprecate setActive() and replace with an explicit API
authorArjen Hiemstra <ahiemstra@heimr.nl>
Tue, 8 Oct 2024 12:03:20 +0000 (14:03 +0200)
committerAurélien COUDERC <coucouf@debian.org>
Sun, 18 May 2025 22:58:37 +0000 (00:58 +0200)
setActive() currently implies starting/stopping the recording process.
However, this is somewhat awkward as everything is rather asynchronous
with multiple threads involved, which means calling setActive() may mean
things are not actually active or may still be active.

To avoid this awkwardness, deprecate setActive() and replace it with an
explicit start() and stop() method that are clearly documented to be
purely requests, with the real active/inactive state matching the stream
state.

This also makes the "Rendering" state more explicit, when "Stop" is
called we immediately switch to the "Rendering" state to indicate we may
still be processing frames but are no longer receiving new frames.

Gbp-Pq: Name upstream_0ac4aa41_encodedstream-Deprecate-setActive-and-replace-with-an-explicit-API.patch

src/pipewirebaseencodedstream.cpp
src/pipewirebaseencodedstream.h
src/pipewireproduce.cpp
src/pipewireproduce_p.h
src/pipewirerecord.cpp
tests/HeadlessTest.cpp

index 67c5445ef52666c332de30cb89932077aeb9177f..27ef1989a004bb42b1bb4021c3bb37d81735621e 100644 (file)
@@ -17,6 +17,8 @@ extern "C" {
 }
 #include <unistd.h>
 
+#include <QThread>
+
 #include "pipewireproduce_p.h"
 #include "vaapiutils_p.h"
 
@@ -29,6 +31,7 @@ struct PipeWireEncodedStreamPrivate {
     PipeWireBaseEncodedStream::Encoder m_encoder;
     std::optional<quint8> m_quality;
     PipeWireBaseEncodedStream::EncodingPreference m_encodingPreference;
+    PipeWireBaseEncodedStream::State m_state = PipeWireBaseEncodedStream::Idle;
 
     std::unique_ptr<QThread> m_produceThread;
     std::unique_ptr<PipeWireProduce> m_produce;
@@ -36,13 +39,7 @@ struct PipeWireEncodedStreamPrivate {
 
 PipeWireBaseEncodedStream::State PipeWireBaseEncodedStream::state() const
 {
-    if (isActive()) {
-        return Recording;
-    } else if (d->m_produceThread && d->m_produce->m_deactivated && d->m_produceThread->isRunning()) {
-        return Rendering;
-    }
-
-    return Idle;
+    return d->m_state;
 }
 
 PipeWireBaseEncodedStream::PipeWireBaseEncodedStream(QObject *parent)
@@ -65,10 +62,10 @@ PipeWireBaseEncodedStream::PipeWireBaseEncodedStream(QObject *parent)
 
 PipeWireBaseEncodedStream::~PipeWireBaseEncodedStream()
 {
-    setActive(false);
+    stop();
 
-    if (d->m_fd) {
-        close(*d->m_fd);
+    if (d->m_produceThread) {
+        d->m_produceThread->wait();
     }
 }
 
@@ -78,7 +75,6 @@ void PipeWireBaseEncodedStream::setNodeId(uint nodeId)
         return;
 
     d->m_nodeId = nodeId;
-    refresh();
     Q_EMIT nodeIdChanged(nodeId);
 }
 
@@ -91,7 +87,6 @@ void PipeWireBaseEncodedStream::setFd(uint fd)
         close(*d->m_fd);
     }
     d->m_fd = fd;
-    refresh();
     Q_EMIT fdChanged(fd);
 }
 
@@ -141,50 +136,84 @@ int PipeWireBaseEncodedStream::maxBufferSize() const
 
 void PipeWireBaseEncodedStream::setActive(bool active)
 {
-    if (d->m_active == active)
-        return;
+    if (active) {
+        start();
+    } else {
+        stop();
 
-    d->m_active = active;
-    refresh();
-    Q_EMIT activeChanged(active);
+        if (d->m_produceThread) {
+            d->m_produceThread->wait();
+        }
+    }
 }
 
-std::optional<quint8> PipeWireBaseEncodedStream::quality() const
+void PipeWireBaseEncodedStream::start()
 {
-    return d->m_quality;
-}
+    if (d->m_nodeId == 0) {
+        qCWarning(PIPEWIRERECORD_LOGGING) << "Cannot start recording on a stream without a node ID";
+        return;
+    }
 
-void PipeWireBaseEncodedStream::setQuality(quint8 quality)
-{
-    d->m_quality = quality;
-    if (d->m_produce) {
-        d->m_produce->setQuality(d->m_quality);
+    if (d->m_produceThread || d->m_state != Idle) {
+        return;
     }
+
+    d->m_produceThread = std::make_unique<QThread>();
+    d->m_produceThread->setObjectName("PipeWireProduce::input");
+    d->m_produce = makeProduce();
+    d->m_produce->setQuality(d->m_quality);
+    d->m_produce->setMaxPendingFrames(d->m_maxPendingFrames);
+    d->m_produce->setEncodingPreference(d->m_encodingPreference);
+    d->m_produce->moveToThread(d->m_produceThread.get());
+    d->m_produceThread->start();
+    QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::initialize, Qt::QueuedConnection);
+
+    connect(d->m_produce.get(), &PipeWireProduce::started, this, [this]() {
+        d->m_active = true;
+        Q_EMIT activeChanged(true);
+        d->m_state = Recording;
+        Q_EMIT stateChanged();
+    });
+
+    connect(d->m_produce.get(), &PipeWireProduce::finished, this, [this]() {
+        d->m_active = false;
+        Q_EMIT activeChanged(false);
+        d->m_state = Idle;
+        Q_EMIT stateChanged();
+    });
+
+    connect(d->m_produceThread.get(), &QThread::finished, this, [this]() {
+        d->m_produce.reset();
+        d->m_produceThread.reset();
+        d->m_nodeId = 0;
+
+        if (d->m_fd) {
+            close(d->m_fd.value());
+        }
+    });
 }
 
-void PipeWireBaseEncodedStream::refresh()
+void PipeWireBaseEncodedStream::stop()
 {
     if (d->m_produceThread) {
         QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::deactivate, Qt::QueuedConnection);
-        d->m_produceThread->wait();
-
-        d->m_produce.reset();
-        d->m_produceThread.reset();
     }
 
-    if (d->m_active && d->m_nodeId > 0) {
-        d->m_produceThread = std::make_unique<QThread>();
-        d->m_produceThread->setObjectName("PipeWireProduce::input");
-        d->m_produce = makeProduce();
+    d->m_state = PipeWireBaseEncodedStream::Rendering;
+    Q_EMIT stateChanged();
+}
+
+std::optional<quint8> PipeWireBaseEncodedStream::quality() const
+{
+    return d->m_quality;
+}
+
+void PipeWireBaseEncodedStream::setQuality(quint8 quality)
+{
+    d->m_quality = quality;
+    if (d->m_produce) {
         d->m_produce->setQuality(d->m_quality);
-        d->m_produce->setMaxPendingFrames(d->m_maxPendingFrames);
-        d->m_produce->setEncodingPreference(d->m_encodingPreference);
-        d->m_produce->moveToThread(d->m_produceThread.get());
-        d->m_produceThread->start();
-        QMetaObject::invokeMethod(d->m_produce.get(), &PipeWireProduce::initialize, Qt::QueuedConnection);
     }
-
-    Q_EMIT stateChanged();
 }
 
 void PipeWireBaseEncodedStream::setEncoder(Encoder encoder)
index 9f4c0fd7b92fb1e53fd462047071e9f6b22381ba..872825291e4e9222017697d94fb0e42d4f4c80f8 100644 (file)
@@ -25,7 +25,7 @@ class KPIPEWIRE_EXPORT PipeWireBaseEncodedStream : public QObject
      * Transfers the ownership of the fd, will close it when it's done with it.
      */
     Q_PROPERTY(uint fd READ fd WRITE setFd NOTIFY fdChanged)
-    Q_PROPERTY(bool active READ isActive WRITE setActive NOTIFY activeChanged)
+    Q_PROPERTY(bool active READ isActive NOTIFY activeChanged)
     Q_PROPERTY(State state READ state NOTIFY stateChanged)
     Q_PROPERTY(Encoder encoder READ encoder WRITE setEncoder NOTIFY encoderChanged)
 
@@ -67,7 +67,42 @@ public:
     int maxBufferSize() const;
 
     bool isActive() const;
-    void setActive(bool active);
+    /**
+     * Set the active state of recording.
+     *
+     * @deprecated Since 6.4, use the separate `start()`/`stop()`calls instead.
+     * This function now just calls `start()`/`stop()`.
+     *
+     * @note When calling `setActive(false)`, unlike `stop()`, this function will
+     * block until the internal encoding threads are finished.
+     */
+    KPIPEWIRE_DEPRECATED void setActive(bool active);
+
+    /**
+     * Request to start recording.
+     *
+     * This will create everything required to perform recording, like a PipeWire
+     * stream and an encoder, then start receiving frames from the stream and
+     * encoding those.
+     *
+     * This requires a valid node ID to be set and that the current state is Idle.
+     *
+     * Note that recording all happens on separate threads, this method only
+     * starts the process, only when state() returns Recording is recording
+     * actually happening.
+     */
+    Q_INVOKABLE void start();
+    /**
+     * Request to stop recording.
+     *
+     * This will terminate receiving frames from PipeWire and do any cleanup
+     * necessary to fully terminate recording after that.
+     *
+     * Note that after this request, there may still be some processing required
+     * due to internal queues. As long as state() does not return Idle processing
+     * is still happening and teardown has not been completed.
+     */
+    Q_INVOKABLE void stop();
 
     /**
      * The quality used for encoding.
@@ -127,6 +162,5 @@ protected:
     virtual std::unique_ptr<PipeWireProduce> makeProduce() = 0;
     EncodingPreference encodingPreference();
 
-    void refresh();
     QScopedPointer<PipeWireEncodedStreamPrivate> d;
 };
index 10878d1125f146584ede4e606a8ad7cf03f5dfb2..610931d59e133ebfb40494ecf91f6a950009fd4f 100644 (file)
@@ -201,6 +201,7 @@ void PipeWireProduce::deactivate()
     if (!m_encoder || streamState != PW_STREAM_STATE_STREAMING) {
         QMetaObject::invokeMethod(this, &PipeWireProduce::destroy, Qt::QueuedConnection);
     }
+    Q_EMIT started();
 }
 
 void PipeWireProduce::destroy()
@@ -231,6 +232,7 @@ void PipeWireProduce::destroy()
 
     qCDebug(PIPEWIRERECORD_LOGGING) << "finished";
     cleanup();
+    Q_EMIT finished();
     QThread::currentThread()->quit();
 }
 
index b87fd00715ac743f971bfb05e999afee78fcbe40..5830c482cb709941c783ac777f638185fb571ea5 100644 (file)
@@ -151,6 +151,8 @@ public:
 
 Q_SIGNALS:
     void producedFrames();
+    void started();
+    void finished();
 
 private:
     void initFiltersVaapi();
index b8cd0425eabb090fdbbad2cde24baaec754a4cfa..e29051e21e0c8e46bf6d07a88fabfe51775660fd 100644 (file)
@@ -70,7 +70,6 @@ void PipeWireRecord::setOutput(const QString &_output)
         return;
 
     d->m_output = output;
-    refresh();
     Q_EMIT outputChanged(output);
 }
 
index 33db7423acc8dbde234f5f16924cff00ebda39d6..6d31f770704aa2e2e660576b3028ff39066ba312 100644 (file)
@@ -55,16 +55,29 @@ void createStream(int nodeId, std::optional<int> fd = {})
             }
             encoded->setEncoder(enc);
         }
-        encoded->setActive(true);
+        encoded->start();
         QObject::connect(encoded, &PipeWireEncodedStream::newPacket, qGuiApp, [](const PipeWireEncodedStream::Packet &packet) {
             qDebug() << "packet received" << packet.data().size() << "key:" << packet.isKeyFrame();
         });
         QObject::connect(encoded, &PipeWireEncodedStream::cursorChanged, qGuiApp, [](const PipeWireCursor &cursor) {
             qDebug() << "cursor received. position:" << cursor.position << "hotspot:" << cursor.hotspot << "image:" << cursor.texture;
         });
+        QObject::connect(encoded, &PipeWireEncodedStream::stateChanged, qGuiApp, [encoded]() {
+            switch (encoded->state()) {
+            case PipeWireEncodedStream::Recording:
+                qDebug() << "Started recording";
+                break;
+            case PipeWireEncodedStream::Rendering:
+                qDebug() << "Stopped recording, flushing remaining frames";
+                break;
+            case PipeWireEncodedStream::Idle:
+                qDebug() << "Recording finished, quitting";
+                exit(0);
+                break;
+            }
+        });
         QObject::connect(KSignalHandler::self(), &KSignalHandler::signalReceived, encoded, [encoded] {
-            encoded->setActive(false);
-            exit(0);
+            encoded->stop();
         });
         return;
     }
@@ -96,7 +109,6 @@ void createStream(int nodeId, std::optional<int> fd = {})
     });
     QObject::connect(KSignalHandler::self(), &KSignalHandler::signalReceived, pwStream, [pwStream] {
         pwStream->setActive(false);
-        exit(0);
     });
 }