cdxcore.jcpool#
Simple multi-processing conv wrapper around (already great) joblib.Parallel().
The minor additions are that parallel processing will be a tad more convenient for dictionaries,
and that it supports routing cdxcore.verbose.Context messaging via a
multiprocessing.Queue to a single thread.
Import#
from cdxcore.jcpool import JCPool
Documentation#
Classes
|
Parallel Job Context Pool. |
|
Lightweight |
- class cdxcore.jcpool.JCPool(num_workers=1, threading=False, tmp_root_dir='!/.cdxmp', *, verbose=<cdxcore.verbose.Context object>, parallel_kwargs={})[source]#
Bases:
objectParallel Job Context Pool.
Simple wrapper around joblib.Parallel() which allows worker processes to use
cdxcore.verbose.Contextto report progress updates. For this purpose,cdxcore.verbose.Contextwill send output messages via amultiprocessing.Queueto the main process where a sepeate thread prints these messages out.Using a fixed central pool object in your code base avoids relaunching processes.
Functions passed to
cdxcore.jcpool.JCPool.parallel()and related functions must be decorated withcdxcore.jcpool.JCPool.delayed().List/Generator Usage
The following code is a standard prototype for using
cdxcore.jcpool.JCPool.parallel()following closely the joblib paradigm:from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( ticker, tdata, verbose : Context ): # some made up function q = np.quantile( tdata, 0.35, axis=0 ) tx = q[0] ty = q[1] time.sleep(0.5) verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}") return tx, ty tickerdata =\ { 'SPY': np.random.normal(size=(1000,2)), 'GLD': np.random.normal(size=(1000,2)), 'BTC': np.random.normal(size=(1000,2)) } verbose = Context("all") with verbose.write_t("Launching analysis") as tme: with pool.context( verbose ) as verbose: for tx, ty in pool.parallel( pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) ) for ticker, tdata in tickerdata.items() ): verbose.report(1,f"Returned {tx:.2f}, {ty:.2f}") verbose.write(f"Analysis done; this took {tme}.")
The output from this code is asynchronous:
00: Launching analysis 02: Result for SPY: -0.43, -0.39 01: Returned -0.43, -0.39 02: Result for BTC: -0.39, -0.45 01: Returned -0.39, -0.45 02: Result for GLD: -0.41, -0.43 01: Returned -0.41, -0.43 00: Analysis done; this took 0.73s.
Dict
Considering the asynchronous nature of the returned data it is often desirable to keep track of results by some identifier. In above example
tickerwas not available in the main loop. This pattern is automated with the dictionary usage pattern:from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( ticker, tdata, verbose : Context ): # some made up function q = np.quantile( tdata, 0.35, axis=0 ) tx = q[0] ty = q[1] time.sleep(0.5) verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}") return tx, ty tickerdata =\ { 'SPY': np.random.normal(size=(1000,2)), 'GLD': np.random.normal(size=(1000,2)), 'BTC': np.random.normal(size=(1000,2)) } verbose = Context("all") with verbose.write_t("Launching analysis") as tme: with pool.context( verbose ) as verbose: for ticker, tx, ty in pool.parallel( { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) ) for ticker, tdata in tickerdata.items() } ): verbose.report(1,f"Returned {ticker} {tx:.2f}, {ty:.2f}") verbose.write(f"Analysis done; this took {tme}.")
This generates the following output:
00: Launching analysis 02: Result for SPY: -0.34, -0.41 01: Returned SPY -0.34, -0.41 02: Result for GLD: -0.38, -0.41 01: Returned GLD -0.38, -0.41 02: Result for BTC: -0.34, -0.32 01: Returned BTC -0.34, -0.32 00: Analysis done; this took 5s.
Note that
cdxcore.jcpool.JCPool.parallel()when applied to a dictionary does not return a dictionary, but a sequence of tuples. As in the example this also works if the function being called returns tuples itself; in this case the returned data is extended by the key of the dictionary provided.In order to retrieve a dictionary use
cdxcore.jcpool.JCPool.parallel_to_dict():verbose = Context("all") with pool.context( verbose ) as verbose: r = pool.parallel_to_dict( { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose ) for ticker, tdata in self.data.items() } )
Note that in this case the function returns only after all jobs have been processed.
- Parameters:
- num_workersint, optional
The number of workers. If
num_workersis1then no parallel process or thread is started. Just as for joblib you can use a negativenum_workersto set the number of workers to thenumber of CPUs + num_workers + 1. For example, anum_workersof-2will use as many jobs as CPUs are present less one. Ifnum_workersis negative, the effective number of workers will be at least1.Default is
1.- threadingbool, optional
If
False, the default, then the pool will act as a"loky"multi-process pool with the associated overhead of managing data accross processes.If
True, then the pool is a"threading"pool. This helps for functions whose code releases Python’s global interpreter lock, for example when engaged in heavy I/O or compiled code such asnumpy.,pandas, or generated with numba.- tmp_root_dirstr | SubDir, optional
Temporary directory for memory mapping large arrays. This is a root directory; the function will create a temporary sub-directory with a name generated from the current state of the system. This sub-directory will be deleted upon destruction of
JCPoolor whencdxcore.jcpool.JCPool.terminate()is called.This parameter can also be
Nonein which case the default behaviour ofjoblib.Parallelis used.Default is
"!/.cdxmp".- verboseContext, optional
A
cdxcore.verbose.Contextobject used to print out multi-processing/threading information. This is not theContextprovided to child processes/threads.Default is
quiet.- parallel_kwargsdict, optional
Additional keywords for
joblib.Parallel.
- __call__(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool.
All functions used in
jobsmust have been decorated usingcdxcore.jcpool.JCPool.delayed().This function returns an iterator which yields results as soon as they are computed.
If
jobsis aSequenceyou can also usecdxcore.jcpool.JCPool.parallel_to_list()to retrieve alistof all results upon completion of the last job. Similarly, ifjobsis aMapping, usecdxcore.jcpool.JCPool.parallel_to_dict()to retrieve adictof results upon completion of the last job.- Parameters:
- jobsSequence | Mapping
Can be a
SequencecontainingCallablefunctions, or aMappingwhose values areCallablefunctions.Each
Callableused as part of either must have been decorated withcdxcore.jcpool.JCPool.delayed().
- Returns:
- parallelIterator
An iterator which yields results as soon as they are available. If
jobsis aMapping, then the resutling iterator will generate tuples with the first element equal to the mapping key of the respective function job. This function will not return a dictionary.
- context(verbose, verbose_interval=None)[source]#
Parallel processing
Contextobject.This function returns a
cdxcore.verbose.Contextobject whosechannelis a queue towards a utility thread which will outout all messages toverbose. As a result a worker process is able to useverboseas if it were in-processA standard usage pattern is:
from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( x, verbose : Context ): verbose.write(f"Found {x}") # <- text "Found 1" etc will be sent return x # to main thread via Queue verbose = Context("all") with pool.context( verbose ) as verbose: for x in pool.parallel( pool.delayed(f)( x=x, verbose=verbose(1) ) for x in [1,2,3,4] ): verbose.write(f"Returned {x}")
See
cdxcore.jcpool.JCPoolfor more usage patterns.
- static cpu_count(only_physical_cores=False)[source]#
Return the number of physical CPUs.
- Parameters:
- only_physical_coresboolean, optional
If
True, does not take hyperthreading / SMT logical cores into account. Default isFalse.
- Returns:
- cpusint
Count
- delayed(F)[source]#
Decorate a function for parallel execution.
This decorate adds minor synthatical sugar on top of
joblib.delayed()(which in turn is discussed here).When called, this decorator checks that no
cdxcore.verbose.Contextarguments are passed to the pooled function which have noParallelContextChannelpresent. In other words, the function detects if the user forgot to usecdxcore.jcpool.JCPool.context().- Parameters:
- FCallable
Function.
- Returns:
- wrapped FCallable
Decorated function.
- property is_no_pool: bool#
Whether this is an actual pool or not (i.e. the pool was constructed with zero workers)
- parallel(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool.
All functions used in
jobsmust have been decorated usingcdxcore.jcpool.JCPool.delayed().This function returns an iterator which yields results as soon as they are computed.
If
jobsis aSequenceyou can also usecdxcore.jcpool.JCPool.parallel_to_list()to retrieve alistof all results upon completion of the last job. Similarly, ifjobsis aMapping, usecdxcore.jcpool.JCPool.parallel_to_dict()to retrieve adictof results upon completion of the last job.- Parameters:
- jobsSequence | Mapping
Can be a
SequencecontainingCallablefunctions, or aMappingwhose values areCallablefunctions.Each
Callableused as part of either must have been decorated withcdxcore.jcpool.JCPool.delayed().
- Returns:
- parallelIterator
An iterator which yields results as soon as they are available. If
jobsis aMapping, then the resutling iterator will generate tuples with the first element equal to the mapping key of the respective function job. This function will not return a dictionary.
- parallel_to_dict(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a dictionary upon completion.
This function awaits the calculation of all elements of
jobsand returns adictwith the results.- Parameters:
- jobsMapping
A dictionary where all (function) values must have been decorated with
cdxcore.jcpool.JCPool.delayed().
- Returns:
- Resultsdict
A dictionary with results.
If
jobsis anOrderedDict, then this function will return anOrderedDictwith the same order asjobs. Otherwise the elements of thedictreturned by this function are in completion order.
- parallel_to_list(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a list upon completion.
This function awaits the calculation of all elements of
jobsand returns alistwith the results.- Parameters:
- jobsSequence
An sequence of
Callablefunctions, each of which must have been decorated withcdxcore.jcpool.JCPool.delayed().
- Returns:
- Resultslist
A list with results, in the order of
jobs.
- class cdxcore.jcpool.ParallelContextChannel(*, cid, maintid, queue, f_verbose)[source]#
Bases:
ContextLightweight
cdxcore.verbose.Contextchannelwhich is pickle’able.This channel sends messages it receives to a
multiprocessing.Queue.- __call__(msg, flush)[source]#
Sends
msgvia amultiprocessing.Queueto the main thread for printing.