Source code for neurotools.jobs.cache

#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''
Functions related to disk caching (memoization)
'''
from __future__ import absolute_import
from __future__ import with_statement
from __future__ import division
from __future__ import nested_scopes
from __future__ import generators
from __future__ import unicode_literals
from __future__ import print_function

import os,sys
__PYTHON_2__ = sys.version_info<(3, 0)

import numpy as np
import scipy.io
import inspect
import ast
import types
import time
import subprocess
import warnings
import traceback
import errno
import pickle
import json
import base64
import zlib
import hashlib
import shutil

from collections import defaultdict
from pickle import UnpicklingError

# TODO: we should use the same pickle library as multiprocessing uses
# for better comptability with parallelism and multiprocessing
try:
    from cPickle import PicklingError
except:
    from pickle import PicklingError

import neurotools.util.tools
import neurotools.jobs
import neurotools.jobs.ndecorator
from   neurotools.jobs.closure   import verify_function_closure
from   neurotools.jobs.filenames import is_dangerous_filename
from   neurotools.jobs.filenames import check_filename

from pathlib import Path

[docs] def get_source(f): ''' Extracts and returns the source code of a function (if it exists). Parameters ---------- f: function Function for which to extract source code Returns ------- :str String containing the source code of the passed function ''' g = neurotools.jobs.ndecorator.unwrap(f) try: return inspect.getsource(g) except (OSError,IOError): if hasattr(f,'__source__'): return f.__source__ return inspect.getsource(f) raise ValueError('Cannot get function source')
[docs] @neurotools.jobs.ndecorator.memoize def function_hash_no_subroutines(f): ''' See ``function_hash_with_subroutines``. This hash value is based on the 1. Undecorated source code 2. Docstring 3. Function name 4. Nodule name 5. Function argument specification This function cannot detect changes in function behavior as a result of changes in subroutines, global variables, or closures over mutable objects. Parameters ---------- f: function Function for which to generate a hash value Returns ------- :str Hash value that depends on the function. Hash is constructed such that changes in function source code and some dependencies will also generate a different hash. ''' source = get_source(f) docstring = inspect.getdoc(f) name = f.__name__ module = f.__module__ argspec = neurotools.jobs.ndecorator.sanitize( inspect.getargspec(f)) return hash(( module,name,docstring,source,argspec,subroutines))
[docs] def function_signature(f): ''' Generates string identifying the cache folder for function ``f``. We want to cache results to disk. However, these cached results will be invalid if the source code changes. It is hard to detect this accurately in Python. Cache entries can also become invalid if the behavior of subroutines change. To address this, the cache folder name includes a hash that depends on the function's - module, - name, - argspec, - source, and - file. If any of these change, the chache folder will as well. This reduces the chances of retrieving stale / invalid cached results. Parameters ---------- f: function Returns ------- :str name+'.'+code ''' # The one thing the decorator module can't fake is # where the function is defined. So we can't see the # source code directly if we're passed a wrapped # function. We can however detect this case and peel # away the layers to get to the underlying source. The # decorator module will leave the wrapped function in a # variable called __wrapped__, so we can follow this # back to the source code g = f source = get_source(f) docstring = inspect.getdoc(f) name = f.__name__ module = f.__module__ try: argspec = inspect.getargspec(f) except DeprecationWarning: result = inspect.getfullargspec(f) named = result.args vargname = result.varargs kwargname = result.varkw defaults = result.defaults argspec = (named,vargname,kwargname,defaults) argspec = neurotools.jobs.ndecorator.sanitize(argspec) identity = (module,name) signature = (docstring,source,argspec) name = '.'.join(identity) code = base64hash10bytes((identity,signature)) return name+'.'+code
[docs] def signature_to_file_string(f,sig, mode='repr', compressed=True, base64encode=True, truncate=True): ''' Converts an argument signature to a string if possible. This can be used to store cached results in a human- readable format. Alternatively, we may want to encode the value of the argument signature in a string that is compatible with most file systems. This does not append the file extension. Reasonable restrictions for compatibility: - No more than 4096 characters in path string - No more than 255 characters in file string - For windows compatibility try to limit it to 260 character total pathlength - These characters should be avoided: ``\/<>:"|?*,@#={}'&`!%$. ASCII 0..31`` The easiest way to avoid problematic characters without restricting the input is to re-encode as base 64. **The following modes are supported:** **repr:** Uses ``repr`` and ``ast.literal_eval(node_or_string)`` to serialize the argument signature. This is safe, but restricts the types permitted as paramteters. **json:** Uses json to serialize the argument signature. Argument signatures cannot be uniquely recovered, because tuples and lists both map to lists in the json representation. Restricting the types used in the argument signature may circumvent this. **pickle:** Uses pickle to serialize argument signature. This should uniquely store argument signatures that can be recovered, but takes more space. Use this with caution, since changes to the pickle serialization protocol between version will make the encoded data irretrievable. **human:** Attempts a human-readable format. Experimental. Compression is on by defaut Signatures are base64 encoded by default Parameters ---------- f: str Function being called sig: Cleaned-up function arguments created by ``neurotools.jobs.ndecorator.argument_signature()`` A tuple of: args: tuple A tuple consisting of a list of (argument_name, argument_value) tuples. vargs: A tuple containing extra variable arguments ("varargs"), if any. Other Parameters ---------------- mode: str; default 'repr' Can be ``'repr'`` ``'json'`` ``'pickle'`` ``'human'``. compressed: boolean; default True Compress the resulting signature usingzlib? base64encode: boolean; default True Base-64 encode the resulting signature? truncate: boolean; default True Truncate file names that are too long? This will discard data, but the truncated signature may still serve as an identified with a low collision probability. Returns ------- filename: str ''' sig = neurotools.jobs.ndecorator.sanitize(sig) if compressed and not base64encode: raise ValueError( 'To use compression set base64encode=True') # A hash value gives us good distribution to control # the complexity of the directory tree used to manage # the cache, but is not unique hsh = base64hash10bytes(sig) # We also need to store some information about which # function this is for. We'll get a human readable # name identifying the funciton, and a shorter # hash-value to make sure we invalidate the cache if # the source code or function definition changes. fname = function_signature(f) # The argument spec can be mapped uniquely to a file # name by converting it to text, then converting this # text to base64 to avoid issues with special # characters. Passing the text representation through # zlib preserves the uniqueness of the key, while # reducing the overall size. This improves performance # convert key to an encoded string if mode=='repr' : key = repr(sig) elif mode=='json' : key = json.dumps(sig) elif mode=='pickle': key = pickle.dumps(sig) elif mode=='human' : key = human_encode(sig) else: raise ValueError( 'I support coding modes repr, json, and pickle. ' 'I don\'t recognize coding mode %s'%mode) # compress and base64 encode string key = key.encode('UTF-8') if compressed : key = zlib.compress(key) if base64encode: key = base64.urlsafe_b64encode(key) # Path will be a joining of the hash and the key. The # hash should give good distribution, while the key # means we can recover the arguments from the file name. filename = '%s.%s.%s'%(fname,hsh,key.decode()) # If for some reason the path is too long, complain if len(filename)>255: if truncate: # hash the key if it is too long and truncation # is enabled s = key.decode() kh = base64hash(s) filename = '%s.%s.%s'%(fname,hsh,kh) filename = filename[:255] else: raise ValueError( 'Argument specification exceeds maximum path ' 'length. Function probably accepts data as an ' 'argument, rather than a key to locate data. ' 'See Joblib for a caching framework that uses ' 'cryptographic hashes to solve this problem. ' 'For now, we skip the cache. The offending ' 'filename is '+filename) if __PYTHON_2__: try: ascii = filename.encode("utf8","ignore") assert unicode(ascii)==filename filename = ascii except UnicodeDecodeError: pass check_filename(filename) return filename
[docs] def file_string_to_signature( filename, mode='repr', compressed=True, base64encode=True): ''' Extracts the argument key from the compressed representation in a cache filename entry. Inverse of ``signature_to_file_string()``. The ``filename`` should be provided as a string, without the file extension. The following modes are supported: **repr:** Uses repr and ast.literal_eval(node_or_string) to serialize the argument signature. This is safe, but restricts the types permitted as paramteters. **json:** Uses json to serialize the argument signature. Argument signatures cannot be uniquely recovered, because tuples and lists both map to lists in the json representation. Restricting the types used in the argument signature may circumvent this. **pickle:** Uses pickle to serialize argument signature. This should uniquely store argument signatures that can be recovered, but takes more space. Use this with caution, since changes to the pickle serialization protocol between version will make the encoded data irretrievable. **human:** Attempts a human-readable format. Experimental. human: Attempts a human-readable format. Experimental. Compression is on by default Signatures are base64 encoded by default Parameters ---------- filename: str Encoded filename, as a string, *without* the file extension Other Parameters ---------------- mode: str; default 'repr' Can be ``'repr'`` ``'json'`` ``'pickle'`` ``'human'``. compressed: boolean; default True Whether ``zlib`` was used to compress this function call signature base64encode: boolean; default True Whether this function call signature was base-65 encoded. Returns ------- sig: nested tuple Function arguments created by ``neurotools.jobs.ndecorator.argument_signature()`` A tuple of: args: tuple A tuple consisting of a list of (argument_name, argument_value) tuples. vargs: A tuple containing extra variable arguments ("varargs"), if any. ''' pieces = filename.split('.') key = pieces[-1] hsh = pieces[-2] name = '.'.join(pieces[:-3]) #try: # The argument spec can be mapped uniquely to a file # name by converting it to text, then converting # this text to base64 to avoid issues with special # characters. Passing the text representation # through zlib preserves the uniqueness of the key, # while reducing the overall size. This improves # performance. if base64encode: key = base64.urlsafe_b64decode( (key+'='*10).encode('UTF-8')) if compressed : key = zlib.decompress(key) key = key.decode() if mode=='repr' : sig = ast.literal_eval(key) elif mode=='json' : sig = json.loads(key) elif mode=='pickle': sig = pickle.loads(key) elif mode=='human' : sig = human_decode(key) else: raise ValueError(( 'I support coding modes repr, json, and pickle;' ' I don\'t recognize coding mode %s')%mode) sig = neurotools.jobs.ndecorator.sanitize(sig) return sig ''' except: raise ValueError(( 'Could not decode "%s"; Please ensure that you' 'provide the file name without the file ' 'extension')%filename) '''
[docs] def human_encode(sig): ''' Formats an argument signature for saving as file name Parameters ---------- sig: nested tuple Argument signature as a safe nested tuple Returns ------- result: str Human-readable argument-signature filename ''' sig = neurotools.jobs.ndecorator.sanitize( sig,mode='strict') named, vargs = sig if not vargs is None: raise ValueError( 'Currently variable arguments are not permitted' ' in the human-readable format') result = ','.join( ['%s=%s'%(k,repr(v)) for (k,v) in named]) return result
[docs] def human_decode(key): ''' Formats the argument signature for saving as file name Parameters ---------- key: str Human-readable argument-signature filename Returns ------- sig: nested tuple Argument signature as a nested tuple ''' params = [k.split('=') for k in key.split(',')] params = tuple((n,ast.literal_eval(v)) for n,v in params) sig = (params,None) sig = neurotools.jobs.ndecorator.sanitize( sig,mode='strict') return sig
[docs] def get_cache_path(cache_root,f,*args,**kwargs): ''' Locate the directory path for function ``f`` within the ``__neurotools_cache__`` path ``cache_root``. Parameters ---------- cache_root: str Path to root of the ``__neurotools__`` cache f: function Cached function object Returns ------- path: str ''' sig = neurotools.jobs.ndecorator.argument_signature( f,*args,**kwargs) fn = signature_to_file_string(f,sig, mode ='repr', compressed =True, base64encode=True) pieces = fn.split('.') # first two words used as directories path = cache_root + os.sep + os.sep.join(pieces[:-2]) + os.sep return path
[docs] def locate_cached(cache_root,f,method,*args,**kwargs): ''' Locate a specific cache entry within ``cache_root`` for function ``f`` cached with method ``method``, and called with arguments ``*args`` and keyword arguments ``**kwargs``. Parameters ---------- cache_root: str directory/path as string f: function Function being cached method: str Cache file extension e.g. ``"npy"``, "``mat``", etc. args: iterable function parameters kwargs: dict function keyword arguments Returns ------- fn: str File name of cache entry without extension sig: tuple Tuple of (args,kwargs) info from ``argument_signature()`` path: str Directory containing cache file filename: str File name with extension location: str Full absolute path to cache entry ''' while method.startswith('.'): method=method[1:] sig = neurotools.jobs.ndecorator.argument_signature(f,*args,**kwargs) fn = signature_to_file_string(f,sig, mode ='repr', compressed =True, base64encode=True) pieces = fn.split('.') # first two words used as directories path = cache_root + os.sep + os.sep.join(pieces[:-2]) + os.sep # remaining pieces a filename filename = '.'.join(pieces[-2:])+'.'+method location = path+filename return fn,sig,path,filename,location
[docs] def validate_for_matfile(x): ''' Verify that the nested tuple ``x``, which contains the arguments to a function call, can be safely stored in a Matlab matfile (``.mat``). .. table:: Numpy types: these should be compatible :widths: auto ========== ======================================== Type Description ========== ======================================== bool Boolean (True or False) stored as a byte int8 Byte (-128 to 127) int16 Integer (-32768 to 32767) int32 Integer (-2147483648 to 2147483647) int64 Integer (-9223372036854775808 to 9223372036854775807) uint8 Unsigned integer (0 to 255) uint16 Unsigned integer (0 to 65535) uint32 Unsigned integer (0 to 4294967295) uint64 Unsigned integer (0 to 18446744073709551615) float16 Half precision float: sign bit, 5 bits exponent, 10 bits mantissa float32 Single precision float: sign bit, 8 bits exponent, 23 bits mantissa float64 Double precision float: sign bit, 11 bits exponent, 52 bits mantissa complex64 Complex number, represented by two float32 complex128 Complex number, represented by two float64 ========== ======================================== Parameters ---------- x: nested tuple Arguments to a function Returns ------- :boolean ''' safe = (np.bool_ , np.int8 , np.int16 , np.int32 , np.int64 , np.uint8 , np.uint16 , np.uint32, np.uint64, np.float32, np.float64, np.complex64, np.complex128) if not type(x) == np.ndarray: x = np.array(x) if len(shape(x))<2: raise ValueError("One-dimensional arrays cannot be stored safely in matfiles") if x.dtype == np.object: # object arrays will be converted to cell arrays, # we need to make sure each cell can be stored safely return map(validate_for_matfile,x) if not x.dtype in safe: raise ValueError("Numpy type %s is not on the list of compatible types"%x.dtype) return True
import warnings
[docs] def validate_for_numpy(x): ''' Check whether an array-like object can safely be stored in a numpy archive. .. table:: Numpy types: these should be compatible :widths: auto ========== ======================================== Type Description ========== ======================================== bool Boolean (True or False) stored as a byte int8 Byte (-128 to 127) int16 Integer (-32768 to 32767) int32 Integer (-2147483648 to 2147483647) int64 Integer (-9223372036854775808 to 9223372036854775807) uint8 Unsigned integer (0 to 255) uint16 Unsigned integer (0 to 65535) uint32 Unsigned integer (0 to 4294967295) uint64 Unsigned integer (0 to 18446744073709551615) float16 Half precision float: sign bit, 5 bits exponent, 10 bits mantissa float32 Single precision float: sign bit, 8 bits exponent, 23 bits mantissa float64 Double precision float: sign bit, 11 bits exponent, 52 bits mantissa complex64 Complex number, represented by two float32 complex128 Complex number, represented by two float64 ========== ======================================== Parameters ---------- x: object array-like object; Returns ------- :boolean True if the data in ``x`` can be safely stored in a Numpy archive ''' safe = ( np.bool_ , np.int8 , np.int16 , np.int32 , np.int64 , np.uint8 , np.uint16 , np.uint32, np.uint64, np.float32, np.float64, np.complex64, np.complex128) warnings.filterwarnings("ignore", category=np.VisibleDeprecationWarning) if not isinstance(x,np.ndarray): try: x = np.array(x) except: x = [*x] _x = np.empty(len(x),dtype=object) for i,xi in enumerate(x): _x[i] = x[i] x = _x if x.dtype == object: # object arrays will be converted to cell arrays, # we need to make sure each cell can be stored safely try: ix = iter(x) except TypeError as te: raise ValueError('is not iterable') return map(validate_for_numpy,x) if not x.dtype in safe: raise ValueError("Numpy type %s is not on the list" " of compatible types"%x.dtype) return True
[docs] def read_cache_entry(location,method): if method=='pickle': with open(location,'rb') as openfile: return pickle.load(openfile) elif method =='mat': return scipy.io.loadmat(location)['varargout'] elif method =='npy': return np.load(location,allow_pickle=True)
[docs] def disk_cacher( cache_location, method = 'npy', write_back = True, skip_fast = False, verbose = False, allow_mutable_bindings=False, cache_identifier='__neurotools_cache__'): ''' Decorator to memoize functions to disk. Currying pattern here where cache_location creates decotrators. write_back: True: Default. Computed results are saved to disk False: Computed results are not saved to disk. In this case of hierarchical caches mapped to the filesystem, a background rsync loop can handle asynchronous write-back. method: p: Use pickle to store cache. Can serialize all objects but seriously slow! May not get ANY speedup due to time costs if pickling and disk IO. mat: Use scipy.io.savemat and scipy.io.loadmat. Nice because it's compatible with matlab. Unfortunately, can only store numpy types and data that can be converted to numpy types. Data conversion may alter the types of the return arguments when retrieved from the cache. npy: Use built in numpy.save functionality. hdf5: Not yet implemented. Parameters ---------- cache_location: str Path to disk cache Other Parameters ---------------- method: str; default 'npy' Storange format for caches. Can be 'pickle', 'mat' or 'npy' write_back: boolean; default True Whether to copy new cache value back to the disk cache. If false, then previously cached values can be read but new entries will not be creates skip_fast: boolean; default False Attempt to simply re-compute values which are taking too long to retrieve from the cache. Experimental, do not use. verbose: boolean; default False Whether to print detailde logging information allow_mutable_bindings: boolean; default False Whether to allow caching of functions that close over mutable scope. Such functions are more likely to return different results for the same arguments, leading to invalid cached values. cache_identifier: str; default 'neurotools_cache' subdirectory name for disk cache. Returns ------- cached : disk cacher object TODO ''' VALID_METHODS = ('pickle','mat','npy') assert method in VALID_METHODS cache_location = os.path.abspath(cache_location)+os.sep cache_root = cache_location+cache_identifier neurotools.util.tools.ensure_dir(cache_location) neurotools.util.tools.ensure_dir(cache_root) def cached(f): ''' The ``disk_cacher`` function constructs a decorator ``cached`` that can be used to wrap functions to memoize their results to disk. ``cached`` returns the ``decorated`` object which is constructed by calling the inner function ``wrapped``. cached <-- disk_cacher(location,...) caching_function <-- cached(somefunction) ''' if not allow_mutable_bindings: verify_function_closure(f) # Patch for 2/3 compatibility if __PYTHON_2__: FileError = IOError else: FileError = FileNotFoundError @neurotools.jobs.ndecorator.robust_decorator def wrapped(f,*args,**kwargs): ''' This is a wrapper for memoizing results to disk. This docstring should be overwritten by the docstring of the wrapped function. ''' t0 = neurotools.util.time.current_milli_time() # Store parameters; # These will be saved in the cached result params = (args,tuple(list(kwargs.items()))) try: fn,sig,path,filename,location = \ locate_cached( cache_root,f,method,*args,**kwargs) except ValueError as exc: print('Generating cache key failed') traceback.print_exc()#exc) time,result = f(*args,**kwargs) return result result = None if os.path.isfile(location): try: result = read_cache_entry(location,method) if verbose: print('Retrieved cache at ',path) print(' %s.%s'%(f.__module__,f.__name__)) print(' %s'%neurotools.jobs.ndecorator.print_signature(sig)) except (ValueError, EOFError, OSError, IOError, FileError, UnpicklingError) as exc: if verbose: print(' File reading failed') if not result is None: params,result = result else: if verbose: print('Recomputing cache at %s'%cache_location) print(' %s.%s'%(f.__module__,f.__name__)) print(' %s'%neurotools.jobs.ndecorator.print_signature(sig)) # Evaluate function time,result = f(*args,**kwargs) if verbose: print(' %s'%path) print(' Took %d milliseconds'%time) # Save Cached output to disk if write_back: savedata = (params,result) neurotools.util.tools.ensure_dir(path) Path(location).touch() if verbose: print('Writing cache at ',path) try: if method=='pickle': with open(location,'wb') as openfile: pickle.dump(savedata,openfile,protocol=pickle.HIGHEST_PROTOCOL) elif method =='mat': validated_result = validate_for_matfile(savedata) if validated_result is None: raise ValueError( 'Error: return value cannot be safely packaged in a matfile') scipy.io.savemat(location,{'varargout':savedata}) elif method =='npy': validated_result = validate_for_numpy(savedata) if validated_result is None: raise ValueError( 'Error: return value cannot be safely packaged in a numpy file') sd = np.empty(2,dtype=object) sd[0] = savedata[0] sd[1] = savedata[1] np.save(location, sd) except (ValueError, IOError, PicklingError) as exc2: if verbose: print('Saving cache at %s FAILED'%cache_location) print(' %s.%s'%(f.__module__,f.__name__)) print(' %s'%\ neurotools.jobs.ndecorator.print_signature(sig)) print(' '+'\n '.join(\ traceback.format_exc().split('\n'))) if verbose: try: print('Wrote cache at ',path) print(' For function %s.%s'%\ (f.__module__,f.__name__)) print(' Argument signature %s'%\ neurotools.jobs.ndecorator.print_signature(sig)) st = os.stat(location) du = st.st_blocks * st.st_blksize t1 = neurotools.util.time.current_milli_time() overhead = float(t1-t0) - time io = float(du)/(1+overhead) recompute = float(du)/(1+time) boost = (recompute-io) saved = time - overhead quality = boost/(1+float(du)) print(' Size on disk is %d'%du) print(' IO overhead %d milliseconds'%overhead) print(' Cached performance %0.4f'%io) print(' Recompute cost %0.4f'%recompute) print(' Expected boost %0.4f'%boost) print(' Time-space quality %0.4f'%quality) except (OSError) as exc3: print('\n '.join(\ traceback.format_exc().split('\n'))) # Skipping when the cache is slower than recompute is not yet supported # if skip_fast and boost<0: # if verbose: # print(' WARNING DISK IO MORE EXPENSIVE THAN RECOMPUTING!') # print(' We should really do something about this?') # print(' Zeroing out the file, hopefully that causes it to crash on load?') # with open(location, 'w'): pass return result def purge(*args,**kwargs): ''' Delete cache entries matching arguments. This is a destructive operation, execute with care. Parameters ---------- *args Arguments forward to the ``locate_cached`` function. Matching cache entries will be deleted. **kwargs Keyword arguments forward to the ``locate_cached`` function Matching cache entries will be deleted. ''' for method in VALID_METHODS: fn,sig,path,filename,location = \ locate_cached( cache_root,f,method,*args,**kwargs) print('Deleting %s'%location) try: os.remove(location) print('Deleted %s'%location) except OSError as ee: if ee.errno==2: print('%s does not exist'%location) else: raise pass def lscache(verbose=False): ''' List all files associated with cached invocations of the wrapped function. ("cache entries") ''' path = cache_root + os.sep +\ os.sep.join( function_signature(f).split('.')) try: files = os.listdir(path) except: files = [] if verbose: print('Cache %s contains:'%path) print('\n '+'\n '.join([ f[:20]+'…' for f in files ])) return path,files @neurotools.jobs.ndecorator.robust_decorator def locate(f,*args,**kwargs): ''' A version of the decorator that simply locates the cache file. The result of ``locate_cached`` is returned directly. It is a tuple: (fn,sig,path,filename,location) Returns ------- fn: str File name of cache entry without extension sig: tuple Tuple of (args,kwargs) info from ``argument_signature()`` path:str Directory containing cache file filename: str File name with extension location: str Full absolute path to cache entry ''' return locate_cached(cache_root,f,method,*args,**kwargs) # Bulid decorated function and # Save additional methods associated with decorated object decorated = wrapped(neurotools.jobs.ndecorator.timed(f)) decorated.purge = purge decorated.cache_root = cache_root decorated.lscache = lscache decorated.locate = locate(f) return decorated cached.cache_root = cache_root return cached
[docs] def hierarchical_cacher(fast_to_slow, method='npy', write_back=True, verbose=False, allow_mutable_bindings=False, cache_identifier ='neurotools_cache'): ''' Construct a filesystem cache defined in terms of a hierarchy from faster to slower (fallback) caches. Parameters ---------- fast_to_slow : tuple of strings list of filesystem paths for disk caches in order from the fast (default or main) cache to slower. Other Parameters ---------------- method: string, default ``'npy'`` cache storing method; write_back : bool, default True whether to automatically copy newly computed cache values to the slower caches verbose : bool, defaults to ``False`` whether to print detailed logging iformation to standard out when manipulating the cache allow_mutable_bindings : bool, default False If true, then "unsafe" namespace bindings, for example user-defined functions, will be allowed in disk cached functions. If a cached function calls subroutines, and those subroutines change, the disk cacher cannot detect the implementation different. Consequentially, it cannot tell whether old cached values are invalid. cache_identifier : str, default 'neurotools_cache' (sub)folder name to store cached results Returns ------- hierarchical: decorator A hierarchical disk-caching decorator that can be used to memoize functions to the specified disk caching hierarchy. ''' slow_to_fast = fast_to_slow[::-1] # reverse it all_cachers = [] def hierarchical(f): # disable write-back on the slow caches for location in slow_to_fast[:-1]: f = disk_cacher(location, method = method, write_back = write_back, verbose = verbose, allow_mutable_bindings = allow_mutable_bindings, cache_identifier = cache_identifier)(f) all_cachers.append(f) # use write-back only on the fast cache location = slow_to_fast[-1] f = neurotools.jobs.cache.disk_cacher(location, method = method, write_back = True, verbose = verbose, allow_mutable_bindings = allow_mutable_bindings, cache_identifier = cache_identifier)(f) def purge(*args,**kwargs): ''' Purge each of the constituent cachers ''' for cacher in all_cachers: if hasattr(cacher,'purge'): cacher.purge(*args,**kwargs) f.purge = purge return f return hierarchical
[docs] def scan_cachedir( cachedir, method="npy", verbose=False, **kw): ''' Retrieve all entries in ``cachedir``, unpacking their encoded arguments. Parameters ---------- cachedir: str Cache directory to scan, e.g. ``__neurotools_cache__/…/…/…/somefunction`` Other Parameters ---------------- method: str; default ``'npy'`` Can be ``'npy'`` or ``'mat'`` verbose: boolean; default False **kw: Forwarded to ``file_string_to_signature()``; See ``file_string_to_signature()`` for details. Returns ------- :dict ``filename -> (args,varags)`` dictionary, where ``args`` is a ``parameter_name -> value`` dictionary and ``varargs`` is a list of extra arguments, if any. ''' if not method.startswith('.'): method = '.'+method argnames = None results = {} invalid = [] for f in os.listdir(cachedir): name, ext = os.path.splitext(f) if not ext==method: continue # If this fails we can try to recover from the # cached contents try: args, varargs = file_string_to_signature( name,**kw) if len(args)==2 and isinstance(args[0],str): args = (args,) # Remember argument names, we might need # these to recover signatures from files # whose filename-based decoding fails _argnames,_ = zip(*args) if argnames is None: argnames = _argnames elif not argnames==_argnames: raise ValueError(('File %s argument ' 'names %s differs from previous ' 'argument names %s')%( f,_argnames,argnames)) # Save arguments as dictionary args = dict(args) results[f] = (args,varargs) except zlib.error as e: invalid.append(f) if len(invalid): if verbose: warnings.warn( 'The following files could not be decoded:'+ '\n '+'\n '.join(invalid)) else: warnings.warn( '%d files could not be decoded'%\ len(invalid)) # Try to recover if method=='.npy': if argnames is None: raise ValueError('No valid reference cache ' 'entry was available for identifying ' 'the function arguments; I would need ' 'the original function used to produce ' 'this cache to proceed.') warnings.warn( 'Format is .npy; I will try recover' ' by inspecting file contents') double_failed = [] for f in invalid: try: args, varargs = np.load( cachedir+os.sep+f,allow_pickle=True )[0] args = dict(zip(argnames,args)) results[f] = (args,varargs) except: double_failed.append(f) warnings.warn( '%d/%d recovered'%( len(invalid)-len(double_failed), len(invalid)) ) if len(double_failed): warnings.warn( '%d files irrecoverable'%\ len(double_failed)) return results
[docs] def hashit(obj): if not isinstance(obj,bytes): try: obj = obj.encode('UTF-8') except: obj = repr(obj).encode('UTF-8') return hashlib.sha224(obj).digest()#[::-1]
[docs] def base64hash(obj): ''' Retrieve a base-64 encoded hash for an object. This uses the built-in ``encode`` function to convert an object to ``utf-8``, then calls ``.sha224(obj).digest()`` to create a hash, finally packaging the result in base-64. Parameters ---------- obj: object Returns ------- code: str ''' code = base64.urlsafe_b64encode(hashit(obj)).decode().replace('=','') #code = base64.urlsafe_b64encode(str(hashit(obj)).encode('UTF-8')).decode().replace('=','') return code
[docs] def base64hash10bytes(obj): ''' Retrieve first two bytes of a base-64 encoded has for an object. Parameters ---------- obj: object Returns ------- code: str ''' code = base64.urlsafe_b64encode(hashit(obj)[:10]).decode().replace('=','') #code = base64.urlsafe_b64encode(str(hashit(obj)).encode('UTF-8')).decode().replace('=','') return code
[docs] @neurotools.jobs.ndecorator.memoize def function_hash_with_subroutines(f,force=False): ''' Functions may change if their subroutines change. This function computes a hash value that is sensitive to changes in the source code, docstring, argument specification, name, module, and subroutines. This is a recursive procedure with a fair amount of overhead. To allow for the possibility of mutual recursion, subroutines are excluded from the hash if the function has already been visited. This does not use the built-in hash function for functions in python. **Ongoing development notes** *Is memoization possible?* Making memoization compatible with graceful handling of potentially complex mutually recurrent call structures is tricky. Each function generates a call tree, which does not expand a node if it is already present in the call tree structure. Therefore there are many possible hash values for an intermediate function depending on how far it's call tree gets expanded, which depends on what has been expanded and encountered so far. Therefore, we cannot cache these intermediate values. *Note:* the topology of a mutually recurrent call structure cannot change without changing the source code of at least one function in the call graph? So it suffices to (1) hash the subroutines, (2) expand the call graph (potentially excluding standard and system library functions), (3) grab the non- recursive hash for each of these functions, and (4) then generate the subroutine dependent hash by combining the non-recursive hash with the hash of a datastructure representing the subroutine "profile" obtained from the call graph. We assume that any decorators wrapping the function do not modify it's computation, and can safely be stripped. Note that this function cannot detect changes in effective function behavior that result from changes in global variables or mutable scope that has been closed over. Parameters ---------- force: boolean force muse be true, otherwise this function will fail with a warning. Returns ------- :str Hash of function ''' if not force: raise NotImplementedError( 'It is not possible to hash a function reliably') # repeatedly expand list of subroutines to_expand = {f} expanded = set() while len(to_expand)>0: new_subroutines = set() for g in to_expand: new_subroutines|=get_subroutines(g) expanded |= to_expand to_expand = new_subroutines - expanded # we now have a set, we need to provide some ordering # over that set sort the hash values and hash that return hash(tuple(sorted(map( function_hash_no_subroutines,expanded))))
[docs] def combine_caches(cache_root,f): ''' Merge all cache folders for function ``f`` by copying cache files into the current cache folder. Usually, the existence of multiple cache folders indicates that cache files were generated using versions of ``f`` with different source code. However, you may want to merge caches if you are certain that such changes code did not change the function's behavior. Parameters ---------- cache_root: str path to the top-level cache directory f: function cached function to merge ''' fs = function_signature(f) copy_to = fs.split('.')[-1] parent = os.path.join( cache_root, os.sep.join(fs.split('.')[:2])) copy_from = {*os.listdir(parent)} - {copy_to} for fr in copy_from: for fn in os.listdir(parent+os.sep+fr): fto = parent+os.sep+copy_to+os.sep+fn ffr = parent+os.sep+fr+os.sep+fn if not os.path.exists(fto): shutil.copy2(ffr,fto) return copy_to
[docs] def exists(cache_root,f,method,*args,**kwargs): ''' Check if a cached result for ``f(*args,**kwargs)`` of type ``method`` exists in cache ``cache_root``. Parameters ---------- cache_root: str directory/path as string f: function Function being cached method: str Cache file extension e.g. ``"npy"``, "``mat``", etc. args: iterable function parameters kwargs: dict function keyword arguments Returns ------- :boolean True if the cache file exists ''' return os.path.exists( locate_cached( cache_root,f,method,*args)[-1])