broadway: Change input handling to use polling rather than async reads

We do this because we need to be able to switch to using sync reads
when we're doing a roundtripping call, and that is too complex
with the async code.
This commit is contained in:
Alexander Larsson 2011-03-10 21:22:19 +01:00
parent d4e2414d4a
commit 0fa32e98a4
2 changed files with 126 additions and 36 deletions

View File

@ -116,12 +116,12 @@ gdk_broadway_display_init_input (GdkDisplay *display)
g_list_free (list);
}
struct HttpRequest {
typedef struct HttpRequest {
GdkDisplay *display;
GSocketConnection *connection;
GDataInputStream *data;
GString *request;
};
} HttpRequest;
static void
http_request_free (HttpRequest *request)
@ -132,6 +132,101 @@ http_request_free (HttpRequest *request)
g_free (request);
}
struct BroadwayInput {
GdkDisplay *display;
GSocketConnection *connection;
GByteArray *buffer;
GSource *source;
};
static void
broadway_input_free (BroadwayInput *input)
{
g_object_unref (input->connection);
g_byte_array_free (input->buffer, FALSE);
g_source_destroy (input->source);
g_free (input);
}
static gboolean
process_input (BroadwayInput *input)
{
char *buf, *ptr;
gsize len;
buf = (char *)input->buffer->data;
len = input->buffer->len;
if (len == 0)
return TRUE;
if (buf[0] != 0)
return FALSE;
while ((ptr = memchr (buf, 0xff, len)) != NULL)
{
*ptr = 0;
ptr++;
_gdk_broadway_events_got_input (input->display, buf + 1);
len -= ptr - buf;
buf = ptr;
if (len > 0 &&buf[0] != 0)
return FALSE;
}
g_byte_array_remove_range (input->buffer, 0, buf - (char *)input->buffer->data);
return TRUE;
}
static gboolean
input_data_cb (GObject *stream,
BroadwayInput *input)
{
GInputStream *in;
gssize res;
guint8 buffer[1024];
GError *error;
in = g_io_stream_get_input_stream (G_IO_STREAM (input->connection));
error = NULL;
res = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (in),
buffer, sizeof (buffer), NULL, &error);
if (res <= 0)
{
if (res < 0 &&
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
g_error_free (error);
return TRUE;
}
GDK_BROADWAY_DISPLAY (input->display)->input = NULL;
broadway_input_free (input);
if (res < 0)
{
g_print ("input error %s", error->message);
g_error_free (error);
}
return FALSE;
}
g_byte_array_append (input->buffer, buffer, res);
if (!process_input (input))
{
GDK_BROADWAY_DISPLAY (input->display)->input = NULL;
broadway_input_free (input);
return FALSE;
}
return TRUE;
}
#include <unistd.h>
#include <fcntl.h>
static void
@ -163,34 +258,6 @@ parse_line (char *line, char *key)
p++;
return p;
}
static void
got_input (GInputStream *stream,
GAsyncResult *result,
HttpRequest *request)
{
GError *error;
char *message;
gsize len;
error = NULL;
message = g_data_input_stream_read_upto_finish (G_DATA_INPUT_STREAM (stream), result, &len, &error);
if (message == NULL)
{
GDK_BROADWAY_DISPLAY (request->display)->input = NULL;
http_request_free (request);
return;
}
g_assert (message[0] == 0);
_gdk_broadway_events_got_input (request->display, message + 1);
/* Skip past ending 0xff */
g_data_input_stream_read_byte (request->data, NULL, NULL);
g_data_input_stream_read_upto_async (request->data, "\xff", 1, 0, NULL,
(GAsyncReadyCallback)got_input, request);
}
static void
send_error (HttpRequest *request,
int error_code,
@ -226,6 +293,10 @@ start_input (HttpRequest *request)
GChecksum *checksum;
char *origin, *host;
GdkBroadwayDisplay *broadway_display;
BroadwayInput *input;
const void *data_buffer;
gsize data_buffer_size;
GInputStream *in;
broadway_display = GDK_BROADWAY_DISPLAY (request->display);
@ -323,17 +394,36 @@ start_input (HttpRequest *request)
"\r\n",
origin, host);
/* TODO: This should really be async */
g_output_stream_write_all (g_io_stream_get_output_stream (G_IO_STREAM (request->connection)),
res, strlen (res), NULL, NULL, NULL);
g_free (res);
g_output_stream_write_all (g_io_stream_get_output_stream (G_IO_STREAM (request->connection)),
challenge, 16, NULL, NULL, NULL);
broadway_display->input = request;
input = g_new0 (BroadwayInput, 1);
g_data_input_stream_read_upto_async (request->data, "\xff", 1, 0, NULL,
(GAsyncReadyCallback)got_input, request);
input->display = request->display;
input->connection = g_object_ref (request->connection);
data_buffer = g_buffered_input_stream_peek_buffer (G_BUFFERED_INPUT_STREAM (request->data), &data_buffer_size);
input->buffer = g_byte_array_sized_new (data_buffer_size);
g_byte_array_append (input->buffer, data_buffer, data_buffer_size);
broadway_display->input = input;
/* This will free and close the data input stream, but we got all the buffered content already */
http_request_free (request);
in = g_io_stream_get_input_stream (G_IO_STREAM (input->connection));
input->source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (in), NULL);
g_source_set_callback (input->source, (GSourceFunc)input_data_cb, input, NULL);
g_source_attach (input->source, NULL);
if (!process_input (input))
{
GDK_BROADWAY_DISPLAY (input->display)->input = NULL;
broadway_input_free (input);
}
g_strfreev (lines);
}

View File

@ -36,7 +36,7 @@ G_BEGIN_DECLS
typedef struct _GdkBroadwayDisplay GdkBroadwayDisplay;
typedef struct _GdkBroadwayDisplayClass GdkBroadwayDisplayClass;
typedef struct HttpRequest HttpRequest;
typedef struct BroadwayInput BroadwayInput;
#define GDK_TYPE_BROADWAY_DISPLAY (gdk_broadway_display_get_type())
#define GDK_BROADWAY_DISPLAY(object) (G_TYPE_CHECK_INSTANCE_CAST ((object), GDK_TYPE_BROADWAY_DISPLAY, GdkBroadwayDisplay))
@ -82,7 +82,7 @@ struct _GdkBroadwayDisplay
GSocketService *service;
BroadwayOutput *output;
guint32 saved_serial;
HttpRequest *input;
BroadwayInput *input;
};
struct _GdkBroadwayDisplayClass