Use blocking sockets from non-main threads in wxProtocol.

Non-blocking sockets can't work in worker threads without additional locking
as they generate events that can be dispatched from the main thread after the
socket object, created in the worker thread, is already destroyed, so don't
even attempt to use them if wxProtocol object is created from non-main thread.

Also simplify the code by removing the calls to SetFlags(), Notify() and
{Save,Restore}State() and simply put the socket from the beginning in
blocking, wait all mode that it needs to be in.

This, with the fixes in the previous commit, allows wxHTTP and wxFTP to work
from worker threads too.

Test using wxHTTP from a worker thread in the socket client sample.

Closes #17031.
This commit is contained in:
Vadim Zeitlin 2015-07-05 16:18:58 +02:00
parent e18c8fd29a
commit d421373c2e
4 changed files with 57 additions and 39 deletions

View File

@ -31,6 +31,7 @@
#include "wx/socket.h"
#include "wx/url.h"
#include "wx/sstream.h"
#include "wx/thread.h"
#include <memory>
// --------------------------------------------------------------------------
@ -575,23 +576,13 @@ void MyFrame::OnDatagram(wxCommandEvent& WXUNUSED(event))
#if wxUSE_URL
void MyFrame::OnTestURL(wxCommandEvent& WXUNUSED(event))
void DoDownload(const wxString& urlname)
{
// Ask for the URL
static wxString s_urlname("http://www.google.com/");
wxString urlname = wxGetTextFromUser
(
_("Enter an URL to get"),
_("URL:"),
s_urlname
);
if ( urlname.empty() )
return; // cancelled by user
wxString testname("URL");
if ( !wxIsMainThread() )
testname += " in worker thread";
s_urlname = urlname;
TestLogger logtest("URL");
TestLogger logtest(testname);
// Parse the URL
wxURL url(urlname);
@ -626,6 +617,51 @@ void MyFrame::OnTestURL(wxCommandEvent& WXUNUSED(event))
urlname, sout.GetString());
}
void MyFrame::OnTestURL(wxCommandEvent& WXUNUSED(event))
{
// Ask for the URL
static wxString s_urlname("http://www.google.com/");
wxString urlname = wxGetTextFromUser
(
_("Enter an URL to get"),
_("URL:"),
s_urlname
);
if ( urlname.empty() )
return; // cancelled by user
s_urlname = urlname;
// First do it in this thread.
DoDownload(urlname);
// And then also in a worker thread.
#if wxUSE_THREADS
class DownloadThread : public wxThread
{
public:
explicit DownloadThread(const wxString& url): m_url(url)
{
Run();
}
virtual void* Entry() wxOVERRIDE
{
DoDownload(m_url);
return NULL;
}
private:
const wxString m_url;
};
// NB: there is a race condition here, we don't check for this thread
// termination before exiting the application, don't do this in real code!
new DownloadThread(urlname);
#endif // wxUSE_THREADS
}
#endif // wxUSE_URL
void MyFrame::OnSocketEvent(wxSocketEvent& event)

View File

@ -79,8 +79,6 @@ wxFTP::wxFTP()
m_username = wxT("anonymous");
m_password << wxGetUserId() << wxT('@') << wxGetFullHostName();
SetNotify(0);
SetFlags(wxSOCKET_NOWAIT);
m_bPassive = true;
m_bEncounteredError = false;
}
@ -782,8 +780,6 @@ wxInputStream *wxFTP::GetInputStream(const wxString& path)
return NULL;
}
sock->SetFlags(wxSOCKET_WAITALL);
m_streaming = true;
wxInputFTPStream *in_stream = new wxInputFTPStream(this, sock);

View File

@ -48,8 +48,6 @@ wxHTTP::wxHTTP()
m_read = false;
m_proxy_mode = false;
m_http_response = 0;
SetNotify(wxSOCKET_LOST_FLAG);
}
wxHTTP::~wxHTTP()
@ -370,16 +368,6 @@ bool wxHTTP::BuildRequest(const wxString& path, const wxString& method)
SetHeader(wxT("Authorization"), GenerateAuthString(m_username, m_password));
}
SaveState();
// we may use non blocking sockets only if we can dispatch events from them
int flags = wxIsMainThread() && wxApp::IsMainLoopRunning() ? wxSOCKET_NONE
: wxSOCKET_BLOCK;
// and we must use wxSOCKET_WAITALL to ensure that all data is sent
flags |= wxSOCKET_WAITALL;
SetFlags(flags);
Notify(false);
wxString buf;
buf.Printf(wxT("%s %s HTTP/1.0\r\n"), method, path);
const wxWX2MBbuf pathbuf = buf.mb_str();
@ -395,10 +383,8 @@ bool wxHTTP::BuildRequest(const wxString& path, const wxString& method)
wxString tmp_str;
m_lastError = ReadLine(this, tmp_str);
if (m_lastError != wxPROTO_NOERR) {
RestoreState();
if (m_lastError != wxPROTO_NOERR)
return false;
}
if (!tmp_str.Contains(wxT("HTTP/"))) {
// TODO: support HTTP v0.9 which can have no header.
@ -441,7 +427,7 @@ bool wxHTTP::BuildRequest(const wxString& path, const wxString& method)
m_lastError = wxPROTO_NOERR;
ret_value = ParseHeaders();
RestoreState();
return ret_value;
}
@ -540,9 +526,6 @@ wxInputStream *wxHTTP::GetInputStream(const wxString& path)
inp_stream->m_read_bytes = 0;
Notify(false);
SetFlags(wxSOCKET_BLOCK | wxSOCKET_WAITALL);
// no error; reset m_lastError
m_lastError = wxPROTO_NOERR;
return inp_stream;

View File

@ -63,7 +63,10 @@ wxIMPLEMENT_ABSTRACT_CLASS(wxProtocol, wxObject);
wxProtocol::wxProtocol()
#if wxUSE_SOCKETS
: wxSocketClient()
// Only use non blocking sockets if we can dispatch events.
: wxSocketClient((wxIsMainThread() && wxApp::IsMainLoopRunning()
? wxSOCKET_NONE
: wxSOCKET_BLOCK) | wxSOCKET_WAITALL)
#endif
{
m_lastError = wxPROTO_NOERR;