3
QfXV]                 @   s  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddlZddl	Z	yddl
mZ ddl
mZ W n4 ek
r   ddlmZ ddlmZ ddlZY nX ddlmZ ddlmZ ddlmZ d	d
lmZmZ ddlmZ yddlZddlmZ W n ek
r   dZY nX d	dlmZ d	dlmZ d	dl m!Z! dZ"ej#ej$B ej%B Z&ej#ej$B Z'dd Z(dd Z)dd Z*dd Z+dd Z,G dd de-Z.G dd deZ/G dd  d e-Z0G d!d" d"eZ1d#d$ Z2G d%d& d&e1Z3dS )'a  Custom implementation of multiprocessing.Pool with custom pickler

This module provides efficient ways of working with data stored in
shared memory with numpy.memmap arrays without inducing any memory
copy between the parent and child processes.

This module should not be imported if multiprocessing is not
available as it implements subclasses of multiprocessing Pool
that uses a custom alternative to SimpleQueue.

    )mmapN)loads)dumps)Pickler)HIGHEST_PROTOCOL)BytesIO   )mpassert_spawning)Pool)
as_strided)load)dump)hashz/dev/shmc             C   s2   t | dd}|dkrdS t|tr&| S t|S dS )z?Recursively look up the original np.memmap instance base if anybaseN)getattr
isinstancer   _get_backing_memmap)ab r   R/home/psgendb/BIRCHDEV/pkg/SPAdes-3.15.4/linux-x86_64/share/spades/joblib3/pool.pyr   G   s    
r   c             C   s   t | dk	S )z>Return True if a is backed by some mmap buffer directly or notN)r   )r   r   r   r   has_shareable_memoryX   s    r   c       	      C   sR   |dkrd}|dkr*t j| |||||dS t j| |||||d}t|||dS dS )z2Reconstruct an array view on a memmory mapped filezw+zr+N)dtypeshapemodeoffsetorder)r   strides)npmemmapr   )	filenamer   r   r   r   r   r   total_buffer_lenr   r   r   r   _strided_from_memmap]   s    

r#   c       	   	   C   s   t j| \}}t j|d }|| }||j7 }|jd r>d}nd}| jd sV| jd r`d}d}n| j}|| | j }t|j| j|j	||| j
||ffS )a  Pickling reduction for memmap backed arrays

    a is expected to be an instance of np.ndarray (or np.memmap)
    m is expected to be an instance of np.memmap on the top of the ``base``
    attribute ancestry of a. ``m.base`` should be the real python mmap object.
    r   ZF_CONTIGUOUSFCZC_CONTIGUOUSN)r   Zbyte_boundsr   flagsr   itemsizer#   r!   r   r   r   )	r   mZa_startZa_endZm_startr   r   r   r"   r   r   r   _reduce_memmap_backedp   s    

r)   c             C   s6   t | }|dk	rt| |S tttj| tdffS dS )zBPickle the descriptors of a memmap instance to reopen on same fileN)protocol)r   r)   r   r   r   asarrayr   )r   r(   r   r   r   reduce_memmap   s    
r,   c               @   s"   e Zd ZdZd	ddZdd ZdS )
ArrayMemmapReducera  Reducer callable to dump large arrays to memmap files.

    Parameters
    ----------
    max_nbytes: int
        Threshold to trigger memmaping of large arrays to files created
        a folder.
    temp_folder: str
        Path of a folder where files for backing memmaped arrays are created.
    mmap_mode: 'r', 'r+' or 'c'
        Mode for the created memmap datastructure. See the documentation of
        numpy.memmap for more details. Note: 'w+' is coerced to 'r+'
        automatically to avoid zeroing the data on unpickling.
    verbose: int, optional, 0 by default
        If verbose > 0, memmap creations are logged.
        If verbose > 1, both memmap creations, reuse and array pickling are
        logged.
    context_id: int, optional, None by default
        Set to a value identifying a call context to spare costly hashing of
        the content of the input arrays when it is safe to assume that each
        array will not be mutated by the parent process for the duration of the
        dispatch process. This is the case when using the high level Parallel
        API. It might not be the case when using the MemmapingPool API
        directly.
    prewarm: bool, optional, False by default.
        Force a read on newly memmaped array to make sure that OS pre-cache it
        memory. This can be useful to avoid concurrent disk access when the
        same data array is passed to different worker processes.
    r   NTc             C   s,   || _ || _|| _t|| _|| _|| _d S )N)_max_nbytes_temp_folder
