cdxcore.jcpool#

Simple multi-processing conv wrapper around (already great) joblib.Parallel().

The minor additions are that parallel processing will be a tad more convenient for dictionaries, and that it supports routing cdxcore.verbose.Context messaging via a multiprocessing.Queue to a single thread.

Import#

from cdxcore.jcpool import JCPool

Documentation#

Classes

JCPool([num_workers, threading, ...])

Parallel Job Context Pool.

ParallelContextChannel(*, cid, maintid, ...)

Lightweight cdxcore.verbose.Context channel which is pickle'able.

class cdxcore.jcpool.JCPool(num_workers=1, threading=False, tmp_root_dir='!/.cdxmp', *, verbose=<cdxcore.verbose.Context object>, parallel_kwargs={})[source]#

Bases: object

Parallel Job Context Pool.

Simple wrapper around joblib.Parallel() which allows worker processes to use cdxcore.verbose.Context to report progress updates. For this purpose, cdxcore.verbose.Context will send output messages via a multiprocessing.Queue to the main process where a sepeate thread prints these messages out.

Using a fixed central pool object in your code base avoids relaunching processes.

Functions passed to cdxcore.jcpool.JCPool.parallel() and related functions must be decorated with cdxcore.jcpool.JCPool.delayed().

List/Generator Usage

The following code is a standard prototype for using cdxcore.jcpool.JCPool.parallel() following closely the joblib paradigm:

from cdxcore.verbose import Context
from cdxcore.jcpool import JCPool
import time as time 
import numpy as np

pool    = JCPool( num_workers=4 )   # global pool. Reuse where possible

def f( ticker, tdata, verbose : Context ):
    # some made up function
    q  = np.quantile( tdata, 0.35, axis=0 )
    tx = q[0]
    ty = q[1]
    time.sleep(0.5)
    verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}")
    return tx, ty

tickerdata =\
 { 'SPY': np.random.normal(size=(1000,2)),
   'GLD': np.random.normal(size=(1000,2)), 
   'BTC': np.random.normal(size=(1000,2))
 } 

verbose = Context("all")
with verbose.write_t("Launching analysis") as tme:
    with pool.context( verbose ) as verbose:
        for tx, ty in pool.parallel(
                    pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) )
                    for ticker, tdata in tickerdata.items() ):
            verbose.report(1,f"Returned {tx:.2f}, {ty:.2f}")
verbose.write(f"Analysis done; this took {tme}.")

The output from this code is asynchronous:

00: Launching analysis
02:     Result for SPY: -0.43, -0.39
01:   Returned -0.43, -0.39
02:     Result for BTC: -0.39, -0.45
01:   Returned -0.39, -0.45
02:     Result for GLD: -0.41, -0.43
01:   Returned -0.41, -0.43
00: Analysis done; this took 0.73s.        

Dict

Considering the asynchronous nature of the returned data it is often desirable to keep track of results by some identifier. In above example ticker was not available in the main loop. This pattern is automated with the dictionary usage pattern:

 from cdxcore.verbose import Context
 from cdxcore.jcpool import JCPool
 import time as time 
 import numpy as np

 pool    = JCPool( num_workers=4 )   # global pool. Reuse where possible

 def f( ticker, tdata, verbose : Context ):
     # some made up function
     q  = np.quantile( tdata, 0.35, axis=0 )
     tx = q[0]
     ty = q[1]
     time.sleep(0.5)
     verbose.write(f"Result for {ticker}: {tx:.2f}, {ty:.2f}")
     return tx, ty

 tickerdata =\
  { 'SPY': np.random.normal(size=(1000,2)),
    'GLD': np.random.normal(size=(1000,2)), 
    'BTC': np.random.normal(size=(1000,2))
  } 

 verbose = Context("all")
 with verbose.write_t("Launching analysis") as tme:
     with pool.context( verbose ) as verbose:
         for ticker, tx, ty in pool.parallel(
                 { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose(2) )
                   for ticker, tdata in tickerdata.items() } ):
     verbose.report(1,f"Returned {ticker} {tx:.2f}, {ty:.2f}")
 verbose.write(f"Analysis done; this took {tme}.")

