QRingBuffer: avoid reallocations of the data

Since its initial implementation, QRingBuffer had the following
fragilities in the architecture:

  - it does not guarantee validity of the pointers, if new data will
    be appended. As an example, passing an address of the QRingBuffer
    chunk as a parameter to the WriteFileEx() function on Windows
    requires the stability of the pointer. So, we can't add new data
    to the QRingBuffer until the overlapped operation completed
    (related issues were fixed for QWindowsPipeWriter and QSerialPort
    in 5.6 branch by introducing an intermediate byte array);
  - inefficient reallocations in reserve(), if a shared chunk was
    inserted in the queue (we can get a reallocation in the place
    where we don't expect it:

      char *writePtr = buffers.last().data() + tail;  <-  line #133

    ).

Proposed solution is to avoid reallocation by allocating a new
block instead. That was accomplished by introducing a QRingChunk
class which operates on a fixed byte array and implements head/tail
pointers strategy for each individual buffer in the queue. So,
QRingBuffer is no longer dependent on QByteArray's internal
shrink/growth algorithms.

Change-Id: I05abab0ad78e22e4815a196037dfc6eff85325d1
Reviewed-by: Oswald Buddenhagen <oswald.buddenhagen@qt.io>
Reviewed-by: Thiago Macieira <thiago.macieira@intel.com>
This commit is contained in:
Alex Trotsenko 2016-07-11 14:06:49 +03:00
parent 48ea14080e
commit 89b0364cde
3 changed files with 244 additions and 109 deletions

View File

