wasm: refactor event dispatcher wait logic

We support blocking the event dispatcher thread in two cases:
 - secondary threads: wait on a wait condition
 - main thread: asyncify suspend

Create abstraction which implements both cases under a common API:
  bool QEventDispatcherWasm::wait(int timeout)
  void QEventDispatcherWasm::wakeUpDispatcherThread()

Make QEventDispatcherWasm::wakeUp() use the new API.

Also refactor the runOnMainThread functionality to provide several variants
  - runAsync(fn): makes an async call on the main thread, from the main thread.
  - runOnMainThread(fn): runs fn on the main thread, synchronously if
      called from the main thread.
  - runOnMainThreadAsync(fn): runs a fn on the main thread, always asynchronously

Change-Id: Ia6ac21a162e6b8ea2d71bacf6085beb9567588b5
Reviewed-by: Lorn Potter <lorn.potter@gmail.com>
This commit is contained in:
Morten Sørvig 2022-05-23 10:18:45 +02:00
parent 86103f3af5
commit 8c5b1f39f7
2 changed files with 154 additions and 92 deletions

View File

@ -20,6 +20,12 @@ QT_BEGIN_NAMESPACE
Q_LOGGING_CATEGORY(lcEventDispatcher, "qt.eventdispatcher");
Q_LOGGING_CATEGORY(lcEventDispatcherTimers, "qt.eventdispatcher.timers");
#if QT_CONFIG(thread)
#define LOCK_GUARD(M) std::lock_guard<std::mutex> lock(M)
#else
#define LOCK_GUARD(M)
#endif
#ifdef QT_HAVE_EMSCRIPTEN_ASYNCIFY
// Emscripten asyncify currently supports one level of suspend -
@ -86,7 +92,7 @@ bool qt_asyncify_yield()
Q_CONSTINIT QEventDispatcherWasm *QEventDispatcherWasm::g_mainThreadEventDispatcher = nullptr;
#if QT_CONFIG(thread)
Q_CONSTINIT QVector<QEventDispatcherWasm *> QEventDispatcherWasm::g_secondaryThreadEventDispatchers;
Q_CONSTINIT std::mutex QEventDispatcherWasm::g_secondaryThreadEventDispatchersMutex;
Q_CONSTINIT std::mutex QEventDispatcherWasm::g_staticDataMutex;
#endif
// ### dynamic initialization:
std::multimap<int, QSocketNotifier *> QEventDispatcherWasm::g_socketNotifiers;
@ -116,7 +122,7 @@ QEventDispatcherWasm::QEventDispatcherWasm()
g_mainThreadEventDispatcher = this;
} else {
#if QT_CONFIG(thread)
std::lock_guard<std::mutex> lock(g_secondaryThreadEventDispatchersMutex);
std::lock_guard<std::mutex> lock(g_staticDataMutex);
g_secondaryThreadEventDispatchers.append(this);
#endif
}
@ -130,7 +136,7 @@ QEventDispatcherWasm::~QEventDispatcherWasm()
#if QT_CONFIG(thread)
if (isSecondaryThreadEventDispatcher()) {
std::lock_guard<std::mutex> lock(g_secondaryThreadEventDispatchersMutex);
std::lock_guard<std::mutex> lock(g_staticDataMutex);
g_secondaryThreadEventDispatchers.remove(g_secondaryThreadEventDispatchers.indexOf(this));
} else
#endif
@ -156,6 +162,18 @@ bool QEventDispatcherWasm::isSecondaryThreadEventDispatcher()
return this != g_mainThreadEventDispatcher;
}
bool QEventDispatcherWasm::isValidEventDispatcherPointer(QEventDispatcherWasm *eventDispatcher)
{
if (eventDispatcher == g_mainThreadEventDispatcher)
return true;
#if QT_CONFIG(thread)
Q_ASSERT(!g_staticDataMutex.try_lock()); // caller must lock mutex
if (g_secondaryThreadEventDispatchers.contains(eventDispatcher))
return true;
#endif
return false;
}
bool QEventDispatcherWasm::processEvents(QEventLoop::ProcessEventsFlags flags)
{
emit awake();
@ -178,7 +196,7 @@ bool QEventDispatcherWasm::processEvents(QEventLoop::ProcessEventsFlags flags)
hasPendingEvents = qGlobalPostedEventsCount() > 0;
if (!hasPendingEvents && (flags & QEventLoop::WaitForMoreEvents))
waitForForEvents();
wait();
if (m_interrupted) {
m_interrupted = false;
@ -317,40 +335,22 @@ void QEventDispatcherWasm::interrupt()
void QEventDispatcherWasm::wakeUp()
{
#if QT_CONFIG(thread)
if (isSecondaryThreadEventDispatcher()) {
std::lock_guard<std::mutex> lock(m_mutex);
m_wakeUpCalled = true;
m_moreEvents.notify_one();
return;
}
#endif
#ifdef QT_HAVE_EMSCRIPTEN_ASYNCIFY
// The main thread may be asyncify-blocked in processEvents(). If so resume it.
if (qt_asyncify_resume()) // ### safe to call from secondary thread?
return;
#endif
{
#if QT_CONFIG(thread)
// This function can be called from any thread (via wakeUp()),
// so we need to lock access to m_pendingProcessEvents.
std::lock_guard<std::mutex> lock(m_mutex);
#endif
if (m_pendingProcessEvents)
return;
m_pendingProcessEvents = true;
}
#if QT_CONFIG(thread)
if (!emscripten_is_main_runtime_thread()) {
runOnMainThread([this](){
// The event dispatcher thread may be blocked or suspended by
// wait(), or control may have been returned to the browser's
// event loop. Make sure the thread is unblocked or make it
// process events.
bool wasBlocked = wakeEventDispatcherThread();
if (!wasBlocked && isMainThreadEventDispatcher()) {
{
LOCK_GUARD(m_mutex);
if (m_pendingProcessEvents)
return;
m_pendingProcessEvents = true;
}
runOnMainThreadAsync([this](){
QEventDispatcherWasm::callProcessEvents(this);
});
} else
#endif
emscripten_async_call(&QEventDispatcherWasm::callProcessEvents, this, 0);
}
}
void QEventDispatcherWasm::handleApplicationExec()
@ -381,7 +381,7 @@ void QEventDispatcherWasm::handleDialogExec()
<< "is leaked, and the exec() call may interfere with input event processing";
emscripten_sleep(1); // This call never returns
#endif
// For the asyncify case we do nothing here and wait for events in waitForForEvents()
// For the asyncify case we do nothing here and wait for events in wait()
}
void QEventDispatcherWasm::pollForNativeEvents()
@ -399,34 +399,62 @@ void QEventDispatcherWasm::pollForNativeEvents()
#endif
}
// Waits for more events. This is possible in two cases:
// - On a secondary thread
// - On the main thread iff asyncify is used
// Returns true if waiting was possible (at which point it
// has already happened).
bool QEventDispatcherWasm::waitForForEvents()
// Blocks/suspends the calling thread. This is possible in two cases:
// - Caller is a secondary thread: block on m_moreEvents
// - Caller is the main thread and asyncify is enabled: suspend using qt_asyncify_suspend()
// Returns false if the wait timed out.
bool QEventDispatcherWasm::wait(int timeout)
{
#if QT_CONFIG(thread)
using namespace std::chrono_literals;
Q_ASSERT(QThread::currentThread() == thread());
if (isSecondaryThreadEventDispatcher()) {
std::unique_lock<std::mutex> lock(m_mutex);
m_wakeUpCalled = false;
auto wait_time = timeout > 0 ? timeout * 1ms : std::chrono::duration<int, std::micro>::max();
bool wakeUpCalled = m_moreEvents.wait_for(lock, wait_time, [=] { return m_wakeUpCalled; });
return wakeUpCalled;
}
#endif
Q_ASSERT(emscripten_is_main_runtime_thread());
Q_ASSERT(isMainThreadEventDispatcher());
#ifdef QT_HAVE_EMSCRIPTEN_ASYNCIFY
if (timeout > 0)
qWarning() << "QEventDispatcherWasm asyncify wait with timeout is not supported; timeout will be ignored"; // FIXME
bool didSuspend = qt_asyncify_suspend();
if (!didSuspend)
qWarning("QEventDispatcherWasm: current thread is already suspended; could not asyncify wait for events");
#else
qWarning("QEventLoop::WaitForMoreEvents is not supported on the main thread without asyncify");
Q_UNUSED(timeout);
#endif
return false;
}
// Wakes a blocked/suspended event dispatcher thread. Returns true if the
// thread is unblocked or was resumed, false if the thread state could not
// be determined.
bool QEventDispatcherWasm::wakeEventDispatcherThread()
{
#if QT_CONFIG(thread)
if (isSecondaryThreadEventDispatcher()) {
std::unique_lock<std::mutex> lock(m_mutex);
m_moreEvents.wait(lock, [=] { return m_wakeUpCalled; });
m_wakeUpCalled = false;
std::lock_guard<std::mutex> lock(m_mutex);
m_wakeUpCalled = true;
m_moreEvents.notify_one();
return true;
}
#endif
Q_ASSERT(emscripten_is_main_runtime_thread());
Q_ASSERT(isMainThreadEventDispatcher());
#ifdef QT_HAVE_EMSCRIPTEN_ASYNCIFY
// We can block on the main thread using asyncify:
bool didSuspend = qt_asyncify_suspend();
if (!didSuspend)
qWarning("QEventDispatcherWasm: current thread is already suspended; could not asyncify wait for events");
return didSuspend;
#else
qWarning("QEventLoop::WaitForMoreEvents is not supported on the main thread without asyncify");
return false;
if (g_is_asyncify_suspended) {
runOnMainThread([]{ qt_asyncify_resume(); });
return true;
}
#endif
return false;
}
// Process event activation callbacks for the main thread event dispatcher.
@ -446,9 +474,7 @@ void QEventDispatcherWasm::callProcessEvents(void *context)
return;
{
#if QT_CONFIG(thread)
std::lock_guard<std::mutex> lock(g_mainThreadEventDispatcher->m_mutex);
#endif
LOCK_GUARD(g_mainThreadEventDispatcher->m_mutex);
g_mainThreadEventDispatcher->m_pendingProcessEvents = false;
}
g_mainThreadEventDispatcher->processEvents(QEventLoop::AllEvents);
@ -507,23 +533,17 @@ void QEventDispatcherWasm::updateNativeTimer()
// Update the native timer for this thread/dispatcher. This must be
// done on the main thread where we have access to native API.
runOnMainThread([this, maintainNativeTimer]() {
Q_ASSERT(emscripten_is_main_runtime_thread());
#if QT_CONFIG(thread)
if (isSecondaryThreadEventDispatcher()) {
runOnMainThread([this, maintainNativeTimer]() {
Q_ASSERT(emscripten_is_main_runtime_thread());
// "this" may have been deleted, or may be about to be deleted.
// Check if the pointer we have is still a valid event dispatcher,
// and keep the mutex locked while updating the native timer to
// prevent it from being deleted.
std::lock_guard<std::mutex> lock(g_secondaryThreadEventDispatchersMutex);
if (g_secondaryThreadEventDispatchers.contains(this))
maintainNativeTimer();
});
} else
#endif
maintainNativeTimer();
// "this" may have been deleted, or may be about to be deleted.
// Check if the pointer we have is still a valid event dispatcher,
// and keep the mutex locked while updating the native timer to
// prevent it from being deleted.
LOCK_GUARD(g_staticDataMutex);
if (isValidEventDispatcherPointer(this))
maintainNativeTimer();
});
}
// Static timer activation callback. Must be called on the main thread
@ -533,7 +553,6 @@ void QEventDispatcherWasm::callProcessTimers(void *context)
{
Q_ASSERT(emscripten_is_main_runtime_thread());
// Note: "context" may be a stale pointer here,
// take care before casting and dereferencing!
@ -546,7 +565,7 @@ void QEventDispatcherWasm::callProcessTimers(void *context)
// Wake and process timers on the secondary thread if this a secondary thread dispatcher
#if QT_CONFIG(thread)
std::lock_guard<std::mutex> lock(g_secondaryThreadEventDispatchersMutex);
std::lock_guard<std::mutex> lock(g_staticDataMutex);
if (g_secondaryThreadEventDispatchers.contains(context)) {
QEventDispatcherWasm *eventDispatcher = reinterpret_cast<QEventDispatcherWasm *>(context);
eventDispatcher->m_timerTargetTime = 0;
@ -660,22 +679,57 @@ void QEventDispatcherWasm::socketClose(int socket, void *context)
}
}
#if QT_CONFIG(thread)
namespace {
void trampoline(void *context) {
std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(context);
(*fn)();
delete fn;
auto async_fn = [](void *context){
std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(context);
(*fn)();
delete fn;
};
emscripten_async_call(async_fn, context, 0);
}
}
// Runs a function on the main thread
// Runs a function right away
void QEventDispatcherWasm::run(std::function<void(void)> fn)
{
fn();
}
// Runs a function asynchronously. Main thread only.
void QEventDispatcherWasm::runAsync(std::function<void(void)> fn)
{
trampoline(new std::function<void(void)>(fn));
}
// Runs a function on the main thread. The function runs synchronusly if
// the calling thread is then main thread.
void QEventDispatcherWasm::runOnMainThread(std::function<void(void)> fn)
{
void *context = new std::function<void(void)>(fn);
emscripten_async_run_in_main_runtime_thread_(EM_FUNC_SIG_VI, reinterpret_cast<void *>(trampoline), context);
}
#if QT_CONFIG(thread)
if (!emscripten_is_main_runtime_thread()) {
void *context = new std::function<void(void)>(fn);
emscripten_async_run_in_main_runtime_thread_(EM_FUNC_SIG_VI, reinterpret_cast<void *>(trampoline), context);
return;
}
#endif
fn();
}
// Runs a function on the main thread. The function alwas runs asynchronously,
// also if the calling thread is the main thread.
void QEventDispatcherWasm::runOnMainThreadAsync(std::function<void(void)> fn)
{
void *context = new std::function<void(void)>(fn);
#if QT_CONFIG(thread)
if (!emscripten_is_main_runtime_thread()) {
emscripten_async_run_in_main_runtime_thread_(EM_FUNC_SIG_VI, reinterpret_cast<void *>(trampoline), context);
return;
}
#endif
trampoline(context);
}
QT_END_NAMESPACE

