Use waitid with WNOWAIT in forkfd

The previous implementation required one syscall per child we're waiting
on to see which one exited. That means the algorithm was O(n).

This implementation uses WNOWAIT to find out which child exited and then
goes straight to that one. So it's O(1) on the number of children, but
runs 2 * number_of_children_that_exited + 1 syscalls, assuming there are
no race conditions with other threads. If there are or if a child not
started by forkfd exits, we'll still iterate over each child we're
managing to see which one exited.

It modifies the existing code so that it will do a waitid() with WNOWAIT
to check on the status of the child: if the child has exited, we'll try
to lock the entry so only one thread will do the final wait(). In the
case we read the PID, then the child exited, was reaped by another
thread, the PID got recycled and that child exited again, we'll fail to
lock the ProcessInfo entry so no harm comes. If by an absurd coincidence
this other child was started by forkfd() and its ProcessInfo is exactly
the one we are looking at, then we'll succeed in locking but that's a
benign race: we'll do what the other thread was trying to do and the
other thread will give up.

Future improvements to the algorithm are discussed in the Gerrit change.

Change-Id: Ie74836dbc388cd9b3fa375a41a8d944602a32df1
Reviewed-by: Thiago Macieira <thiago.macieira@intel.com>
This commit is contained in:
Thiago Macieira 2014-01-23 16:01:25 -08:00
parent 39482e6e0f
commit c8849dd8d5
2 changed files with 105 additions and 16 deletions

View File

