skia2/experimental/Networking/SkSockets.cpp
rmistry@google.com d6176b0dca Result of running tools/sanitize_source_files.py (which was added in https://codereview.appspot.com/6465078/)
This CL is part II of IV (I broke down the 1280 files into 4 CLs).
Review URL: https://codereview.appspot.com/6474054

git-svn-id: http://skia.googlecode.com/svn/trunk@5263 2bbb7eff-a529-9590-31e7-b0007b416f81
2012-08-23 18:14:13 +00:00

359 lines
11 KiB
C++

/*
* Copyright 2011 Google Inc.
*
* Use of this source code is governed by a BSD-style license that can be
* found in the LICENSE file.
*/
#include <netdb.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include "SkSockets.h"
#include "SkData.h"
SkSocket::SkSocket() {
fMaxfd = 0;
FD_ZERO(&fMasterSet);
fConnected = false;
fReady = false;
fReadSuspended = false;
fWriteSuspended = false;
fSockfd = this->createSocket();
}
SkSocket::~SkSocket() {
this->closeSocket(fSockfd);
shutdown(fSockfd, 2); //stop sending/receiving
}
int SkSocket::createSocket() {
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
SkDebugf("ERROR opening socket\n");
return -1;
}
int reuse = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)) < 0) {
SkDebugf("error: %s\n", strerror(errno));
return -1;
}
#ifdef NONBLOCKING_SOCKETS
this->setNonBlocking(sockfd);
#endif
//SkDebugf("Opened fd:%d\n", sockfd);
fReady = true;
return sockfd;
}
void SkSocket::closeSocket(int sockfd) {
if (!fReady)
return;
close(sockfd);
//SkDebugf("Closed fd:%d\n", sockfd);
if (FD_ISSET(sockfd, &fMasterSet)) {
FD_CLR(sockfd, &fMasterSet);
if (sockfd >= fMaxfd) {
while (FD_ISSET(fMaxfd, &fMasterSet) == false && fMaxfd > 0)
fMaxfd -= 1;
}
}
if (0 == fMaxfd)
fConnected = false;
}
void SkSocket::onFailedConnection(int sockfd) {
this->closeSocket(sockfd);
}
void SkSocket::setNonBlocking(int sockfd) {
int flags = fcntl(sockfd, F_GETFL);
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
}
void SkSocket::addToMasterSet(int sockfd) {
FD_SET(sockfd, &fMasterSet);
if (sockfd > fMaxfd)
fMaxfd = sockfd;
}
int SkSocket::readPacket(void (*onRead)(int, const void*, size_t, DataType,
void*), void* context) {
if (!fConnected || !fReady || NULL == onRead || NULL == context
|| fReadSuspended)
return -1;
int totalBytesRead = 0;
char packet[PACKET_SIZE];
for (int i = 0; i <= fMaxfd; ++i) {
if (!FD_ISSET (i, &fMasterSet))
continue;
memset(packet, 0, PACKET_SIZE);
SkDynamicMemoryWStream stream;
int attempts = 0;
bool failure = false;
int bytesReadInTransfer = 0;
int bytesReadInPacket = 0;
header h;
h.done = false;
h.bytes = 0;
while (!h.done && fConnected && !failure) {
int retval = read(i, packet + bytesReadInPacket,
PACKET_SIZE - bytesReadInPacket);
++attempts;
if (retval < 0) {
#ifdef NONBLOCKING_SOCKETS
if (errno == EWOULDBLOCK || errno == EAGAIN) {
if (bytesReadInPacket > 0 || bytesReadInTransfer > 0)
continue; //incomplete packet or frame, keep tring
else
break; //nothing to read
}
#endif
//SkDebugf("Read() failed with error: %s\n", strerror(errno));
failure = true;
break;
}
if (retval == 0) {
//SkDebugf("Peer closed connection or connection failed\n");
failure = true;
break;
}
SkASSERT(retval > 0);
bytesReadInPacket += retval;
if (bytesReadInPacket < PACKET_SIZE) {
//SkDebugf("Read %d/%d\n", bytesReadInPacket, PACKET_SIZE);
continue; //incomplete packet, keep trying
}
SkASSERT((bytesReadInPacket == PACKET_SIZE) && !failure);
memcpy(&h.done, packet, sizeof(bool));
memcpy(&h.bytes, packet + sizeof(bool), sizeof(int));
memcpy(&h.type, packet + sizeof(bool) + sizeof(int), sizeof(DataType));
if (h.bytes > CONTENT_SIZE || h.bytes <= 0) {
//SkDebugf("bad packet\n");
failure = true;
break;
}
//SkDebugf("read packet(done:%d, bytes:%d) from fd:%d in %d tries\n",
// h.done, h.bytes, fSockfd, attempts);
stream.write(packet + HEADER_SIZE, h.bytes);
bytesReadInPacket = 0;
attempts = 0;
bytesReadInTransfer += h.bytes;
}
if (failure) {
onRead(i, NULL, 0, h.type, context);
this->onFailedConnection(i);
continue;
}
if (bytesReadInTransfer > 0) {
SkData* data = stream.copyToData();
SkASSERT(data->size() == bytesReadInTransfer);
onRead(i, data->data(), data->size(), h.type, context);
data->unref();
totalBytesRead += bytesReadInTransfer;
}
}
return totalBytesRead;
}
int SkSocket::writePacket(void* data, size_t size, DataType type) {
if (size < 0|| NULL == data || !fConnected || !fReady || fWriteSuspended)
return -1;
int totalBytesWritten = 0;
header h;
char packet[PACKET_SIZE];
for (int i = 0; i <= fMaxfd; ++i) {
if (!FD_ISSET (i, &fMasterSet))
continue;
int bytesWrittenInTransfer = 0;
int bytesWrittenInPacket = 0;
int attempts = 0;
bool failure = false;
while (bytesWrittenInTransfer < size && fConnected && !failure) {
memset(packet, 0, PACKET_SIZE);
h.done = (size - bytesWrittenInTransfer <= CONTENT_SIZE);
h.bytes = (h.done) ? size - bytesWrittenInTransfer : CONTENT_SIZE;
h.type = type;
memcpy(packet, &h.done, sizeof(bool));
memcpy(packet + sizeof(bool), &h.bytes, sizeof(int));
memcpy(packet + sizeof(bool) + sizeof(int), &h.type, sizeof(DataType));
memcpy(packet + HEADER_SIZE, (char*)data + bytesWrittenInTransfer,
h.bytes);
int retval = write(i, packet + bytesWrittenInPacket,
PACKET_SIZE - bytesWrittenInPacket);
attempts++;
if (retval < 0) {
if (errno == EPIPE) {
//SkDebugf("broken pipe, client closed connection");
failure = true;
break;
}
#ifdef NONBLOCKING_SOCKETS
else if (errno == EWOULDBLOCK || errno == EAGAIN) {
if (bytesWrittenInPacket > 0 || bytesWrittenInTransfer > 0)
continue; //incomplete packet or frame, keep trying
else
break; //client not available, skip current transfer
}
#endif
else {
//SkDebugf("write(%d) failed with error:%s\n", i,
// strerror(errno));
failure = true;
break;
}
}
bytesWrittenInPacket += retval;
if (bytesWrittenInPacket < PACKET_SIZE)
continue; //incomplete packet, keep trying
SkASSERT(bytesWrittenInPacket == PACKET_SIZE);
//SkDebugf("wrote to packet(done:%d, bytes:%d) to fd:%d in %d tries\n",
// h.done, h.bytes, i, attempts);
bytesWrittenInTransfer += h.bytes;
bytesWrittenInPacket = 0;
attempts = 0;
}
if (failure)
this->onFailedConnection(i);
totalBytesWritten += bytesWrittenInTransfer;
}
return totalBytesWritten;
}
////////////////////////////////////////////////////////////////////////////////
SkTCPServer::SkTCPServer(int port) {
sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(port);
if (bind(fSockfd, (sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
SkDebugf("ERROR on binding: %s\n", strerror(errno));
fReady = false;
}
}
SkTCPServer::~SkTCPServer() {
this->disconnectAll();
}
int SkTCPServer::acceptConnections() {
if (!fReady)
return -1;
listen(fSockfd, MAX_WAITING_CLIENTS);
int newfd;
for (int i = 0; i < MAX_WAITING_CLIENTS; ++i) {
#ifdef NONBLOCKING_SOCKETS
fd_set workingSet;
FD_ZERO(&workingSet);
FD_SET(fSockfd, &workingSet);
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
int sel = select(fSockfd + 1, &workingSet, NULL, NULL, &timeout);
if (sel < 0) {
SkDebugf("select() failed with error %s\n", strerror(errno));
continue;
}
if (sel == 0) //select() timed out
continue;
#endif
sockaddr_in clientAddr;
socklen_t clientLen = sizeof(clientAddr);
newfd = accept(fSockfd, (struct sockaddr*)&clientAddr, &clientLen);
if (newfd< 0) {
SkDebugf("accept() failed with error %s\n", strerror(errno));
continue;
}
SkDebugf("New incoming connection - %d\n", newfd);
fConnected = true;
#ifdef NONBLOCKING_SOCKETS
this->setNonBlocking(newfd);
#endif
this->addToMasterSet(newfd);
}
return 0;
}
int SkTCPServer::disconnectAll() {
if (!fConnected || !fReady)
return -1;
for (int i = 0; i <= fMaxfd; ++i) {
if (FD_ISSET(i, &fMasterSet))
this->closeSocket(i);
}
fConnected = false;
return 0;
}
////////////////////////////////////////////////////////////////////////////////
SkTCPClient::SkTCPClient(const char* hostname, int port) {
//Add fSockfd since the client will be using it to read/write
this->addToMasterSet(fSockfd);
hostent* server = gethostbyname(hostname);
if (server) {
fServerAddr.sin_family = AF_INET;
memcpy((char*)&fServerAddr.sin_addr.s_addr, (char*)server->h_addr,
server->h_length);
fServerAddr.sin_port = htons(port);
}
else {
//SkDebugf("ERROR, no such host\n");
fReady = false;
}
}
void SkTCPClient::onFailedConnection(int sockfd) { //cleanup and recreate socket
SkASSERT(sockfd == fSockfd);
this->closeSocket(fSockfd);
fSockfd = this->createSocket();
//Add fSockfd since the client will be using it to read/write
this->addToMasterSet(fSockfd);
}
int SkTCPClient::connectToServer() {
if (!fReady)
return -1;
if (fConnected)
return 0;
int conn = connect(fSockfd, (sockaddr*)&fServerAddr, sizeof(fServerAddr));
if (conn < 0) {
#ifdef NONBLOCKING_SOCKETS
if (errno == EINPROGRESS || errno == EALREADY)
return conn;
#endif
if (errno != EISCONN) {
//SkDebugf("error: %s\n", strerror(errno));
this->onFailedConnection(fSockfd);
return conn;
}
}
fConnected = true;
SkDebugf("Succesfully reached server\n");
return 0;
}