Add a way of notifying QFutureWatcher when pause is in effect

Because setting QFutureInterface to paused state does not mean that
the computations that are already in progress will stop immediately,
it may be useful to get notified when pause actually takes effect.

Introduced the QFutureWatcher::suspended() signal, to be emitted when
there are no more computations in progress, and no more result ready
or progress reporting signals will be emitted, i.e. when pause took
effect. Added {QFuture, QFutureWatcher}::isSuspended() methods for
checking if pause took effect.

QtConcurrent will now to send QFutureCallOutEvent::Suspended event
when the state is paused and there are no more active threads.

[ChangeLog][QtCore][QFutureWatcher] Added a new QFutureWatcher::suspended()
signal, to be emitted when pause took effect, meaning that there are no
more computations in progress. Added {QFuture, QFutureWatcher}::isSuspended()
methods for checking if pause took effect.

Fixes: QTBUG-12152
Change-Id: I88f2ad24d800cd6293dec63977d45bd35f9a09f0
Reviewed-by: Jarek Kobus <jaroslaw.kobus@qt.io>
This commit is contained in:
Sona Kurazyan 2020-05-20 11:39:39 +02:00
parent 9b0e23ef8a
commit 2f15927f01
11 changed files with 226 additions and 17 deletions

View File

@ -219,6 +219,12 @@ void ThreadEngineBase::acquireBarrierSemaphore()
barrier.acquire();
}
void ThreadEngineBase::reportIfPausedDone() const
{
if (futureInterface && futureInterface->isPaused())
futureInterface->reportSuspended();
}
bool ThreadEngineBase::isCanceled()
{
if (futureInterface)
@ -304,8 +310,15 @@ void ThreadEngineBase::run() // implements QRunnable.
// struct wants to be throttled by making a worker thread exit.
// Respect that request unless this is the only worker thread left
// running, in which case it has to keep going.
if (threadThrottleExit())
if (threadThrottleExit()) {
return;
} else {
// If the last worker thread is throttled and the state is paused,
// it means that pause has been requested, and it is already
// in effect (because all previous threads have already exited).
// Report the "Suspended" state.
reportIfPausedDone();
}
}
#ifndef QT_NO_EXCEPTIONS

View File

@ -99,6 +99,7 @@ public:
void setProgressValue(int progress);
void setProgressRange(int minimum, int maximum);
void acquireBarrierSemaphore();
void reportIfPausedDone() const;
protected: // The user overrides these:
virtual void start() {}

View File

@ -111,6 +111,7 @@ public:
void setPaused(bool paused) { d.setPaused(paused); }
bool isPaused() const { return d.isPaused(); }
bool isSuspended() const { return d.isSuspended(); }
void pause() { setPaused(true); }
void resume() { setPaused(false); }
void togglePaused() { d.togglePaused(); }

View File

