broadwayd: Read using socket API

This changes nothing, but it allows us to later recieve
unix messages and thus fd passing
This commit is contained in:
Alexander Larsson 2017-11-17 15:22:06 +01:00
parent 620d3cf402
commit 43a02da07b

View File

@ -48,7 +48,9 @@ typedef struct {
typedef struct {
guint32 id;
GSocketConnection *connection;
GBufferedInputStream *in;
GInputStream *in;
GString *buffer;
GSource *source;
GSList *serial_mappings;
GList *windows;
guint disconnect_idle;
@ -62,6 +64,7 @@ client_free (BroadwayClient *client)
clients = g_list_remove (clients, client);
g_object_unref (client->connection);
g_object_unref (client->in);
g_string_free (client->buffer, TRUE);
g_slist_free_full (client->serial_mappings, g_free);
g_free (client);
}
@ -77,6 +80,12 @@ client_disconnected (BroadwayClient *client)
client->disconnect_idle = 0;
}
if (client->source != 0)
{
g_source_destroy (client->source);
client->source = 0;
}
for (l = client->windows; l != NULL; l = l->next)
broadway_server_destroy_window (server,
GPOINTER_TO_UINT (l->data));
@ -320,54 +329,62 @@ client_handle_request (BroadwayClient *client,
before_serial - 1);
}
static void
client_fill_cb (GObject *source_object,
GAsyncResult *result,
gpointer user_data)
#define INPUT_BUFFER_SIZE 8192
static gboolean
client_input_cb (GPollableInputStream *stream,
gpointer user_data)
{
BroadwayClient *client = user_data;
GSocket *socket = g_socket_connection_get_socket (client->connection);
gssize res;
gsize old_len;
guchar *buffer;
gsize buffer_len;
res = g_buffered_input_stream_fill_finish (client->in, result, NULL);
old_len = client->buffer->len;
if (res > 0)
/* Ensure we have at least INPUT_BUFFER_SIZE extra space */
g_string_set_size (client->buffer, old_len + INPUT_BUFFER_SIZE);
g_string_set_size (client->buffer, old_len);
res = g_socket_receive_with_blocking (socket, client->buffer->str + old_len,
client->buffer->allocated_len - client->buffer->len -1,
FALSE, NULL, NULL);
if (res <= 0)
{
client->source = NULL;
client_disconnected (client);
return G_SOURCE_REMOVE;
}
g_string_set_size (client->buffer, old_len + res);
buffer = (guchar *)client->buffer->str;
buffer_len = client->buffer->len;
while (buffer_len >= sizeof (guint32))
{
guint32 size;
gsize count, remaining;
guint8 *buffer;
buffer = (guint8 *)g_buffered_input_stream_peek_buffer (client->in, &count);
memcpy (&size, buffer, sizeof (guint32));
if (size <= buffer_len)
{
client_handle_request (client, (BroadwayRequest *)buffer);
remaining = count;
while (remaining >= sizeof (guint32))
{
memcpy (&size, buffer, sizeof (guint32));
if (size <= remaining)
{
client_handle_request (client, (BroadwayRequest *)buffer);
remaining -= size;
buffer += size;
}
}
/* This is guaranteed not to block */
g_input_stream_skip (G_INPUT_STREAM (client->in), count - remaining, NULL, NULL);
g_buffered_input_stream_fill_async (client->in,
4*1024,
0,
NULL,
client_fill_cb, client);
}
else
{
client_disconnected (client);
buffer_len -= size;
buffer += size;
}
else
break;
}
g_string_erase (client->buffer, 0, client->buffer->len - buffer_len);
return G_SOURCE_CONTINUE;
}
static gboolean
incoming_client (GSocketService *service,
GSocketConnection *connection,
@ -382,16 +399,15 @@ incoming_client (GSocketService *service,
client->connection = g_object_ref (connection);
input = g_io_stream_get_input_stream (G_IO_STREAM (client->connection));
client->in = (GBufferedInputStream *)g_buffered_input_stream_new (input);
client->in = input;
client->buffer = g_string_sized_new (INPUT_BUFFER_SIZE);
client->source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (input), NULL);
g_source_set_callback (client->source, (GSourceFunc) client_input_cb, client, NULL);
g_source_attach (client->source, NULL);
clients = g_list_prepend (clients, client);
g_buffered_input_stream_fill_async (client->in,
4*1024,
0,
NULL,
client_fill_cb, client);
/* Send initial resize notify */
ev.base.type = BROADWAY_EVENT_SCREEN_SIZE_CHANGED;
ev.base.serial = broadway_server_get_next_serial (server) - 1;