_mmap_modeintverbose_context_id_prewarm)self
max_nbytestemp_folder	mmap_moder2   
context_idprewarmr   r   r   __init__   s    
zArrayMemmapReducer.__init__c             C   s  t |}|d k	rt||S |jj o8| jd k	o8|j| jkrzytj| j tj	| jt
 W n2 tk
r } z|jtjkr||W Y d d }~X nX | jd k	r| j}nt|}dtj ttj t||f }tjj| j|}tjj|sF| jdkrtd|j|j|f  x t||D ]}tj	|t qW | jrht|| jdj  n"| jdkrhtd|j|j|f  t t|| jdS | jdkrtd|j|jf  t!t"|t#dffS d S )	Nz%d-%d-%d-%s.pklr   z-Memmaping (shape=%r, dtype=%s) to new file %s)r8   r   z-Memmaping (shape=%s, dtype=%s) to old file %sz$Pickling array (shape=%r, dtype=%s).)r*   )$r   r)   r   	hasobjectr.   nbytesosmakedirsr/   chmodFOLDER_PERMISSIONSOSErrorerrnoEEXISTr3   r   getpidid	threadingcurrent_threadpathjoinexistsr2   printr   r   FILE_PERMISSIONSr4   r   r0   maxr,   r   r   r   )r5   r   r(   eZmarkerbasenamer!   Zdumped_filenamer   r   r   __call__   sD    



zArrayMemmapReducer.__call__)r   NT)__name__
__module____qualname____doc__r;   rQ   r   r   r   r   r-      s    
r-   c               @   s&   e Zd ZdZdefddZdd ZdS )CustomizablePicklera  Pickler that accepts custom reducers.

    HIGHEST_PROTOCOL is selected by default as this pickler is used
    to pickle ephemeral datastructures for interprocess communication
    hence no backward compatibility is required.

    `reducers` is expected expected to be a dictionary with key/values
    being `(type, callable)` pairs where `callable` is a function that
    give an instance of `type` will return a tuple `(constructor,
    tuple_of_objects)` to rebuild an instance out of the pickled
    `tuple_of_objects` as would return a `__reduce__` method. See the
    standard library documentation on pickling for more details.

    Nc             C   sf   t j| ||d |d kri }tt dr4t jj | _ntjj | _x |j D ]\}}| j|| qJW d S )N)r*   dispatch)	r   r;   hasattrrW   copycopyregdispatch_tableitemsregister)r5   writerreducersr*   typereduce_funcr   r   r   r;     s    
zCustomizablePickler.__init__c                s0   t tdr" fdd}|| j|< n
 | j|< d S )NrW   c                s    |}| j |d|i d S )Nobj)save_reduce)r5   rb   Zreduced)ra   r   r   
