CTF: Add streaming support

Implement TCP server to stream trace data. Use the QTRACE_LOCATION
environment variable to configure the server address and port.

The rest of the configuration is done in the client TraceRequest.
The user can configure the compression level, buffer size and
whether the server is buffering when client doesn't have connection.

The server responds with basic response and then begins sending packets
until the client disconnects or the app closes.
Each packet contains stream name, stream data and whether to overwrite
the stream, which is used with metadata streams.

Task-number: QTBUG-105976
Change-Id: I41c708a4c7988666d59f0c6093dd41e8ccd88441
Reviewed-by: Janne Koskinen <janne.p.koskinen@qt.io>
This commit is contained in:
Antti Määttä 2023-03-02 13:21:01 +02:00
parent c6ab516f71
commit 74dd646e82
5 changed files with 686 additions and 78 deletions

View File

@ -17,7 +17,8 @@ qt_internal_add_plugin(QCtfTracePlugin
PLUGIN_TYPE tracing
SOURCES
qctflib_p.h qctflib.cpp metadata_template.txt qctfplugin.cpp qctfplugin_p.h
qctfserver_p.h qctfserver.cpp
LIBRARIES
Qt6::Core Qt6::CorePrivate
Qt6::Core Qt6::CorePrivate Qt6::Network
)

View File

