QThreadPool::cancel() to remove individual jobs from the job queue.
[ChangeLog][QtCore][QThreadPool] Added QThreadPool::cancel() which allows removing from the job queue a job that hasn't been started yet. Change-Id: Ib8f1c1f32a34f5eec8338c641d820b928e470164 Reviewed-by: Nick Shaforostoff <shafff@ukr.net> Reviewed-by: Olivier Goffart <ogoffart@woboq.com>
This commit is contained in:
parent
6475462c6f
commit
5b11e43e9f
@ -292,7 +292,7 @@ void QFutureInterfaceBase::waitForResult(int resultIndex)
|
||||
|
||||
// To avoid deadlocks and reduce the number of threads used, try to
|
||||
// run the runnable in the current thread.
|
||||
d->pool()->d_func()->stealRunnable(d->runnable);
|
||||
d->pool()->d_func()->stealAndRunRunnable(d->runnable);
|
||||
|
||||
lock.relock();
|
||||
|
||||
@ -313,7 +313,7 @@ void QFutureInterfaceBase::waitForFinished()
|
||||
lock.unlock();
|
||||
|
||||
if (!alreadyFinished) {
|
||||
d->pool()->d_func()->stealRunnable(d->runnable);
|
||||
d->pool()->d_func()->stealAndRunRunnable(d->runnable);
|
||||
|
||||
lock.relock();
|
||||
|
||||
|
@ -311,14 +311,12 @@ void QThreadPoolPrivate::clear()
|
||||
/*!
|
||||
\internal
|
||||
Searches for \a runnable in the queue, removes it from the queue and
|
||||
runs it if found. This function does not return until the runnable
|
||||
has completed.
|
||||
returns \c true if it was found in the queue
|
||||
*/
|
||||
void QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
|
||||
bool QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
|
||||
{
|
||||
if (runnable == 0)
|
||||
return;
|
||||
bool found = false;
|
||||
return false;
|
||||
{
|
||||
QMutexLocker locker(&mutex);
|
||||
QList<QPair<QRunnable *, int> >::iterator it = queue.begin();
|
||||
@ -326,17 +324,26 @@ void QThreadPoolPrivate::stealRunnable(QRunnable *runnable)
|
||||
|
||||
while (it != end) {
|
||||
if (it->first == runnable) {
|
||||
found = true;
|
||||
queue.erase(it);
|
||||
break;
|
||||
return true;
|
||||
}
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
if (!found)
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
/*!
|
||||
\internal
|
||||
Searches for \a runnable in the queue, removes it from the queue and
|
||||
runs it if found. This function does not return until the runnable
|
||||
has completed.
|
||||
*/
|
||||
void QThreadPoolPrivate::stealAndRunRunnable(QRunnable *runnable)
|
||||
{
|
||||
if (!stealRunnable(runnable))
|
||||
return;
|
||||
const bool autoDelete = runnable->autoDelete();
|
||||
bool del = autoDelete && !--runnable->ref;
|
||||
|
||||
@ -628,6 +635,25 @@ void QThreadPool::clear()
|
||||
d->clear();
|
||||
}
|
||||
|
||||
/*!
|
||||
\since 5.5
|
||||
|
||||
Removes the specified \a runnable from the queue if it is not yet started.
|
||||
The runnables for which \l{QRunnable::autoDelete()}{runnable->autoDelete()}
|
||||
returns \c true are deleted.
|
||||
|
||||
\sa start()
|
||||
*/
|
||||
void QThreadPool::cancel(QRunnable *runnable)
|
||||
{
|
||||
Q_D(QThreadPool);
|
||||
if (!d->stealRunnable(runnable))
|
||||
return;
|
||||
if (runnable->autoDelete() && !--runnable->ref) {
|
||||
delete runnable;
|
||||
}
|
||||
}
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
||||
#endif
|
||||
|
@ -77,6 +77,7 @@ public:
|
||||
bool waitForDone(int msecs = -1);
|
||||
|
||||
void clear();
|
||||
void cancel(QRunnable *runnable);
|
||||
};
|
||||
|
||||
QT_END_NAMESPACE
|
||||
|
@ -76,7 +76,8 @@ public:
|
||||
void reset();
|
||||
bool waitForDone(int msecs);
|
||||
void clear();
|
||||
void stealRunnable(QRunnable *);
|
||||
bool stealRunnable(QRunnable *runnable);
|
||||
void stealAndRunRunnable(QRunnable *runnable);
|
||||
|
||||
mutable QMutex mutex;
|
||||
QSet<QThreadPoolThread *> allThreads;
|
||||
|
@ -92,6 +92,7 @@ private slots:
|
||||
void priorityStart();
|
||||
void waitForDone();
|
||||
void clear();
|
||||
void cancel();
|
||||
void waitForDoneTimeout();
|
||||
void destroyingWaitsForTasksToFinish();
|
||||
void stressTest();
|
||||
@ -958,6 +959,56 @@ void tst_QThreadPool::clear()
|
||||
QCOMPARE(count.load(), threadPool.maxThreadCount());
|
||||
}
|
||||
|
||||
void tst_QThreadPool::cancel()
|
||||
{
|
||||
QSemaphore sem(0);
|
||||
class BlockingRunnable : public QRunnable
|
||||
{
|
||||
public:
|
||||
QSemaphore & sem;
|
||||
int & dtorCounter;
|
||||
int & runCounter;
|
||||
int dummy;
|
||||
BlockingRunnable(QSemaphore & s, int & c, int & r) : sem(s), dtorCounter(c), runCounter(r){}
|
||||
~BlockingRunnable(){dtorCounter++;}
|
||||
void run()
|
||||
{
|
||||
runCounter++;
|
||||
sem.acquire();
|
||||
count.ref();
|
||||
}
|
||||
};
|
||||
typedef BlockingRunnable* BlockingRunnablePtr;
|
||||
|
||||
QThreadPool threadPool;
|
||||
threadPool.setMaxThreadCount(3);
|
||||
int runs = 2 * threadPool.maxThreadCount();
|
||||
BlockingRunnablePtr* runnables = new BlockingRunnablePtr[runs];
|
||||
count.store(0);
|
||||
int dtorCounter = 0;
|
||||
int runCounter = 0;
|
||||
for (int i = 0; i < runs; i++) {
|
||||
runnables[i] = new BlockingRunnable(sem, dtorCounter, runCounter);
|
||||
runnables[i]->setAutoDelete(i != 0 && i != (runs-1)); //one which will run and one which will not
|
||||
threadPool.cancel(runnables[i]); //verify NOOP for jobs not in the queue
|
||||
threadPool.start(runnables[i]);
|
||||
}
|
||||
for (int i = 0; i < runs; i++) {
|
||||
threadPool.cancel(runnables[i]);
|
||||
}
|
||||
runnables[0]->dummy = 0; //valgrind will catch this if cancel() is crazy enough to delete currently running jobs
|
||||
runnables[runs-1]->dummy = 0;
|
||||
QCOMPARE(dtorCounter, runs-threadPool.maxThreadCount()-1);
|
||||
sem.release(threadPool.maxThreadCount());
|
||||
threadPool.waitForDone();
|
||||
QCOMPARE(runCounter, threadPool.maxThreadCount());
|
||||
QCOMPARE(count.load(), threadPool.maxThreadCount());
|
||||
QCOMPARE(dtorCounter, runs-2);
|
||||
delete runnables[0]; //if the pool deletes them then we'll get double-free crash
|
||||
delete runnables[runs-1];
|
||||
delete[] runnables;
|
||||
}
|
||||
|
||||
void tst_QThreadPool::destroyingWaitsForTasksToFinish()
|
||||
{
|
||||
QTime total, pass;
|
||||
|
Loading…
Reference in New Issue
Block a user