dispatcher2  s    z0CustomizablePickler.register.<locals>.dispatcher)rX   r   rW   r[   )r5   r`   ra   rd   r   )ra   r   r]   .  s    
zCustomizablePickler.register)rR   rS   rT   rU   r   r;   r]   r   r   r   r   rV   	  s   rV   c               @   s:   e Zd ZdZdddZdd Zdd Zd	d
 Zdd ZdS )CustomizablePicklingQueuea  Locked Pipe implementation that uses a customizable pickler.

    This class is an alternative to the multiprocessing implementation
    of SimpleQueue in order to make it possible to pass custom
    pickling reducers, for instance to avoid memory copy when passing
    memmory mapped datastructures.

    `reducers` is expected expected to be a dictionary with key/values
    being `(type, callable)` pairs where `callable` is a function that
    give an instance of `type` will return a tuple `(constructor,
    tuple_of_objects)` to rebuild an instance out of the pickled
    `tuple_of_objects` as would return a `__reduce__` method. See the
    standard library documentation on pickling for more details.
    Nc             C   sL   || _ |jdd\| _| _|j | _tjdkr6d | _n
|j | _| j	  d S )NF)duplexwin32)
	_reducersPipe_reader_writerLock_rlocksysplatform_wlock_make_methods)r5   contextr_   r   r   r   r;   J  s    


z"CustomizablePicklingQueue.__init__c             C   s    t |  | j| j| j| j| jfS )N)r
   rj   rk   rm   rp   rh   )r5   r   r   r   __getstate__T  s    z&CustomizablePicklingQueue.__getstate__c             C   s$   |\| _ | _| _| _| _| j  d S )N)rj   rk   rm   rp   rh   rq   )r5   stater   r   r   __setstate__Y  s    z&CustomizablePicklingQueue.__setstate__c             C   s   | j j  S )N)rj   poll)r5   r   r   r   empty^  s    zCustomizablePicklingQueue.emptyc                s   j j _jjjj   fdd}|_jrPfdd_nj	j
 _jd krp_n(jjjj fdd}|_d S )Nc            
      s      z S   X d S )Nr   r   )racquirerecvrreleaser   r   gete  s    z4CustomizablePicklingQueue._make_methods.<locals>.getc                s,   t  }t| jj|   jj|j  d S )N)r   rV   rh   r   rk   Z
send_bytesgetvalue)rb   buffer)r5   r   r   sendo  s    z5CustomizablePicklingQueue._make_methods.<locals>.sendc          
      s     z | S   X d S )Nr   )rb   )r~   wlock_acquirewlock_releaser   r   put}  s    z4CustomizablePicklingQueue._make_methods.<locals>.put)rj   ry   _recvrm   acquirereleaser{   rh   _sendrk   r~   rp   r   )r5   r{   r   r   )rx   ry   rz   r5   r~   r   r   r   rq   a  s    
z'CustomizablePicklingQueue._make_methods)N)	rR   rS   rT   rU   r;   rs   ru   rw   rq   r   r   r   r   re   :  s   

re   c                   s*   e Zd ZdZd fdd	Zdd Z  ZS )PicklingPoola  Pool implementation with customizable pickling reducers.

    This is useful to control how data is shipped between processes
    and makes it possible to use shared memory without useless
    copies induces by the default pickling methods of the original
    objects passed as arguments to dispatch.

    `forward_reducers` and `backward_reducers` are expected to be
    dictionaries with key/values being `(type, callable)` pairs where
    `callable` is a function that give an instance of `type` will return
    a tuple `(constructor, tuple_of_objects)` to rebuild an instance out
    of the pickled `tuple_of_objects` as would return a `__reduce__`
    method. See the standard library documentation on pickling for more
    details.

    Nc                sR   |d krt  }|d krt  }|| _|| _t |d}|j| tt| jf | d S )N)	processes)dict_forward_reducers_backward_reducersupdatesuperr   r;   )r5   r   forward_reducersbackward_reducerskwargspoolargs)	__class__r   r   r;     s    

zPicklingPool.__init__c             C   s@   t | dt}t|| j| _t|| j| _| jj| _| jj	| _
d S )N_ctx)r   r	   re   r   _inqueuer   	_outqueuer   
_quick_putr   
_quick_get)r5   rr   r   r   r   _setup_queues  s    


zPicklingPool._setup_queues)NNN)rR   rS   rT   rU   r;   r   __classcell__r   r   )r   r   r     s    r   c             C   s   t jj| rtj|  dS )z@Utility function to cleanup a temporary folder if still existingN)r>   rI   rK   shutilrmtree)Zfolder_pathr   r   r   delete_folder  s    r   c            	       s.   e Zd ZdZd fdd	Z fd	d
Z  ZS )MemmapingPoolaD  Process pool that shares large arrays to avoid memory copy.

    This drop-in replacement for `multiprocessing.pool.Pool` makes
    it possible to work efficiently with shared memory in a numpy
    context.

    Existing instances of numpy.memmap are preserved: the child
    suprocesses will have access to the same shared memory in the
    original mode except for the 'w+' mode that is automatically
    transformed as 'r+' to avoid zeroing the original data upon
    instantiation.

    Furthermore large arrays from the parent process are automatically
    dumped to a temporary folder on the filesystem such as child
    processes to access their content via memmaping (file system
    backed shared memory).

    Note: it is important to call the terminate method to collect
    the temporary folder used by the pool.

    Parameters
    ----------
    processes: int, optional
        Number of worker processes running concurrently in the pool.
    initializer: callable, optional
        Callable executed on worker process creation.
    initargs: tuple, optional
        Arguments passed to the initializer callable.
    temp_folder: str, optional
        Folder to be used by the pool for memmaping large arrays
        for sharing memory with worker processes. If None, this will try in
        order:
        - a folder pointed by the JOBLIB_TEMP_FOLDER environment variable,
        - /dev/shm if the folder exists and is writable: this is a RAMdisk
          filesystem available by default on modern Linux distributions,
        - the default system temporary folder that can be overridden
          with TMP, TMPDIR or TEMP environment variables, typically /tmp
          under Unix operating systems.
    max_nbytes int or None, optional, 1e6 by default
        Threshold on the size of arrays passed to the workers that
        triggers automated memmory mapping in temp_folder.
        Use None to disable memmaping of large arrays.
    forward_reducers: dictionary, optional
        Reducers used to pickle objects passed from master to worker
        processes: see below.
    backward_reducers: dictionary, optional
        Reducers used to pickle return values from workers back to the
        master process.
    verbose: int, optional
        Make it possible to monitor how the communication of numpy arrays
        with the subprocess is handled (pickling or memmaping)
    context_id: int, optional, None by default
        Set to a value identifying a call context to spare costly hashing of
        the content of the input arrays when it is safe to assume that each
        array will not be mutated by the parent process for the duration of the
        dispatch process. This is the case when using the high level Parallel
        API.
    prewarm: bool or str, optional, "auto" by default.
        If True, force a read on newly memmaped array to make sure that OS pre-
        cache it in memory. This can be useful to avoid concurrent disk access
        when the same data array is passed to different worker processes.
        If "auto" (by default), prewarm is set to True, unless the Linux shared
        memory partition /dev/shm is available and used as temp_folder.

    `forward_reducers` and `backward_reducers` are expected to be
    dictionaries with key/values being `(type, callable)` pairs where
    `callable` is a function that give an instance of `type` will return
    a tuple `(constructor, tuple_of_objects)` to rebuild an instance out
    of the pickled `tuple_of_objects` as would return a `__reduce__`
    method. See the standard library documentation on pickling for more
    details.

    N    .Arr   Fc
                s  |d krt  }|d krt  }d}dtj t| f }|d krJtjjdd }|d krtjjtry0t}tjj	|| tjj stj
  d}W n tk
r   d }Y nX |d krtj }tjjtjj|}tjj	||  | _tj fdd td k	rV|	dkr| }	t| ||||	d}||tj< t|tj< td  ||}||tj< t|tj< t |||d	}|j|
 tt| jf | d S )
NFzjoblib_memmaping_pool_%d_%dZJOBLIB_TEMP_FOLDERTc                  s   t  S )N)r   r   )pool_folderr   r   <lambda>(  s    z(MemmapingPool.__init__.<locals>.<lambda>auto)r9   r:   )r   r   r   )r   r>   rE   rF   environr{   rI   rK   SYSTEM_SHARED_MEM_FSrJ   r?   IOErrortempfile
gettempdirabspath
expanduserr/   atexitr]   r   r-   ndarrayr,   r    r   r   r   r;   )r5   r   r7   r6   r8   r   r   r2   r9   r:   r   Zuse_shared_memZpool_folder_nameZforward_reduce_ndarrayZbackward_reduce_ndarrayr   )r   )r   r   r;      sV    









zMemmapingPool.__init__c                s   t t| j  t| j d S )N)r   r   	terminater   r/   )r5   )r   r   r   r   F  s    zMemmapingPool.terminate)	NNr   r   NNr   NF)rR   rS   rT   rU   r;   r   r   r   r   )r   r   r     s
   I  Dr   )4rU   r   rC   r>   statrn   rG   r   r   r   cPickler   r   ImportErrorpicklerZ   r   r   ior   _multiprocessing_helpersr	   r
   multiprocessing.poolr   numpyr   Znumpy.lib.stride_tricksr   numpy_pickler   r   hashingr   r   S_IRUSRS_IWUSRS_IXUSRrA   rM   r   r   r#   r)   r,   objectr-   rV   re   r   r   r   r   r   r   r   <module>   sV   
$g1M(