View File

@ -56,11 +56,13 @@ protected:
private:
bool isMainThreadEventDispatcher();
bool isSecondaryThreadEventDispatcher();
static bool isValidEventDispatcherPointer(QEventDispatcherWasm *eventDispatcher);
void handleApplicationExec();
void handleDialogExec();
void pollForNativeEvents();
bool waitForForEvents();
bool wait(int timeout = -1);
bool wakeEventDispatcherThread();
static void callProcessEvents(void *eventDispatcher);
void processTimers();
@ -76,9 +78,10 @@ private:
static void socketMessage(int fd, void *context);
static void socketClose(int fd, void *context);
#if QT_CONFIG(thread)
void runOnMainThread(std::function<void(void)> fn);
#endif
static void run(std::function<void(void)> fn);
static void runAsync(std::function<void(void)> fn);
static void runOnMainThread(std::function<void(void)> fn);
static void runOnMainThreadAsync(std::function<void(void)> fn);
static QEventDispatcherWasm *g_mainThreadEventDispatcher;
@ -96,8 +99,13 @@ private:
std::condition_variable m_moreEvents;
static QVector<QEventDispatcherWasm *> g_secondaryThreadEventDispatchers;
static std::mutex g_secondaryThreadEventDispatchersMutex;
static std::mutex g_staticDataMutex;
// Note on mutex usage: the global g_staticDataMutex protects the global (g_ prefixed) data,
// while the per eventdispatcher m_mutex protects the state accociated with blocking and waking
// that eventdispatcher thread. The locking order is g_staticDataMutex first, then m_mutex.
#endif
static std::multimap<int, QSocketNotifier *> g_socketNotifiers;
};