Integrate QProcess into QIODevice's multistreaming infrastructure

As a result, this patch eliminates double-buffering in QProcess.

Change-Id: I436faa4a5ffc28ce77f959dd6089bef400ac39f6
Reviewed-by: Allan Sandfeld Jensen <allan.jensen@theqtcompany.com>
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@theqtcompany.com>
This commit is contained in:
Alex Trotsenko 2015-07-22 10:56:06 +03:00
parent 8f92baf5c9
commit 7ff655360f
7 changed files with 64 additions and 115 deletions

View File

@ -866,7 +866,8 @@ void QProcessPrivate::Channel::clear()
*/
QProcessPrivate::QProcessPrivate()
{
processChannel = QProcess::StandardOutput;
readBufferChunkSize = QRINGBUFFER_CHUNKSIZE;
writeBufferChunkSize = QRINGBUFFER_CHUNKSIZE;
processChannelMode = QProcess::SeparateChannels;
inputChannelMode = QProcess::ManagedInputChannel;
processError = QProcess::UnknownError;
@ -1017,10 +1018,15 @@ bool QProcessPrivate::tryReadFromChannel(Channel *channel)
if (available == 0)
available = 1; // always try to read at least one byte
char *ptr = channel->buffer.reserve(available);
QProcess::ProcessChannel channelIdx = (channel == &stdoutChannel
? QProcess::StandardOutput
: QProcess::StandardError);
Q_ASSERT(readBuffers.size() > int(channelIdx));
QRingBuffer &readBuffer = readBuffers[int(channelIdx)];
char *ptr = readBuffer.reserve(available);
qint64 readBytes = readFromChannel(channel, ptr, available);
if (readBytes <= 0)
channel->buffer.chop(available);
readBuffer.chop(available);
if (readBytes == -2) {
// EWOULDBLOCK
return false;
@ -1048,18 +1054,17 @@ bool QProcessPrivate::tryReadFromChannel(Channel *channel)
#endif
if (channel->closed) {
channel->buffer.chop(readBytes);
readBuffer.chop(readBytes);
return false;
}
channel->buffer.chop(available - readBytes);
readBuffer.chop(available - readBytes);
bool didRead = false;
bool isStdout = channel == &stdoutChannel;
if (readBytes == 0) {
if (channel->notifier)
channel->notifier->setEnabled(false);
} else if ((processChannel == QProcess::StandardOutput) == isStdout) {
} else if (currentReadChannel == channelIdx) {
didRead = true;
if (!emittedReadyRead) {
emittedReadyRead = true;
@ -1067,7 +1072,8 @@ bool QProcessPrivate::tryReadFromChannel(Channel *channel)
emittedReadyRead = false;
}
}
if (isStdout)
emit q->channelReadyRead(int(channelIdx));
if (channelIdx == QProcess::StandardOutput)
emit q->readyReadStandardOutput(QProcess::QPrivateSignal());
else
emit q->readyReadStandardError(QProcess::QPrivateSignal());
@ -1099,15 +1105,14 @@ bool QProcessPrivate::_q_canWrite()
if (stdinChannel.notifier)
stdinChannel.notifier->setEnabled(false);
if (stdinChannel.buffer.isEmpty()) {
if (writeBuffer.isEmpty()) {
#if defined QPROCESS_DEBUG
qDebug("QProcessPrivate::canWrite(), not writing anything (empty write buffer).");
#endif
return false;
}
qint64 written = writeToStdin(stdinChannel.buffer.readPointer(),
stdinChannel.buffer.nextDataBlockSize());
qint64 written = writeToStdin(writeBuffer.readPointer(), writeBuffer.nextDataBlockSize());
if (written < 0) {
closeChannel(&stdinChannel);
setErrorAndEmit(QProcess::WriteError);
@ -1119,16 +1124,17 @@ bool QProcessPrivate::_q_canWrite()
#endif
if (written != 0) {
stdinChannel.buffer.free(written);
writeBuffer.free(written);
if (!emittedBytesWritten) {
emittedBytesWritten = true;
emit q->bytesWritten(written);
emittedBytesWritten = false;
}
emit q->channelBytesWritten(0, written);
}
if (stdinChannel.notifier && !stdinChannel.buffer.isEmpty())
if (stdinChannel.notifier && !writeBuffer.isEmpty())
stdinChannel.notifier->setEnabled(true);
if (stdinChannel.buffer.isEmpty() && stdinChannel.closed)
if (writeBuffer.isEmpty() && stdinChannel.closed)
closeWriteChannel();
return true;
}
@ -1387,7 +1393,7 @@ void QProcess::setInputChannelMode(InputChannelMode mode)
QProcess::ProcessChannel QProcess::readChannel() const
{
Q_D(const QProcess);
return d->processChannel;
return ProcessChannel(d->currentReadChannel);
}
/*!
@ -1400,20 +1406,7 @@ QProcess::ProcessChannel QProcess::readChannel() const
*/
void QProcess::setReadChannel(ProcessChannel channel)
{
Q_D(QProcess);
if (d->transactionStarted) {
qWarning("QProcess::setReadChannel: Failed due to the active read transaction");
return;
}
if (d->processChannel != channel) {
QRingBuffer *buffer = (d->processChannel == QProcess::StandardOutput)
? &d->stdoutChannel.buffer
: &d->stderrChannel.buffer;
d->buffer.read(buffer->reserveFront(d->buffer.size()), d->buffer.size());
}
d->processChannel = channel;
QIODevice::setCurrentReadChannel(int(channel));
}
/*!
@ -1458,7 +1451,7 @@ void QProcess::closeWriteChannel()
{
Q_D(QProcess);
d->stdinChannel.closed = true; // closing
if (d->stdinChannel.buffer.isEmpty())
if (d->writeBuffer.isEmpty())
d->closeWriteChannel();
}
@ -1719,11 +1712,7 @@ qint64 QProcess::processId() const
*/
bool QProcess::canReadLine() const
{
Q_D(const QProcess);
const QRingBuffer *readBuffer = (d->processChannel == QProcess::StandardError)
? &d->stderrChannel.buffer
: &d->stdoutChannel.buffer;
return readBuffer->canReadLine() || QIODevice::canReadLine();
return QIODevice::canReadLine();
}
/*!
@ -1733,11 +1722,13 @@ bool QProcess::canReadLine() const
*/
void QProcess::close()
{
Q_D(QProcess);
emit aboutToClose();
while (waitForBytesWritten(-1))
;
kill();
waitForFinished(-1);
d->setWriteChannelCount(0);
QIODevice::close();
}
@ -1748,11 +1739,7 @@ void QProcess::close()
*/
bool QProcess::atEnd() const
{
Q_D(const QProcess);
const QRingBuffer *readBuffer = (d->processChannel == QProcess::StandardError)
? &d->stderrChannel.buffer
: &d->stdoutChannel.buffer;
return QIODevice::atEnd() && (!isOpen() || readBuffer->isEmpty());
return QIODevice::atEnd();
}
/*! \reimp
@ -1766,25 +1753,16 @@ bool QProcess::isSequential() const
*/
qint64 QProcess::bytesAvailable() const
{
Q_D(const QProcess);
const QRingBuffer *readBuffer = (d->processChannel == QProcess::StandardError)
? &d->stderrChannel.buffer
: &d->stdoutChannel.buffer;
#if defined QPROCESS_DEBUG
qDebug("QProcess::bytesAvailable() == %i (%s)", readBuffer->size(),
(d->processChannel == QProcess::StandardError) ? "stderr" : "stdout");
#endif
return readBuffer->size() + QIODevice::bytesAvailable();
return QIODevice::bytesAvailable();
}
/*! \reimp
*/
qint64 QProcess::bytesToWrite() const
{
Q_D(const QProcess);
qint64 size = d->stdinChannel.buffer.size();
qint64 size = QIODevice::bytesToWrite();
#ifdef Q_OS_WIN
size += d->pipeWriterBytesToWrite();
size += d_func()->pipeWriterBytesToWrite();
#endif
return size;
}
@ -1923,9 +1901,9 @@ bool QProcess::waitForReadyRead(int msecs)
if (d->processState == QProcess::NotRunning)
return false;
if (d->processChannel == QProcess::StandardOutput && d->stdoutChannel.closed)
if (d->currentReadChannel == QProcess::StandardOutput && d->stdoutChannel.closed)
return false;
if (d->processChannel == QProcess::StandardError && d->stderrChannel.closed)
if (d->currentReadChannel == QProcess::StandardError && d->stderrChannel.closed)
return false;
return d->waitForReadyRead(msecs);
}
@ -2024,47 +2002,12 @@ void QProcess::setupChildProcess()
qint64 QProcess::readData(char *data, qint64 maxlen)
{
Q_D(QProcess);
Q_UNUSED(data);
if (!maxlen)
return 0;
QRingBuffer *readBuffer = (d->processChannel == QProcess::StandardError)
? &d->stderrChannel.buffer
: &d->stdoutChannel.buffer;
if (maxlen == 1 && !readBuffer->isEmpty()) {
int c = readBuffer->getChar();
if (c == -1) {
#if defined QPROCESS_DEBUG
qDebug("QProcess::readData(%p \"%s\", %d) == -1",
data, qt_prettyDebug(data, 1, maxlen).constData(), 1);
#endif
return -1;
}
*data = (char) c;
#if defined QPROCESS_DEBUG
qDebug("QProcess::readData(%p \"%s\", %d) == 1",
data, qt_prettyDebug(data, 1, maxlen).constData(), 1);
#endif
return 1;
}
qint64 bytesToRead = qMin(readBuffer->size(), maxlen);
qint64 readSoFar = 0;
while (readSoFar < bytesToRead) {
const char *ptr = readBuffer->readPointer();
qint64 bytesToReadFromThisBlock = qMin(bytesToRead - readSoFar,
readBuffer->nextDataBlockSize());
memcpy(data + readSoFar, ptr, bytesToReadFromThisBlock);
readSoFar += bytesToReadFromThisBlock;
readBuffer->free(bytesToReadFromThisBlock);
}
#if defined QPROCESS_DEBUG
qDebug("QProcess::readData(%p \"%s\", %lld) == %lld",
data, qt_prettyDebug(data, readSoFar, 16).constData(), maxlen, readSoFar);
#endif
if (!readSoFar && d->processState == QProcess::NotRunning)
if (d->processState == QProcess::NotRunning)
return -1; // EOF
return readSoFar;
return 0;
}
/*! \reimp
@ -2098,7 +2041,7 @@ qint64 QProcess::writeData(const char *data, qint64 len)
#endif
if (len == 1) {
d->stdinChannel.buffer.putChar(*data);
d->writeBuffer.putChar(*data);
#ifdef Q_OS_WIN
if (!d->stdinWriteTrigger->isActive())
d->stdinWriteTrigger->start();
@ -2113,7 +2056,7 @@ qint64 QProcess::writeData(const char *data, qint64 len)
return 1;
}
char *dest = d->stdinChannel.buffer.reserve(len);
char *dest = d->writeBuffer.reserve(len);
memcpy(dest, data, len);
#ifdef Q_OS_WIN
if (!d->stdinWriteTrigger->isActive())
@ -2271,10 +2214,6 @@ void QProcessPrivate::start(QIODevice::OpenMode mode)
qDebug() << "QProcess::start(" << program << ',' << arguments << ',' << mode << ')';
#endif
stdinChannel.buffer.clear();
stdoutChannel.buffer.clear();
stderrChannel.buffer.clear();
if (stdinChannel.type != QProcessPrivate::Channel::Normal)
mode &= ~QIODevice::WriteOnly; // not open for writing
if (stdoutChannel.type != QProcessPrivate::Channel::Normal &&
@ -2295,6 +2234,9 @@ void QProcessPrivate::start(QIODevice::OpenMode mode)
q->QIODevice::open(mode);
if (q->isReadable() && processChannelMode != QProcess::MergedChannels)
setReadChannelCount(2);
stdinChannel.closed = false;
stdoutChannel.closed = false;
stderrChannel.closed = false;

View File

@ -235,12 +235,12 @@ public:
QProcess::ExitStatus exitStatus() const;
// QIODevice
qint64 bytesAvailable() const Q_DECL_OVERRIDE;
qint64 bytesAvailable() const Q_DECL_OVERRIDE; // ### Qt6: remove trivial override
qint64 bytesToWrite() const Q_DECL_OVERRIDE;
bool isSequential() const Q_DECL_OVERRIDE;
bool canReadLine() const Q_DECL_OVERRIDE;
bool canReadLine() const Q_DECL_OVERRIDE; // ### Qt6: remove trivial override
void close() Q_DECL_OVERRIDE;
bool atEnd() const Q_DECL_OVERRIDE;
bool atEnd() const Q_DECL_OVERRIDE; // ### Qt6: remove trivial override
static int execute(const QString &program, const QStringList &arguments);
static int execute(const QString &command);

View File

@ -56,7 +56,6 @@
#include "QtCore/qstringlist.h"
#include "QtCore/qhash.h"
#include "QtCore/qshareddata.h"
#include "private/qringbuffer_p.h"
#include "private/qiodevice_p.h"
#ifdef Q_OS_UNIX
#include <QtCore/private/qorderedmutexlocker_p.h>
@ -293,7 +292,6 @@ public:
QWindowsPipeWriter *writer;
};
#endif
QRingBuffer buffer;
Q_PIPE pipe[2];
unsigned type : 2;
@ -311,7 +309,6 @@ public:
bool _q_startupNotification();
bool _q_processDied();
QProcess::ProcessChannel processChannel;
QProcess::ProcessChannelMode processChannelMode;
QProcess::InputChannelMode inputChannelMode;
QProcess::ProcessError processError;

View File

@ -877,7 +877,7 @@ bool QProcessPrivate::waitForReadyRead(int msecs)
if (stderrChannel.pipe[0] != -1)
add_fd(nfds, stderrChannel.pipe[0], &fdread);
if (!stdinChannel.buffer.isEmpty() && stdinChannel.pipe[1] != -1)
if (!writeBuffer.isEmpty() && stdinChannel.pipe[1] != -1)
add_fd(nfds, stdinChannel.pipe[1], &fdwrite);
int timeout = qt_subtract_from_timeout(msecs, stopWatch.elapsed());
@ -899,12 +899,12 @@ bool QProcessPrivate::waitForReadyRead(int msecs)
bool readyReadEmitted = false;
if (stdoutChannel.pipe[0] != -1 && FD_ISSET(stdoutChannel.pipe[0], &fdread)) {
bool canRead = _q_canReadStandardOutput();
if (processChannel == QProcess::StandardOutput && canRead)
if (currentReadChannel == QProcess::StandardOutput && canRead)
readyReadEmitted = true;
}
if (stderrChannel.pipe[0] != -1 && FD_ISSET(stderrChannel.pipe[0], &fdread)) {
bool canRead = _q_canReadStandardError();
if (processChannel == QProcess::StandardError && canRead)
if (currentReadChannel == QProcess::StandardError && canRead)
readyReadEmitted = true;
}
if (readyReadEmitted)
@ -930,7 +930,7 @@ bool QProcessPrivate::waitForBytesWritten(int msecs)
QElapsedTimer stopWatch;
stopWatch.start();
while (!stdinChannel.buffer.isEmpty()) {
while (!writeBuffer.isEmpty()) {
fd_set fdread;
fd_set fdwrite;
@ -949,7 +949,7 @@ bool QProcessPrivate::waitForBytesWritten(int msecs)
add_fd(nfds, stderrChannel.pipe[0], &fdread);
if (!stdinChannel.buffer.isEmpty() && stdinChannel.pipe[1] != -1)
if (!writeBuffer.isEmpty() && stdinChannel.pipe[1] != -1)
add_fd(nfds, stdinChannel.pipe[1], &fdwrite);
int timeout = qt_subtract_from_timeout(msecs, stopWatch.elapsed());
@ -1015,7 +1015,7 @@ bool QProcessPrivate::waitForFinished(int msecs)
if (processState == QProcess::Running && forkfd != -1)
add_fd(nfds, forkfd, &fdread);
if (!stdinChannel.buffer.isEmpty() && stdinChannel.pipe[1] != -1)
if (!writeBuffer.isEmpty() && stdinChannel.pipe[1] != -1)
add_fd(nfds, stdinChannel.pipe[1], &fdwrite);
int timeout = qt_subtract_from_timeout(msecs, stopWatch.elapsed());

View File

@ -652,7 +652,7 @@ bool QProcessPrivate::waitForReadyRead(int msecs)
QIncrementalSleepTimer timer(msecs);
forever {
if (!stdinChannel.buffer.isEmpty() && !_q_canWrite())
if (!writeBuffer.isEmpty() && !_q_canWrite())
return false;
if (stdinChannel.writer && stdinChannel.writer->waitForWrite(0))
timer.resetIncrements();
@ -690,7 +690,7 @@ bool QProcessPrivate::waitForBytesWritten(int msecs)
// If we don't have pending data, and our write buffer is
// empty, we fail.
if (!pendingDataInPipe && stdinChannel.buffer.isEmpty())
if (!pendingDataInPipe && writeBuffer.isEmpty())
return false;
// If we don't have pending data and we do have data in our
@ -754,7 +754,7 @@ bool QProcessPrivate::waitForFinished(int msecs)
QIncrementalSleepTimer timer(msecs);
forever {
if (!stdinChannel.buffer.isEmpty() && !_q_canWrite())
if (!writeBuffer.isEmpty() && !_q_canWrite())
return false;
if (stdinChannel.writer && stdinChannel.writer->waitForWrite(0))
timer.resetIncrements();

View File

@ -56,10 +56,14 @@
QT_BEGIN_NAMESPACE
#ifndef QRINGBUFFER_CHUNKSIZE
#define QRINGBUFFER_CHUNKSIZE 4096
#endif
class QRingBuffer
{
public:
explicit inline QRingBuffer(int growth = 4096) :
explicit inline QRingBuffer(int growth = QRINGBUFFER_CHUNKSIZE) :
head(0), tail(0), tailBuffer(0), basicBlockSize(growth), bufferSize(0) { }
inline qint64 nextDataBlockSize() const {

View File

@ -493,9 +493,11 @@ void tst_QProcess::echoTest2()
QCOMPARE(process.error(), QProcess::Timedout);
process.write("Hello");
QSignalSpy spy0(&process, &QProcess::channelReadyRead);
QSignalSpy spy1(&process, &QProcess::readyReadStandardOutput);
QSignalSpy spy2(&process, &QProcess::readyReadStandardError);
QVERIFY(spy0.isValid());
QVERIFY(spy1.isValid());
QVERIFY(spy2.isValid());
@ -514,6 +516,7 @@ void tst_QProcess::echoTest2()
break;
}
QVERIFY(spy0.count() > 0);
QVERIFY(spy1.count() > 0);
QVERIFY(spy2.count() > 0);
@ -985,6 +988,9 @@ public:
this, &SoftExitProcess::terminateSlot);
break;
case 4:
setReadChannelMode(QProcess::MergedChannels);
connect(this, SIGNAL(channelReadyRead(int)), this, SLOT(terminateSlot()));
break;
default:
connect(this, &QProcess::stateChanged,
this, &SoftExitProcess::terminateSlot);
@ -1057,7 +1063,7 @@ void tst_QProcess::softExitInSlots()
{
QFETCH(QString, appName);
for (int i = 0; i < 5; ++i) {
for (int i = 0; i < 6; ++i) {
SoftExitProcess proc(i);
proc.writeAfterStart("OLEBOLE", 8); // include the \0
proc.start(appName);