This generates the following output:

00: Launching analysis
02:     Result for SPY: -0.34, -0.41
01:   Returned SPY -0.34, -0.41
02:     Result for GLD: -0.38, -0.41
01:   Returned GLD -0.38, -0.41
02:     Result for BTC: -0.34, -0.32
01:   Returned BTC -0.34, -0.32
00: Analysis done; this took 5s.

Note that cdxcore.jcpool.JCPool.parallel() when applied to a dictionary does not return a dictionary, but a sequence of tuples. As in the example this also works if the function being called returns tuples itself; in this case the returned data is extended by the key of the dictionary provided.

In order to retrieve a dictionary use cdxcore.jcpool.JCPool.parallel_to_dict():

verbose = Context("all")
with pool.context( verbose ) as verbose:
    r = pool.parallel_to_dict( { ticker: pool.delayed(f)( ticker=ticker, tdata=tdata, verbose=verbose )
                                 for ticker, tdata in self.data.items() } )

Note that in this case the function returns only after all jobs have been processed.

Parameters:
num_workersint, optional

The number of workers. If num_workers is 1 then no parallel process or thread is started. Just as for joblib you can use a negative num_workers to set the number of workers to the number of CPUs + num_workers + 1. For example, a num_workers of -2 will use as many jobs as CPUs are present less one. If num_workers is negative, the effective number of workers will be at least 1.

Default is 1.

threadingbool, optional

If False, the default, then the pool will act as a "loky" multi-process pool with the associated overhead of managing data accross processes.

If True, then the pool is a "threading" pool. This helps for functions whose code releases Python’s global interpreter lock, for example when engaged in heavy I/O or compiled code such as numpy., pandas, or generated with numba.

tmp_root_dirstr | SubDir, optional

Temporary directory for memory mapping large arrays. This is a root directory; the function will create a temporary sub-directory with a name generated from the current state of the system. This sub-directory will be deleted upon destruction of JCPool or when cdxcore.jcpool.JCPool.terminate() is called.

This parameter can also be None in which case the default behaviour of joblib.Parallel is used.

Default is "!/.cdxmp".

verboseContext, optional

A cdxcore.verbose.Context object used to print out multi-processing/threading information. This is not the Context provided to child processes/threads.

Default is quiet.

parallel_kwargsdict, optional

Additional keywords for joblib.Parallel.

Attributes:
is_threading

Whether we are threading or mulit-processing.

tmp_path

Path to the temporary directory for this object.

Methods

context(verbose[, verbose_interval])

Parallel processing Context object.

cpu_count([only_physical_cores])

Return the number of physical CPUs.

delayed(F)

Decorate a function for parallel execution.

parallel(jobs)

Process a number of jobs in parallel using the current multiprocessing pool.

parallel_to_dict(jobs)

Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a dictionary upon completion.

parallel_to_list(jobs)

Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a list upon completion.

terminate()

Stop the current parallel pool, and delete any temporary files (if managed by JCPool).

context(verbose, verbose_interval=None)[source]#

Parallel processing Context object.

This function returns a cdxcore.verbose.Context object whose channel is a queue towards a utility thread which will outout all messages to verbose. As a result a worker process is able to use verbose as if it were in-process

A standard usage pattern is:

from cdxcore.verbose import Context
from cdxcore.jcpool import JCPool
import time as time 
import numpy as np

pool    = JCPool( num_workers=4 )   # global pool. Reuse where possible

def f( x, verbose : Context ):
    verbose.write(f"Found {x}")     # <- text "Found 1" etc will be sent
    return x                        #    to main thread via Queue 

verbose = Context("all")
with pool.context( verbose ) as verbose:
    for x in pool.parallel( pool.delayed(f)( x=x, verbose=verbose(1) ) for x in [1,2,3,4] ):
        verbose.write(f"Returned {x}")                    

See cdxcore.jcpool.JCPool for more usage patterns.

static cpu_count(only_physical_cores=False)[source]#

Return the number of physical CPUs.

