zstdmt via compress_generic: reduce opportunity to free/create mtctx

`zstreamtest --newapi` (and `--opaqueapi`) create and destroy way too many threads
resulting in failure of tsan tests,
and potentially connected to the qemu flaky tests.

This is because, at each test, the nb of threads can be changed (random).

The `--no-big-tests` directive reduce this choice to 1/2 threads,
in order to limit memory usage, especially for qemu and 32-bits builds.
Unfortunately, swapping between 1 and 2 threads is enough to constantly create/destroy new mtctx.

This patch takes advantage of the following property :
via compress_generic, no internal mtctx is needed for nbThreads < 2.
As a consequence, when nbThreads == 2, the currently active mtctx is necessarily good.

This dramatically reduces the nb of thread creations when invoking `zstreamtest --newapi --no-big-tests`
(only when parent cctx itself is created, which is randomized to 1/256 tests).

Expected outcome :
- at a minimum : tsan tests shall now work continuously without exploding the thread counter
- at best : flaky qemu tests on `zstreamtest --newapi --no-big-tests` may stop being flaky, due to less stress from constant thread creation/destruction

Real world impact :
minimal, I don't expect users to constantly change `nbThreads` between each invocation.
If `nbThreads` remains stable, existing implementation re-uses existing mtctx.

Also : `zstreamtest --newapi` but without `--no-big-tests` doesn't benefit as much,
since this test can select a random `nbThreads` value between 1 and 4.
The current patch only reduces opportunity to free/create mtctx (for example : 2->1->2 doesn't need a new mtctx)
but doesn't completely eliminate it, since `nbThreads` can still change between 2/3/4.
A more complete solution could be to only use 2 out of 4 allocated threads, thus keeping the pool at a constant size.
This would require a larger change to `POOL_*` api though.
This commit is contained in:
Yann Collet 2017-12-16 12:48:13 -08:00
parent 569e06b91e
commit 5c2f2ebfdb
3 changed files with 18 additions and 2 deletions

View File

@ -2851,7 +2851,9 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN)
params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */
if (params.nbThreads > 1) {
if (cctx->mtctx == NULL || cctx->appliedParams.nbThreads != params.nbThreads) {
if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) {
DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u (previous: %u)",
params.nbThreads, ZSTDMT_getNbThreads(cctx->mtctx));
ZSTDMT_freeCCtx(cctx->mtctx);
cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem);
if (cctx->mtctx == NULL) return ERROR(memory_allocation);

View File

@ -459,6 +459,15 @@ size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThread
return nbThreads;
}
/* ZSTDMT_getNbThreads():
* @return nb threads currently active in mtctx.
* mtctx must be valid */
size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx)
{
assert(mtctx != NULL);
return mtctx->params.nbThreads;
}
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
{
ZSTDMT_CCtx* mtctx;

View File

@ -114,9 +114,14 @@ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_param
/* ZSTDMT_CCtxParam_setNbThreads()
* Set nbThreads, and clamp it correctly,
* but also reset jobSize and overlapLog */
* also reset jobSize and overlapLog */
size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads);
/* ZSTDMT_getNbThreads():
* @return nb threads currently active in mtctx.
* mtctx must be valid */
size_t ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx);
/*! ZSTDMT_initCStream_internal() :
* Private use only. Init streaming operation.
* expects params to be valid.