3
QfX              
   @   s  d Z ddlmZ ddlZddlZddlZddlZddlmZ ddl	Z	ddl
Z
ddlZddlZddlmZ yddlZW n   ddlZY nX ddlmZ edk	rddlmZ dd	lmZ dd
lmZmZ ddlmZmZ ddlmZmZ ddl m!Z! ddl"m#Z# ddgZ$dZ%dZ&dZ'e(edrDej)j*ddj+ p4dZ,ej-e,dZ.ndZ.G dd de/Z0dd Z1dd Z2G dd de3Z4G d d! d!e/Z5d+d#d$Z6G d%d& d&e/Z7G d'd( d(e/Z8G d)d* d*eZ9dS ),z+
Helpers for embarrassingly parallel code.
    )divisionN)sqrt)Integral   )mp)MemmapingPool)
ThreadPool)
format_excformat_outer_frames)Loggershort_format_time)TransportableException_mk_exception)memstr_to_kbytes)_basestringmultiprocessing	threadingZ__JOBLIB_SPAWNED_PARALLEL__g?   get_contextZJOBLIB_START_METHOD )methodc               @   s(   e Zd ZdZdd Zdd Zdd ZdS )	BatchedCallszCWrap a sequence of (func, args, kwargs) tuples as a single callablec             C   s   t || _t| j| _d S )N)listitemslen_size)selfZiterator_slice r   V/home/psgendb/BIRCHDEV/pkg/SPAdes-3.15.4/linux-x86_64/share/spades/joblib3/parallel.py__init__C   s    
zBatchedCalls.__init__c             C   s   dd | j D S )Nc             S   s   g | ]\}}}|||qS r   r   ).0funcargskwargsr   r   r   
<listcomp>H   s    z)BatchedCalls.__call__.<locals>.<listcomp>)r   )r   r   r   r   __call__G   s    zBatchedCalls.__call__c             C   s   | j S )N)r   )r   r   r   r   __len__J   s    zBatchedCalls.__len__N)__name__
__module____qualname____doc__r   r%   r&   r   r   r   r   r   @   s   r   c               C   s   t dkrdS t j S )z  Return the number of CPUs.
    Nr   )r   	cpu_countr   r   r   r   r+   Q   s    r+   c             C   s\   |sdS |dkrdS | dkr dS dd| d  }t | | }t | d | }t|t|kS )	z Returns False for indices increasingly apart, the distance
        depending on the value of verbose.

        We use a lag increasing as the square of index
    T
   Fr   g      ?   r   r   )r   int)indexverboseZscaleZ
next_scaler   r   r   _verbosity_filter\   s    r1   c               @   s   e Zd ZdZdS )WorkerInterruptza An exception that is not KeyboardInterrupt to allow subprocesses
        to be interrupted.
    N)r'   r(   r)   r*   r   r   r   r   r2   o   s   r2   c               @   s    e Zd ZdZdd Zdd ZdS )SafeFunctionz Wraps a function to make it exception with full traceback in
        their representation.
        Useful for parallel computing with multiprocessing, for which
        exceptions cannot be captured.
    c             C   s
   || _ d S )N)r!   )r   r!   r   r   r   r   }   s    zSafeFunction.__init__c             O   sn   y| j ||S  tk
r&   t Y nD   tj \}}}t|||ddd}t|trZ n
t||Y nX d S )Nr,   r   )contextZ	tb_offset)r!   KeyboardInterruptr2   sysexc_infor	   
issubclassr   )r   r"   r#   Ze_typeZe_valueZe_tbtextr   r   r   r%      s    


zSafeFunction.__call__N)r'   r(   r)   r*   r   r%   r   r   r   r   r3   w   s   r3   Tc                sF   |rt j   fdd}ytj |}W n tk
r@   Y nX |S )a!  Decorator used to capture the arguments of a function.

    Pass `check_pickle=False` when:

    - performing a possibly repeated check is too costly and has been done
      already once outside of the call to delayed.

    - when used in conjunction `Parallel(backend='threading')`.

    c                 s
    | |fS )Nr   )r"   r#   )functionr   r   delayed_function   s    z!delayed.<locals>.delayed_function)pickledumps	functoolswrapsAttributeError)r:   Zcheck_pickler;   r   )r:   r   delayed   s    
