QThreadPool::cancel() to remove individual jobs from the job queue.

[ChangeLog][QtCore][QThreadPool] Added QThreadPool::cancel() which allows
removing from the job queue a job that hasn't been started yet.

Change-Id: Ib8f1c1f32a34f5eec8338c641d820b928e470164
Reviewed-by: Nick Shaforostoff <shafff@ukr.net>
Reviewed-by: Olivier Goffart <ogoffart@woboq.com>
This commit is contained in:
Nick Shaforostoff 2014-06-16 18:50:12 +03:00
parent 6475462c6f
commit 5b11e43e9f
5 changed files with 91 additions and 12 deletions

View File

@ -292,7 +292,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex)
// To avoid deadlocks and reduce the number of threads used, try to
// run the runnable in the current thread.
d->pool()->d_func()->stealRunnable(d->runnable);
d->pool()->d_func()->stealAndRunRunnable(d->runnable);
lock.relock();
@ -313,7 +313,7 @@ void QFutureInterfaceBase::waitForFinished()
lock.unlock();
if (!alreadyFinished) {
d->pool()->d_func()->stealRunnable(d->runnable);
d->pool()->d_func()->stealAndRunRunnable(d->runnable);
lock.relock();

View File

@ -311,14 +311,12 @@ void QThreadPoolPrivate::clear()
/*!
\internal
Searches for \a runnable in the queue, removes it from the queue and
runs it if found. This function does not return until the runnable
has completed.
returns \c true if it was found in the queue
*/
void QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
bool QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
{
if (runnable == 0)
return;
bool found = false;
return false;
{
QMutexLocker locker(&mutex);
QList<QPair<QRunnable *, int> >::iterator it = queue.begin();
@ -326,17 +324,26 @@ void QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
while (it != end) {
if (it->first == runnable) {
found = true;
queue.erase(it);
break;
return true;
}
++it;
}
}
if (!found)
return;
return false;
}
/*!
\internal
Searches for \a runnable in the queue, removes it from the queue and
runs it if found. This function does not return until the runnable
has completed.
*/
void QThreadPoolPrivate::stealAndRunRunnable(QRunnable *runnable)
{
if (!stealRunnable(runnable))
return;
const bool autoDelete = runnable->autoDelete();
bool del = autoDelete && !--runnable->ref;
@ -628,6 +635,25 @@ void QThreadPool::clear()
d->clear();
}
/*!
\since 5.5
Removes the specified \a runnable from the queue if it is not yet started.
The runnables for which \l{QRunnable::autoDelete()}{runnable->autoDelete()}
returns \c true are deleted.
\sa start()
*/
void QThreadPool::cancel(QRunnable *runnable)
{
Q_D(QThreadPool);
if (!d->stealRunnable(runnable))
return;
if (runnable->autoDelete() && !--runnable->ref) {
delete runnable;
}
}
QT_END_NAMESPACE
#endif

View File

@ -77,6 +77,7 @@ public:
bool waitForDone(int msecs = -1);
void clear();
void cancel(QRunnable *runnable);
};
QT_END_NAMESPACE

View File

@ -76,7 +76,8 @@ public:
void reset();
bool waitForDone(int msecs);
void clear();
void stealRunnable(QRunnable *);
bool stealRunnable(QRunnable *runnable);
void stealAndRunRunnable(QRunnable *runnable);
mutable QMutex mutex;
QSet<QThreadPoolThread *> allThreads;

View File

@ -92,6 +92,7 @@ private slots:
void priorityStart();
void waitForDone();
void clear();
void cancel();
void waitForDoneTimeout();
void destroyingWaitsForTasksToFinish();
void stressTest();
@ -958,6 +959,56 @@ void tst_QThreadPool::clear()
QCOMPARE(count.load(), threadPool.maxThreadCount());
}
void tst_QThreadPool::cancel()
{
QSemaphore sem(0);
class BlockingRunnable : public QRunnable
{
public:
QSemaphore & sem;
int & dtorCounter;
int & runCounter;
int dummy;
BlockingRunnable(QSemaphore & s, int & c, int & r) : sem(s), dtorCounter(c), runCounter(r){}
~BlockingRunnable(){dtorCounter++;}
void run()
{
runCounter++;
sem.acquire();
count.ref();
}
};
typedef BlockingRunnable* BlockingRunnablePtr;
QThreadPool threadPool;
threadPool.setMaxThreadCount(3);
int runs = 2 * threadPool.maxThreadCount();
BlockingRunnablePtr* runnables = new BlockingRunnablePtr[runs];
count.store(0);
int dtorCounter = 0;
int runCounter = 0;
for (int i = 0; i < runs; i++) {
runnables[i] = new BlockingRunnable(sem, dtorCounter, runCounter);
runnables[i]->setAutoDelete(i != 0 && i != (runs-1)); //one which will run and one which will not
threadPool.cancel(runnables[i]); //verify NOOP for jobs not in the queue
threadPool.start(runnables[i]);
}
for (int i = 0; i < runs; i++) {
threadPool.cancel(runnables[i]);
}
runnables[0]->dummy = 0; //valgrind will catch this if cancel() is crazy enough to delete currently running jobs
runnables[runs-1]->dummy = 0;
QCOMPARE(dtorCounter, runs-threadPool.maxThreadCount()-1);
sem.release(threadPool.maxThreadCount());
threadPool.waitForDone();
QCOMPARE(runCounter, threadPool.maxThreadCount());
QCOMPARE(count.load(), threadPool.maxThreadCount());
QCOMPARE(dtorCounter, runs-2);
delete runnables[0]; //if the pool deletes them then we'll get double-free crash
delete runnables[runs-1];
delete[] runnables;
}
void tst_QThreadPool::destroyingWaitsForTasksToFinish()
{
QTime total, pass;