@ -62,6 +62,39 @@ void QCtfLibImpl::cleanup()
s_instance = nullptr;
}
void QCtfLibImpl::handleSessionChange()
{
m_sessionChanged = true;
}
void QCtfLibImpl::handleStatusChange(QCtfServer::ServerStatus status)
{
switch (status) {
case QCtfServer::Error: {
m_serverClosed = true;
} break;
default:
break;
}
}
void QCtfLibImpl::buildMetadata()
{
const QString mhn = QSysInfo::machineHostName();
QString metadata = QString::fromUtf8(traceMetadataTemplate, traceMetadataSize);
metadata.replace(QStringLiteral("$TRACE_UUID"), s_TraceUuid.toString(QUuid::WithoutBraces));
metadata.replace(QStringLiteral("$ARC_BIT_WIDTH"), QString::number(Q_PROCESSOR_WORDSIZE * 8));
metadata.replace(QStringLiteral("$SESSION_NAME"), m_session.name);
metadata.replace(QStringLiteral("$CREATION_TIME"), m_datetime.toString(Qt::ISODate));
metadata.replace(QStringLiteral("$HOST_NAME"), mhn);
metadata.replace(QStringLiteral("$CLOCK_FREQUENCY"), QStringLiteral("1000000000"));
metadata.replace(QStringLiteral("$CLOCK_NAME"), QStringLiteral("monotonic"));
metadata.replace(QStringLiteral("$CLOCK_TYPE"), QStringLiteral("Monotonic clock"));
metadata.replace(QStringLiteral("$CLOCK_OFFSET"), QString::number(m_datetime.toMSecsSinceEpoch() * 1000000));
metadata.replace(QStringLiteral("$ENDIANNESS"), QSysInfo::ByteOrder == QSysInfo::BigEndian ? u"be"_s : u"le"_s);
writeMetadata(metadata, true);
}
QCtfLibImpl::QCtfLibImpl()
{
QString location = qEnvironmentVariable("QTRACE_LOCATION");
@ -70,71 +103,71 @@ QCtfLibImpl::QCtfLibImpl()
return;
}
// Check if the location is writable
if (QT_ACCESS(qPrintable(location), W_OK) != 0) {
qCWarning(lcDebugTrace) << "Unable to write to location";
return;
}
const QString filename = location + QStringLiteral("/session.json");
FILE *file = openFile(qPrintable(filename), "rb"_L1);
if (!file) {
qCWarning(lcDebugTrace) << "unable to open session file: "
<< filename << ", " << qt_error_string();
m_location = location;
if (location.startsWith(QStringLiteral("tcp"))) {
QUrl url(location);
m_server.reset(new QCtfServer());
m_server->setCallback(this);
m_server->setHost(url.host());
m_server->setPort(url.port());
m_server->startServer();
m_streaming = true;
m_session.tracepoints.append(QStringLiteral("all"));
m_session.name = QStringLiteral("default");
} else {
QT_STATBUF stat;
if (QT_FSTAT(QT_FILENO(file), &stat) != 0) {
qCWarning(lcDebugTrace) << "Unable to stat session file, " << qt_error_string();
// Check if the location is writable
if (QT_ACCESS(qPrintable(location), W_OK) != 0) {
qCWarning(lcDebugTrace) << "Unable to write to location";
return;
}
qsizetype filesize = qMin(stat.st_size, std::numeric_limits<qsizetype>::max());
QByteArray data(filesize, Qt::Uninitialized);
qsizetype size = static_cast<qsizetype>(fread(data.data(), 1, filesize, file));
fclose(file);
if (size != filesize)
return;
QJsonDocument json(QJsonDocument::fromJson(data));
QJsonObject obj = json.object();
bool valid = false;
if (!obj.isEmpty()) {
const auto it = obj.begin();
if (it.value().isArray()) {
m_session.name = it.key();
for (auto var : it.value().toArray())
m_session.tracepoints.append(var.toString());
valid = true;
}
}
if (!valid) {
qCWarning(lcDebugTrace) << "Session file is not valid";
const QString filename = location + QStringLiteral("/session.json");
FILE *file = openFile(qPrintable(filename), "rb"_L1);
if (!file) {
qCWarning(lcDebugTrace) << "unable to open session file: "
<< filename << ", " << qt_error_string();
m_location = location;
m_session.tracepoints.append(QStringLiteral("all"));
m_session.name = QStringLiteral("default");
} else {
QT_STATBUF stat;
if (QT_FSTAT(QT_FILENO(file), &stat) != 0) {
qCWarning(lcDebugTrace) << "Unable to stat session file, " << qt_error_string();
return;
}
qsizetype filesize = qMin(stat.st_size, std::numeric_limits<qsizetype>::max());
QByteArray data(filesize, Qt::Uninitialized);
qsizetype size = static_cast<qsizetype>(fread(data.data(), 1, filesize, file));
fclose(file);
if (size != filesize)
return;
QJsonDocument json(QJsonDocument::fromJson(data));
QJsonObject obj = json.object();
bool valid = false;
if (!obj.isEmpty()) {
const auto it = obj.begin();
if (it.value().isArray()) {
m_session.name = it.key();
for (auto var : it.value().toArray())
m_session.tracepoints.append(var.toString());
valid = true;
}
}
if (!valid) {
qCWarning(lcDebugTrace) << "Session file is not valid";
m_session.tracepoints.append(QStringLiteral("all"));
m_session.name = QStringLiteral("default");
}
m_location = location + QStringLiteral("/ust");
std::filesystem::create_directory(qPrintable(m_location), qPrintable(location));
}
m_location = location + QStringLiteral("/ust");
std::filesystem::create_directory(qPrintable(m_location), qPrintable(location));
clearLocation();
}
m_session.all = m_session.tracepoints.contains(QStringLiteral("all"));
auto datetime = QDateTime::currentDateTime().toUTC();
const QString mhn = QSysInfo::machineHostName();
QString metadata = QString::fromUtf8(traceMetadataTemplate, traceMetadataSize);
metadata.replace(QStringLiteral("$TRACE_UUID"), s_TraceUuid.toString(QUuid::WithoutBraces));
metadata.replace(QStringLiteral("$ARC_BIT_WIDTH"), QString::number(Q_PROCESSOR_WORDSIZE * 8));
metadata.replace(QStringLiteral("$SESSION_NAME"), m_session.name);
metadata.replace(QStringLiteral("$CREATION_TIME"), datetime.toString(Qt::ISODate));
metadata.replace(QStringLiteral("$HOST_NAME"), mhn);
metadata.replace(QStringLiteral("$CLOCK_FREQUENCY"), QStringLiteral("1000000000"));
metadata.replace(QStringLiteral("$CLOCK_NAME"), QStringLiteral("monotonic"));
metadata.replace(QStringLiteral("$CLOCK_TYPE"), QStringLiteral("Monotonic clock"));
metadata.replace(QStringLiteral("$CLOCK_OFFSET"), QString::number(datetime.toMSecsSinceEpoch() * 1000000));
metadata.replace(QStringLiteral("$ENDIANNESS"), QSysInfo::ByteOrder == QSysInfo::BigEndian ? u"be"_s : u"le"_s);
writeMetadata(metadata, true);
m_session.all = m_session.tracepoints.contains(QStringLiteral("all"));
// Get datetime to when the timer was started to store the offset to epoch time for the traces
m_datetime = QDateTime::currentDateTime().toUTC();
m_timer.start();
if (!m_streaming)
buildMetadata();
}
void QCtfLibImpl::clearLocation()
@ -163,25 +196,32 @@ void QCtfLibImpl::clearLocation()
void QCtfLibImpl::writeMetadata(const QString &metadata, bool overwrite)
{
FILE *file = nullptr;
file = openFile(qPrintable(m_location + "/metadata"_L1), overwrite ? "w+b"_L1: "ab"_L1);
if (!file)
return;
if (m_streaming) {
auto mt = metadata.toUtf8();
mt.resize(mt.size() - 1);
m_server->bufferData(QStringLiteral("metadata"), mt, overwrite);
} else {
FILE *file = nullptr;
file = openFile(qPrintable(m_location + "/metadata"_L1), overwrite ? "w+b"_L1: "ab"_L1);
if (!file)
return;
if (!overwrite)
fputs("\n", file);
if (!overwrite)
fputs("\n", file);
// data contains zero at the end, hence size - 1.
const QByteArray data = metadata.toUtf8();
fwrite(data.data(), data.size() - 1, 1, file);
fclose(file);
// data contains zero at the end, hence size - 1.
const QByteArray data = metadata.toUtf8();
fwrite(data.data(), data.size() - 1, 1, file);
fclose(file);
}
}
void QCtfLibImpl::writeCtfPacket(QCtfLibImpl::Channel &ch)
{
FILE *file = nullptr;
file = openFile(ch.channelName, "ab"_L1);
if (file) {
if (!m_streaming)
file = openFile(ch.channelName, "ab"_L1);
if (file || m_streaming) {
/* Each packet contains header and context, which are defined in the metadata.txt */
QByteArray packet;
packet << s_CtfHeaderMagic;
@ -201,29 +241,56 @@ void QCtfLibImpl::writeCtfPacket(QCtfLibImpl::Channel &ch)
Q_ASSERT(ch.data.size() + packetHeaderSize + ch.threadNameLength <= packetSize);
Q_ASSERT(packet.size() == qsizetype(packetHeaderSize + ch.threadNameLength));
fwrite(packet.data(), packet.size(), 1, file);
ch.data.resize(packetSize - packet.size(), 0);
fwrite(ch.data.data(), ch.data.size(), 1, file);
fclose(file);
if (m_streaming) {
ch.data.resize(packetSize - packet.size(), 0);
packet += ch.data;
m_server->bufferData(QString::fromLatin1(ch.channelName), packet, false);
} else {
fwrite(packet.data(), packet.size(), 1, file);
ch.data.resize(packetSize - packet.size(), 0);
fwrite(ch.data.data(), ch.data.size(), 1, file);
fclose(file);
}
}
}
QCtfLibImpl::Channel::~Channel()
{
impl->writeCtfPacket(*this);
}
QCtfLibImpl::~QCtfLibImpl()
{
if (!m_server.isNull())
m_server->stopServer();
qDeleteAll(m_eventPrivs);
}
bool QCtfLibImpl::tracepointEnabled(const QCtfTracePointEvent &point)
{
if (m_sessionChanged) {
const QMutexLocker lock(&m_mutex);
buildMetadata();
m_session.name = m_server->sessionName();
m_session.tracepoints = m_server->sessionTracepoints().split(';');
m_session.all = m_session.tracepoints.contains(QStringLiteral("all"));
m_sessionChanged = false;
for (const auto &meta : m_additionalMetadata)
writeMetadata(meta->metadata);
for (auto *priv : m_eventPrivs)
writeMetadata(priv->metadata);
quint64 timestamp = m_timer.nsecsElapsed();
for (auto *ch : m_channels) {
writeCtfPacket(*ch);
ch->data.clear();
ch->minTimestamp = ch->maxTimestamp = timestamp;
}
}
if (m_streaming && (m_serverClosed || (!m_server->bufferOnIdle() && m_server->status() == QCtfServer::Idle)))
return false;
return m_session.all || m_session.tracepoints.contains(point.provider.provider);
}
QCtfLibImpl::Channel::~Channel()
{
if (data.size())
QCtfLibImpl::writeCtfPacket(*this);
}
static QString toMetadata(const QString &provider, const QString &name, const QString &metadata, quint32 eventId)
{
/*
@ -268,6 +335,8 @@ void QCtfLibImpl::doTracepoint(const QCtfTracePointEvent &point, const QByteArra
QCtfTracePointPrivate *priv = point.d;
quint64 timestamp = 0;
QThread *thread = nullptr;
if (m_streaming && m_serverClosed)
return;
{
QMutexLocker lock(&m_mutex);
if (!priv->metadataWritten) {
@ -300,6 +369,8 @@ void QCtfLibImpl::doTracepoint(const QCtfTracePointEvent &point, const QByteArra
Channel &ch = m_threadData.localData();
if (ch.channelName[0] == 0) {
ch.impl = this;
m_channels.append(&ch);
m_threadIndices.insert(thread, m_threadIndices.size());
ch.minTimestamp = ch.maxTimestamp = timestamp;
ch.thread = thread;

View File

@ -26,6 +26,7 @@
#include <qthreadstorage.h>
#include <qthread.h>
#include <qloggingcategory.h>
#include "qctfserver_p.h"
QT_BEGIN_NAMESPACE
@ -39,7 +40,7 @@ struct QCtfTracePointPrivate
bool metadataWritten = false;
};
class QCtfLibImpl : public QCtfLib
class QCtfLibImpl : public QCtfLib, public QCtfServer::ServerCallback
{
struct Session
{
@ -59,6 +60,7 @@ class QCtfLibImpl : public QCtfLib
QByteArray threadName;
quint32 threadNameLength = 0;
bool locked = false;
QCtfLibImpl *impl = nullptr;
Channel()
{
memset(channelName, 0, sizeof(channelName));
@ -90,7 +92,10 @@ private:
void updateMetadata(const QCtfTracePointEvent &point);
void writeMetadata(const QString &metadata, bool overwrite = false);
void clearLocation();
static void writeCtfPacket(Channel &ch);
void handleSessionChange() override;
void handleStatusChange(QCtfServer::ServerStatus status) override;
void writeCtfPacket(Channel &ch);
void buildMetadata();
static constexpr QUuid s_TraceUuid = QUuid(0x3e589c95, 0xed11, 0xc159, 0x42, 0x02, 0x6a, 0x9b, 0x02, 0x00, 0x12, 0xac);
static constexpr quint32 s_CtfHeaderMagic = 0xC1FC1FC1;
@ -102,9 +107,16 @@ private:
Session m_session;
QHash<QThread*, quint32> m_threadIndices;
QThreadStorage<Channel> m_threadData;
QList<Channel *> m_channels;
QHash<QString, const QCtfTraceMetadata *> m_additionalMetadata;
QSet<QString> m_newAdditionalMetadata;
QDateTime m_datetime;
int m_eventId = 0;
bool m_streaming = false;
std::atomic_bool m_sessionChanged = false;
std::atomic_bool m_serverClosed = false;
QScopedPointer<QCtfServer> m_server;
friend struct Channel;
};
QT_END_NAMESPACE

View File

@ -0,0 +1,338 @@
// Copyright (C) 2023 The Qt Company Ltd.
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
#include <qloggingcategory.h>
#include "qctfserver_p.h"
using namespace Qt::Literals::StringLiterals;
Q_LOGGING_CATEGORY(lcCtfInfoTrace, "qt.core.ctfserver", QtWarningMsg)
TracePacket &TracePacket::writePacket(TracePacket &packet, QCborStreamWriter &cbor, int compression)
{
cbor.startMap(4);
cbor.append("magic"_L1);
cbor.append(packet.PacketMagicNumber);
cbor.append("name"_L1);
cbor.append(QString::fromUtf8(packet.stream_name));
cbor.append("flags"_L1);
cbor.append(packet.flags);
cbor.append("data"_L1);
if (compression > 0) {
QByteArray compressed = qCompress(packet.stream_data, compression);
cbor.append(compressed);
} else {
cbor.append(packet.stream_data);
}
cbor.endMap();
return packet;
}
QCtfServer::QCtfServer(QObject *parent)
: QThread(parent)
{
m_keySet << "cliendId"_L1
<< "clientVersion"_L1
<< "sessionName"_L1
<< "sessionTracepoints"_L1
<< "flags"_L1
<< "bufferSize"_L1;
}
void QCtfServer::setHost(const QString &address)
{
m_address = address;
}
void QCtfServer::setPort(int port)
{
m_port = port;
}
void QCtfServer::setCallback(ServerCallback *cb)
{
m_cb = cb;
}
QString QCtfServer::sessionName() const
{
return m_req.sessionName;
}
QString QCtfServer::sessionTracepoints() const
{
return m_req.sessionTracepoints;
}
bool QCtfServer::bufferOnIdle() const
{
return m_bufferOnIdle;
}
QCtfServer::ServerStatus QCtfServer::status() const
{
return m_status;
}
void QCtfServer::setStatusAndNotify(ServerStatus status)
{
m_status = status;
m_cb->handleStatusChange(status);
}
void QCtfServer::bytesWritten(qint64 size)
{
m_writtenSize += size;
if (m_writtenSize >= m_waitWriteSize && m_eventLoop)
m_eventLoop->exit();
}
void QCtfServer::initWrite()
{
m_waitWriteSize = 0;
m_writtenSize = 0;
}
bool QCtfServer::waitSocket()
{
if (m_eventLoop)
m_eventLoop->exec();
return m_socket->state() == QTcpSocket::ConnectedState;
}
void QCtfServer::handleString(QCborStreamReader &cbor)
{
const auto readString = [](QCborStreamReader &cbor) -> QString {
QString result;
auto r = cbor.readString();
while (r.status == QCborStreamReader::Ok) {
result += r.data;
r = cbor.readString();
}
if (r.status == QCborStreamReader::Error) {
// handle error condition
result.clear();
}
return result;
};
do {
if (m_currentKey.isEmpty()) {
m_currentKey = readString(cbor);
} else {
switch (m_keySet.indexOf(m_currentKey)) {
case RequestSessionName:
m_req.sessionName = readString(cbor);
break;
case RequestSessionTracepoints:
m_req.sessionTracepoints = readString(cbor);
break;
default:
// handle error
break;
}
m_currentKey.clear();
}
if (cbor.lastError() == QCborError::EndOfFile) {
if (!waitSocket())
return;
cbor.reparse();
}
} while (cbor.lastError() == QCborError::EndOfFile);
}
void QCtfServer::handleFixedWidth(QCborStreamReader &cbor)
{
switch (m_keySet.indexOf(m_currentKey)) {
case RequestClientId:
if (!cbor.isUnsignedInteger())
return;
m_req.clientId = cbor.toUnsignedInteger();
break;
case RequestClientVersion:
if (!cbor.isUnsignedInteger())
return;
m_req.clientVersion = cbor.toUnsignedInteger();
break;
case RequestFlags:
if (!cbor.isUnsignedInteger())
return;
m_req.flags = cbor.toUnsignedInteger();
break;
case RequestBufferSize:
if (!cbor.isUnsignedInteger())
return;
m_req.bufferSize = cbor.toUnsignedInteger();
break;
default:
// handle error
break;
}
m_currentKey.clear();
}
void QCtfServer::readCbor(QCborStreamReader &cbor)
{
switch (cbor.type()) {
case QCborStreamReader::UnsignedInteger:
case QCborStreamReader::NegativeInteger:
case QCborStreamReader::SimpleType:
case QCborStreamReader::Float16:
case QCborStreamReader::Float:
case QCborStreamReader::Double:
handleFixedWidth(cbor);
cbor.next();
break;
case QCborStreamReader::ByteArray:
case QCborStreamReader::String:
handleString(cbor);
break;
case QCborStreamReader::Array:
case QCborStreamReader::Map:
cbor.enterContainer();
while (cbor.lastError() == QCborError::NoError && cbor.hasNext())
readCbor(cbor);
if (cbor.lastError() == QCborError::NoError)
cbor.leaveContainer();
default:
break;
}
}
void QCtfServer::run()
{
m_server = new QTcpServer();
QHostAddress addr;
if (m_address.isEmpty())
addr = QHostAddress(QHostAddress::Any);
else
addr = QHostAddress(m_address);
qCInfo(lcCtfInfoTrace) << "Starting CTF server: " << m_address << ", port: " << m_port;
while (m_stopping == 0) {
if (!m_server->isListening()) {
if (!m_server->listen(addr, m_port)) {
qCInfo(lcCtfInfoTrace) << "Unable to start server";
m_stopping = 1;
setStatusAndNotify(Error);
}
}
setStatusAndNotify(Idle);
if (m_server->waitForNewConnection(-1)) {
qCInfo(lcCtfInfoTrace) << "client connection";
m_eventLoop = new QEventLoop();
m_socket = m_server->nextPendingConnection();
QObject::connect(m_socket, &QTcpSocket::readyRead, [&](){
if (m_eventLoop) m_eventLoop->exit();
});
QObject::connect(m_socket, &QTcpSocket::bytesWritten, this, &QCtfServer::bytesWritten);
QObject::connect(m_socket, &QTcpSocket::disconnected, [&](){
if (m_eventLoop) m_eventLoop->exit();
});
m_server->close(); // Do not wait for more connections
setStatusAndNotify(Connected);
if (waitSocket())
{
QCborStreamReader cbor(m_socket);
m_req = {};
while (cbor.hasNext() && cbor.lastError() == QCborError::NoError)
readCbor(cbor);
if (!m_req.isValid()) {
qCInfo(lcCtfInfoTrace) << "Invalid trace request.";
m_socket->close();
} else {
m_compression = m_req.flags & CompressionMask;
m_bufferOnIdle = !(m_req.flags & DontBufferOnIdle);
m_maxPackets = qMax(m_req.bufferSize / TracePacket::PacketSize, 16u);
qCInfo(lcCtfInfoTrace) << "request received: " << m_req.sessionName << ", " << m_req.sessionTracepoints;
m_cb->handleSessionChange();
{
TraceResponse resp;
resp.serverId = ServerId;
resp.serverVersion = 1;
resp.serverName = QStringLiteral("Ctf Server");
QCborStreamWriter cbor(m_socket);
cbor.startMap(m_compression ? 4 : 3);
cbor.append("serverId"_L1);
cbor.append(resp.serverId);
cbor.append("serverVersion"_L1);
cbor.append(resp.serverVersion);
cbor.append("serverName"_L1);
cbor.append(resp.serverName);
if (m_compression) {
cbor.append("compressionScheme"_L1);
cbor.append("zlib"_L1);
}
cbor.endMap();
}
qCInfo(lcCtfInfoTrace) << "response sent, sending data";
if (waitSocket()) {
while (m_socket->state() == QTcpSocket::ConnectedState) {
QList<TracePacket> packets;
{
QMutexLocker lock(&m_mutex);
while (m_packets.size() == 0)
m_bufferHasData.wait(&m_mutex);
packets = std::exchange(m_packets, {});
}
{
QCborStreamWriter cbor(m_socket);
for (TracePacket &packet : packets) {
TracePacket::writePacket(packet, cbor, m_compression);
if (!waitSocket())
break;
}
}
qCInfo(lcCtfInfoTrace) << packets.size() << " packets written";
}
}
qCInfo(lcCtfInfoTrace) << "client connection closed";
}
}
delete m_eventLoop;
m_eventLoop = nullptr;
} else {
qCInfo(lcCtfInfoTrace) << "error: " << m_server->errorString();
m_stopping = 1;
setStatusAndNotify(Error);
}
}
}
void QCtfServer::startServer()
{
start();
}
void QCtfServer::stopServer()
{
this->m_stopping = 1;
wait();
}
void QCtfServer::bufferData(const QString &stream, const QByteArray &data, quint32 flags)
{
QMutexLocker lock(&m_mutex);
TracePacket packet;
packet.stream_name = stream.toUtf8();
packet.stream_data = data;
packet.flags = flags;
m_packets.append(packet);
if (m_packets.size() > m_maxPackets)
m_packets.pop_front();
m_bufferHasData.wakeOne();
}

View File

@ -0,0 +1,186 @@
// Copyright (C) 2023 The Qt Company Ltd.
// SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
#ifndef QT_CTFSERVER_H
#define QT_CTFSERVER_H
//
// W A R N I N G
// -------------
//
// This file is not part of the Qt API. It exists purely as an
// implementation detail. This header file may change from version to
// version without notice, or even be removed.
//
// We mean it.
//
//
#include <qbytearray.h>
#include <qdatastream.h>
#include <qthread.h>
#include <qmutex.h>
#include <qwaitcondition.h>
#include <qeventloop.h>
#include <QtNetwork/qtcpserver.h>
#include <QtNetwork/qtcpsocket.h>
#include <qcborstreamreader.h>
#include <qcborstreamwriter.h>
#include <qlist.h>
QT_BEGIN_NAMESPACE
struct TracePacket
{
static constexpr quint32 PacketMagicNumber = 0x100924da;
static constexpr quint32 PacketSize = 4096 + 9;
QByteArray stream_name;
QByteArray stream_data;
quint32 flags = 0;
TracePacket() = default;
TracePacket(const TracePacket &t)
{
stream_name = t.stream_name;
stream_data = t.stream_data;
flags = t.flags;
}
TracePacket &operator = (const TracePacket &t)
{
stream_name = t.stream_name;
stream_data = t.stream_data;
flags = t.flags;
return *this;
}
TracePacket(TracePacket &&t)
{
stream_name = std::move(t.stream_name);
stream_data = std::move(t.stream_data);
flags = t.flags;
}
TracePacket &operator = (TracePacket &&t)
{
stream_name = std::move(t.stream_name);
stream_data = std::move(t.stream_data);
flags = t.flags;
return *this;
}
static TracePacket &writePacket(TracePacket &packet, QCborStreamWriter &cbor, int compression);
};
auto constexpr operator""_MB(quint64 s) -> quint64
{
return s * 1024ul * 1024ul;
}
struct TraceRequest
{
quint32 clientId;
quint32 clientVersion;
quint32 flags;
quint32 bufferSize;
QString sessionName;
QString sessionTracepoints;
static constexpr quint32 MaxBufferSize = 1024_MB;
bool isValid() const
{
if (clientId != 0 && clientVersion != 0 && !sessionName.isEmpty()
&& !sessionTracepoints.isEmpty() && bufferSize < MaxBufferSize)
return true;
return false;
}
};
struct TraceResponse
{
quint32 serverId;
quint32 serverVersion;
QString serverName;
};
class QCtfServer : public QThread
{
Q_OBJECT
public:
enum ServerStatus
{
Uninitialized,
Idle,
Connected,
Error,
};
enum ServerFlags
{
CompressionMask = 15,
DontBufferOnIdle = 16, // not set -> the server is buffering even without client connection
// set -> the server is buffering only when client is connected
};
enum RequestIds
{
RequestClientId = 0,
RequestClientVersion,
RequestSessionName,
RequestSessionTracepoints,
RequestFlags,
RequestBufferSize,
};
struct ServerCallback
{
virtual void handleSessionChange() = 0;
virtual void handleStatusChange(ServerStatus status) = 0;
};
QCtfServer(QObject *parent = nullptr);
void setCallback(ServerCallback *cb);
void setHost(const QString &address);
void setPort(int port);
void run() override;
void startServer();
void stopServer();
void bufferData(const QString &stream, const QByteArray &data, quint32 flags);
QString sessionName() const;
QString sessionTracepoints() const;
bool bufferOnIdle() const;
ServerStatus status() const;
private:
void initWrite();
void bytesWritten(qint64 size);
bool waitSocket();
void readCbor(QCborStreamReader &cbor);
void handleString(QCborStreamReader &cbor);
void handleFixedWidth(QCborStreamReader &cbor);
void setStatusAndNotify(ServerStatus status);
QMutex m_mutex;
QWaitCondition m_bufferHasData;
QList<TracePacket> m_packets;
QString m_address;
QTcpServer *m_server = nullptr;
QTcpSocket *m_socket = nullptr;
QEventLoop *m_eventLoop = nullptr;
QList<QString> m_keySet;
TraceRequest m_req;
ServerCallback *m_cb = nullptr;
ServerStatus m_status = Uninitialized;
qint64 m_waitWriteSize = 0;
qint64 m_writtenSize = 0;
int m_port;
int m_compression = 0;
int m_maxPackets = DefaultMaxPackets;
QAtomicInt m_stopping;
bool m_bufferOnIdle = true;
QString m_currentKey;
static constexpr quint32 ServerId = 1;
static constexpr quint32 DefaultMaxPackets = 256; // 1 MB
};
QT_END_NAMESPACE
#endif