Parameters:
only_physical_coresboolean, optional

If True, does not take hyperthreading / SMT logical cores into account. Default is False.

Returns:
cpusint

Count

delayed(F)[source]#

Decorate a function for parallel execution.

This decorate adds minor synthatical sugar on top of joblib.delayed() (which in turn is discussed here).

When called, this decorator checks that no cdxcore.verbose.Context arguments are passed to the pooled function which have no ParallelContextChannel present. In other words, the function detects if the user forgot to use cdxcore.jcpool.JCPool.context().

Parameters:
FCallable

Function.

Returns:
wrapped FCallable

Decorated function.

property is_threading: bool#

Whether we are threading or mulit-processing.

parallel(jobs)[source]#

Process a number of jobs in parallel using the current multiprocessing pool.

All functions used in jobs must have been decorated using cdxcore.jcpool.JCPool.delayed().

This function returns an iterator which yields results as soon as they are computed.

If jobs is a Sequence you can also use cdxcore.jcpool.JCPool.parallel_to_list() to retrieve a list of all results upon completion of the last job. Similarly, if jobs is a Mapping, use cdxcore.jcpool.JCPool.parallel_to_dict() to retrieve a dict of results upon completion of the last job.

Parameters:
jobsSequence | Mapping

Can be a Sequence containing Callable functions, or a Mapping whose values are Callable functions.

Each Callable used as part of either must have been decorated with cdxcore.jcpool.JCPool.delayed().

Returns:
parallelIterator

An iterator which yields results as soon as they are available. If jobs is a Mapping, then the resutling iterator will generate tuples with the first element equal to the mapping key of the respective function job. This function will not return a dictionary.

parallel_to_dict(jobs)[source]#

Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a dictionary upon completion.

This function awaits the calculation of all elements of jobs and returns a dict with the results.

Parameters:
jobsMapping

A dictionary where all (function) values must have been decorated with cdxcore.jcpool.JCPool.delayed().

Returns:
Resultsdict

A dictionary with results.

If jobs is an OrderedDict, then this function will return an OrderedDict with the same order as jobs. Otherwise the elements of the dict returned by this function are in completion order.

parallel_to_list(jobs)[source]#

Process a number of jobs in parallel using the current multiprocessing pool, and return all results in a list upon completion.

This function awaits the calculation of all elements of jobs and returns a list with the results.

Parameters:
jobsSequence

An sequence of Callable functions, each of which must have been decorated with cdxcore.jcpool.JCPool.delayed().

Returns:
Resultslist

A list with results, in the order of jobs.

terminate()[source]#

Stop the current parallel pool, and delete any temporary files (if managed by JCPool).

property tmp_path: str | None#

Path to the temporary directory for this object.

class cdxcore.jcpool.ParallelContextChannel(*, cid, maintid, queue, f_verbose)[source]#

Bases: Context

Lightweight cdxcore.verbose.Context channel which is pickle’able.

This channel sends messages it receives to a multiprocessing.Queue.

Attributes:
as_quiet

Return a Context at the same current reporting level as self with zero visibility

as_verbose

Return a Context at the same current reporting level as self with full visibility

is_quiet

Whether the current context is "quiet"

Methods

__call__(msg, flush)

Sends msg via a multiprocessing.Queue to the main thread for printing.

apply_channel(channel)

Advanced Use

fmt(level, message, *args[, head])

Formats message with the formattting arguments at curent context level plus level.

quiet

report(level, message, *args[, end, head])

Report message at current level plus level.

shall_report([add_level])

Returns whether to print something at current level plus add_level.

str_indent([add_level])

Returns the string identation for the current level plus add_level

timer()

Returns a new cdxcore.util.Timer object to measure time spent in a block of code.

write(message, *args[, end, head])

Report message at current level.

write_t(message, *args[, end, head])

Reports message subject to string formatting at current level if visible and returns a cdxcore.util.Timer object which can be used to measure time elapsed since write_t() was called.

all

__call__(msg, flush)[source]#

Sends msg via a multiprocessing.Queue to the main thread for printing.