@ -116,8 +116,8 @@
the computation to finish, ensuring that all results are available.
The state of the computation represented by a QFuture can be queried using
the isCanceled(), isStarted(), isFinished(), isRunning(), or isPaused()
functions.
the isCanceled(), isStarted(), isFinished(), isRunning(), isPaused()
or isSuspended() functions.
QFuture is a lightweight reference counted class that can be passed by
value.
@ -229,9 +229,18 @@
pause() function; otherwise returns \c false.
Be aware that the computation may still be running even though this
function returns \c true. See setPaused() for more details.
function returns \c true. See setPaused() for more details. To check
if pause actually took effect, use isSuspended() instead.
\sa setPaused(), togglePaused()
\sa setPaused(), togglePaused(), isSuspended()
*/
/*! \fn template <typename T> bool QFuture<T>::isSuspended() const
Returns \c true if a paused asynchronous computation has been suspended,
and no more results or progress changes are expected.
\sa setPaused(), togglePaused(), isPaused()
*/
/*! \fn template <typename T> void QFuture<T>::pause()

View File

@ -110,7 +110,7 @@ void QFutureInterfaceBase::cancel()
if (d->state.loadRelaxed() & Canceled)
return;
switch_from_to(d->state, Paused, Canceled);
switch_from_to(d->state, Paused | Suspended, Canceled);
d->waitCondition.wakeAll();
d->pausedWaitCondition.wakeAll();
d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Canceled));
@ -124,7 +124,7 @@ void QFutureInterfaceBase::setPaused(bool paused)
switch_on(d->state, Paused);
d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
} else {
switch_off(d->state, Paused);
switch_off(d->state, Paused | Suspended);
d->pausedWaitCondition.wakeAll();
d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
}
@ -134,7 +134,7 @@ void QFutureInterfaceBase::togglePaused()
{
QMutexLocker locker(&d->m_mutex);
if (d->state.loadRelaxed() & Paused) {
switch_off(d->state, Paused);
switch_off(d->state, Paused | Suspended);
d->pausedWaitCondition.wakeAll();
d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Resumed));
} else {
@ -143,6 +143,20 @@ void QFutureInterfaceBase::togglePaused()
}
}
void QFutureInterfaceBase::reportSuspended() const
{
// Needs to be called when pause is in effect,
// i.e. no more events will be reported.
QMutexLocker locker(&d->m_mutex);
const int state = d->state;
if (!(state & Paused) || (state & Suspended))
return;
switch_on(d->state, Suspended);
d->sendCallOut(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
}
void QFutureInterfaceBase::setThrottled(bool enable)
{
QMutexLocker lock(&d->m_mutex);
@ -181,6 +195,11 @@ bool QFutureInterfaceBase::isPaused() const
return queryState(Paused);
}
bool QFutureInterfaceBase::isSuspended() const
{
return queryState(Suspended);
}
bool QFutureInterfaceBase::isThrottled() const
{
return queryState(Throttled);
@ -612,7 +631,9 @@ void QFutureInterfaceBasePrivate::connectOutputInterface(QFutureCallOutInterface
it.batchedAdvance();
}
if (state.loadRelaxed() & QFutureInterfaceBase::Paused)
if (state.loadRelaxed() & QFutureInterfaceBase::Suspended)
interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Suspended));
else if (state.loadRelaxed() & QFutureInterfaceBase::Paused)
interface->postCallOutEvent(QFutureCallOutEvent(QFutureCallOutEvent::Paused));
if (state.loadRelaxed() & QFutureInterfaceBase::Canceled)

View File

@ -85,7 +85,8 @@ public:
Paused = 0x10,
Throttled = 0x20,
// Pending means that the future depends on another one, which is not finished yet
Pending = 0x40
Pending = 0x40,
Suspended = 0x80
};
QFutureInterfaceBase(State initialState = NoState);
@ -125,6 +126,7 @@ public:
bool isCanceled() const;
bool isFinished() const;
bool isPaused() const;
bool isSuspended() const;
bool isThrottled() const;
bool isResultReadyAt(int index) const;
bool isValid() const;
@ -132,6 +134,7 @@ public:
void cancel();
void setPaused(bool paused);
void togglePaused();
void reportSuspended() const;
void setThrottled(bool enable);
void waitForFinished();

View File

@ -74,7 +74,8 @@ public:
Resumed,
Progress,
ProgressRange,
ResultsReady
ResultsReady,
Suspended
};
QFutureCallOutEvent()

View File

@ -69,8 +69,8 @@ QT_BEGIN_NAMESPACE
QFutureWatcher.
Status changes are reported via the started(), finished(), canceled(),
paused(), resumed(), resultReadyAt(), and resultsReadyAt() signals.
Progress information is provided from the progressRangeChanged(),
paused(), resumed(), suspended(), resultReadyAt(), and resultsReadyAt()
signals. Progress information is provided from the progressRangeChanged(),
void progressValueChanged(), and progressTextChanged() signals.
Throttling control is provided by the setPendingResultsLimit() function.
@ -292,15 +292,28 @@ bool QFutureWatcherBase::isCanceled() const
pause() function; otherwise returns \c false.
Be aware that the computation may still be running even though this
function returns \c true. See setPaused() for more details.
function returns \c true. See setPaused() for more details. To check
if pause actually took effect, use isSuspended() instead.
\sa setPaused(), togglePaused()
\sa setPaused(), togglePaused(), isSuspended()
*/
bool QFutureWatcherBase::isPaused() const
{
return futureInterface().queryState(QFutureInterfaceBase::Paused);
}
/*! \fn template <typename T> bool QFutureWatcher<T>::isSuspended() const
Returns \c true if a paused asynchronous computation has been suspended,
and no more results or progress changes are expected.
\sa suspended(), paused(), isPaused()
*/
bool QFutureWatcherBase::isSuspended() const
{
return futureInterface().isSuspended();
}
/*! \fn template <typename T> void QFutureWatcher<T>::waitForFinished()
Waits for the asynchronous computation to finish (including cancel()ed
@ -431,6 +444,11 @@ void QFutureWatcherBasePrivate::sendCallOutEvent(QFutureCallOutEvent *event)
break;
emit q->paused();
break;
case QFutureCallOutEvent::Suspended:
if (q->futureInterface().isCanceled())
break;
emit q->suspended();
break;
case QFutureCallOutEvent::Resumed:
if (q->futureInterface().isCanceled())
break;
@ -529,9 +547,18 @@ void QFutureWatcherBasePrivate::sendCallOutEvent(QFutureCallOutEvent *event)
\note This signal only informs that pause has been requested. It
doesn't indicate that all background operations are stopped. Signals
for computations that were in progress at the moment of pausing will
still be delivered.
still be delivered. To to be informed when pause() actually
took effect, use the suspended() signal.
\sa setPaused(), pause()
\sa setPaused(), pause(), suspended()
*/
/*! \fn template <typename T> void QFutureWatcher<T>::suspended()
This signal is emitted when pause() took effect, meaning that there are
no more running computations. After receiving this signal no more result
ready or progress reporting signals are expected.
\sa setPaused(), pause(), paused()
*/
/*! \fn template <typename T> void QFutureWatcher<T>::resumed()

View File

@ -70,6 +70,7 @@ public:
bool isRunning() const;
bool isCanceled() const;
bool isPaused() const;
bool isSuspended() const;
void waitForFinished();
@ -82,6 +83,7 @@ Q_SIGNALS:
void finished();
void canceled();
void paused();
void suspended();
void resumed();
void resultReadyAt(int resultIndex);
void resultsReadyAt(int beginIndex, int endIndex);
@ -147,6 +149,7 @@ public:
bool isRunning() const;
bool isCanceled() const;
bool isPaused() const;
bool isSuspended() const
void waitForFinished();
@ -157,6 +160,7 @@ Q_SIGNALS:
void finished();
void canceled();
void paused();
void suspended();
void resumed();
void resultReadyAt(int resultIndex);
void resultsReadyAt(int beginIndex, int endIndex);

View File

@ -113,6 +113,7 @@ private slots:
void iterators();
void iteratorsThread();
void pause();
void suspend();
void throttling();
void voidConversions();
#ifndef QT_NO_EXCEPTIONS
@ -1334,6 +1335,43 @@ void tst_QFuture::pause()
Interface.reportFinished();
}
void tst_QFuture::suspend()
{
QFutureInterface<void> interface;
interface.reportStarted();
QFuture<void> f = interface.future();
QVERIFY(!interface.isSuspended());
interface.reportSuspended();
QVERIFY(!interface.isSuspended());
// pause
interface.togglePaused();
QVERIFY(!interface.isSuspended());
QVERIFY(interface.isPaused());
interface.reportSuspended();
QVERIFY(interface.isSuspended());
QVERIFY(interface.isPaused());
// resume
interface.togglePaused();
QVERIFY(!interface.isSuspended());
QVERIFY(!interface.isPaused());
// pause again
interface.togglePaused();
interface.reportSuspended();
interface.reportCanceled();
QVERIFY(!interface.isSuspended());
QVERIFY(!interface.isPaused());
QVERIFY(interface.isCanceled());
interface.reportFinished();
}
class ResultObject : public QObject
{
Q_OBJECT

View File

@ -58,6 +58,8 @@ private slots:
void changeFuture();
void cancelEvents();
void pauseEvents();
void suspended();
void suspendedEvents();
void finishedState();
void throttling();
void incrementalMapResults();
@ -723,6 +725,95 @@ void tst_QFutureWatcher::pauseEvents()
}
}
void tst_QFutureWatcher::suspended()
{
QFutureWatcher<void> watcher;
QSignalSpy resultReadySpy(&watcher, &QFutureWatcher<int>::resultReadyAt);
QSignalSpy pausedSpy(&watcher, &QFutureWatcher<int>::paused);
QSignalSpy suspendedSpy(&watcher, &QFutureWatcher<int>::suspended);
QSignalSpy finishedSpy(&watcher, &QFutureWatcher<int>::finished);
const int numValues = 25;
std::vector<int> values(numValues, 0);
std::atomic_int count = 0;
QThreadPool pool;
pool.setMaxThreadCount(3);
QFuture<int> future = QtConcurrent::mapped(&pool, values, [&](int value) {
++count;
// Sleep, to make sure not all threads will start at once.
QThread::msleep(50);
return value;
});
watcher.setFuture(future);
// Allow some threads to start before pausing.
QThread::msleep(200);
watcher.pause();
watcher.pause();
QTRY_COMPARE(suspendedSpy.count(), 1); // suspended() should be emitted only once
QCOMPARE(pausedSpy.count(), 2); // paused() is emitted as many times as requested
// Make sure QFutureWatcher::resultReadyAt() is emitted only for already started threads.
const auto resultReadyAfterPaused = resultReadySpy.count();
QCOMPARE(resultReadyAfterPaused, count);
// Make sure no more results are reported before resuming.
QThread::msleep(200);
QCOMPARE(resultReadyAfterPaused, resultReadySpy.count());
resultReadySpy.clear();
watcher.resume();
QTRY_COMPARE(finishedSpy.count(), 1);
// Make sure that no more suspended() signals have been emitted.
QCOMPARE(suspendedSpy.count(), 1);
// Make sure the rest of results were reported after resume.
QCOMPARE(resultReadySpy.count(), numValues - resultReadyAfterPaused);
}
void tst_QFutureWatcher::suspendedEvents()
{
QFutureInterface<void> iface;
iface.reportStarted();
QFutureWatcher<void> watcher;
QSignalSpy pausedSpy(&watcher, &QFutureWatcher<void>::paused);
QVERIFY(pausedSpy.isValid());
QSignalSpy suspendedSpy(&watcher, &QFutureWatcher<void>::suspended);
QVERIFY(suspendedSpy.isValid());
bool pausedBeforeSuspended = false;
bool notSuspendedBeforePasused = false;
connect(&watcher, &QFutureWatcher<void>::paused,
[&] { notSuspendedBeforePasused = (suspendedSpy.count() == 0); });
connect(&watcher, &QFutureWatcher<void>::suspended,
[&] { pausedBeforeSuspended = (pausedSpy.count() == 1); });
watcher.setFuture(iface.future());
iface.reportSuspended();
// Make sure reportPaused() is ignored if the state is not paused
pausedSpy.wait(100);
QCOMPARE(pausedSpy.count(), 0);
QCOMPARE(suspendedSpy.count(), 0);
iface.setPaused(true);
iface.reportSuspended();
QTRY_COMPARE(suspendedSpy.count(), 1);
QCOMPARE(pausedSpy.count(), 1);
QVERIFY(notSuspendedBeforePasused);
QVERIFY(pausedBeforeSuspended);
iface.reportFinished();
}
// Test that the finished state for the watcher gets
// set when the finished event is delivered.
// This means it will lag the finished state for the future,