Make QThreadPool::waitForDone more atomic

Avoid having the reset in waitForDone interfere with other uses of the
thread-pool by locking the mutex higher, and maintaining the state
so the queues doesn't have threads not in allThreads.

Task-number: QTBUG-62865
Change-Id: I17ee95d5f0e138ec15e785c6d61bb0fe064d3659
Reviewed-by: Edward Welbourne <edward.welbourne@qt.io>
This commit is contained in:
Allan Sandfeld Jensen 2018-09-25 15:40:48 +02:00 committed by Liang Qi
parent a6a5e81cd6
commit 7581858148
2 changed files with 44 additions and 41 deletions

View File

@ -39,7 +39,7 @@
#include "qthreadpool.h"
#include "qthreadpool_p.h"
#include "qelapsedtimer.h"
#include "qdeadlinetimer.h"
#include <algorithm>
@ -130,11 +130,6 @@ void QThreadPoolThread::run()
}
} while (true);
if (manager->isExiting) {
registerThreadInactive();
break;
}
// if too many threads are active, expire this thread
bool expired = manager->tooManyThreadsActive();
if (!expired) {
@ -145,6 +140,10 @@ void QThreadPoolThread::run()
++manager->activeThreads;
if (manager->waitingThreads.removeOne(this))
expired = true;
if (!manager->allThreads.contains(this)) {
registerThreadInactive();
break;
}
}
if (expired) {
manager->expiredThreads.enqueue(this);
@ -267,7 +266,7 @@ void QThreadPoolPrivate::startThread(QRunnable *runnable)
QScopedPointer <QThreadPoolThread> thread(new QThreadPoolThread(this));
thread->setObjectName(QLatin1String("Thread (pooled)"));
Q_ASSERT(!allThreads.contains(thread.data())); // if this assert hits, we have an ABA problem (deleted threads don't get removed here)
allThreads.append(thread.data());
allThreads.insert(thread.data());
++activeThreads;
if (runnable->autoDelete())
@ -278,49 +277,54 @@ void QThreadPoolPrivate::startThread(QRunnable *runnable)
/*!
\internal
Makes all threads exit, waits for each thread to exit and deletes it.
Helper function only to be called from waitForDone(int)
*/
void QThreadPoolPrivate::reset()
{
QMutexLocker locker(&mutex);
isExiting = true;
// move the contents of the set out so that we can iterate without the lock
QSet<QThreadPoolThread *> allThreadsCopy;
allThreadsCopy.swap(allThreads);
expiredThreads.clear();
waitingThreads.clear();
mutex.unlock();
while (!allThreads.empty()) {
// move the contents of the set out so that we can iterate without the lock
QList<QThreadPoolThread *> allThreadsCopy;
allThreadsCopy.swap(allThreads);
locker.unlock();
for (QThreadPoolThread *thread : qAsConst(allThreadsCopy)) {
for (QThreadPoolThread *thread: qAsConst(allThreadsCopy)) {
if (!thread->isFinished()) {
thread->runnableReady.wakeAll();
thread->wait();
delete thread;
}
locker.relock();
// repeat until all newly arrived threads have also completed
delete thread;
}
waitingThreads.clear();
expiredThreads.clear();
mutex.lock();
}
isExiting = false;
/*!
\internal
Helper function only to be called from waitForDone(int)
*/
bool QThreadPoolPrivate::waitForDone(const QDeadlineTimer &timer)
{
while (!(queue.isEmpty() && activeThreads == 0) && !timer.hasExpired())
noActiveThreads.wait(&mutex, timer);
return queue.isEmpty() && activeThreads == 0;
}
bool QThreadPoolPrivate::waitForDone(int msecs)
{
QMutexLocker locker(&mutex);
if (msecs < 0) {
while (!(queue.isEmpty() && activeThreads == 0))
noActiveThreads.wait(locker.mutex());
} else {
QElapsedTimer timer;
timer.start();
int t;
while (!(queue.isEmpty() && activeThreads == 0) &&
((t = msecs - timer.elapsed()) > 0))
noActiveThreads.wait(locker.mutex(), t);
}
QDeadlineTimer timer(msecs);
do {
if (!waitForDone(timer))
return false;
reset();
// More threads can be started during reset(), in that case continue
// waiting if we still have time left.
} while ((!queue.isEmpty() || activeThreads) && !timer.hasExpired());
return queue.isEmpty() && activeThreads == 0;
}
@ -686,10 +690,7 @@ void QThreadPool::releaseThread()
bool QThreadPool::waitForDone(int msecs)
{
Q_D(QThreadPool);
bool rc = d->waitForDone(msecs);
if (rc)
d->reset();
return rc;
return d->waitForDone(msecs);
}
/*!

View File

@ -63,6 +63,8 @@ QT_REQUIRE_CONFIG(thread);
QT_BEGIN_NAMESPACE
class QDeadlineTimer;
class QueuePage {
public:
enum {
@ -163,12 +165,13 @@ public:
void startThread(QRunnable *runnable = 0);
void reset();
bool waitForDone(int msecs);
bool waitForDone(const QDeadlineTimer &timer);
void clear();
void stealAndRunRunnable(QRunnable *runnable);
void deletePageIfFinished(QueuePage *page);
mutable QMutex mutex;
QList<QThreadPoolThread *> allThreads;
QSet<QThreadPoolThread *> allThreads;
QQueue<QThreadPoolThread *> waitingThreads;
QQueue<QThreadPoolThread *> expiredThreads;
QVector<QueuePage*> queue;
@ -179,7 +182,6 @@ public:
int reservedThreads = 0;
int activeThreads = 0;
uint stackSize = 0;
bool isExiting = false;
};
QT_END_NAMESPACE