CTF: Add zstd support for streaming

Adds zstd support for CTF streaming. The client can select which
compression scheme to use in the trace request.

Change-Id: I21423eda2c8223fd5d23eb5adaf216eb61a25501
Reviewed-by: Hatem ElKharashy <hatem.elkharashy@qt.io>
Reviewed-by: Janne Koskinen <janne.p.koskinen@qt.io>
This commit is contained in:
Antti Määttä 2023-06-19 12:31:06 +03:00
parent 4721721f60
commit 1909e1e14c
3 changed files with 105 additions and 27 deletions

View File

@ -22,3 +22,7 @@ qt_internal_add_plugin(QCtfTracePlugin
Qt6::Core Qt6::CorePrivate Qt6::Network Qt6::Core Qt6::CorePrivate Qt6::Network
) )
qt_internal_extend_target(QCtfTracePlugin CONDITION QT_FEATURE_zstd
LIBRARIES
WrapZSTD::WrapZSTD
)

View File

@ -4,31 +4,34 @@
#include <qloggingcategory.h> #include <qloggingcategory.h>
#include "qctfserver_p.h" #include "qctfserver_p.h"
#if QT_CONFIG(zstd)
#include <zstd.h>
#endif
using namespace Qt::Literals::StringLiterals; using namespace Qt::Literals::StringLiterals;
Q_LOGGING_CATEGORY(lcCtfInfoTrace, "qt.core.ctfserver", QtWarningMsg) Q_LOGGING_CATEGORY(lcCtfInfoTrace, "qt.core.ctfserver", QtWarningMsg)
TracePacket &TracePacket::writePacket(TracePacket &packet, QCborStreamWriter &cbor, int compression) #if QT_CONFIG(zstd)
static QByteArray zstdCompress(ZSTD_CCtx *&context, const QByteArray &data, int compression)
{ {
cbor.startMap(4); if (context == nullptr)
cbor.append("magic"_L1); context = ZSTD_createCCtx();
cbor.append(packet.PacketMagicNumber); qsizetype size = data.size();
cbor.append("name"_L1); size = ZSTD_COMPRESSBOUND(size);
cbor.append(QString::fromUtf8(packet.stream_name)); QByteArray compressed(size, Qt::Uninitialized);
cbor.append("flags"_L1); char *dst = compressed.data();
cbor.append(packet.flags); size_t n = ZSTD_compressCCtx(context, dst, size,
data.constData(), data.size(),
cbor.append("data"_L1); compression);
if (compression > 0) { if (ZSTD_isError(n)) {
QByteArray compressed = qCompress(packet.stream_data, compression); qCWarning(lcCtfInfoTrace) << "Compression with zstd failed: " << QString::fromUtf8(ZSTD_getErrorName(n));
cbor.append(compressed); return {};
} else {
cbor.append(packet.stream_data);
} }
compressed.truncate(n);
cbor.endMap(); return compressed;
return packet;
} }
#endif
QCtfServer::QCtfServer(QObject *parent) QCtfServer::QCtfServer(QObject *parent)
: QThread(parent) : QThread(parent)
@ -38,7 +41,15 @@ QCtfServer::QCtfServer(QObject *parent)
<< "sessionName"_L1 << "sessionName"_L1
<< "sessionTracepoints"_L1 << "sessionTracepoints"_L1
<< "flags"_L1 << "flags"_L1
<< "bufferSize"_L1; << "bufferSize"_L1
<< "compressionScheme"_L1;
}
QCtfServer::~QCtfServer()
{
#if QT_CONFIG(zstd)
ZSTD_freeCCtx(m_zstdCCtx);
#endif
} }
void QCtfServer::setHost(const QString &address) void QCtfServer::setHost(const QString &address)
@ -129,6 +140,9 @@ void QCtfServer::handleString(QCborStreamReader &cbor)
case RequestSessionTracepoints: case RequestSessionTracepoints:
m_req.sessionTracepoints = readString(cbor); m_req.sessionTracepoints = readString(cbor);
break; break;
case RequestCompressionScheme:
m_requestedCompressionScheme = readString(cbor);
break;
default: default:
// handle error // handle error
break; break;
@ -201,6 +215,47 @@ void QCtfServer::readCbor(QCborStreamReader &cbor)
} }
} }
void QCtfServer::writePacket(TracePacket &packet, QCborStreamWriter &cbor)
{
cbor.startMap(4);
cbor.append("magic"_L1);
cbor.append(packet.PacketMagicNumber);
cbor.append("name"_L1);
cbor.append(QString::fromUtf8(packet.stream_name));
cbor.append("flags"_L1);
cbor.append(packet.flags);
cbor.append("data"_L1);
if (m_compression > 0) {
QByteArray compressed;
#if QT_CONFIG(zstd)
if (m_requestedCompressionScheme == QStringLiteral("zstd"))
compressed = zstdCompress(m_zstdCCtx, packet.stream_data, m_compression);
else
#endif
compressed = qCompress(packet.stream_data, m_compression);
cbor.append(compressed);
} else {
cbor.append(packet.stream_data);
}
cbor.endMap();
}
bool QCtfServer::recognizedCompressionScheme() const
{
if (m_requestedCompressionScheme.isEmpty())
return true;
#if QT_CONFIG(zstd)
if (m_requestedCompressionScheme == QStringLiteral("zstd"))
return true;
#endif
if (m_requestedCompressionScheme == QStringLiteral("zlib"))
return true;
return false;
}
void QCtfServer::run() void QCtfServer::run()
{ {
m_server = new QTcpServer(); m_server = new QTcpServer();
@ -250,10 +305,21 @@ void QCtfServer::run()
m_socket->close(); m_socket->close();
} else { } else {
m_compression = m_req.flags & CompressionMask; m_compression = m_req.flags & CompressionMask;
#if QT_CONFIG(zstd)
m_compression = qMin(m_compression, ZSTD_maxCLevel());
#else
m_compression = qMin(m_compression, 9);
#endif
m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle); m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle);
m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u); m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u);
if (!recognizedCompressionScheme()) {
qCWarning(lcCtfInfoTrace) << "Client requested unrecognized compression scheme: " << m_requestedCompressionScheme;
m_requestedCompressionScheme.clear();
m_compression = 0;
}
qCInfo(lcCtfInfoTrace) << "request received: " << m_req.sessionName << ", " << m_req.sessionTracepoints; qCInfo(lcCtfInfoTrace) << "request received: " << m_req.sessionName << ", " << m_req.sessionTracepoints;
m_cb->handleSessionChange(); m_cb->handleSessionChange();
@ -273,7 +339,7 @@ void QCtfServer::run()
cbor.append(resp.serverName); cbor.append(resp.serverName);
if (m_compression) { if (m_compression) {
cbor.append("compressionScheme"_L1); cbor.append("compressionScheme"_L1);
cbor.append("zlib"_L1); cbor.append(m_requestedCompressionScheme);
} }
cbor.endMap(); cbor.endMap();
} }
@ -292,7 +358,7 @@ void QCtfServer::run()
{ {
QCborStreamWriter cbor(m_socket); QCborStreamWriter cbor(m_socket);
for (TracePacket &packet : packets) { for (TracePacket &packet : packets) {
TracePacket::writePacket(packet, cbor, m_compression); writePacket(packet, cbor);
if (!waitSocket()) if (!waitSocket())
break; break;
} }

View File

@ -28,8 +28,11 @@
#include <qcborstreamwriter.h> #include <qcborstreamwriter.h>
#include <qlist.h> #include <qlist.h>
typedef struct ZSTD_CCtx_s ZSTD_CCtx;
QT_BEGIN_NAMESPACE QT_BEGIN_NAMESPACE
class QCtfServer;
struct TracePacket struct TracePacket
{ {
static constexpr quint32 PacketMagicNumber = 0x100924da; static constexpr quint32 PacketMagicNumber = 0x100924da;
@ -66,8 +69,6 @@ struct TracePacket
flags = t.flags; flags = t.flags;
return *this; return *this;
} }
static TracePacket &writePacket(TracePacket &packet, QCborStreamWriter &cbor, int compression);
}; };
auto constexpr operator""_MB(quint64 s) -> quint64 auto constexpr operator""_MB(quint64 s) -> quint64
@ -115,8 +116,8 @@ public:
}; };
enum ServerFlags enum ServerFlags
{ {
CompressionMask = 15, CompressionMask = 255,
DontBufferOnIdle = 16, // not set -> the server is buffering even without client connection DontBufferOnIdle = 256, // not set -> the server is buffering even without client connection
// set -> the server is buffering only when client is connected // set -> the server is buffering only when client is connected
}; };
enum RequestIds enum RequestIds
@ -127,6 +128,7 @@ public:
RequestSessionTracepoints, RequestSessionTracepoints,
RequestFlags, RequestFlags,
RequestBufferSize, RequestBufferSize,
RequestCompressionScheme,
}; };
struct ServerCallback struct ServerCallback
@ -135,6 +137,7 @@ public:
virtual void handleStatusChange(ServerStatus status) = 0; virtual void handleStatusChange(ServerStatus status) = 0;
}; };
QCtfServer(QObject *parent = nullptr); QCtfServer(QObject *parent = nullptr);
~QCtfServer();
void setCallback(ServerCallback *cb); void setCallback(ServerCallback *cb);
void setHost(const QString &address); void setHost(const QString &address);
void setPort(int port); void setPort(int port);
@ -154,8 +157,9 @@ private:
void readCbor(QCborStreamReader &cbor); void readCbor(QCborStreamReader &cbor);
void handleString(QCborStreamReader &cbor); void handleString(QCborStreamReader &cbor);
void handleFixedWidth(QCborStreamReader &cbor); void handleFixedWidth(QCborStreamReader &cbor);
bool recognizedCompressionScheme() const;
void setStatusAndNotify(ServerStatus status); void setStatusAndNotify(ServerStatus status);
void writePacket(TracePacket &packet, QCborStreamWriter &cbor);
QMutex m_mutex; QMutex m_mutex;
QWaitCondition m_bufferHasData; QWaitCondition m_bufferHasData;
@ -176,6 +180,10 @@ private:
QAtomicInt m_stopping; QAtomicInt m_stopping;
bool m_bufferOnIdle = true; bool m_bufferOnIdle = true;
QString m_currentKey; QString m_currentKey;
QString m_requestedCompressionScheme;
#if QT_CONFIG(zstd)
ZSTD_CCtx *m_zstdCCtx = nullptr;
#endif
static constexpr quint32 ServerId = 1; static constexpr quint32 ServerId = 1;
static constexpr quint32 DefaultMaxPackets = 256; // 1 MB static constexpr quint32 DefaultMaxPackets = 256; // 1 MB