cdxcore.jcpool#
Simple multi-processing conv wrapper around (already great) joblib.Parallel().
The minor additions are that parallel processing will be a tad more convenient for dictionaries,
and that it supports routing cdxcore.verbose.Context
messaging via a
multiprocessing.Queue
to a single thread.
Import#
from cdxcore.jcpool import JCPool
Documentation#
Classes
|
Parallel Job Context Pool. |
|
Lightweight |
- class cdxcore.jcpool.JCPool(num_workers=1, threading=False, tmp_root_dir='!/.cdxmp', *, verbose=<cdxcore.verbose.Context object>, parallel_kwargs={})[source]#
Bases:
object
Parallel Job Context Pool.
Simple wrapper around joblib.Parallel() which allows worker processes to use
cdxcore.verbose.Context
to report progress updates. For this purpose,cdxcore.verbose.Context
will send output messages via amultiprocessing.Queue
to the main process where a sepeate thread prints these messages out.Using a fixed central pool object in your code base avoids relaunching processes.
Functions passed to
cdxcore.jcpool.JCPool.parallel()
and related functions must be decorated withcdxcore.jcpool.JCPool.delayed()
.List/Generator Usage
The following code is a standard prototype for using
cdxcore.jcpool.JCPool.parallel()
following closely the joblib paradigm:from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( ticker, tdata, verbose : Context ): # some made up function q = np.quantile( tdata, 0.35, axis=0 ) tx = q[0] ty = q[1] time.sleep(0.5) verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}") return tx, ty tickerdata =\ { 'SPY': np.random.normal(size=(1000,2)), 'GLD': np.random.normal(size=(1000,2)), 'BTC': np.random.normal(size=(1000,2)) } verbose = Context("all") with verbose.write_t("Launching analysis") as tme: with pool.context( verbose ) as verbose: for tx, ty in pool.parallel( pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) ) for ticker, tdata in tickerdata.items() ): verbose.report(1,f"Returned {tx:.2f}, {ty:.2f}") verbose.write(f"Analysis done; this took {tme}.")
The output from this code is asynchronous:
00: Launching analysis 02: Result for SPY: -0.43, -0.39 01: Returned -0.43, -0.39 02: Result for BTC: -0.39, -0.45 01: Returned -0.39, -0.45 02: Result for GLD: -0.41, -0.43 01: Returned -0.41, -0.43 00: Analysis done; this took 0.73s.
Dict
Considering the asynchronous nature of the returned data it is often desirable to keep track of results by some identifier. In above example
ticker
was not available in the main loop. This pattern is automated with the dictionary usage pattern:from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( ticker, tdata, verbose : Context ): # some made up function q = np.quantile( tdata, 0.35, axis=0 ) tx = q[0] ty = q[1] time.sleep(0.5) verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}") return tx, ty tickerdata =\ { 'SPY': np.random.normal(size=(1000,2)), 'GLD': np.random.normal(size=(1000,2)), 'BTC': np.random.normal(size=(1000,2)) } verbose = Context("all") with verbose.write_t("Launching analysis") as tme: with pool.context( verbose ) as verbose: for ticker, tx, ty in pool.parallel( { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) ) for ticker, tdata in tickerdata.items() } ): verbose.report(1,f"Returned {ticker} {tx:.2f}, {ty:.2f}") verbose.write(f"Analysis done; this took {tme}.")
This generates the following output:
00: Launching analysis 02: Result for SPY: -0.34, -0.41 01: Returned SPY -0.34, -0.41 02: Result for GLD: -0.38, -0.41 01: Returned GLD -0.38, -0.41 02: Result for BTC: -0.34, -0.32 01: Returned BTC -0.34, -0.32 00: Analysis done; this took 5s.
Note that
cdxcore.jcpool.JCPool.parallel()
when applied to a dictionary does not return a dictionary, but a sequence of tuples. As in the example this also works if the function being called returns tuples itself; in this case the returned data is extended by the key of the dictionary provided.In order to retrieve a dictionary use
cdxcore.jcpool.JCPool.parallel_to_dict()
:verbose = Context("all") with pool.context( verbose ) as verbose: r = pool.parallel_to_dict( { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose ) for ticker, tdata in self.data.items() } )
Note that in this case the function returns only after all jobs have been processed.
- Parameters:
- num_workersint, optional
The number of workers. If
num_workers
is1
then no parallel process or thread is started. Just as for joblib you can use a negativenum_workers
to set the number of workers to thenumber of CPUs + num_workers + 1
. For example, anum_workers
of-2
will use as many jobs as CPUs are present less one. Ifnum_workers
is negative, the effective number of workers will be at least1
.Default is
1
.- threadingbool, optional
If
False
, the default, then the pool will act as a"loky"
multi-process pool with the associated overhead of managing data accross processes.If
True
, then the pool is a"threading"
pool. This helps for functions whose code releases Python’s global interpreter lock, for example when engaged in heavy I/O or compiled code such asnumpy
.,pandas
, or generated with numba.- tmp_root_dirstr | SubDir, optional
Temporary directory for memory mapping large arrays. This is a root directory; the function will create a temporary sub-directory with a name generated from the current state of the system. This sub-directory will be deleted upon destruction of
JCPool
or whencdxcore.jcpool.JCPool.terminate()
is called.This parameter can also be
None
in which case the default behaviour ofjoblib.Parallel
is used.Default is
"!/.cdxmp"
.- verboseContext, optional
A
cdxcore.verbose.Context
object used to print out multi-processing/threading information. This is not theContext
provided to child processes/threads.Default is
quiet
.- parallel_kwargsdict, optional
Additional keywords for
joblib.Parallel
.
- Attributes:
is_threading
Whether we are threading or mulit-processing.
tmp_path
Path to the temporary directory for this object.
Methods
context
(verbose[, verbose_interval])Parallel processing
Context
object.cpu_count
([only_physical_cores])Return the number of physical CPUs.
delayed
(F)Decorate a function for parallel execution.
parallel
(jobs)Process a number of jobs in parallel using the current multiprocessing pool.
parallel_to_dict
(jobs)Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a dictionary upon completion.
parallel_to_list
(jobs)Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a list upon completion.
Stop the current parallel pool, and delete any temporary files (if managed by
JCPool
).- context(verbose, verbose_interval=None)[source]#
Parallel processing
Context
object.This function returns a
cdxcore.verbose.Context
object whosechannel
is a queue towards a utility thread which will outout all messages toverbose
. As a result a worker process is able to useverbose
as if it were in-processA standard usage pattern is:
from cdxcore.verbose import Context from cdxcore.jcpool import JCPool import time as time import numpy as np pool = JCPool( num_workers=4 ) # global pool. Reuse where possible def f( x, verbose : Context ): verbose.write(f"Found {x}") # <- text "Found 1" etc will be sent return x # to main thread via Queue verbose = Context("all") with pool.context( verbose ) as verbose: for x in pool.parallel( pool.delayed(f)( x=x, verbose=verbose(1) ) for x in [1,2,3,4] ): verbose.write(f"Returned {x}")
See
cdxcore.jcpool.JCPool
for more usage patterns.
- static cpu_count(only_physical_cores=False)[source]#
Return the number of physical CPUs.
- Parameters:
- only_physical_coresboolean, optional
If
True
, does not take hyperthreading / SMT logical cores into account. Default isFalse
.
- Returns:
- cpusint
Count
- delayed(F)[source]#
Decorate a function for parallel execution.
This decorate adds minor synthatical sugar on top of
joblib.delayed()
(which in turn is discussed here).When called, this decorator checks that no
cdxcore.verbose.Context
arguments are passed to the pooled function which have noParallelContextChannel
present. In other words, the function detects if the user forgot to usecdxcore.jcpool.JCPool.context()
.- Parameters:
- FCallable
Function.
- Returns:
- wrapped FCallable
Decorated function.
- parallel(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool.
All functions used in
jobs
must have been decorated usingcdxcore.jcpool.JCPool.delayed()
.This function returns an iterator which yields results as soon as they are computed.
If
jobs
is aSequence
you can also usecdxcore.jcpool.JCPool.parallel_to_list()
to retrieve alist
of all results upon completion of the last job. Similarly, ifjobs
is aMapping
, usecdxcore.jcpool.JCPool.parallel_to_dict()
to retrieve adict
of results upon completion of the last job.- Parameters:
- jobsSequence | Mapping
Can be a
Sequence
containingCallable
functions, or aMapping
whose values areCallable
functions.Each
Callable
used as part of either must have been decorated withcdxcore.jcpool.JCPool.delayed()
.
- Returns:
- parallelIterator
An iterator which yields results as soon as they are available. If
jobs
is aMapping
, then the resutling iterator will generate tuples with the first element equal to the mapping key of the respective function job. This function will not return a dictionary.
- parallel_to_dict(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a dictionary upon completion.
This function awaits the calculation of all elements of
jobs
and returns adict
with the results.- Parameters:
- jobsMapping
A dictionary where all (function) values must have been decorated with
cdxcore.jcpool.JCPool.delayed()
.
- Returns:
- Resultsdict
A dictionary with results.
If
jobs
is anOrderedDict
, then this function will return anOrderedDict
with the same order asjobs
. Otherwise the elements of thedict
returned by this function are in completion order.
- parallel_to_list(jobs)[source]#
Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a list upon completion.
This function awaits the calculation of all elements of
jobs
and returns alist
with the results.- Parameters:
- jobsSequence
An sequence of
Callable
functions, each of which must have been decorated withcdxcore.jcpool.JCPool.delayed()
.
- Returns:
- Resultslist
A list with results, in the order of
jobs
.
- class cdxcore.jcpool.ParallelContextChannel(*, cid, maintid, queue, f_verbose)[source]#
Bases:
Context
Lightweight
cdxcore.verbose.Context
channel
which is pickle’able.This channel sends messages it receives to a
multiprocessing.Queue
.- Attributes:
as_quiet
Return a Context at the same current reporting level as
self
with zero visibilityas_verbose
Return a Context at the same current reporting level as
self
with full visibilityis_quiet
Whether the current context is
"quiet"
Methods
__call__
(msg, flush)Sends
msg
via amultiprocessing.Queue
to the main thread for printing.apply_channel
(channel)Advanced Use
fmt
(level, message, *args[, head])Formats message with the formattting arguments at curent context level plus
level
.quiet
report
(level, message, *args[, end, head])Report message at current level plus
level
.shall_report
([add_level])Returns whether to print something at current level plus
add_level
.str_indent
([add_level])Returns the string identation for the current level plus
add_level
timer
()Returns a new
cdxcore.util.Timer
object to measure time spent in a block of code.write
(message, *args[, end, head])Report message at current level.
write_t
(message, *args[, end, head])Reports
message
subject to string formatting at current level if visible and returns acdxcore.util.Timer
object which can be used to measure time elapsed sincewrite_t()
was called.all
- __call__(msg, flush)[source]#
Sends
msg
via amultiprocessing.Queue
to the main thread for printing.