cdxcore.jcpool#
Simple multi-processing conv wrapper around (already great) joblib.Parallel().
The minor additions are that parallel processing will be a tad more convenient for dictionaries,
and that it supports routing cdxcore.verbose.Context messaging via a
multiprocessing.Queue to a single thread.
Import#
from cdxcore.jcpool import JCPool
Documentation#
Classes
|
Parallel Job Context Pool. |
|
Lightweight |
- class cdxcore.jcpool.JCPool(num_workers=1, threading=False, tmp_root_dir='!/.cdxmp', *, mem_leak_enforce=True, mem_leak_max_memory=300000000, mem_leak_timer=1.0, logging_level=40, verbose=<cdxcore.verbose.Context object>, parallel_kwargs={})[source]#
Bases:
objectParallel Job Context Pool.
Simple wrapper around joblib.Parallel() which allows worker processes to use
cdxcore.verbose.Contextto report progress updates. For this purpose,cdxcore.verbose.Contextwill send output messages via amultiprocessing.Queueto the main process where a sepeate thread prints these messages out.Using a fixed central pool object in your code base avoids relaunching processes.
Functions passed to
cdxcore.jcpool.JCPool.parallel()and related functions must be decorated withcdxcore.jcpool.JCPool.delayed().List/Generator Usage
The following code is a standard prototype for using
cdxcore.jcpool.JCPool.parallel()following closely the joblib paradigm:from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( ticker, tdata, verbose : Context ): # some made up function q = np.quantile( tdata, 0.35, axis=0 ) tx = q[0] ty = q[1] time.sleep(0.5) verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}") return tx, ty tickerdata =\ { 'SPY': np.random.normal(size=(1000,2)), 'GLD': np.random.normal(size=(1000,2)), 'BTC': np.random.normal(size=(1000,2)) } verbose = Context("all") with verbose.write_t("Launching analysis") as tme: with pool.context( verbose ) as verbose: for tx, ty in pool.parallel( pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) ) for ticker, tdata in tickerdata.items() ): verbose.report(1,lambda : f"Returned {tx:.2f}, {ty:.2f}") verbose.write(lambda : f"Analysis done; this took {tme}.")
The output from this code is asynchronous:
00: Launching analysis 02: Result for SPY: -0.43, -0.39 01: Returned -0.43, -0.39 02: Result for BTC: -0.39, -0.45 01: Returned -0.39, -0.45 02: Result for GLD: -0.41, -0.43 01: Returned -0.41, -0.43 00: Analysis done; this took 0.73s.
Dict
Considering the asynchronous nature of the returned data it is often desirable to keep track of results by some identifier. In above example
tickerwas not available in the main loop. This pattern is automated with the dictionary usage pattern:from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( ticker, tdata, verbose : Context ): # some made up function q = np.quantile( tdata, 0.35, axis=0 ) tx = q[0] ty = q[1] time.sleep(0.5) verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}") return tx, ty tickerdata =\ { 'SPY': np.random.normal(size=(1000,2)), 'GLD': np.random.normal(size=(1000,2)), 'BTC': np.random.normal(size=(1000,2)) } verbose = Context("all") with verbose.write_t("Launching analysis") as tme: with pool.context( verbose ) as verbose: for ticker, tx, ty in pool.parallel( { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) ) for ticker, tdata in tickerdata.items() } ): verbose.report(1,f"Returned {ticker} {tx:.2f}, {ty:.2f}") verbose.write(f"Analysis done; this took {tme}.")
This generates the following output:
00: Launching analysis 02: Result for SPY: -0.34, -0.41 01: Returned SPY -0.34, -0.41 02: Result for GLD: -0.38, -0.41 01: Returned GLD -0.38, -0.41 02: Result for BTC: -0.34, -0.32 01: Returned BTC -0.34, -0.32 00: Analysis done; this took 5s.
Note that
cdxcore.jcpool.JCPool.parallel()when applied to a dictionary does not return a dictionary, but a sequence of tuples. As in the example this also works if the function being called returns tuples itself; in this case the returned data is extended by the key of the dictionary provided.In order to retrieve a dictionary use
cdxcore.jcpool.JCPool.parallel_to_dict():verbose = Context("all") with pool.context( verbose ) as verbose: r = pool.parallel_to_dict( { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose ) for ticker, tdata in self.data.items() } )
Note that in this case the function returns only after all jobs have been processed.
** Loky Memory Leak Detection **
This monitoring consists of two components:
After a first initial period (the first task, usually), Loky assess the intial memory used by the process uing
psutil.Before each subsequent task, if a minimum time period has passed (a second by default) it: 1. Checks whether the process allocated more than this number + 300MB by default. 2. If that happens, Loky will call
gc.collect()(a very expensive operation) 3. Then check again perform above test. If this exceeds the initial number + 300MB it will kill the process.
The github code is here.
For most intensive machine learning tasks such as data pipeline processing this will lead to killing the process early on. By default, Loky is quiet about killing a process which it thinks leaks memory. The default implementation of
JCPoolchanges Loky’s INFO to FATAL and reports it tostderrif this happens. This can be turned off by settinglogging_leveltoNone.To manually check whether a Loky joblib process is killed due to a perceived memory leak, use:
import multiprocessing.util as mp_util mp_util.log_to_stderr(level=mp_util.INFO)
Check for
"Memory leak detected: shutting down worker"and then"Exit due to memory leak".You can either modify this behaviour to a more moderate setting, or turn it off:
Turn off: set
mem_leak_enforcetoFalseto turn off memory checking. However in this case Loky will still regularly callgc.collect(), by default if a second or more has passed after the last task. You can chnage that delay by usingmem_leak_timer.Modify: set tjhe memory threshold to a bigger number than 300MB by using
mem_leak_max_memory; this can be a float representing the percentage of total physical memory. You can chnage the delay of checking vs the new threshold by usingmem_leak_timer.
- Parameters:
- num_workersint, default
1 The number of workers. If
num_workersis1then no parallel process or thread is started per default joblib functionality.Just as for joblib you can use a negative
num_workersto set the number of workers to thenumber of CPUs + num_workers + 1. For example, anum_workersof-2will use as many jobs as CPUs are present less one. Ifnum_workersis negative, the effective number of workers will be at least1.For debugging can set the number of workes to zero to bypass
joblibentirely.Default is
1.- threadingbool, default
False If
False, the default, then the pool will act as a"loky"multi-process pool with the associated overhead of managing data accross processes.If
True, then the pool is a"threading"pool. This helps for functions whose code releases Python’s global interpreter lock, for example when engaged in heavy I/O or compiled code such asnumpy.,pandas, or generated with numba.- tmp_root_dirstr | SubDir, default
"!/.cdxmp" Temporary directory for memory mapping large arrays. This is a root directory; the function will create a temporary sub-directory with a name generated from the current state of the system. This sub-directory will be deleted upon destruction of
JCPoolor whencdxcore.jcpool.JCPool.terminate()is called. This function usescdxcore.subdir.SubDirand therefore supports for example root directories!/(local temporary directory),?/(a temporary directory root), and~/as the home directory.This parameter can also be
Nonein which case the default behaviour ofjoblib.Parallelis used.Default is
"?/.cdxmp"which creates a temporary temp directory using,tempfile.gettempdir(), and places a subdirectory".cdxmp"inside it.- mem_leak_enforcebool, default
True This parameter controls whether Loky is allowed to detect memory leaks when multi-processing is used. See the section on “Loky Memory Leak Detection” above for background.
This parameter has no effect if
threadingisTrue.- mem_leak_max_memoryint | float, default
DEFAULT_MEM_LEAK_MAX_MEMORY This parameter sets the memory threshold after which Loky assumes a process leaks memory and kills it. See the section on “Loky Memory Leak Detection” above for background.
If
mem_leak_max_memoryis anintit specifies the amount of memory in bytes a process may allocate before it is killed. It must be abovecdxcore.jcpool.JCPool.MIN_MAX_MEMORY.If
``mem_leak_max_memoryis afloatit specified the amount of memory a process may allocated as percentage of total avaible physical memory. This will be floored by attr:cdxcore.jcpool.JCPool.MIN_MAX_MEMORY.
The default
cdxcore.jcpool.JCPool.DEFAULT_MEM_LEAK_MAX_MEMORYis typically 300MB.This parameter has no effect if
threadingisTrue.- mem_leak_timerfloat, default
DEFAULT_MEM_LEAK_TIMER This parameter controls how many seconds Loky waits before detecting memory leaks (if
mem_leak_enforceisTrue) or how often it callsgc.collect()(ifmem_leak_enforceisFalse). See the section on “Loky Memory Leak Detection” above for background.The default,
cdxcore.jcpool.JCPool.DEFAULT_MEM_LEAK_TIMERis one second.This parameter has no effect if
threadingisTrue.- logging_levelint | None, default :attr:logging.ERROR
Sets the global level for
multiprocessingupon which to print log messages tostderr. Essentially, if notNone, thenmultiprocessing.util.log_to_stderr()is called.- verbose
cdxcore.verbose.Context, defaultcdxcore.verbose.Context.quiet A
cdxcore.verbose.Contextobject used to print out multi-processing/threading information. This is not theContextprovided to child processes/threads.Default is
quiet, a context which does not print anything.- parallel_kwargsdict, default empty
Additional keywords for
joblib.Parallel.
- num_workersint, default
- DEFAULT_MEM_LEAK_MAX_MEMORY = 300000000#
Default Loky memory leak size, usually 300MB
- DEFAULT_MEM_LEAK_TIMER = 1.0#
Default loky memory leak and
gc.collect()timer in seconds, usually 1.#
- MIN_MAX_MEMORY = 10000000#
lower bound for memory leak detection per process. See discussion on “Loky Memory Leak Detection” above.
- __call__(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool.
All functions used in
jobsmust have been decorated usingcdxcore.jcpool.JCPool.delayed().This function returns an iterator which yields results as soon as they are computed.
If
jobsis aSequenceyou can also usecdxcore.jcpool.JCPool.parallel_to_list()to retrieve alistof all results upon completion of the last job. Similarly, ifjobsis aMapping, usecdxcore.jcpool.JCPool.parallel_to_dict()to retrieve adictof results upon completion of the last job.- Parameters:
- jobsSequence | Mapping
Can be a
SequencecontainingCallablefunctions, or aMappingwhose values areCallablefunctions.Each
Callableused as part of either must have been decorated withcdxcore.jcpool.JCPool.delayed().
- Returns:
- parallelIterator
An iterator which yields results as soon as they are available. If
jobsis aMapping, then the resutling iterator will generate tuples with the first element equal to the mapping key of the respective function job. This function will not return a dictionary.
- context(verbose, verbose_interval=None)[source]#
Parallel processing
Contextobject.This function returns a
cdxcore.verbose.Contextobject whosechannelis a queue towards a utility thread which will outout all messages toverbose. As a result a worker process is able to useverboseas if it were in-processA standard usage pattern is:
from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( x, verbose : Context ): verbose.write(f"Found {x}") # <- text "Found 1" etc will be sent return x # to main thread via Queue verbose = Context("all") with pool.context( verbose ) as verbose: for x in pool.parallel( pool.delayed(f)( x=x, verbose=verbose(1) ) for x in [1,2,3,4] ): verbose.write(f"Returned {x}")
See
cdxcore.jcpool.JCPoolfor more usage patterns.
- static cpu_count(only_physical_cores=False)[source]#
Return the number of physical CPUs.
- Parameters:
- only_physical_coresboolean, optional
If
True, does not take hyperthreading / SMT logical cores into account. Default isFalse.
- Returns:
- cpusint
Count
- delayed(F)[source]#
Decorate a function for parallel execution.
This decorate adds minor synthatical sugar on top of
joblib.delayed()(which in turn is discussed here).When called, this decorator checks that no
cdxcore.verbose.Contextarguments are passed to the pooled function which have noParallelContextChannelpresent. In other words, the function detects if the user forgot to usecdxcore.jcpool.JCPool.context().- Parameters:
- FCallable
Function.
- Returns:
- wrapped FCallable
Decorated function.
- property is_no_pool: bool#
Whether this is an actual pool or not (i.e. the pool was constructed with zero workers)
- property mem_leak_max_memory: int | None#
returns the effective
mem_leak_max_memoryused by the pool as integer, orNoneif not used.
- parallel(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool.
All functions used in
jobsmust have been decorated usingcdxcore.jcpool.JCPool.delayed().This function returns an iterator which yields results as soon as they are computed.
If
jobsis aSequenceyou can also usecdxcore.jcpool.JCPool.parallel_to_list()to retrieve alistof all results upon completion of the last job. Similarly, ifjobsis aMapping, usecdxcore.jcpool.JCPool.parallel_to_dict()to retrieve adictof results upon completion of the last job.- Parameters:
- jobsSequence | Mapping
Can be a
SequencecontainingCallablefunctions, or aMappingwhose values areCallablefunctions.Each
Callableused as part of either must have been decorated withcdxcore.jcpool.JCPool.delayed().
- Returns:
- parallelIterator
An iterator which yields results as soon as they are available. If
jobsis aMapping, then the resutling iterator will generate tuples with the first element equal to the mapping key of the respective function job. This function will not return a dictionary.
- parallel_to_dict(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a dictionary upon completion.
This function awaits the calculation of all elements of
jobsand returns adictwith the results.- Parameters:
- jobsMapping
A dictionary where all (function) values must have been decorated with
cdxcore.jcpool.JCPool.delayed().
- Returns:
- Resultsdict
A dictionary with results.
If
jobsis anOrderedDict, then this function will return anOrderedDictwith the same order asjobs. Otherwise the elements of thedictreturned by this function are in completion order.
- parallel_to_list(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a list upon completion.
This function awaits the calculation of all elements of
jobsand returns alistwith the results.- Parameters:
- jobsSequence
An sequence of
Callablefunctions, each of which must have been decorated withcdxcore.jcpool.JCPool.delayed().
- Returns:
- Resultslist
A list with results, in the order of
jobs.
- class cdxcore.jcpool.ParallelContextChannel(*, cid, maintid, queue, f_verbose)[source]#
Bases:
ContextLightweight
cdxcore.verbose.Contextchannelwhich is pickle’able.This channel sends messages it receives to a
multiprocessing.Queue.- __call__(msg, flush)[source]#
Sends
msgvia amultiprocessing.Queueto the main thread for printing.