@ -44,6 +44,46 @@
QT_BEGIN_NAMESPACE
void QRingChunk::allocate(int alloc)
{
Q_ASSERT(alloc > 0 && size() == 0);
if (chunk.size() < alloc || isShared())
chunk = QByteArray(alloc, Qt::Uninitialized);
}
void QRingChunk::detach()
{
Q_ASSERT(isShared());
const int chunkSize = size();
QByteArray x(chunkSize, Qt::Uninitialized);
::memcpy(x.data(), chunk.constData() + headOffset, chunkSize);
chunk = qMove(x);
headOffset = 0;
tailOffset = chunkSize;
}
QByteArray QRingChunk::toByteArray()
{
if (headOffset != 0 || tailOffset != chunk.size()) {
if (isShared())
return chunk.mid(headOffset, size());
if (headOffset != 0) {
char *ptr = chunk.data();
::memmove(ptr, ptr + headOffset, size());
tailOffset -= headOffset;
headOffset = 0;
}
chunk.reserve(0); // avoid that resizing needlessly reallocates
chunk.resize(tailOffset);
}
return chunk;
}
/*!
\internal
@ -54,12 +94,11 @@ QT_BEGIN_NAMESPACE
const char *QRingBuffer::readPointerAtPosition(qint64 pos, qint64 &length) const
{
if (pos >= 0) {
pos += head;
for (int i = 0; i < buffers.size(); ++i) {
length = (i == tailBuffer ? tail : buffers[i].size());
for (const QRingChunk &chunk : buffers) {
length = chunk.size();
if (length > pos) {
length -= pos;
return buffers[i].constData() + pos;
return chunk.data() + pos;
}
pos -= length;
}
@ -74,32 +113,31 @@ void QRingBuffer::free(qint64 bytes)
Q_ASSERT(bytes <= bufferSize);
while (bytes > 0) {
const qint64 blockSize = buffers.constFirst().size() - head;
const qint64 chunkSize = buffers.constFirst().size();
if (tailBuffer == 0 || blockSize > bytes) {
if (buffers.size() == 1 || chunkSize > bytes) {
QRingChunk &chunk = buffers.first();
// keep a single block around if it does not exceed
// the basic block size, to avoid repeated allocations
// between uses of the buffer
if (bufferSize <= bytes) {
if (buffers.constFirst().size() <= basicBlockSize) {
if (chunk.capacity() <= basicBlockSize && !chunk.isShared()) {
chunk.reset();
bufferSize = 0;
head = tail = 0;
} else {
clear(); // try to minify/squeeze us
}
} else {
Q_ASSERT(bytes < MaxByteArraySize);
head += int(bytes);
chunk.advance(bytes);
bufferSize -= bytes;
}
return;
}
bufferSize -= blockSize;
bytes -= blockSize;
bufferSize -= chunkSize;
bytes -= chunkSize;
buffers.removeFirst();
--tailBuffer;
head = 0;
}
}
@ -108,32 +146,25 @@ char *QRingBuffer::reserve(qint64 bytes)
if (bytes <= 0 || bytes >= MaxByteArraySize)
return 0;
const int chunkSize = qMax(basicBlockSize, int(bytes));
int tail = 0;
if (bufferSize == 0) {
if (buffers.isEmpty())
buffers.append(QByteArray(qMax(basicBlockSize, int(bytes)), Qt::Uninitialized));
buffers.append(QRingChunk(chunkSize));
else
buffers.first().resize(qMax(basicBlockSize, int(bytes)));
buffers.first().allocate(chunkSize);
} else {
const qint64 newSize = bytes + tail;
const QRingChunk &chunk = buffers.constLast();
// if need a new buffer
if (basicBlockSize == 0 || (newSize > buffers.constLast().capacity()
&& (tail >= basicBlockSize || newSize >= MaxByteArraySize))) {
// shrink this buffer to its current size
buffers.last().resize(tail);
// create a new QByteArray
buffers.append(QByteArray(qMax(basicBlockSize, int(bytes)), Qt::Uninitialized));
++tailBuffer;
tail = 0;
} else if (newSize > buffers.constLast().size()) {
buffers.last().resize(qMax(basicBlockSize, int(newSize)));
}
if (basicBlockSize == 0 || chunk.isShared() || bytes > chunk.available())
buffers.append(QRingChunk(chunkSize));
else
tail = chunk.size();
}
char *writePtr = buffers.last().data() + tail;
buffers.last().grow(bytes);
bufferSize += bytes;
tail += int(bytes);
return writePtr;
return buffers.last().data() + tail;
}
/*!
@ -146,29 +177,28 @@ char *QRingBuffer::reserveFront(qint64 bytes)
if (bytes <= 0 || bytes >= MaxByteArraySize)
return 0;
if (head < bytes || basicBlockSize == 0) {
if (head > 0) {
buffers.first().remove(0, head);
if (tailBuffer == 0)
tail -= head;
}
head = qMax(basicBlockSize, int(bytes));
if (bufferSize == 0) {
if (buffers.isEmpty())
buffers.prepend(QByteArray(head, Qt::Uninitialized));
else
buffers.first().resize(head);
tail = head;
const int chunkSize = qMax(basicBlockSize, int(bytes));
if (bufferSize == 0) {
if (buffers.isEmpty())
buffers.prepend(QRingChunk(chunkSize));
else
buffers.first().allocate(chunkSize);
buffers.first().grow(chunkSize);
buffers.first().advance(chunkSize - bytes);
} else {
const QRingChunk &chunk = buffers.constFirst();
// if need a new buffer
if (basicBlockSize == 0 || chunk.isShared() || bytes > chunk.head()) {
buffers.prepend(QRingChunk(chunkSize));
buffers.first().grow(chunkSize);
buffers.first().advance(chunkSize - bytes);
} else {
buffers.prepend(QByteArray(head, Qt::Uninitialized));
++tailBuffer;
buffers.first().advance(-bytes);
}
}
head -= int(bytes);
bufferSize += bytes;
return buffers.first().data() + head;
return buffers.first().data();
}
void QRingBuffer::chop(qint64 bytes)
@ -176,30 +206,31 @@ void QRingBuffer::chop(qint64 bytes)
Q_ASSERT(bytes <= bufferSize);
while (bytes > 0) {
if (tailBuffer == 0 || tail > bytes) {
const qint64 chunkSize = buffers.constLast().size();
if (buffers.size() == 1 || chunkSize > bytes) {
QRingChunk &chunk = buffers.last();
// keep a single block around if it does not exceed
// the basic block size, to avoid repeated allocations
// between uses of the buffer
if (bufferSize == bytes) {
if (buffers.constFirst().size() <= basicBlockSize) {
if (chunk.capacity() <= basicBlockSize && !chunk.isShared()) {
chunk.reset();
bufferSize = 0;
head = tail = 0;
} else {
clear(); // try to minify/squeeze us
}
} else {
Q_ASSERT(bytes < MaxByteArraySize);
tail -= int(bytes);
chunk.grow(-bytes);
bufferSize -= bytes;
}
return;
}
bufferSize -= tail;
bytes -= tail;
bufferSize -= chunkSize;
bytes -= chunkSize;
buffers.removeLast();
--tailBuffer;
tail = buffers.constLast().size();
}
}
@ -210,9 +241,6 @@ void QRingBuffer::clear()
buffers.erase(buffers.begin() + 1, buffers.end());
buffers.first().clear();
head = tail = 0;
tailBuffer = 0;
bufferSize = 0;
}
@ -221,13 +249,12 @@ qint64 QRingBuffer::indexOf(char c, qint64 maxLength, qint64 pos) const
if (maxLength <= 0 || pos < 0)
return -1;
qint64 index = -(pos + head);
for (int i = 0; i < buffers.size(); ++i) {
const qint64 nextBlockIndex = qMin(index + (i == tailBuffer ? tail : buffers[i].size()),
maxLength);
qint64 index = -pos;
for (const QRingChunk &chunk : buffers) {
const qint64 nextBlockIndex = qMin(index + chunk.size(), maxLength);
if (nextBlockIndex > 0) {
const char *ptr = buffers[i].constData();
const char *ptr = chunk.data();
if (index < 0) {
ptr -= index;
index = 0;
@ -271,19 +298,8 @@ QByteArray QRingBuffer::read()
if (bufferSize == 0)
return QByteArray();
QByteArray qba(buffers.takeFirst());
qba.reserve(0); // avoid that resizing needlessly reallocates
if (tailBuffer == 0) {
qba.resize(tail);
tail = 0;
} else {
--tailBuffer;
}
qba.remove(0, head); // does nothing if head is 0
head = 0;
bufferSize -= qba.size();
return qba;
bufferSize -= buffers.constFirst().size();
return buffers.takeFirst().toByteArray();
}
/*!
@ -296,13 +312,12 @@ qint64 QRingBuffer::peek(char *data, qint64 maxLength, qint64 pos) const
qint64 readSoFar = 0;
if (pos >= 0) {
pos += head;
for (int i = 0; readSoFar < maxLength && i < buffers.size(); ++i) {
qint64 blockLength = (i == tailBuffer ? tail : buffers[i].size());
qint64 blockLength = buffers[i].size();
if (pos < blockLength) {
blockLength = qMin(blockLength - pos, maxLength - readSoFar);
memcpy(data + readSoFar, buffers[i].constData() + pos, blockLength);
memcpy(data + readSoFar, buffers[i].data() + pos, blockLength);
readSoFar += blockLength;
pos = 0;
} else {
@ -335,18 +350,11 @@ void QRingBuffer::append(const char *data, qint64 size)
*/
void QRingBuffer::append(const QByteArray &qba)
{
if (tail == 0) {
if (buffers.isEmpty())
buffers.append(qba);
else
buffers.last() = qba;
} else {
buffers.last().resize(tail);
buffers.append(qba);
++tailBuffer;
}
tail = qba.size();
bufferSize += tail;
if (bufferSize != 0 || buffers.isEmpty())
buffers.append(QRingChunk(qba));
else
buffers.last().assign(qba);
bufferSize += qba.size();
}
qint64 QRingBuffer::readLine(char *data, qint64 maxLength)

