Source code for miniutils.progress_bar

import itertools
import multiprocessing as mp
try:
    from nose.plugins.multiprocess import TimedOutException
except ImportError:
    TimedOutException = TimeoutError
import random
import warnings

try:
    from tqdm import tqdm as _tqdm
    try:  # pragma: nocover
        # Check if we're in a Jupyter notebook... if so, use the ipywidgets progress bar instead
        from IPython import get_ipython
        if type(get_ipython()).__module__.startswith('ipykernel.'):
            from tqdm import tqdm_notebook as _tqdm
    except (ImportError, NameError):  # pragma: nocover
        # IPython isn't even installed, or we're not in it
        pass
except ImportError:  # pragma: nocover
    # noinspection PyUnusedLocal
    def _tqdm(iterable, *a, **kw):
        return iterable


[docs]def progbar(iterable, *a, verbose=True, **kw): """Prints a progress bar as the iterable is iterated over :param iterable: The iterator to iterate over :param a: Arguments to get passed to tqdm (or tqdm_notebook, if in a Jupyter notebook) :param verbose: Whether or not to print the progress bar at all :param kw: Keyword arguments to get passed to tqdm :return: The iterable that will report a progress bar """ iterable = range(iterable) if isinstance(iterable, int) else iterable if verbose: return _tqdm(iterable, *a, **kw) else: return iterable
def _fun(f, q_in, q_out, flatten, star): # pragma: no cover try: while True: i, x = q_in.get() if i is None: break out = f(*x) if star else f(x) if flatten: for j, o in enumerate(out): q_out.put(((i, j), o)) q_out.put((None, None)) else: q_out.put((i, out)) except BaseException as ex: q_out.put((None, ex)) def _parallel_progbar_launch(mapper, iterable, nprocs=None, starmap=False, flatmap=False, shuffle=False, verbose=True, verbose_flatmap=None, max_cache=-1, timeout=1, **kwargs): # Shuffle the iterable if requested, to make the parallel execution potentially more uniform in runtime enumerated_iterable = enumerate(iterable) if shuffle: enumerated_iterable = list(enumerated_iterable) # ids = [i for i in sorted(range(len(iterable)), key=lambda x: random.random())] # iterable = (iterable[i] for i in ids) random.shuffle(enumerated_iterable) # Is this going to be expensive for large lists of large objects? # Check that we don't launch more processes than there are elements to map (if that's knowable) nprocs = nprocs or mp.cpu_count() try: nprocs = max(1, min(len(iterable), nprocs)) except TypeError: pass # Set up multiprocessing management for mapping q_in = mp.Queue() q_out = mp.Queue(max_cache) procs = [mp.Process(target=_fun, args=(mapper, q_in, q_out, flatmap, starmap)) for _ in range(nprocs)] for p in procs: p.daemon = True p.start() # Doing it this way prevents us from storing locally an entire list of the input values unnecessarily, and still # gets us the number of elements sent for processing sent = (q_in.put((i, x)) for i, x in enumerated_iterable) num_sent = sum(1 for _ in sent) for _ in range(nprocs): # Send out a flag for each process to terminate once all elements are processed q_in.put((None, None)) # Fetch the mapped results from the output queue, printing a progress bar as you go if flatmap: # If we're flat mapping, then we'll keep separate progress of all returned results (an unknown number) and how # many inputs are complete (a known number). The outer loop will track the latter, and the inner loop the former results = (q_out.get() for _ in progbar(itertools.count(), verbose=verbose if verbose_flatmap is None else verbose_flatmap, **kwargs)) for _ in progbar(num_sent, verbose=verbose): for i, x in results: # When we're flagged that an input is done being returned in the queue, break the inner loop to make # the "completed inputs" progress bar tick if i is None: if x is None: break else: raise x yield i, x else: for i, x in (q_out.get() for _ in progbar(num_sent, verbose=verbose, **kwargs)): if i is None: raise x yield i, x # Clean up for p in procs: try: p.join(timeout) except (TimeoutError, mp.TimeoutError, TimedOutException): # pragma: nocover warnings.warn("parallel_progbar mapping process failed to close properly (check error output)")
[docs]def parallel_progbar(*args, **kwargs): """Performs a parallel mapping of the given iterable, reporting a progress bar as values get returned :param mapper: The mapping function to apply to elements of the iterable :param iterable: The iterable to map :param nprocs: The number of processes (defaults to the number of cpu's) :param starmap: If true, the iterable is expected to contain tuples and the mapper function gets each element of a tuple as an argument :param flatmap: If true, flatten out the returned values if the mapper function returns a list of objects :param shuffle: If true, randomly sort the elements before processing them. This might help provide more uniform runtimes if processing different objects takes different amounts of time. :param verbose: Whether or not to print the progress bar :param verbose_flatmap: If performing a flatmap, whether or not to report each object as it's returned :param timeout: The number of seconds to wait for each worker process after completing :param kwargs: Any other keyword arguments to pass to the progress bar (see ``progbar``) :return: A list of the returned objects, in the same order as provided """ results = _parallel_progbar_launch(*args, **kwargs) return [x for i, x in sorted(results, key=lambda p: p[0])]
[docs]def iparallel_progbar(*args, **kwargs): """Performs a parallel mapping of the given iterable, reporting a progress bar as values get returned. Yields objects as soon as they're computed, but does not guarantee that they'll be in the correct order. :param mapper: The mapping function to apply to elements of the iterable :param iterable: The iterable to map :param nprocs: The number of processes (defaults to the number of cpu's) :param starmap: If true, the iterable is expected to contain tuples and the mapper function gets each element of a tuple as an argument :param flatmap: If true, flatten out the returned values if the mapper function returns a list of objects :param shuffle: If true, randomly sort the elements before processing them. This might help provide more uniform runtimes if processing different objects takes different amounts of time. :param verbose: Whether or not to print the progress bar :param verbose_flatmap: If performing a flatmap, whether or not to report each object as it's returned :param max_cache: Maximum number of mapped objects to permit in the queue at once :param timeout: The number of seconds to wait for each worker process after completing :param kwargs: Any other keyword arguments to pass to the progress bar (see ``progbar``) :return: A list of the returned objects, in whatever order they're done being computed """ results = _parallel_progbar_launch(*args, **kwargs) return (x for i, x in results)