rA   c               @   s    e Zd ZdZdd Zdd ZdS )ImmediateComputeBatchzSequential computation of a batch of tasks.

    This replicates the async computation API but actually does not delay
    the computations when joblib.Parallel runs in sequential mode.

    c             C   s   | | _ d S )N)results)r   batchr   r   r   r      s    zImmediateComputeBatch.__init__c             C   s   | j S )N)rC   )r   r   r   r   get   s    zImmediateComputeBatch.getN)r'   r(   r)   r*   r   rE   r   r   r   r   rB      s   rB   c               @   s    e Zd ZdZdd Zdd ZdS )BatchCompletionCallBacka_  Callback used by joblib.Parallel's multiprocessing backend.

    This callable is executed by the parent process whenever a worker process
    has returned the results of a batch of tasks.

    It is used for progress reporting, to update estimate of the batch
    processing duration and to schedule the next batch of tasks to be
    processed.

    c             C   s   || _ || _|| _d S )N)dispatch_timestamp
batch_sizeparallel)r   rG   rH   rI   r   r   r   r      s    z BatchCompletionCallBack.__init__c             C   s   | j  j| j7  _tj | j }| j jdkrh| j| j jkrh| j j}|dkrP|}nd| d|  }|| j _| j j  | j jd k	r| j j	  d S )Nautor   g?g?)
rI   n_completed_tasksrH   timerG   _effective_batch_size_smoothed_batch_durationprint_progress_original_iteratordispatch_next)r   outZthis_batch_durationZold_durationZnew_durationr   r   r   r%      s    
z BatchCompletionCallBack.__call__N)r'   r(   r)   r*   r   r%   r   r   r   r   rF      s   
rF   c               @   s   e Zd ZdZd&d
dZdd Zdd Zdd Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% ZdS )'ParallelaG%   Helper class for readable parallel mapping.

        Parameters
        -----------
        n_jobs: int, default: 1
            The maximum number of concurrently running jobs, such as the number
            of Python worker processes when backend="multiprocessing"
            or the size of the thread-pool when backend="threading".
            If -1 all CPUs are used. If 1 is given, no parallel computing code
            is used at all, which is useful for debugging. For n_jobs below -1,
            (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all
            CPUs but one are used.
        backend: str or None, default: 'multiprocessing'
            Specify the parallelization backend implementation.
            Supported backends are:
              - "multiprocessing" used by default, can induce some
                communication and memory overhead when exchanging input and
                output data with the with the worker Python processes.
              - "threading" is a very low-overhead backend but it suffers
                from the Python Global Interpreter Lock if the called function
                relies a lot on Python objects. "threading" is mostly useful
                when the execution bottleneck is a compiled extension that
                explicitly releases the GIL (for instance a Cython loop wrapped
                in a "with nogil" block or an expensive call to a library such
                as NumPy).
        verbose: int, optional
            The verbosity level: if non zero, progress messages are
            printed. Above 50, the output is sent to stdout.
            The frequency of the messages increases with the verbosity level.
            If it more than 10, all iterations are reported.
        pre_dispatch: {'all', integer, or expression, as in '3*n_jobs'}
            The number of batches (of tasks) to be pre-dispatched.
            Default is '2*n_jobs'. When batch_size="auto" this is reasonable
            default and the multiprocessing workers shoud never starve.
        batch_size: int or 'auto', default: 'auto'
            The number of atomic tasks to dispatch at once to each
            worker. When individual evaluations are very fast, multiprocessing
            can be slower than sequential computation because of the overhead.
            Batching fast computations together can mitigate this.
            The ``'auto'`` strategy keeps track of the time it takes for a batch
            to complete, and dynamically adjusts the batch size to keep the time
            on the order of half a second, using a heuristic. The initial batch
            size is 1.
            ``batch_size="auto"`` with ``backend="threading"`` will dispatch
            batches of a single task at a time as the threading backend has
            very little overhead and using larger batch size has not proved to
            bring any gain in that case.
        temp_folder: str, optional
            Folder to be used by the pool for memmaping large arrays
            for sharing memory with worker processes. If None, this will try in
            order:
            - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable,
            - /dev/shm if the folder exists and is writable: this is a RAMdisk
              filesystem available by default on modern Linux distributions,
            - the default system temporary folder that can be overridden
              with TMP, TMPDIR or TEMP environment variables, typically /tmp
              under Unix operating systems.
            Only active when backend="multiprocessing".
        max_nbytes int, str, or None, optional, 1M by default
            Threshold on the size of arrays passed to the workers that
            triggers automated memory mapping in temp_folder. Can be an int
            in Bytes, or a human-readable string, e.g., '1M' for 1 megabyte.
            Use None to disable memmaping of large arrays.
            Only active when backend="multiprocessing".

        Notes
        -----

        This object uses the multiprocessing module to compute in
        parallel the application of a function to many different
        arguments. The main functionality it brings in addition to
        using the raw multiprocessing API are (see examples for details):

            * More readable code, in particular since it avoids
              constructing list of arguments.

            * Easier debugging:
                - informative tracebacks even when the error happens on
                  the client side
                - using 'n_jobs=1' enables to turn off parallel computing
                  for debugging without changing the codepath
                - early capture of pickling errors

            * An optional progress meter.

            * Interruption of multiprocesses jobs with 'Ctrl-C'

            * Flexible pickling control for the communication to and from
              the worker processes.

            * Ability to use shared memory efficiently with worker
              processes for large numpy-based datastructures.

        Examples
        --------

        A simple example:

        >>> from math import sqrt
        >>> from joblib import Parallel, delayed
        >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
        [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Reshaping the output when the function has several return
        values:

        >>> from math import modf
        >>> from joblib import Parallel, delayed
        >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
        >>> res, i = zip(*r)
        >>> res
        (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
        >>> i
        (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)

        The progress meter: the higher the value of `verbose`, the more
        messages::

            >>> from time import sleep
            >>> from joblib import Parallel, delayed
            >>> r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(10)) #doctest: +SKIP
            [Parallel(n_jobs=2)]: Done   1 out of  10 | elapsed:    0.1s remaining:    0.9s
            [Parallel(n_jobs=2)]: Done   3 out of  10 | elapsed:    0.2s remaining:    0.5s
            [Parallel(n_jobs=2)]: Done   6 out of  10 | elapsed:    0.3s remaining:    0.2s
            [Parallel(n_jobs=2)]: Done   9 out of  10 | elapsed:    0.5s remaining:    0.1s
            [Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    0.5s finished

        Traceback example, note how the line of the error is indicated
        as well as the values of the parameter passed to the function that
        triggered the exception, even though the traceback happens in the
        child process::

         >>> from heapq import nlargest
         >>> from joblib import Parallel, delayed
         >>> Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) #doctest: +SKIP
         #...
         ---------------------------------------------------------------------------
         Sub-process traceback:
         ---------------------------------------------------------------------------
         TypeError                                          Mon Nov 12 11:37:46 2012
         PID: 12934                                    Python 2.7.3: /usr/bin/python
         ...........................................................................
         /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
             419         if n >= size:
             420             return sorted(iterable, key=key, reverse=True)[:n]
             421
             422     # When key is none, use simpler decoration
             423     if key is None:
         --> 424         it = izip(iterable, count(0,-1))                    # decorate
             425         result = _nlargest(n, it)
             426         return map(itemgetter(0), result)                   # undecorate
             427
             428     # General case, slowest method

         TypeError: izip argument #1 must support iteration
         ___________________________________________________________________________


        Using pre_dispatch in a producer/consumer situation, where the
        data is generated on the fly. Note how the producer is first
        called a 3 times before the parallel loop is initiated, and then
        called to generate new data on the fly. In this case the total
        number of iterations cannot be reported in the progress messages::

         >>> from math import sqrt
         >>> from joblib import Parallel, delayed

         >>> def producer():
         ...     for i in range(6):
         ...         print('Produced %s' % i)
         ...         yield i

         >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
         ...                         delayed(sqrt)(i) for i in producer()) #doctest: +SKIP
         Produced 0
         Produced 1
         Produced 2
         [Parallel(n_jobs=2)]: Done   1 jobs       | elapsed:    0.0s
         Produced 3
         [Parallel(n_jobs=2)]: Done   2 jobs       | elapsed:    0.0s
         Produced 4
         [Parallel(n_jobs=2)]: Done   3 jobs       | elapsed:    0.0s
         Produced 5
         [Parallel(n_jobs=2)]: Done   4 jobs       | elapsed:    0.0s
         [Parallel(n_jobs=2)]: Done   5 out of   6 | elapsed:    0.0s remaining:    0.0s
         [Parallel(n_jobs=2)]: Done   6 out of   6 | elapsed:    0.0s finished
    r   r   r   
2 * n_jobsrJ   N1Mrc	       	      C   s   || _ t| _|d krd}nt|dr8t|dr8|| _d}|tkrPtd|tf || _|| _|dksvt|t	r~|dkr~|| _
ntd| || _|| _t|trdt| | _n|| _|| _d | _d | _t | _d	| _tj | _d S )
Nr   ZPoolLockz'Invalid backend: %s, expected one of %rrJ   r   z8batch_size must be 'auto' or a positive integer, got: %ri   F)r0   DEFAULT_MP_CONTEXT_mp_contexthasattrVALID_BACKENDS
ValueErrorbackendn_jobs
isinstancer   rH   pre_dispatch_temp_folderr   r   _max_nbytes
_mmap_mode_pool_outputr   _jobs_managed_poolr   rW   _lock)	r   r^   r]   r0   r`   rH   temp_folder
max_nbytes	mmap_moder   r   r   r     s:    
zParallel.__init__c             C   s   d| _ | j  | S )NT)rg   _initialize_pool)r   r   r   r   	__enter__  s    zParallel.__enter__c             C   s   | j   d| _d S )NF)_terminate_poolrg   )r   exc_type	exc_value	tracebackr   r   r   __exit__  s    zParallel.__exit__c             C   sN   | j }|dkrtdn2td ks(|d kr,dS |dk rJttj d | d}|S )Nr   z&n_jobs == 0 in Parallel has no meaningr   )r^   r\   r   maxr+   )r   r^   r   r   r   _effective_n_jobs  s    
zParallel._effective_n_jobsc             C   s&  | j  }tg| _|dkr"d| _n | jdkr8t|| _n| jdkrtj jrfd| _t	j
ddd dS tj jdkrd| _t	j
d	dd dS ttjjtd
}|rtddtjt< tj  t| j| j| jtd
| jd d
d}| jdk	r| j|d< t|f|| _| jjtt g nt!d| j |S )z?Build a process or thread pool and return the number of workersr   Nr   r   zHMultiprocessing-backed parallel loops cannot be nested, setting n_jobs=1   )
stacklevel
MainThreadzVMultiprocessing backed parallel loops cannot be nested below threads, setting n_jobs=1r   a%  [joblib] Attempting to do parallel computing without protecting your import on a system that does not support forking. To use parallel-computing in a script, you must protect your main loop using "if __name__ == '__main__'". Please see the joblib documentation on Parallel for more information12   )rj   rk   ri   r0   Z
context_idr4   zUnsupported backend: %s)"rt   r   
exceptionsrd   r]   r   r   current_processdaemonwarningswarnr   current_threadnamer.   osenvironrE   JOBLIB_SPAWNED_PROCESSImportErrorgcZcollectdictrb   rc   ra   rs   r0   rY   r   extendr5   r2   r\   )r   r^   Zalready_forkedZpoolargsr   r   r   rl     sH    


	


zParallel._initialize_poolc             C   s@   | j d k	r<| j j  | j j  d | _ | jdkr<tjjtd d S )Nr   r   )rd   close	terminater]   r   r   popr   )r   r   r   r   rn   &  s    



zParallel._terminate_poolc             C   s   | j r
dS | jdkrt|}| jj| |  jd7  _|  jt|7  _|  jt|7  _t	| j| j
s| jd| jttj | j f nXtj }t|t|| }| jjt||d}| jj| |  jt|7  _|  jd7  _dS )zQueue the batch for computing, with or without multiprocessing

        WARNING: this method is not thread-safe: it should be only called
        indirectly via dispatch_one_batch.

        Nr   z"Done %3i tasks       | elapsed: %s)callback)	_abortingrd   rB   rf   appendn_dispatched_batchesn_dispatched_tasksr   rK   r1   r0   _printr   rL   _start_timerF   Zapply_asyncr3   )r   rD   jobrG   cbr   r   r   	_dispatch.  s$    
zParallel._dispatchc             C   s   | j | jsd| _d| _dS )a  Dispatch more data for parallel processing

        This method is meant to be called concurrently by the multiprocessing
        callback. We rely on the thread-safety of dispatch_one_batch to protect
        against concurrent consumption of the unprotected iterator.

        FN)dispatch_one_batchrP   
_iterating)r   r   r   r   rQ   L  s    zParallel.dispatch_nextc             C   s  | j dkr| jdkrd}n| j dkr| j}| j}|dkr|tk rt|t | }td| d}|| _| jdkr| jd||f n>|t	kr|dkr|d  | _}| jdkr| jd||f n|}||krd| _n| j }| j
, ttj||}|sd	S | j| d
S W dQ R X dS )aT  Prefetch the tasks for the next batch and dispatch them.

        The effective size of the batch is computed here.
        If there are no more jobs to dispatch, return False, else return True.

        The iterator consumption and dispatching is protected by the same
        lock so calling this function should be thread safe.

        rJ   r   r   r   r   r,   z:Batch computation too fast (%.4fs.) Setting batch_size=%d.z:Batch computation too slow (%.2fs.) Setting batch_size=%d.FTN)rH   r]   rM   rN   MIN_IDEAL_BATCH_DURATIONr.   rs   r0   r   MAX_IDEAL_BATCH_DURATIONrh   r   	itertoolsislicer   )r   iteratorrH   Zold_batch_sizeZbatch_durationZideal_batch_sizeZtasksr   r   r   r   X  s<    




zParallel.dispatch_one_batchc             C   sB   | j s
dS | j dk rtjj}ntjj}|| }|d| |f  dS )z=Display the message on stout or stderr depending on verbosityNry   z	[%s]: %s
)r0   r6   stderrwritestdout)r   msgZmsg_argswriterr   r   r   r     s    

zParallel._printc             C   s   | j s
dS tj | j }| jrHt| j| j r0dS | jd| jt|f n| j}| j	}|dks|| d | j
 }|| j  d }|d |k}|s|| rdS ||d  | j	| d  }| jd|d |t|t|f dS )zvDisplay the process of the parallel execution only a fraction
           of time, controlled by self.verbose.
        Nz!Done %3i tasks      | elapsed: %sr   r   g      ?z/Done %3i out of %3i | elapsed: %s remaining: %s)r0   rL   r   rP   r1   r   r   rK   r   r   _pre_dispatch_amount)r   elapsed_timer/   Ztotal_tasksZcursorZ	frequencyZis_last_itemZremaining_timer   r   r   rO     s0    
zParallel.print_progressc             C   s  t  | _x| js t| jdkrt| jdkr:tjd q
| j | jjd}W d Q R X y| jj	|j
  W q
 t| jk
r } zrd| _t|trtddd}d||jf }t|jd }||}| j | j  | jr| j  W d Q R X |W Y d d }~X q
X q
W d S )Nr   g{Gz?Tr,   r   )r4   Zstack_startzMultiprocessing exception:
%s
---------------------------------------------------------------------------
Sub-process traceback:
---------------------------------------------------------------------------
%s)r   re   r   r   rf   rL   sleeprh   r   r   rE   tuplerz   r   r_   r   r
   messager   etypern   rg   rl   )r   r   	exceptionZthis_reportZreportZexception_typer   r   r   retrieve  s,    

zParallel.retrievec             C   sP  | j rtdd| _| js$| j }n| j }| jdkr<d| _t|}| j	}|dksZ|dkrhd | _
d| _n2|| _
t|drt|}t| | _}tj||}tj | _d| _d| _d| _d| _zhd	| _x| j|rqW |dks|dkrd| _| j  tj | j }| jd
t| jt| jt|f W d | js6| j  t | _ X | j}d | _|S )Nz)This Parallel instance is already runningFrJ   r   allr   endswithg        Tz*Done %3i out of %3i | elapsed: %s finished) rf   r\   r   rg   rl   rt   rH   rM   iterr`   rP   r   rZ   evalr.   r   r   rL   r   r   r   rK   rN   r   r   r   r   r   re   r   rn   r   )r   iterabler^   r   r`   r   outputr   r   r   r%     sP    




zParallel.__call__c             C   s   d| j j| jf S )Nz%s(n_jobs=%s))	__class__r'   r^   )r   r   r   r   __repr__:  s    zParallel.__repr__)r   r   r   rT   rJ   NrU   rV   )r'   r(   r)   r*   r   rm   rr   rt   rl   rn   r   rQ   r   r   rO   r   r%   r   r   r   r   r   rS      s$    <  
,@A(.>rS   )T):r*   
__future__r   r   r6   r   r}   mathr   r>   rL   r   r   numbersr   cPickler<   Z_multiprocessing_helpersr   poolr   Zmultiprocessing.poolr   format_stackr	   r
   loggerr   r   Zmy_exceptionsr   r   diskr   _compatr   r[   r   r   r   rZ   r   rE   stripr   r   rX   objectr   r+   r1   	Exceptionr2   r3   rA   rB   rF   rS   r   r   r   r   <module>   sR   
)