Checkpoint work to replace the Qt event loop and I/O code.

This commit is contained in:
Ryan Prichard 2012-03-13 22:14:39 -07:00
parent 39e07ca4d2
commit c949e03164
10 changed files with 526 additions and 75 deletions

View File

@ -1,29 +1,25 @@
#include "Agent.h"
#include "Win32Console.h"
#include "Terminal.h"
#include "NamedPipe.h"
#include "../Shared/DebugClient.h"
#include "../Shared/AgentMsg.h"
#include "../Shared/Buffer.h"
#include <QCoreApplication>
#include <QLocalSocket>
#include <QtDebug>
#include <QTimer>
#include <QSize>
#include <QRect>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <windows.h>
#include <vector>
#include <string>
const int SC_CONSOLE_MARK = 0xFFF2;
const int SC_CONSOLE_SELECT_ALL = 0xFFF5;
const int SYNC_MARKER_LEN = 16;
Agent::Agent(const QString &controlPipeName,
const QString &dataPipeName,
Agent::Agent(LPCWSTR controlPipeName,
LPCWSTR dataPipeName,
int initialCols,
int initialRows,
QObject *parent) :
QObject(parent),
int initialRows) :
m_terminal(NULL),
m_timer(NULL),
m_childProcess(NULL),
@ -40,39 +36,43 @@ Agent::Agent(const QString &controlPipeName,
m_controlSocket = makeSocket(controlPipeName);
m_dataSocket = makeSocket(dataPipeName);
m_terminal = new Terminal(m_dataSocket, this);
m_terminal = new Terminal(m_dataSocket);
resetConsoleTracking(false);
connect(m_controlSocket, SIGNAL(readyRead()), SLOT(controlSocketReadyRead()));
connect(m_controlSocket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
connect(m_dataSocket, SIGNAL(readyRead()), SLOT(dataSocketReadyRead()));
//connect(m_controlSocket, SIGNAL(readyRead()), SLOT(controlSocketReadyRead()));
//connect(m_controlSocket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
//connect(m_dataSocket, SIGNAL(readyRead()), SLOT(dataSocketReadyRead()));
m_timer = new QTimer(this);
m_timer->setSingleShot(false);
connect(m_timer, SIGNAL(timeout()), SLOT(pollTimeout()));
m_timer->start(25);
//m_timer = new QTimer(this);
//m_timer->setSingleShot(false);
//connect(m_timer, SIGNAL(timeout()), SLOT(pollTimeout()));
//m_timer->start(25);
setPollInterval(25);
Trace("agent starting...");
}
Agent::~Agent()
{
// TODO: review how shut down and cleanup work.
m_console->postCloseMessage();
delete m_console;
delete [] m_bufferData;
delete m_console;
delete m_terminal;
}
QLocalSocket *Agent::makeSocket(const QString &pipeName)
NamedPipe *Agent::makeSocket(LPCWSTR pipeName)
{
// Connect to the named pipe.
QLocalSocket *socket = new QLocalSocket(this);
socket->connectToServer(pipeName);
if (!socket->waitForConnected())
qFatal("Could not connect to %s", pipeName.toStdString().c_str());
socket->setReadBufferSize(64 * 1024);
return socket;
NamedPipe *pipe = createNamedPipe();
if (!pipe->connectToServer(pipeName)) {
Trace("error: could not connect to %ls", pipeName);
::exit(1);
}
pipe->setReadBufferSize(64 * 1024);
return pipe;
}
void Agent::resetConsoleTracking(bool sendClear)
@ -87,6 +87,16 @@ void Agent::resetConsoleTracking(bool sendClear)
m_terminal->reset(sendClear, m_scrapedLineCount);
}
void Agent::onPipeIo()
{
controlSocketReadyRead();
dataSocketReadyRead();
// TODO: Is it possible that one or more pipe has closed when this
// function returns? Will the agent shut down correctly? I don't see any
// calls to EventLoop::exit.
}
void Agent::controlSocketReadyRead()
{
while (true) {
@ -94,15 +104,16 @@ void Agent::controlSocketReadyRead()
int size = m_controlSocket->peek((char*)&packetSize, sizeof(int32_t));
if (size < (int)sizeof(int32_t))
break;
int32_t totalSize = sizeof(int32_t) + packetSize;
int totalSize = sizeof(int32_t) + packetSize;
if (m_controlSocket->bytesAvailable() < totalSize) {
if (m_controlSocket->readBufferSize() < totalSize)
m_controlSocket->setReadBufferSize(totalSize);
break;
}
QByteArray packetData = m_controlSocket->read(totalSize);
Q_ASSERT(packetData.length() == totalSize);
ReadBuffer buffer(std::string(packetData.constData() + 4, packetSize));
std::string packetData = m_controlSocket->read(totalSize);
assert((int)packetData.size() == totalSize);
ReadBuffer buffer(packetData);
buffer.getInt(); // Discard the size.
Trace("read packet of %d total bytes", totalSize);
handlePacket(buffer);
}
@ -122,6 +133,8 @@ void Agent::handlePacket(ReadBuffer &packet)
case AgentMsg::GetExitCode:
packet.assertEof();
result = m_childExitCode;
default:
Trace("Unrecognized message, id:%d", type);
}
m_controlSocket->write((char*)&result, sizeof(result));
}
@ -171,6 +184,7 @@ int Agent::handleStartProcessPacket(ReadBuffer &packet)
int Agent::handleSetSizePacket(ReadBuffer &packet)
{
Trace("SetSize msg");
int cols = packet.getInt();
int rows = packet.getInt();
packet.assertEof();
@ -182,8 +196,8 @@ void Agent::dataSocketReadyRead()
{
// TODO: This is an incomplete hack...
Trace("socketReadyRead -- %d bytes available", m_dataSocket->bytesAvailable());
QByteArray data = m_dataSocket->readAll();
for (int i = 0; i < data.length(); ++i) {
std::string data = m_dataSocket->readAll();
for (size_t i = 0; i < data.size(); ++i) {
char ch = data[i];
const short vk = VkKeyScan(ch);
if (vk != -1) {
@ -202,12 +216,12 @@ void Agent::dataSocketReadyRead()
void Agent::socketDisconnected()
{
QCoreApplication::exit(0);
//QCoreApplication::exit(0);
}
void Agent::pollTimeout()
void Agent::onPollTimeout()
{
if (m_dataSocket->state() == QLocalSocket::ConnectedState)
if (/*TODO: stop scraping as we're shutting down*/true)
scrapeOutput();
if (m_childProcess != NULL) {
@ -217,7 +231,9 @@ void Agent::pollTimeout()
m_childExitCode = exitCode;
CloseHandle(m_childProcess);
m_childProcess = NULL;
m_dataSocket->disconnectFromServer();
// TODO: review how exiting/disconnecting work...
//m_dataSocket->disconnectFromServer();
m_dataSocket->closePipe();
}
}
}
@ -309,7 +325,7 @@ void Agent::scrapeOutput()
Trace("Sync marker has disappeared -- resetting the terminal");
resetConsoleTracking();
} else if (markerRow != m_syncRow) {
Q_ASSERT(markerRow < m_syncRow);
assert(markerRow < m_syncRow);
m_scrolledCount += (m_syncRow - markerRow);
m_syncRow = markerRow;
// If the buffer has scrolled, then the entire window is dirty.
@ -408,7 +424,7 @@ void Agent::syncMarkerText(CHAR_INFO *output)
int Agent::findSyncMarker()
{
Q_ASSERT(m_syncRow >= 0);
assert(m_syncRow >= 0);
CHAR_INFO marker[SYNC_MARKER_LEN];
CHAR_INFO column[BUFFER_LINE_COUNT];
syncMarkerText(marker);

View File

@ -4,32 +4,31 @@
#include <QObject>
#include <QPoint>
#include <windows.h>
#include "EventLoop.h"
class Win32Console;
class QLocalSocket;
class Terminal;
class QTimer;
class ReadBuffer;
class NamedPipe;
const int BUFFER_LINE_COUNT = 3000; // TODO: Use something like 9000.
const int MAX_CONSOLE_WIDTH = 500;
class Agent : public QObject
class Agent : public EventLoop
{
Q_OBJECT
public:
explicit Agent(const QString &controlPipeName,
const QString &dataPipeName,
int initialCols, int initialRows,
QObject *parent = 0);
virtual ~Agent();
Agent(LPCWSTR controlPipeName,
LPCWSTR dataPipeName,
int initialCols,
int initialRows);
~Agent();
private:
QLocalSocket *makeSocket(const QString &pipeName);
NamedPipe *makeSocket(LPCWSTR pipeName);
void resetConsoleTracking(bool sendClear = true);
signals:
private slots:
void controlSocketReadyRead();
void handlePacket(ReadBuffer &packet);
@ -37,7 +36,10 @@ private slots:
int handleSetSizePacket(ReadBuffer &packet);
void dataSocketReadyRead();
void socketDisconnected();
void pollTimeout();
protected:
virtual void onPollTimeout();
virtual void onPipeIo();
private:
void markEntireWindowDirty();
@ -52,8 +54,8 @@ private:
private:
Win32Console *m_console;
QLocalSocket *m_controlSocket;
QLocalSocket *m_dataSocket;
NamedPipe *m_controlSocket;
NamedPipe *m_dataSocket;
Terminal *m_terminal;
QTimer *m_timer;
HANDLE m_childProcess;

85
agent/EventLoop.cc Normal file
View File

@ -0,0 +1,85 @@
#include "EventLoop.h"
#include "NamedPipe.h"
#include "../Shared/DebugClient.h"
#include <assert.h>
EventLoop::EventLoop() : m_exiting(false), m_pollInterval(0)
{
}
EventLoop::~EventLoop()
{
for (size_t i = 0; i < m_pipes.size(); ++i)
delete m_pipes[i];
}
void EventLoop::run()
{
std::vector<HANDLE> waitHandles;
DWORD pollTime = GetTickCount();
while (!m_exiting) {
Trace("poll...");
waitHandles.reserve(m_pipes.size() * 2);
waitHandles.clear();
for (size_t i = 0; i < m_pipes.size(); ++i) {
HANDLE pipe1 = m_pipes[i]->getWaitEvent1();
HANDLE pipe2 = m_pipes[i]->getWaitEvent2();
if (pipe1 != NULL)
waitHandles.push_back(pipe1);
if (pipe2 != NULL)
waitHandles.push_back(pipe2);
}
DWORD timeout = INFINITE;
if (m_pollInterval > 0) {
int elapsed = GetTickCount() - pollTime;
if (elapsed < m_pollInterval)
timeout = m_pollInterval - elapsed;
else
timeout = 0;
}
Trace("poll... timeout is %d ms", (int)timeout);
DWORD result = WaitForMultipleObjects(waitHandles.size(),
waitHandles.data(),
FALSE,
timeout);
Trace("poll... result is 0x%x", result);
assert(result != WAIT_FAILED);
if (result != WAIT_TIMEOUT) {
for (size_t i = 0; i < m_pipes.size(); ++i)
m_pipes[i]->poll();
onPipeIo();
}
if (m_pollInterval > 0) {
int elapsed = GetTickCount() - pollTime;
if (elapsed >= m_pollInterval) {
onPollTimeout();
pollTime = GetTickCount();
}
}
}
}
NamedPipe *EventLoop::createNamedPipe()
{
NamedPipe *ret = new NamedPipe();
m_pipes.push_back(ret);
return ret;
}
void EventLoop::setPollInterval(int ms)
{
m_pollInterval = ms;
}
void EventLoop::exit()
{
m_exiting = true;
}
void EventLoop::onPollTimeout()
{
}
void EventLoop::onPipeIo()
{
}

28
agent/EventLoop.h Normal file
View File

@ -0,0 +1,28 @@
#ifndef EVENTLOOP_H
#define EVENTLOOP_H
#include <vector>
class NamedPipe;
class EventLoop
{
public:
EventLoop();
virtual ~EventLoop();
void run();
protected:
NamedPipe *createNamedPipe();
void setPollInterval(int ms);
void exit();
virtual void onPollTimeout();
virtual void onPipeIo();
private:
bool m_exiting;
std::vector<NamedPipe*> m_pipes;
int m_pollInterval;
};
#endif // EVENTLOOP_H

View File

@ -3,8 +3,8 @@ include ../config-mingw.mk
include ../config-qt.mk
PROGRAM = pconsole-agent
HEADERS = Agent.h Terminal.h
OBJECTS = Agent.o Terminal.o Win32Console.o AgentDebugClient.o main.o
HEADERS =
OBJECTS = EventLoop.o NamedPipe.o Agent.o Terminal.o Win32Console.o AgentDebugClient.o main.o
OBJECTS += $(patsubst %.h,moc_%.o,$(HEADERS))
CXXFLAGS += -I$(QT_INCLUDE)/QtNetwork -D_WIN32_WINNT=0x0501

229
agent/NamedPipe.cc Normal file
View File

@ -0,0 +1,229 @@
#include "NamedPipe.h"
#include "EventLoop.h"
#include "../Shared/DebugClient.h"
#include <string.h>
#include <assert.h>
NamedPipe::NamedPipe() :
m_readBufferSize(64 * 1024),
m_handle(NULL),
m_inputWorker(this),
m_outputWorker(this)
{
}
NamedPipe::~NamedPipe()
{
closePipe();
}
HANDLE NamedPipe::getWaitEvent1()
{
return m_inputWorker.getWaitEvent();
}
HANDLE NamedPipe::getWaitEvent2()
{
return m_outputWorker.getWaitEvent();
}
void NamedPipe::poll()
{
m_inputWorker.service();
m_outputWorker.service();
}
NamedPipe::IoWorker::IoWorker(NamedPipe *namedPipe) :
m_namedPipe(namedPipe),
m_pending(false)
{
m_event = CreateEvent(NULL, TRUE, FALSE, NULL);
assert(m_event != NULL);
}
NamedPipe::IoWorker::~IoWorker()
{
// TODO: Does it matter if an I/O is currently pending?
CloseHandle(m_event);
}
void NamedPipe::IoWorker::service()
{
if (m_namedPipe->isClosed()) {
m_pending = false;
ResetEvent(m_event);
return;
}
if (m_pending) {
DWORD actual;
BOOL ret = GetOverlappedResult(m_namedPipe->m_handle, &m_over, &actual, FALSE);
if (!ret) {
if (GetLastError() == ERROR_IO_INCOMPLETE) {
// There is a pending I/O.
return;
} else {
// Pipe error. Close the pipe.
ResetEvent(m_event);
m_pending = false;
m_namedPipe->closePipe();
return;
}
}
ResetEvent(m_event);
m_pending = false;
completeIo(actual);
}
int nextSize;
bool isRead;
while (shouldIssueIo(&nextSize, &isRead)) {
DWORD actual = 0;
memset(&m_over, 0, sizeof(m_over));
m_over.hEvent = m_event;
Trace("[startio] isread:%d size:%d", isRead, nextSize);
BOOL ret = isRead
? ReadFile(m_namedPipe->m_handle, m_buffer, nextSize, &actual, &m_over)
: WriteFile(m_namedPipe->m_handle, m_buffer, nextSize, &actual, &m_over);
if (!ret) {
if (GetLastError() == ERROR_IO_PENDING) {
// There is a pending I/O.
m_pending = true;
return;
} else {
// Pipe error. Close the pipe.
m_namedPipe->closePipe();
return;
}
}
completeIo(actual);
}
}
HANDLE NamedPipe::IoWorker::getWaitEvent()
{
return m_pending ? m_event : NULL;
}
void NamedPipe::InputWorker::completeIo(int size)
{
m_namedPipe->m_inQueue.append(m_buffer, size);
}
bool NamedPipe::InputWorker::shouldIssueIo(int *size, bool *isRead)
{
*isRead = true;
if (m_namedPipe->isClosed()) {
return false;
} else if ((int)m_namedPipe->m_inQueue.size() < m_namedPipe->readBufferSize()) {
*size = kIoSize;
return true;
} else {
return false;
}
}
void NamedPipe::OutputWorker::completeIo(int size)
{
}
bool NamedPipe::OutputWorker::shouldIssueIo(int *size, bool *isRead)
{
*isRead = false;
if (!m_namedPipe->m_outQueue.empty()) {
int writeSize = std::min((int)m_namedPipe->m_outQueue.size(), (int)kIoSize);
memcpy(m_buffer, m_namedPipe->m_outQueue.data(), writeSize);
m_namedPipe->m_outQueue.erase(0, writeSize);
*size = writeSize;
return true;
} else {
return false;
}
}
bool NamedPipe::connectToServer(LPCWSTR pipeName)
{
assert(m_handle == NULL);
m_handle = CreateFile(pipeName,
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
Trace("connection to [%ls], handle == 0x%x", pipeName, m_handle);
if (m_handle == INVALID_HANDLE_VALUE)
return false;
m_inputWorker.service();
// TODO: I suppose the user could call write before calling
// connectToServer. I think that would work, but I'm not sure.
m_outputWorker.service();
return true;
}
void NamedPipe::write(const void *data, int size)
{
m_outQueue.append((const char*)data, size);
m_outputWorker.service();
}
void NamedPipe::write(const char *text)
{
write(text, strlen(text));
}
int NamedPipe::readBufferSize()
{
return m_readBufferSize;
}
void NamedPipe::setReadBufferSize(int size)
{
m_readBufferSize = size;
m_inputWorker.service();
}
int NamedPipe::bytesAvailable()
{
return m_inQueue.size();
}
int NamedPipe::peek(void *data, int size)
{
int ret = std::min(size, (int)m_inQueue.size());
memcpy(data, m_inQueue.data(), ret);
return ret;
}
std::string NamedPipe::read(int size)
{
int retSize = std::min(size, (int)m_inQueue.size());
std::string ret = m_inQueue.substr(0, retSize);
m_inQueue.erase(0, retSize);
m_inputWorker.service();
return ret;
}
std::string NamedPipe::readAll()
{
std::string ret = m_inQueue;
m_inQueue.clear();
m_inputWorker.service();
return ret;
}
void NamedPipe::closePipe()
{
// TODO: Use CancelIo, ResetEvent, etc, to ensure that the socket is in
// a completely shut down state when this function returns.
if (m_handle == NULL)
return;
CloseHandle(m_handle);
m_handle = NULL;
}
bool NamedPipe::isClosed()
{
return m_handle == NULL;
}

80
agent/NamedPipe.h Normal file
View File

@ -0,0 +1,80 @@
#ifndef NAMEDPIPE_H
#define NAMEDPIPE_H
#include <windows.h>
#include <string>
class EventLoop;
class NamedPipe
{
private:
// The EventLoop uses these private members.
friend class EventLoop;
NamedPipe();
~NamedPipe();
HANDLE getWaitEvent1();
HANDLE getWaitEvent2();
void poll();
private:
class IoWorker
{
public:
IoWorker(NamedPipe *namedPipe);
virtual ~IoWorker();
void service();
HANDLE getWaitEvent();
protected:
NamedPipe *m_namedPipe;
bool m_pending;
HANDLE m_event;
OVERLAPPED m_over;
enum { kIoSize = 64 * 1024 };
char m_buffer[kIoSize];
virtual void completeIo(int size) = 0;
virtual bool shouldIssueIo(int *size, bool *isRead) = 0;
};
class InputWorker : public IoWorker
{
public:
InputWorker(NamedPipe *namedPipe) : IoWorker(namedPipe) {}
protected:
virtual void completeIo(int size);
virtual bool shouldIssueIo(int *size, bool *isRead);
};
class OutputWorker : public IoWorker
{
public:
OutputWorker(NamedPipe *namedPipe) : IoWorker(namedPipe) {}
protected:
virtual void completeIo(int size);
virtual bool shouldIssueIo(int *size, bool *isRead);
};
public:
bool connectToServer(LPCWSTR pipeName);
void write(const void *data, int size);
void write(const char *text);
int readBufferSize();
void setReadBufferSize(int size);
int bytesAvailable();
int peek(void *data, int size);
std::string read(int size);
std::string readAll();
void closePipe();
bool isClosed();
private:
// Input/output buffers
int m_readBufferSize;
std::string m_inQueue;
std::string m_outQueue;
HANDLE m_handle;
InputWorker m_inputWorker;
OutputWorker m_outputWorker;
};
#endif // NAMEDPIPE_H

View File

@ -1,7 +1,9 @@
#include "Terminal.h"
#include <QIODevice>
#include "NamedPipe.h"
#include <windows.h>
#include <stdio.h>
#include <string.h>
#include <string>
#define CSI "\x1b["
@ -27,8 +29,7 @@ const int TERMINAL_BLUE = 4;
const int TERMINAL_FOREGROUND = 30;
const int TERMINAL_BACKGROUND = 40;
Terminal::Terminal(QIODevice *output, QObject *parent) :
QObject(parent),
Terminal::Terminal(NamedPipe *output) :
m_output(output),
m_remoteLine(0),
m_cursorHidden(false),
@ -54,7 +55,7 @@ void Terminal::sendLine(int line, CHAR_INFO *lineData, int width)
// Erase in Line -- erase entire line.
m_output->write(CSI"2K");
QByteArray termLine;
std::string termLine;
termLine.reserve(width + 32);
int length = 0;
@ -85,15 +86,14 @@ void Terminal::sendLine(int line, CHAR_INFO *lineData, int width)
// TODO: Unicode
char ch = lineData[i].Char.AsciiChar;
if (ch == ' ') {
termLine.append(' ');
termLine.push_back(' ');
} else {
termLine.append(isprint(ch) ? ch : '?');
termLine.push_back(isprint(ch) ? ch : '?');
length = termLine.size();
}
}
termLine.truncate(length);
m_output->write(termLine);
m_output->write(termLine.data(), termLine.size());
}
void Terminal::finishOutput(const Coord &newCursorPos)

View File

@ -1,18 +1,15 @@
#ifndef TERMINAL_H
#define TERMINAL_H
#include <QObject>
#include <QPoint>
#include <windows.h>
#include "Coord.h"
class QIODevice;
class NamedPipe;
class Terminal : public QObject
class Terminal
{
Q_OBJECT
public:
explicit Terminal(QIODevice *output, QObject *parent = 0);
explicit Terminal(NamedPipe *output);
void reset(bool sendClearFirst, int newLine);
void sendLine(int line, CHAR_INFO *lineData, int width);
void finishOutput(const Coord &newCursorPos);
@ -22,7 +19,7 @@ private:
void moveTerminalToLine(int line);
private:
QIODevice *m_output;
NamedPipe *m_output;
int m_remoteLine;
bool m_cursorHidden;
Coord m_cursorPos;

View File

@ -1,10 +1,24 @@
#include "Agent.h"
#include <QCoreApplication>
#include <assert.h>
#include <stdlib.h>
wchar_t *heapMbsToWcs(const char *text)
{
size_t len = mbstowcs(NULL, text, 0);
if (len == (size_t)-1)
return NULL;
wchar_t *ret = new wchar_t[len + 1];
size_t len2 = mbstowcs(ret, text, len + 1);
assert(len == len2);
return ret;
}
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
Q_ASSERT(argc == 5);
Agent agent(argv[1], argv[2], atoi(argv[3]), atoi(argv[4]));
return a.exec();
assert(argc == 5);
Agent agent(heapMbsToWcs(argv[1]),
heapMbsToWcs(argv[2]),
atoi(argv[3]),
atoi(argv[4]));
agent.run();
}