View File

@ -53,7 +53,7 @@
#include <QtCore/private/qglobal_p.h>
#include <QtCore/qbytearray.h>
#include <QtCore/qlist.h>
#include <QtCore/qvector.h>
QT_BEGIN_NAMESPACE
@ -61,11 +61,129 @@ QT_BEGIN_NAMESPACE
#define QRINGBUFFER_CHUNKSIZE 4096
#endif
class QRingChunk
{
public:
// initialization and cleanup
inline QRingChunk() Q_DECL_NOTHROW :
headOffset(0), tailOffset(0)
{
}
inline QRingChunk(const QRingChunk &other) Q_DECL_NOTHROW :
chunk(other.chunk), headOffset(other.headOffset), tailOffset(other.tailOffset)
{
}
explicit inline QRingChunk(int alloc) :
chunk(alloc, Qt::Uninitialized), headOffset(0), tailOffset(0)
{
}
explicit inline QRingChunk(const QByteArray &qba) Q_DECL_NOTHROW :
chunk(qba), headOffset(0), tailOffset(qba.size())
{
}
inline QRingChunk &operator=(const QRingChunk &other) Q_DECL_NOTHROW
{
chunk = other.chunk;
headOffset = other.headOffset;
tailOffset = other.tailOffset;
return *this;
}
inline QRingChunk(QRingChunk &&other) Q_DECL_NOTHROW :
chunk(other.chunk), headOffset(other.headOffset), tailOffset(other.tailOffset)
{
other.headOffset = other.tailOffset = 0;
}
inline QRingChunk &operator=(QRingChunk &&other) Q_DECL_NOTHROW
{
swap(other);
return *this;
}
inline void swap(QRingChunk &other) Q_DECL_NOTHROW
{
chunk.swap(other.chunk);
qSwap(headOffset, other.headOffset);
qSwap(tailOffset, other.tailOffset);
}
// allocating and sharing
void allocate(int alloc);
inline bool isShared() const
{
return !chunk.isDetached();
}
Q_CORE_EXPORT void detach();
QByteArray toByteArray();
// getters
inline int head() const
{
return headOffset;
}
inline int size() const
{
return tailOffset - headOffset;
}
inline int capacity() const
{
return chunk.size();
}
inline int available() const
{
return chunk.size() - tailOffset;
}
inline const char *data() const
{
return chunk.constData() + headOffset;
}
inline char *data()
{
if (isShared())
detach();
return chunk.data() + headOffset;
}
// array management
inline void advance(int offset)
{
Q_ASSERT(headOffset + offset >= 0);
Q_ASSERT(size() - offset > 0);
headOffset += offset;
}
inline void grow(int offset)
{
Q_ASSERT(size() + offset > 0);
Q_ASSERT(head() + size() + offset <= capacity());
tailOffset += offset;
}
inline void assign(const QByteArray &qba)
{
chunk = qba;
headOffset = 0;
tailOffset = qba.size();
}
inline void reset()
{
headOffset = tailOffset = 0;
}
inline void clear()
{
assign(QByteArray());
}
private:
QByteArray chunk;
int headOffset, tailOffset;
};
class QRingBuffer
{
public:
explicit inline QRingBuffer(int growth = QRINGBUFFER_CHUNKSIZE) :
head(0), tail(0), tailBuffer(0), basicBlockSize(growth), bufferSize(0) { }
bufferSize(0), basicBlockSize(growth) { }
inline void setChunkSize(int size) {
basicBlockSize = size;
@ -76,11 +194,11 @@ public:
}
inline qint64 nextDataBlockSize() const {
return (tailBuffer == 0 ? tail : buffers.first().size()) - head;
return bufferSize == 0 ? Q_INT64_C(0) : buffers.first().size();
}
inline const char *readPointer() const {
return bufferSize == 0 ? nullptr : (buffers.first().constData() + head);
return bufferSize == 0 ? nullptr : buffers.first().data();
}
Q_CORE_EXPORT const char *readPointerAtPosition(qint64 pos, qint64 &length) const;
@ -114,14 +232,8 @@ public:
void ungetChar(char c)
{
if (head > 0) {
--head;
buffers.first()[head] = c;
++bufferSize;
} else {
char *ptr = reserveFront(1);
*ptr = c;
}
char *ptr = reserveFront(1);
*ptr = c;
}
@ -152,13 +264,12 @@ public:
}
private:
QList<QByteArray> buffers;
int head, tail;
int tailBuffer; // always buffers.size() - 1
int basicBlockSize;
QVector<QRingChunk> buffers;
qint64 bufferSize;
int basicBlockSize;
};
Q_DECLARE_SHARED(QRingChunk)
Q_DECLARE_TYPEINFO(QRingBuffer, Q_MOVABLE_TYPE);
QT_END_NAMESPACE

View File

@ -48,6 +48,7 @@ private slots:
void reserveAndReadInPacketMode();
void reserveFrontAndRead();
void chop();
void readPointerValidity();
void ungetChar();
void indexOf();
void appendAndRead();
@ -303,6 +304,21 @@ void tst_QRingBuffer::chop()
QVERIFY(memcmp(ringBuffer.readPointer(), "0123", 4) == 0);
}
void tst_QRingBuffer::readPointerValidity()
{
QRingBuffer ringBuffer(16);
QByteArray ba("Hello world!");
ringBuffer.append(ba);
const char *ptr = ringBuffer.readPointer();
ba.clear();
ringBuffer.reserve(32);
QVERIFY(ptr == ringBuffer.readPointer());
ringBuffer.reserveFront(32);
qint64 dummy;
QVERIFY(ptr == ringBuffer.readPointerAtPosition(32, dummy));
}
void tst_QRingBuffer::ungetChar()
{
QRingBuffer ringBuffer(16);