@ -160,6 +160,14 @@ static ProcessInfo *allocateInfo(Header **header)
return info; return info;
} }
#ifdef HAVE_WAITID
static int isChildReady(pid_t pid, siginfo_t *info)
{
info->si_pid = 0;
return waitid(P_PID, pid, info, WEXITED | WNOHANG | WNOWAIT) == 0 && info->si_pid == pid;
}
#endif
static int tryReaping(pid_t pid, siginfo_t *info) static int tryReaping(pid_t pid, siginfo_t *info)
{ {
/* reap the child */ /* reap the child */
@ -226,9 +234,74 @@ static void sigchld_handler(int signum)
siginfo_t info; siginfo_t info;
int i; int i;
#ifdef HAVE_WAITID
/* be optimistic: try to see if we can get the child that exited */
search_next_child:
/* waitid returns -1 ECHILD if there are no further children at all;
* it returns 0 and sets si_pid to 0 if there are children but they are not ready
* to be waited (we're passing WNOHANG). We should not get EINTR because
* we're passing WNOHANG and we should definitely not get EINVAL or anything else.
* That means we can actually ignore the return code and only inspect si_pid.
*/
info.si_pid = 0;
waitid(P_ALL, 0, &info, WNOHANG | WNOWAIT | WEXITED);
if (info.si_pid == 0) {
/* there are no further un-waited-for children, so we can just exit.
* But before, transfer control to the chained SIGCHLD handler.
*/
goto chain_handler;
}
for (i = 0; i < (int)sizeofarray(children.entries); ++i) {
/* acquire the child first: swap the PID with -1 to indicate it's busy */
int pid = info.si_pid;
if (ffd_atomic_compare_exchange(&children.entries[i].pid, &pid, -1,
FFD_ATOMIC_ACQUIRE, FFD_ATOMIC_RELAXED)) {
/* this is our child, send notification and free up this entry */
/* ### FIXME: what if tryReaping returns false? */
if (tryReaping(pid, &info))
notifyAndFreeInfo(&children.header, &children.entries[i], &info);
goto search_next_child;
}
}
/* try the arrays */
array = ffd_atomic_load(&children.header.nextArray, FFD_ATOMIC_ACQUIRE);
while (array != NULL) {
for (i = 0; i < (int)sizeofarray(array->entries); ++i) {
int pid = info.si_pid;
if (ffd_atomic_compare_exchange(&array->entries[i].pid, &pid, -1,
FFD_ATOMIC_ACQUIRE, FFD_ATOMIC_RELAXED)) {
/* this is our child, send notification and free up this entry */
/* ### FIXME: what if tryReaping returns false? */
if (tryReaping(pid, &info))
notifyAndFreeInfo(&array->header, &array->entries[i], &info);
goto search_next_child;
}
}
array = ffd_atomic_load(&array->header.nextArray, FFD_ATOMIC_ACQUIRE);
}
/* if we got here, we couldn't find this child in our list. That means this child
* belongs to one of the chained SIGCHLD handlers. However, there might be another
* child that exited and does belong to us, so we need to check each one individually.
*/
#endif
for (i = 0; i < (int)sizeofarray(children.entries); ++i) { for (i = 0; i < (int)sizeofarray(children.entries); ++i) {
int pid = ffd_atomic_load(&children.entries[i].pid, FFD_ATOMIC_ACQUIRE); int pid = ffd_atomic_load(&children.entries[i].pid, FFD_ATOMIC_ACQUIRE);
if (pid > 0 && tryReaping(pid, &info)) { if (pid <= 0)
continue;
#ifdef HAVE_WAITID
/* The child might have been reaped by the block above in another thread,
* so first check if it's ready and, if it is, lock it */
if (!isChildReady(pid, &info) ||
!ffd_atomic_compare_exchange(&children.entries[i].pid, &pid, -1,
FFD_ATOMIC_RELAXED, FFD_ATOMIC_RELAXED))
continue;
#endif
if (tryReaping(pid, &info)) {
/* this is our child, send notification and free up this entry */ /* this is our child, send notification and free up this entry */
notifyAndFreeInfo(&children.header, &children.entries[i], &info); notifyAndFreeInfo(&children.header, &children.entries[i], &info);
} }
@ -239,7 +312,17 @@ static void sigchld_handler(int signum)
while (array != NULL) { while (array != NULL) {
for (i = 0; i < (int)sizeofarray(array->entries); ++i) { for (i = 0; i < (int)sizeofarray(array->entries); ++i) {
int pid = ffd_atomic_load(&array->entries[i].pid, FFD_ATOMIC_ACQUIRE); int pid = ffd_atomic_load(&array->entries[i].pid, FFD_ATOMIC_ACQUIRE);
if (pid > 0 && tryReaping(pid, &info)) { if (pid <= 0)
continue;
#ifdef HAVE_WAITID
/* The child might have been reaped by the block above in another thread,
* so first check if it's ready and, if it is, lock it */
if (!isChildReady(pid, &info) ||
!ffd_atomic_compare_exchange(&array->entries[i].pid, &pid, -1,
FFD_ATOMIC_RELAXED, FFD_ATOMIC_RELAXED))
continue;
#endif
if (tryReaping(pid, &info)) {
/* this is our child, send notification and free up this entry */ /* this is our child, send notification and free up this entry */
notifyAndFreeInfo(&array->header, &array->entries[i], &info); notifyAndFreeInfo(&array->header, &array->entries[i], &info);
} }
@ -249,6 +332,9 @@ static void sigchld_handler(int signum)
} }
} }
#ifdef HAVE_WAITID
chain_handler:
#endif
if (old_sigaction.sa_handler != SIG_IGN && old_sigaction.sa_handler != SIG_DFL) if (old_sigaction.sa_handler != SIG_IGN && old_sigaction.sa_handler != SIG_DFL)
old_sigaction.sa_handler(signum); old_sigaction.sa_handler(signum);
} }

View File

@ -1223,21 +1223,24 @@ void tst_QProcess::processInAThread()
void tst_QProcess::processesInMultipleThreads() void tst_QProcess::processesInMultipleThreads()
{ {
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
TestThread thread1; // run from 1 to 10 threads, but run at least some tests
TestThread thread2; // with more threads than the ideal
TestThread thread3; int threadCount = i;
if (i > 7)
threadCount = qMax(threadCount, QThread::idealThreadCount() + 2);
thread1.start(); QVector<TestThread *> threads(threadCount);
thread2.start(); for (int j = 0; j < threadCount; ++j)
thread3.start(); threads[j] = new TestThread;
for (int j = 0; j < threadCount; ++j)
QVERIFY(thread2.wait(10000)); threads[j]->start();
QVERIFY(thread3.wait(10000)); for (int j = 0; j < threadCount; ++j) {
QVERIFY(thread1.wait(10000)); QVERIFY(threads[j]->wait(10000));
}
QCOMPARE(thread1.code(), 0); for (int j = 0; j < threadCount; ++j) {
QCOMPARE(thread2.code(), 0); QCOMPARE(threads[j]->code(), 0);
QCOMPARE(thread3.code(), 0); }
qDeleteAll(threads);
} }
} }