openquake.baselib package#
general#
Utility functions of general interest.
- class openquake.baselib.general.AccumDict(dic=None, accum=None, keys=())[source]#
Bases:
dict
An accumulating dictionary, useful to accumulate variables:
>>> acc = AccumDict() >>> acc += {'a': 1} >>> acc += {'a': 1, 'b': 1} >>> acc {'a': 2, 'b': 1} >>> {'a': 1} + acc {'a': 3, 'b': 1} >>> acc + 1 {'a': 3, 'b': 2} >>> 1 - acc {'a': -1, 'b': 0} >>> acc - 1 {'a': 1, 'b': 0}
The multiplication has been defined:
>>> prob1 = AccumDict(dict(a=0.4, b=0.5)) >>> prob2 = AccumDict(dict(b=0.5)) >>> prob1 * prob2 {'a': 0.4, 'b': 0.25} >>> prob1 * 1.2 {'a': 0.48, 'b': 0.6} >>> 1.2 * prob1 {'a': 0.48, 'b': 0.6}
And even the power:
>>> prob2 ** 2 {'b': 0.25}
It is very common to use an AccumDict of accumulators; here is an example using the empty list as accumulator:
>>> acc = AccumDict(accum=[]) >>> acc['a'] += [1] >>> acc['b'] += [2] >>> sorted(acc.items()) [('a', [1]), ('b', [2])]
The implementation is smart enough to make (deep) copies of the accumulator, therefore each key has a different accumulator, which initially is the empty list (in this case).
- class openquake.baselib.general.Cache[source]#
Bases:
dict
- property hit#
- miss = 0#
- property speedup#
- tot = 0#
- class openquake.baselib.general.CallableDict(keyfunc=<function CallableDict.<lambda>>, keymissing=None)[source]#
Bases:
dict
A callable object built on top of a dictionary of functions, used as a smart registry or as a poor man generic function dispatching on the first argument. It is typically used to implement converters. Here is an example:
>>> format_attrs = CallableDict() # dict of functions (fmt, obj) -> str
>>> @format_attrs.add('csv') # implementation for csv ... def format_attrs_csv(fmt, obj): ... items = sorted(vars(obj).items()) ... return '\n'.join('%s,%s' % item for item in items)
>>> @format_attrs.add('json') # implementation for json ... def format_attrs_json(fmt, obj): ... return json.dumps(vars(obj))
format_attrs(fmt, obj) calls the correct underlying function depending on the fmt key. If the format is unknown a KeyError is raised. It is also possible to set a keymissing function to specify what to return if the key is missing.
For a more practical example see the implementation of the exporters in openquake.calculators.export
- class openquake.baselib.general.Deduplicate(objects, check_one=False)[source]#
Bases:
Sequence
Deduplicate lists containing duplicated objects
- exception openquake.baselib.general.DeprecationWarning[source]#
Bases:
UserWarning
Raised the first time a deprecated function is called
- class openquake.baselib.general.DictArray(imtls)[source]#
Bases:
Mapping
A small wrapper over a dictionary of arrays with the same lenghts.
- class openquake.baselib.general.Param(**defaults)[source]#
Bases:
object
Container class for a set of parameters with defaults
>>> p = Param(a=1, b=2) >>> p.a = 3 >>> p.a, p.b (3, 2) >>> p.c = 4 Traceback (most recent call last): ... AttributeError: Unknown parameter c
- class openquake.baselib.general.RecordBuilder(**defaults)[source]#
Bases:
object
Builder for numpy records or arrays.
>>> rb = RecordBuilder(a=numpy.int64(0), b=1., c="2") >>> rb.dtype dtype([('a', '<i8'), ('b', '<f8'), ('c', 'S1')]) >>> rb() (0, 1., b'2')
- class openquake.baselib.general.WeightedSequence(seq=())[source]#
Bases:
MutableSequence
A wrapper over a sequence of weighted items with a total weight attribute. Adding items automatically increases the weight.
- classmethod merge(ws_list)[source]#
Merge a set of WeightedSequence objects.
- Parameters:
ws_list – a sequence of :class: openquake.baselib.general.WeightedSequence instances
- Returns:
a
openquake.baselib.general.WeightedSequence
instance
- openquake.baselib.general.add_columns(a, b, on, cols=None)[source]#
>>> a_dt = [('aid', numpy.int64), ('eid', numpy.int64), ('loss', float)] >>> b_dt = [('ordinal', numpy.int64), ('custom_site_id', numpy.int64)] >>> a = numpy.array([(1, 0, 2.4), (2, 0, 2.2), ... (1, 1, 2.1), (2, 1, 2.3)], a_dt) >>> b = numpy.array([(0, 20126), (1, 20127), (2, 20128)], b_dt) >>> add_columns(a, b, 'aid', ['custom_site_id']) array([(1, 0, 2.4, 20127), (2, 0, 2.2, 20128), (1, 1, 2.1, 20127), (2, 1, 2.3, 20128)], dtype=[('aid', '<i8'), ('eid', '<i8'), ('loss', '<f8'), ('custom_site_id', '<i8')])
- openquake.baselib.general.agg_probs(*probs)[source]#
Aggregate probabilities with the usual formula 1 - (1 - P1) … (1 - Pn)
- openquake.baselib.general.all_equals(inputs)[source]#
- Parameters:
inputs – a list of arrays or strings
- Returns:
True if all values are equal, False otherwise
- openquake.baselib.general.around(vec, value, delta)[source]#
- Parameters:
vec – a numpy vector or pandas column
value – a float value
delta – a positive float
- Returns:
array of booleans for the range [value-delta, value+delta]
- openquake.baselib.general.assert_close(a, b, rtol=1e-07, atol=0, context=None)[source]#
Compare for equality up to a given precision two composite objects which may contain floats. NB: if the objects are or contain generators, they are exhausted.
- Parameters:
a – an object
b – another object
rtol – relative tolerance
atol – absolute tolerance
- openquake.baselib.general.assert_independent(package, *packages)[source]#
- Parameters:
package – Python name of a module/package
packages – Python names of modules/packages
Make sure the package does not depend from the packages.
- openquake.baselib.general.block_splitter(items, max_weight, weight=<function <lambda>>, key=<function nokey>, sort=False)[source]#
- Parameters:
items – an iterator over items
max_weight – the max weight to split on
weight – a function returning the weigth of a given item
key – a function returning the kind of a given item
sort – if True, sort the items by reverse weight before splitting
Group together items of the same kind until the total weight exceeds the max_weight and yield WeightedSequence instances. Items with weight zero are ignored.
For instance
>>> items = 'ABCDE' >>> list(block_splitter(items, 3)) [<WeightedSequence ['A', 'B', 'C'], weight=3>, <WeightedSequence ['D', 'E'], weight=2>]
The default weight is 1 for all items. Here is an example leveraning on the key to group together results:
>>> items = ['A1', 'C2', 'D2', 'E2'] >>> list(block_splitter(items, 2, key=operator.itemgetter(1))) [<WeightedSequence ['A1'], weight=1>, <WeightedSequence ['C2', 'D2'], weight=2>, <WeightedSequence ['E2'], weight=1>]
- openquake.baselib.general.cached_property(method)[source]#
- Parameters:
method – a method without arguments except self
- Returns:
a cached property
- openquake.baselib.general.categorize(values, nchars=2)[source]#
Takes an array with duplicate values and categorize it, i.e. replace the values with codes of length nchars in BASE183. With nchars=2 33856 unique values can be encoded, if there are more nchars must be increased otherwise a ValueError will be raised.
- Parameters:
values – an array of V non-unique values
nchars – number of characters in BASE183 for each code
- Returns:
an array of V non-unique codes
>>> categorize([1,2,2,3,4,1,1,2]) # 8 values, 4 unique ones array([b'AA', b'AB', b'AB', b'AC', b'AD', b'AA', b'AA', b'AB'], dtype='|S2')
- openquake.baselib.general.check_dependencies()[source]#
Print a warning if we forgot to update the dependencies. Works only for development installations.
- openquake.baselib.general.check_extension(fnames)[source]#
Make sure all file names have the same extension
- openquake.baselib.general.compose_arrays(**kwarrays)[source]#
Compose multiple 1D and 2D arrays into a single composite array. For instance
>>> mag = numpy.array([5.5, 5.6]) >>> mea = numpy.array([[-4.5, -4.6], [-4.45, -4.55]]) >>> compose_arrays(mag=mag, mea=mea) array([(5.5, -4.5 , -4.6 ), (5.6, -4.45, -4.55)], dtype=[('mag', '<f8'), ('mea0', '<f8'), ('mea1', '<f8')])
- openquake.baselib.general.copyobj(obj, **kwargs)[source]#
- Returns:
a shallow copy of obj with some changed attributes
- openquake.baselib.general.countby(array, *kfields)[source]#
- Returns:
a dict kfields -> number of records with that key
- openquake.baselib.general.detach_process()[source]#
Detach the current process from the controlling terminal by using a double fork. Can be used only on platforms with fork (no Windows).
- openquake.baselib.general.dumpa(obj)[source]#
Dump a Python object as an array of uint8:
>>> dumpa(23) array([128, 5, 75, 23, 46], dtype=uint8)
- openquake.baselib.general.duplicated(items)[source]#
- Returns:
the list of duplicated keys, possibly empty
- openquake.baselib.general.engine_version()[source]#
- Returns:
__version__ + <short git hash> if Git repository found
- openquake.baselib.general.fast_agg(indices, values=None, axis=0, factor=None, M=None)[source]#
- Parameters:
indices – N indices in the range 0 … M - 1 with M < N
values – N values (can be arrays)
factor – if given, a multiplicate factor (or weight) for the values
M – maximum index; if None, use max(indices) + 1
- Returns:
M aggregated values (can be arrays)
>>> values = numpy.array([[.1, .11], [.2, .22], [.3, .33], [.4, .44]]) >>> fast_agg([0, 1, 1, 0], values) array([[0.5 , 0.55], [0.5 , 0.55]])
- openquake.baselib.general.fast_agg2(tags, values=None, axis=0)[source]#
- Parameters:
tags – N non-unique tags out of M
values – N values (can be arrays)
- Returns:
(M unique tags, M aggregated values)
>>> values = numpy.array([[.1, .11], [.2, .22], [.3, .33], [.4, .44]]) >>> fast_agg2(['A', 'B', 'B', 'A'], values) (array(['A', 'B'], dtype='<U1'), array([[0.5 , 0.55], [0.5 , 0.55]]))
It can also be used to count the number of tags:
>>> fast_agg2(['A', 'B', 'B', 'A', 'A']) (array(['A', 'B'], dtype='<U1'), array([3., 2.]))
- openquake.baselib.general.fast_agg3(structured_array, kfield, vfields=None, factor=None)[source]#
Aggregate a structured array with a key field (the kfield) and some value fields (the vfields). If vfields is not passed, use all fields except the kfield.
>>> data = numpy.array([(1, 2.4), (1, 1.6), (2, 2.5)], ... [('aid', U16), ('val', F32)]) >>> fast_agg3(data, 'aid') array([(1, 4. ), (2, 2.5)], dtype=[('aid', '<u2'), ('val', '<f4')])
- openquake.baselib.general.gen_slices(start, stop, blocksize)[source]#
Yields slices of lenght at most block_size.
>>> list(gen_slices(1, 6, 2)) [slice(1, 3, None), slice(3, 5, None), slice(5, 6, None)]
- openquake.baselib.general.gen_subclasses(cls)[source]#
- Returns:
the subclasses of cls, ordered by name
- openquake.baselib.general.get_array(array, **kw)[source]#
Extract a subarray by filtering on the given keyword arguments
- openquake.baselib.general.get_bins(values, nbins, key=None, minval=None, maxval=None)[source]#
- Parameters:
values – an array of N floats (or arrays)
- Returns:
an array of N bin indices plus an array of B bins
- openquake.baselib.general.get_duplicates(array, *fields)[source]#
- Returns:
a dictionary {key: num_dupl} for duplicate records
- openquake.baselib.general.get_nbytes_msg(sizedict, size=8)[source]#
- Parameters:
sizedict – mapping name -> num_dimensions
- Returns:
(size of the array in bytes, descriptive message)
>>> get_nbytes_msg(dict(nsites=2, nbins=5)) (80, '(nsites=2) * (nbins=5) * 8 bytes = 80 B')
- openquake.baselib.general.getsizeof(o, ids=None)[source]#
Find the memory footprint of a Python object recursively, see https://code.tutsplus.com/tutorials/understand-how-much-memory-your-python-objects-use–cms-25609 :param o: the object :returns: the size in bytes
- openquake.baselib.general.gettemp(content=None, dir=None, prefix='tmp', suffix='tmp', remove=True)[source]#
Create temporary file with the given content.
Please note: the temporary file can be deleted by the caller or not.
- Parameters:
content (string) – the content to write to the temporary file.
dir (string) – directory where the file should be created
prefix (string) – file name prefix
suffix (string) – file name suffix
remove (bool) – True by default, meaning the file will be automatically removed at the exit of the program
- Returns:
a string with the path to the temporary file
- openquake.baselib.general.group_array(array, *kfields)[source]#
Convert an array into a dict kfields -> array
- openquake.baselib.general.groupby(objects, key, reducegroup=<class 'list'>)[source]#
- Parameters:
objects – a sequence of objects with a key value
key – the key function to extract the key value
reducegroup – the function to apply to each group
- Returns:
a dict {key value: map(reducegroup, group)}
>>> groupby(['A1', 'A2', 'B1', 'B2', 'B3'], lambda x: x[0], ... lambda group: ''.join(x[1] for x in group)) {'A': '12', 'B': '123'}
- openquake.baselib.general.groupby2(records, kfield, vfield)[source]#
- Parameters:
records – a sequence of records with positional or named fields
kfield – the index/name/tuple specifying the field to use as a key
vfield – the index/name/tuple specifying the field to use as a value
- Returns:
an list of pairs of the form (key, [value, …]).
>>> groupby2(['A1', 'A2', 'B1', 'B2', 'B3'], 0, 1) [('A', ['1', '2']), ('B', ['1', '2', '3'])]
Here is an example where the keyfield is a tuple of integers:
>>> groupby2(['A11', 'A12', 'B11', 'B21'], (0, 1), 2) [(('A', '1'), ['1', '2']), (('B', '1'), ['1']), (('B', '2'), ['1'])]
- openquake.baselib.general.groupby_bin(values, nbins, key=None, minval=None, maxval=None)[source]#
>>> values = numpy.arange(10) >>> for group in groupby_bin(values, 3): ... print(group) [0, 1, 2] [3, 4, 5] [6, 7, 8, 9]
- openquake.baselib.general.groupby_grid(xs, ys, deltax, deltay)[source]#
- Parameters:
xs – an array of P abscissas
ys – an array of P ordinates
deltax – grid spacing on the x-axis
deltay – grid spacing on the y-axis
- Returns:
dictionary centroid -> indices (of the points around each centroid)
- openquake.baselib.general.humansize(nbytes, suffixes=('B', 'KB', 'MB', 'GB', 'TB', 'PB'))[source]#
Return file size in a human-friendly format
- openquake.baselib.general.idxs_by_tag(tags)[source]#
>>> idxs_by_tag([2, 1, 1, 2]) {2: array([0, 3], dtype=uint32), 1: array([1, 2], dtype=uint32)}
- openquake.baselib.general.import_all(module_or_package)[source]#
If module_or_package is a module, just import it; if it is a package, recursively imports all the modules it contains. Returns the names of the modules that were imported as a set. The set can be empty if the modules were already in sys.modules.
- openquake.baselib.general.loada(arr)[source]#
Convert an array of uint8 into a Python object:
>>> loada(numpy.array([128, 5, 75, 23, 46], numpy.uint8)) 23
- openquake.baselib.general.multi_index(shape, axis=None)[source]#
- Parameters:
shape – a shape of lenght L
axis – None or an integer in the range 0 .. L -1
- Yields:
tuples of indices with a slice(None) at the axis position (if any)
>>> for slc in multi_index((2, 3), 0): print(slc) (slice(None, None, None), 0, 0) (slice(None, None, None), 0, 1) (slice(None, None, None), 0, 2) (slice(None, None, None), 1, 0) (slice(None, None, None), 1, 1) (slice(None, None, None), 1, 2)
- openquake.baselib.general.not_equal(array_or_none1, array_or_none2)[source]#
Compare two arrays that can also be None or have diffent shapes and returns a boolean.
>>> a1 = numpy.array([1]) >>> a2 = numpy.array([2]) >>> a3 = numpy.array([2, 3]) >>> not_equal(a1, a2) True >>> not_equal(a1, a3) True >>> not_equal(a1, None) True
- class openquake.baselib.general.pack(dic, attrs=())[source]#
Bases:
dict
Compact a dictionary of lists into a dictionary of arrays. If attrs are given, consider those keys as attributes. For instance,
>>> p = pack(dict(x=[1], a=[0]), ['a']) >>> p {'x': array([1])} >>> p.a array([0])
- openquake.baselib.general.println(msg)[source]#
Convenience function to print messages on a single line in the terminal
- openquake.baselib.general.random_choice(array, num_samples, offset=0, seed=42)[source]#
Extract num_samples from an array. It has the fundamental property of splittability, i.e. if the seed is the same and || means array concatenation:
choice(a, N) = choice(a, n, 0) || choice(a, N-n, n)
This property makes random_choice suitable to be parallelized, while random.choice is not. It as also absurdly fast.
- openquake.baselib.general.random_filter(objects, reduction_factor, seed=42)[source]#
Given a list of objects, returns a sublist by extracting randomly some elements. The reduction factor (< 1) tells how small is the extracted list compared to the original list.
- openquake.baselib.general.random_histogram(counts, nbins_or_binweights, seed)[source]#
Distribute a total number of counts over a set of bins. If the weights of the bins are equal you can just pass the number of the bins and a faster algorithm will be used. Otherwise pass the weights. Here are a few examples:
>>> list(random_histogram(1, 2, seed=42)) [0, 1] >>> list(random_histogram(100, 5, seed=42)) [22, 17, 21, 26, 14] >>> list(random_histogram(10000, 5, seed=42)) [2034, 2000, 2014, 1998, 1954] >>> list(random_histogram(1000, [.3, .3, .4], seed=42)) [308, 295, 397]
- openquake.baselib.general.rmsdiff(a, b)[source]#
- Parameters:
a – an array of shape (N, …)
b – an array with the same shape of a
- Returns:
an array of shape (N,) with the root mean squares of a-b
- openquake.baselib.general.run_in_process(code, *args)[source]#
Run in an external process the given Python code and return the output as a Python object. If there are arguments, then code is taken as a template and traditional string interpolation is performed.
- Parameters:
code – string or template describing Python code
args – arguments to be used for interpolation
- Returns:
the output of the process, as a Python object
- openquake.baselib.general.safeprint(*args, **kwargs)[source]#
Convert and print characters using the proper encoding
- openquake.baselib.general.shortlist(lst)[source]#
>>> shortlist([1, 2, 3, 4, 5, 6, 7, 8]) '[1, 2, 3, ..., 6, 7, 8]'
- openquake.baselib.general.smart_concat(arrays)[source]#
Concatenated structured arrays by considering only the common fields
- openquake.baselib.general.socket_ready(hostport)[source]#
- Parameters:
hostport – a pair (host, port) or a string (tcp://)host:port
- Returns:
True if the socket is ready and False otherwise
- openquake.baselib.general.split_in_blocks(sequence, hint, weight=<function <lambda>>, key=<function nokey>)[source]#
Split the sequence in a number of WeightedSequences close to hint.
- Parameters:
sequence – a finite sequence of items
hint – an integer suggesting the number of subsequences to generate
weight – a function returning the weigth of a given item
key – a function returning the key of a given item
The WeightedSequences are of homogeneous key and they try to be balanced in weight. For instance
>>> items = 'ABCDE' >>> list(split_in_blocks(items, 3)) [<WeightedSequence ['A', 'B'], weight=2>, <WeightedSequence ['C', 'D'], weight=2>, <WeightedSequence ['E'], weight=1>]
- openquake.baselib.general.split_in_slices(number, num_slices)[source]#
- Parameters:
number – a positive number to split in slices
num_slices – the number of slices to return (at most)
- Returns:
a list of slices
>>> split_in_slices(4, 2) [slice(0, 2, None), slice(2, 4, None)] >>> split_in_slices(5, 1) [slice(0, 5, None)] >>> split_in_slices(5, 2) [slice(0, 3, None), slice(3, 5, None)] >>> split_in_slices(2, 4) [slice(0, 1, None), slice(1, 2, None)]
hdf5#
- class openquake.baselib.hdf5.ArrayWrapper(array, attrs, extra=('value',))[source]#
Bases:
object
A pickleable and serializable wrapper over an array, HDF5 dataset or group
- Parameters:
array – an array (or the empty tuple)
attrs – metadata of the array (or dictionary of arrays)
- property dtype#
dtype of the underlying array
- property shape#
shape of the underlying array
- to_dframe(skip_zeros=True)[source]#
Convert an ArrayWrapper with shape (D1, …, DN) and attributes (T1, …, TN) which are list of tags of lenghts (D1, …, DN) into a DataFrame with rows (tag1, …, tagN, value) of maximum length D1 * … * DN. Zero values are discarded.
>>> from pprint import pprint >>> dic = dict(shape_descr=['taxonomy', 'occupancy'], ... taxonomy=['RC', 'WOOD'], ... occupancy=['RES', 'IND', 'COM']) >>> arr = numpy.zeros((2, 3)) >>> arr[0, 0] = 2000 >>> arr[0, 1] = 5000 >>> arr[1, 0] = 500 >>> aw = ArrayWrapper(arr, dic) >>> pprint(aw.to_dframe()) taxonomy occupancy value 0 RC RES 2000.0 1 RC IND 5000.0 2 WOOD RES 500.0
It is also possible to pass M > 1 extra fields an convert an array of shape (D1, …, DN, M) and attributes (T1, …, TN) into a DataFrame with rows (tag1, …, tagN, value1, …, valueM).
>>> dic = dict(shape_descr=['taxonomy'], taxonomy=['RC', 'WOOD']) >>> aw = ArrayWrapper(arr, dic, ['RES', 'IND', 'COM']) >>> pprint(aw.to_dframe()) taxonomy RES IND COM 0 RC 2000.0 5000.0 0.0 1 WOOD 500.0 0.0 0.0
- class openquake.baselib.hdf5.ByteCounter(nbytes=0)[source]#
Bases:
object
A visitor used to measure the dimensions of a HDF5 dataset or group. Use it as ByteCounter.get_nbytes(dset_or_group).
- class openquake.baselib.hdf5.CSVFile(fname: str, header: list[str], fields: list[str], size: int, skip: int)[source]#
Bases:
object
- fields: list[str]#
- fname: str#
- header: list[str]#
- size: int#
- skip: int#
- class openquake.baselib.hdf5.File(name, mode='r', driver=None, libver='latest', userblock_size=None, rdcc_nslots=None, rdcc_nbytes=None, rdcc_w0=None, track_order=None, **kwds)[source]#
Bases:
File
Subclass of
h5py.File
able to store and retrieve objects conforming to the HDF5 protocol used by the OpenQuake software. It works recursively also for dictionaries of the form name->obj.>>> f = File('/tmp/x.h5', 'w') >>> f['dic'] = dict(a=dict(x=1, y=2), b=3) >>> dic = f['dic'] >>> dic['a']['x'][()] 1 >>> dic['b'][()] 3 >>> f.close()
- create_df(key, nametypes, compression=None, **kw)[source]#
Create a HDF5 datagroup readable as a pandas DataFrame
- Parameters:
key – name of the dataset
nametypes – list of pairs (name, dtype) or (name, array) or DataFrame
compression – the kind of HDF5 compression to use
kw – extra attributes to store
- read_df(key, index=None, sel=(), slc=slice(None, None, None), slices=())[source]#
- Parameters:
key – name of the structured dataset
index – pandas index (or multi-index), possibly None
sel – dictionary used to select subsets of the dataset
slc – slice object to extract a slice of the dataset
slices – an array of shape (N, 2) with start,stop indices
- Returns:
pandas DataFrame associated to the dataset
- class openquake.baselib.hdf5.Group(items, attrs)[source]#
Bases:
Mapping
A mock for a h5py group object
- openquake.baselib.hdf5.array_of_vstr(lst)[source]#
- Parameters:
lst – a list of strings or bytes
- Returns:
an array of variable length ASCII strings
- openquake.baselib.hdf5.build_dt(dtypedict, names, fname)[source]#
Build a composite dtype for a list of names and dictionary name -> dtype with a None entry corresponding to the default dtype.
- openquake.baselib.hdf5.check_length(field, size)[source]#
- Parameters:
field – a bytes field in the exposure
size – maximum size of the field
- Returns:
a function checking that the value is below the size
- openquake.baselib.hdf5.cls2dotname(cls)[source]#
The full Python name (i.e. pkg.subpkg.mod.cls) of a class
- openquake.baselib.hdf5.create(hdf5, name, dtype, shape=(None,), compression=None, fillvalue=0, attrs=None)[source]#
- Parameters:
hdf5 – a h5py.File object
name – an hdf5 key string
dtype – dtype of the dataset (usually composite)
shape – shape of the dataset (can be extendable)
compression – None or ‘gzip’ are recommended
attrs – dictionary of attributes of the dataset
- Returns:
a HDF5 dataset
- openquake.baselib.hdf5.dotname2cls(dotname)[source]#
The class associated to the given dotname (i.e. pkg.subpkg.mod.cls)
- openquake.baselib.hdf5.dset2df(dset, indexfield, filterdict)[source]#
Converts an HDF5 dataset with an attribute shape_descr into a Pandas dataframe. NB: this is very slow for large datasets.
- openquake.baselib.hdf5.dumps(dic)[source]#
Dump a dictionary in json. Extend json.dumps to work on numpy objects.
- openquake.baselib.hdf5.extend(dset, array, **attrs)[source]#
Extend an extensible dataset with an array of a compatible dtype.
- Parameters:
dset – an h5py dataset
array – an array of length L
- Returns:
the total length of the dataset (i.e. initial length + L)
- openquake.baselib.hdf5.extract_cols(datagrp, sel, slices, columns)[source]#
- Parameters:
datagrp – something like and HDF5 data group
sel – dictionary column name -> value specifying a selection
slices – list of slices
columns – the full list of column names
- Returns:
a dictionary col -> array of values
- openquake.baselib.hdf5.find_error(fname, errors, dtype)[source]#
Given a CSV file with an error, parse it with the csv.reader and get a better exception including the first line with an error
- openquake.baselib.hdf5.get_nbytes(dset)[source]#
- Parameters:
dset – an HDF5 group or dataset
- Returns:
the size of the underlying array or None if the dataset is actually a group.
- openquake.baselib.hdf5.get_shape_descr(json_string)[source]#
- Parameters:
json_string – JSON string containing the shape_descr
- Returns:
a dictionary field -> values extracted from the shape_descr
- openquake.baselib.hdf5.json_to_obj(js)[source]#
- Parameters:
js – a JSON string with the form {“cls”: {“arg1”: …}}
- Returns:
an instance cls(arg1, …)
- openquake.baselib.hdf5.obj_to_json(obj)[source]#
- Parameters:
obj – a Python object with a .__dict__
- Returns:
a JSON string
- openquake.baselib.hdf5.parse_comment(comment)[source]#
Parse a comment of the form investigation_time=50.0, imt=”PGA”, … and returns it as pairs of strings:
>>> parse_comment('''path=['b1'], time=50.0, imt="PGA"''') [('path', ['b1']), ('time', 50.0), ('imt', 'PGA')]
- openquake.baselib.hdf5.read_csv(fname, dtypedict={None: <class 'float'>}, renamedict={}, sep=', ', index=None, errors=None, usecols=None)[source]#
- Parameters:
fname – a CSV file with an header and float fields
dtypedict – a dictionary fieldname -> dtype, None -> default
renamedict – aliases for the fields to rename
sep – separator (default comma)
index – if not None, returns a pandas DataFrame
errors – passed to the underlying open function (default None)
usecols – columns to read
- Returns:
an ArrayWrapper, unless there is an index
- openquake.baselib.hdf5.sanitize(value)[source]#
Sanitize the value so that it can be stored as an HDF5 attribute
- openquake.baselib.hdf5.save_npz(obj, path)[source]#
- Parameters:
obj – object to serialize
path – an .npz pathname
- openquake.baselib.hdf5.sel(dset, filterdict)[source]#
Select a dataset with shape_descr. For instance dstore.sel(‘hcurves’, imt=’PGA’, sid=2)
node#
This module defines a Node class, together with a few conversion functions which are able to convert NRML files into hierarchical objects (DOM). That makes it easier to read and write XML from Python and viceversa. Such features are used in the command-line conversion tools. The Node class is kept intentionally similar to an Element class, however it overcomes the limitation of ElementTree: in particular a node can manage a lazy iterable of subnodes, whereas ElementTree wants to keep everything in memory. Moreover the Node class provides a convenient dot notation to access subnodes.
The Node class is instantiated with four arguments:
the node tag (a mandatory string)
the node attributes (a dictionary)
the node value (a string or None)
the subnodes (an iterable over nodes)
If a node has subnodes, its value should be None.
For instance, here is an example of instantiating a root node with two subnodes a and b:
>>> from openquake.baselib.node import Node
>>> a = Node('a', {}, 'A1')
>>> b = Node('b', {'attrb': 'B'}, 'B1')
>>> root = Node('root', nodes=[a, b])
>>> root
<root {} None ...>
Node objects can be converted into nicely indented strings:
>>> print(root.to_str())
root
a 'A1'
b{attrb='B'} 'B1'
The subnodes can be retrieved with the dot notation:
>>> root.a
<a {} A1 >
The value of a node can be extracted with the ~ operator:
>>> ~root.a
'A1'
If there are multiple subnodes with the same name
>>> root.append(Node('a', {}, 'A2')) # add another 'a' node
the dot notation will retrieve the first node.
It is possible to retrieve the other nodes from the ordinal index:
>>> root[0], root[1], root[2]
(<a {} A1 >, <b {'attrb': 'B'} B1 >, <a {} A2 >)
The list of all subnodes with a given name can be retrieved as follows:
>>> list(root.getnodes('a'))
[<a {} A1 >, <a {} A2 >]
It is also possible to delete a node given its index:
>>> del root[2]
A node is an iterable object yielding its subnodes:
>>> list(root)
[<a {} A1 >, <b {'attrb': 'B'} B1 >]
The attributes of a node can be retrieved with the square bracket notation:
>>> root.b['attrb']
'B'
It is possible to add and remove attributes freely:
>>> root.b['attr'] = 'new attr'
>>> del root.b['attr']
Node objects can be easily converted into ElementTree objects:
>>> node_to_elem(root)
<Element 'root' at ...>
Then is trivial to generate the XML representation of a node:
>>> from xml.etree import ElementTree
>>> print(ElementTree.tostring(node_to_elem(root)).decode('utf-8'))
<root><a>A1</a><b attrb="B">B1</b></root>
Generating XML files larger than the available memory requires some care. The trick is to use a node generator, such that it is not necessary to keep the entire tree in memory. Here is an example:
>>> def gen_many_nodes(N):
... for i in xrange(N):
... yield Node('a', {}, 'Text for node %d' % i)
>>> lazytree = Node('lazytree', {}, nodes=gen_many_nodes(10))
The lazytree object defined here consumes no memory, because the nodes are not created a instantiation time. They are created as soon as you start iterating on the lazytree. In particular list(lazytree) will generated all of them. If your goal is to store the tree on the filesystem in XML format you should use a writing routine converting a subnode at the time, without requiring the full list of them. The routines provided by ElementTree are no good, however baselib.writers provide an StreamingXMLWriter just for that purpose.
Lazy trees should not be used unless it is absolutely necessary in order to save memory; the problem is that if you use a lazy tree the slice notation will not work (the underlying generator will not accept it); moreover it will not be possible to iterate twice on the subnodes, since the generator will be exhausted. Notice that even accessing a subnode with the dot notation will avance the generator. Finally, nodes containing lazy nodes will not be pickleable.
- class openquake.baselib.node.Node(fulltag, attrib=None, text=None, nodes=None, lineno=None)[source]#
Bases:
object
A class to make it easy to edit hierarchical structures with attributes, such as XML files. Node objects must be pickleable and must consume as little memory as possible. Moreover they must be easily converted from and to ElementTree objects. The advantage over ElementTree objects is that subnodes can be lazily generated and that they can be accessed with the dot notation.
- attrib#
- lineno#
- nodes#
- tag#
- text#
- to_str(expandattrs=True, expandvals=True, striptags=True, shortentags=False)[source]#
Convert the node into a string, intended for testing/debugging purposes
- Parameters:
expandattrs – print the values of the attributes if True, else print only the names
expandvals – print the values if True, else print only the tag names
striptags (bool) – do not display fully qualified tag names
shortentags (bool) – display a shorter representation of the namespace (overriding the striptags parameter)
- class openquake.baselib.node.SourceLineParser[source]#
Bases:
XMLParser
A custom parser managing line numbers: works for Python <= 3.3
- class openquake.baselib.node.StreamingXMLWriter(bytestream, indent=4, encoding='utf-8', nsmap=None)[source]#
Bases:
object
A bynary stream XML writer. The typical usage is something like this:
with StreamingXMLWriter(output_file) as writer: writer.start_tag('root') for node in nodegenerator(): writer.serialize(node) writer.end_tag('root')
- class openquake.baselib.node.ValidatingXmlParser(validators, stop=None)[source]#
Bases:
object
Validating XML Parser based on Expat. It has two methods .parse_file and .parse_bytes returning a validated
Node
object.- Parameters:
validators – a dictionary of validation functions
stop – the tag where to stop the parsing (if any)
- exception Exit[source]#
Bases:
Exception
Raised when the parsing is stopped before the end on purpose
- openquake.baselib.node.context(fname, node)[source]#
Context manager managing exceptions and adding line number of the current node and name of the current file to the error message.
- Parameters:
fname – the current file being processed
node – the current node being processed
- openquake.baselib.node.floatformat(fmt_string)[source]#
Context manager to change the default format string for the function
openquake.baselib.writers.write_csv()
.- Parameters:
fmt_string – the format to use; for instance ‘%13.9E’
- openquake.baselib.node.iterparse(source, events=('end',), remove_comments=True, **kw)[source]#
Thin wrapper around ElementTree.iterparse
- openquake.baselib.node.node_copy(node, nodefactory=<class 'openquake.baselib.node.Node'>)[source]#
Make a deep copy of the node
- openquake.baselib.node.node_display(root, expandattrs=False, expandvals=False, output=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, striptags=True, shortentags=False, nsmap=None)[source]#
Write an indented representation of the Node object on the output; this is intended for testing/debugging purposes.
- Parameters:
root – a Node object
expandattrs (bool) – if True, the values of the attributes are also printed, not only the names
expandvals (bool) – if True, the values of the tags are also printed, not only the names.
output – stream where to write the string representation of the node
striptags (bool) – do not display fully qualified tag names
shortentags (bool) – display a shorter representation of the namespace (overriding the striptags parameter)
nsmap (dict) – map of namespaces (keys are full names, values are the corresponding aliases)
- openquake.baselib.node.node_from_dict(dic, nodefactory=<class 'openquake.baselib.node.Node'>)[source]#
Convert a (nested) dictionary into a Node object.
- openquake.baselib.node.node_from_elem(elem, nodefactory=<class 'openquake.baselib.node.Node'>, lazy=())[source]#
Convert (recursively) an ElementTree object into a Node object.
- openquake.baselib.node.node_from_ini(ini_file, nodefactory=<class 'openquake.baselib.node.Node'>, root_name='ini')[source]#
Convert a .ini file into a Node object.
- Parameters:
ini_file – a filename or a file like object in read mode
- openquake.baselib.node.node_from_xml(xmlfile, nodefactory=<class 'openquake.baselib.node.Node'>)[source]#
Convert a .xml file into a Node object.
- Parameters:
xmlfile – a file name or file object open for reading
- openquake.baselib.node.node_to_dict(node)[source]#
Convert a Node object into a (nested) dictionary with attributes tag, attrib, text, nodes.
- Parameters:
node – a Node-compatible object
- openquake.baselib.node.node_to_elem(root)[source]#
Convert (recursively) a Node object into an ElementTree object.
- openquake.baselib.node.node_to_ini(node, output=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]#
Convert a Node object with the right structure into a .ini file.
- Params node:
a Node object
- Params output:
a file-like object opened in write mode
- openquake.baselib.node.node_to_xml(node, output, nsmap=None)[source]#
Convert a Node object into a pretty .xml file without keeping everything in memory. If you just want the string representation use tostring(node).
- Parameters:
node – a Node-compatible object (ElementTree nodes are fine)
output – a binary output file
nsmap – if given, shorten the tags with aliases
- openquake.baselib.node.parse(source, remove_comments=True, **kw)[source]#
Thin wrapper around ElementTree.parse
- openquake.baselib.node.pprint(self, stream=None, indent=1, width=80, depth=None)[source]#
Pretty print the underlying literal Python object
- openquake.baselib.node.read_nodes(fname, filter_elem, nodefactory=<class 'openquake.baselib.node.Node'>, remove_comments=True)[source]#
Convert an XML file into a lazy iterator over Node objects satifying the given specification, i.e. a function element -> boolean.
- Parameters:
fname – file name of file object
filter_elem – element specification
In case of errors, add the file name to the error message.
- openquake.baselib.node.scientificformat(value, fmt='%13.9E', sep=' ', sep2=':')[source]#
- Parameters:
value – the value to convert into a string
fmt – the formatting string to use for float values
sep – separator to use for vector-like values
sep2 – second separator to use for matrix-like values
Convert a float or an array into a string by using the scientific notation and a fixed precision (by default 10 decimal digits). For instance:
>>> scientificformat(-0E0) '0.000000000E+00' >>> scientificformat(-0.004) '-4.000000000E-03' >>> scientificformat([0.004]) '4.000000000E-03' >>> scientificformat([0.01, 0.02], '%10.6E') '1.000000E-02 2.000000E-02' >>> scientificformat([[0.1, 0.2], [0.3, 0.4]], '%4.1E') '1.0E-01:2.0E-01 3.0E-01:4.0E-01'
- openquake.baselib.node.striptag(tag)[source]#
Get the short representation of a fully qualified tag
- Parameters:
tag (str) – a (fully qualified or not) XML tag
- openquake.baselib.node.tostring(node, indent=4, nsmap=None)[source]#
Convert a node into an XML string by using the StreamingXMLWriter. This is useful for testing purposes.
- Parameters:
node – a node object (typically an ElementTree object)
indent – the indentation to use in the XML (default 4 spaces)
parallel#
The Starmap API#
There are several good libraries to manage parallel programming in Python, both
in the standard library and in third party packages. Since we are not
interested in reinventing the wheel, OpenQuake does not provide any new
parallel library; however, it does offer some glue code so that you
can use over your library of choice. Currently threading, multiprocessing,
and zmq are supported. Moreover,
openquake.baselib.parallel
offers some additional facilities
that make it easier to parallelize scientific computations,
i.e. embarrassingly parallel problems.
Typically one wants to apply a callable to a list of arguments in parallel, and then combine together the results. This is known as a MapReduce problem. As a simple example, we will consider the problem of counting the letters in a text, by using the following count function:
def count(word):
return collections.Counter(word)
The collections.Counter class works sequentially, and can
solve the problem in parallel by using
openquake.baselib.parallel.Starmap
:
>>> arglist = [('hello',), ('world',)] # list of arguments
>>> smap = Starmap(count, arglist) # Starmap instance, nothing started yet
>>> sorted(smap.reduce().items()) # build the counts per letter
[('d', 1), ('e', 1), ('h', 1), ('l', 3), ('o', 2), ('r', 1), ('w', 1)]
A Starmap object is an iterable: when iterating over it produces task results. It also has a reduce method similar to functools.reduce with sensible defaults:
the default aggregation function is add, so there is no need to specify it
the default accumulator is an empty accumulation dictionary (see
openquake.baselib.AccumDict
) working as a Counter, so there is no need to specify it.
You can of course override the defaults, so if you really want to return a Counter you can do
>>> res = Starmap(count, arglist).reduce(acc=collections.Counter())
In the engine we use nearly always callables that return dictionaries and we aggregate nearly always with the addition operator, so such defaults are very convenient. You are encouraged to do the same, since we found that approach to be very flexible. Typically in a scientific application you will return a dictionary of numpy arrays.
The parallelization algorithm used by Starmap will depend on the environment variable OQ_DISTRIBUTE. Here are the possibilities available at the moment:
- OQ_DISTRIBUTE not set or set to “processpool”:
use multiprocessing
- OQ_DISTRIBUTE set to “no”:
disable the parallelization, useful for debugging
- OQ_DISTRIBUTE set tp “zmq”
use the zmq concurrency mechanism (experimental)
There is also an OQ_DISTRIBUTE = “threadpool”; however the performance of using threads instead of processes is normally bad for the kind of applications we are interested in (CPU-dominated, which large tasks such that the time to spawn a new process is negligible with respect to the time to perform the task), so it is not recommended.
If you are using a pool, is always a good idea to cleanup resources at the end with
>>> Starmap.shutdown()
Starmap.shutdown is always defined. It does nothing if there is no pool, but it is still better to call it: in the future, you may change idea and use another parallelization strategy requiring cleanup. In this way your code is future-proof.
Monitoring#
A major feature of the Starmap API is the ability to monitor the time spent
in each task and the memory allocated. Such information is written into an
HDF5 file that can be provided by the user or autogenerated. To autogenerate
the file you can use openquake.commonlib.datastore.hdf5new()
which
will create a file named calc_XXX.hdf5
in your $OQ_DATA directory
(if the environment variable is not set, the engine will use $HOME/oqdata).
Here is an example of usage:
>>> from openquake.commonlib.datastore import hdf5new
>>> h5 = hdf5new()
>>> smap = Starmap(count, [['hello'], ['world']], h5=h5)
>>> print(sorted(smap.reduce().items()))
[('d', 1), ('e', 1), ('h', 1), ('l', 3), ('o', 2), ('r', 1), ('w', 1)]
After the calculation, or even while the calculation is running, you can open the calculation file for reading and extract the performance information for it. The engine provides a command to do that, oq show performance, but you can also get it manually, with a call to openquake.baselib.performance.performance_view(h5) which will return the performance information as a numpy array:
>>> from openquake.baselib.performance import performance_view
>>> performance_view(h5).dtype.names
('operation', 'time_sec', 'memory_mb', 'counts')
>>> h5.close()
The four columns are as follows:
- operation:
the name of the function running in parallel (in this case ‘count’)
- time_sec:
the cumulative time in second spent running the function
- memory_mb:
the maximum allocated memory per core
- counts:
the number of times the function was called (in this case 2)
The Starmap.apply API#
The Starmap class has a very convenient classmethod Starmap.apply which is used in several places in the engine. Starmap.apply is useful when you have a sequence of objects that you want to split in homogenous chunks and then apply a callable to each chunk (in parallel). For instance, in the letter counting example discussed before, Starmap.apply could be used as follows:
>>> text = 'helloworld' # sequence of characters
>>> res3 = Starmap.apply(count, (text,)).reduce()
>>> assert res3 == res
The API of Starmap.apply is designed to extend the one of apply,
a builtin of Python 2; the second argument is the tuple of arguments
passed to the first argument. The difference with apply is that
Starmap.apply returns a Starmap
object so that nothing is
actually done until you iterate on it (reduce is doing that).
How many chunks will be produced? That depends on the parameter concurrent_tasks; it it is not passed, it has a default of 5 times the number of cores in your machine - as returned by os.cpu_count() - and Starmap.apply will try to produce a number of chunks close to that number. The nice thing is that it is also possible to pass a weight function. Suppose for instance that instead of a list of letters you have a list of seismic sources: some sources requires a long computation time (such as ComplexFaultSources), some requires a short computation time (such as PointSources). By giving an heuristic weight to the different sources it is possible to produce chunks with nearly homogeneous weight; in particular PointSource tasks will contain a lot more sources than tasks with ComplexFaultSources.
It is essential in large computations to have a homogeneous task distribution, otherwise you will end up having a big task dominating the computation time (i.e. you may have 1000 cores of which 999 are free, having finished all the short tasks, but you have to wait for days for the single core processing the slow task). The OpenQuake engine does a great deal of work trying to split slow sources in more manageable fast sources.
- class openquake.baselib.parallel.IterResult(iresults, taskname, argnames, sent, h5)[source]#
Bases:
object
- Parameters:
iresults – an iterator over Result objects
taskname – the name of the task
done_total – a function returning the number of done tasks and the total
sent – a nested dictionary name -> {argname: number of bytes sent}
progress – a logging function for the progress report
hdf5path – a path where to store persistently the performance info
- class openquake.baselib.parallel.Pickled(obj)[source]#
Bases:
object
An utility to manually pickling/unpickling objects. Pickled instances have a nice string representation and length giving the size of the pickled bytestring.
- Parameters:
obj – the object to pickle
- compressed = False#
- class openquake.baselib.parallel.Result(val, mon, tb_str='', msg='')[source]#
Bases:
object
- Parameters:
val – value to return or exception instance
mon – Monitor instance
tb_str – traceback string (empty if there was no exception)
msg – message string (default empty)
- func = None#
Bases:
object
Wrapper over a SharedMemory array to be used as a context manager.
- class openquake.baselib.parallel.Starmap(task_func, task_args=(), distribute=None, progress=<function info>, h5=None)[source]#
Bases:
object
- CT = 8#
- classmethod apply(task, allargs, concurrent_tasks=None, maxweight=None, weight=<function Starmap.<lambda>>, key=<function Starmap.<lambda>>, distribute=None, progress=<function info>, h5=None)[source]#
Apply a task to a tuple of the form (sequence, *other_args) by first splitting the sequence in chunks, according to the weight of the elements and possibly to a key (see :func: openquake.baselib.general.split_in_blocks).
- Parameters:
task – a task to run in parallel
args – the arguments to be passed to the task function
concurrent_tasks – hint about how many tasks to generate
maxweight – if not None, used to split the tasks
weight – function to extract the weight of an item in arg0
key – function to extract the kind of an item in arg0
distribute – if not given, inferred from OQ_DISTRIBUTE
progress – logging function to use (default logging.info)
h5 – an open hdf5.File where to store the performance info
- Returns:
an
IterResult
object
- apply_split(task, allargs, concurrent_tasks=None, maxweight=None, weight=<function Starmap.<lambda>>, key=<function Starmap.<lambda>>, distribute=None, progress=<function info>, h5=None, duration=300, outs_per_task=5)[source]#
Same as Starmap.apply, but possibly produces subtasks
- expected_outputs = 0#
- get_results()[source]#
- Returns:
an
IterResult
instance
- init_slurm()[source]#
Initialize the list host_cores by reading the file with the hostcores generated by the worker nodes
- maxtasksperchild = None#
- num_cores = 4#
- pids = ()#
- running_tasks = []#
Apply SharedArray.new to a dictionary of arrays
- openquake.baselib.parallel.check_mem_usage(soft_percent=None, hard_percent=None)[source]#
Display a warning if we are running out of memory
- openquake.baselib.parallel.get_pickled_sizes(obj)[source]#
Return the pickled sizes of an object and its direct attributes, ordered by decreasing size. Here is an example:
>> total_size, partial_sizes = get_pickled_sizes(Monitor(‘’)) >> total_size 345 >> partial_sizes [(‘_procs’, 214), (‘exc’, 4), (‘mem’, 4), (‘start_time’, 4), (‘_start_time’, 4), (‘duration’, 4)]
Notice that the sizes depend on the operating system and the machine.
- openquake.baselib.parallel.multispawn(func, allargs, nprocs=4, logfinish=True)[source]#
Spawn processes with the given arguments
- openquake.baselib.parallel.oq_distribute(task=None)[source]#
- Returns:
the value of OQ_DISTRIBUTE or config.distribution.oq_distribute
- openquake.baselib.parallel.pickle_sequence(objects)[source]#
Convert an iterable of objects into a list of pickled objects. If the iterable contains copies, the pickling will be done only once. If the iterable contains objects already pickled, they will not be pickled again.
- Parameters:
objects – a sequence of objects to pickle
- openquake.baselib.parallel.safely_call(func, args, task_no=0, mon=<Monitor [runner]>)[source]#
Call the given function with the given arguments safely, i.e. by trapping the exceptions. Return a pair (result, exc_type) where exc_type is None if no exceptions occur, otherwise it is the exception class and the result is a string containing error message and traceback.
- Parameters:
func – the function to call
args – the arguments
task_no – the task number
mon – a monitor
- openquake.baselib.parallel.scratch_dir(job_id)[source]#
- Returns:
scratch directory associated to the given job_id
- openquake.baselib.parallel.sendback(res, zsocket)[source]#
Send back to the master node the result by using the zsocket.
- Returns:
the accumulated number of bytes sent
- openquake.baselib.parallel.sequential_apply(task, args, concurrent_tasks=8, maxweight=None, weight=<function <lambda>>, key=<function <lambda>>, progress=<function info>)[source]#
Apply sequentially task to args by splitting args[0] in blocks
- openquake.baselib.parallel.split_task(elements, func, args, duration, outs_per_task, monitor)[source]#
- Parameters:
func – a task function with a monitor as last argument
args – arguments of the task function, with args[0] being a sequence
duration – split the task if it exceeds the duration
outs_per_task – number of splits to try (ex. 5)
- Yields:
a partial result, 0 or more task objects
performance#
- class openquake.baselib.performance.Monitor(operation='', measuremem=False, inner_loop=False, h5=None, version=None, dbserver_host='127.0.0.1')[source]#
Bases:
object
Measure the resident memory occupied by a list of processes during the execution of a block of code. Should be used as a context manager, as follows:
with Monitor('do_something') as mon: do_something() print mon.mem
At the end of the block the Monitor object will have the following 5 public attributes:
.start_time: when the monitor started (a datetime object) .duration: time elapsed between start and stop (in seconds) .exc: usually None; otherwise the exception happened in the with block .mem: the memory delta in bytes
The behaviour of the Monitor can be customized by subclassing it and by overriding the method on_exit(), called at end and used to display or store the results of the analysis.
NB: if the .address attribute is set, it is possible for the monitor to send commands to that address, assuming there is a
multiprocessing.connection.Listener
listening.- address = None#
- authkey = None#
- calc_id = None#
- property dt#
Last time interval measured
- get_data()[source]#
- Returns:
an array of dtype perf_dt, with the information of the monitor (operation, time_sec, memory_mb, counts); the lenght of the array can be 0 (for counts=0) or 1 (otherwise).
- inject = None#
- iter(genobj, atstop=<function Monitor.<lambda>>)[source]#
- Parameters:
genobj – a generator object
atstop – optional thunk to call at StopIteration
- Yields:
the elements of the generator object
- property mem#
Mean memory allocation
- new(operation='no operation', **kw)[source]#
Return a copy of the monitor usable for a different operation.
- read(key, slc=slice(None, None, None))[source]#
- Parameters:
key – key in the _tmp.hdf5 file
slc – slice to read (default all)
- Returns:
unpickled object
- save(key, obj)[source]#
- Parameters:
key – key in the _tmp.hdf5 file
obj – big object to store in pickle format
- Returns:
True is saved, False if not because the key was taken
- save_task_info(h5, res, name, mem_gb=0)[source]#
Called by parallel.IterResult.
- Parameters:
h5 – where to save the info
res – a
Result
objectname – name of the task function
mem_gb – memory consumption at the saving time (optional)
- property start_time#
Datetime instance recording when the monitoring started
- class openquake.baselib.performance.PStatData(ncalls, tottime, percall, cumtime, percall2, path)#
Bases:
tuple
- cumtime#
Alias for field number 3
- ncalls#
Alias for field number 0
- path#
Alias for field number 5
- percall#
Alias for field number 2
- percall2#
Alias for field number 4
- tottime#
Alias for field number 1
- openquake.baselib.performance.compile(sigstr)[source]#
Compile a function Ahead-Of-Time using the given signature string
- openquake.baselib.performance.get_pstats(pstatfile, n)[source]#
Return profiling information as a list [(ncalls, cumtime, path), …]
- Parameters:
pstatfile – path to a .pstat file
n – the maximum number of stats to retrieve
- openquake.baselib.performance.get_slices(uint32s)[source]#
- Parameters:
uint32s – a sequence of uint32 integers (with repetitions)
- Returns:
a dict integer -> [(start, stop), …]
>>> from pprint import pprint >>> pprint(get_slices(numpy.uint32([0, 0, 3, 3, 3, 2, 2, 0]))) {0: [(0, 2), (7, 8)], 2: [(5, 7)], 3: [(2, 5)]}
- openquake.baselib.performance.init_performance(hdf5file, swmr=False)[source]#
- Parameters:
hdf5file – file name of hdf5.File instance
- openquake.baselib.performance.kollapse(array, kfields, kround=<function kround0>, mfields=(), afield='')[source]#
Given a structured array of N elements with a discrete kfield with K <= N unique values, returns a structured array of K elements obtained by averaging the values associated to the kfield.
- openquake.baselib.performance.memory_gb(pids=())[source]#
- Params pids:
a list or PIDs running on the same machine
- Returns:
the total memory allocated by the current process and all the PIDs
- openquake.baselib.performance.memory_rss(pid)[source]#
- Returns:
the RSS memory allocated by a process
- openquake.baselib.performance.perf_stat()[source]#
Profile the current process by using the linux perf command
- openquake.baselib.performance.performance_view(dstore)[source]#
Returns the performance view as a numpy array.
- openquake.baselib.performance.split_array(arr, indices, counts=None)[source]#
- Parameters:
arr – an array with N elements
indices – a set of integers with repetitions
counts – if None the indices MUST be ordered
- Returns:
a list of K arrays, split on the integers
>>> arr = numpy.array([.1, .2, .3, .4, .5]) >>> idx = numpy.array([1, 1, 2, 2, 3]) >>> split_array(arr, idx) [array([0.1, 0.2]), array([0.3, 0.4]), array([0.5])]
python3compat#
Compatibility layer for Python 2 and 3. Mostly copied from six and future, but reduced to the subset of utilities needed by GEM. This is done to avoid an external dependency.
- openquake.baselib.python3compat.decode(val)[source]#
Decode an object assuming the encoding is UTF-8.
- Param:
a unicode or bytes object
- Returns:
a unicode object
- openquake.baselib.python3compat.encode(val)[source]#
Encode a string assuming the encoding is UTF-8.
- Param:
a unicode or bytes object
- Returns:
bytes
- openquake.baselib.python3compat.raise_(tp, value=None, tb=None)[source]#
A function that matches the Python 2.x
raise
statement. This allows re-raising exceptions with the cls value and traceback on Python 2 and 3.
runtests#
sap#
openquake.baselib.sap
is a Simple Argument Parser based on argparse
which is extremely powerful. Its features are
zero boilerplate (no decorators)
supports arbitrarily nested subcommands with an easy sintax
automatically generates a simple parser from a Python module and a hierarchic parser from a Python package.
Here is a minimal example of usage:
>>> def convert_archive(input_, output=None, inplace=False, *, out='/tmp'):
... "Example"
... print(input_, output, inplace, out)
>>> convert_archive.input_ = 'input file or archive'
>>> convert_archive.inplace = 'convert inplace'
>>> convert_archive.output = 'output archive'
>>> convert_archive.out = 'output directory'
>>> run(convert_archive, argv=['a.zip', 'b.zip'])
a.zip b.zip False /tmp
>>> run(convert_archive, argv=['a.zip', '-i', '-o', '/tmp/x'])
a.zip None True /tmp/x
- openquake.baselib.sap.parser(funcdict, **kw)[source]#
- Parameters:
funcdict – a function or a nested dictionary of functions
kw – keyword arguments passed to the underlying ArgumentParser
- Returns:
the ArgumentParser instance
- openquake.baselib.sap.pkg2dic(pkg)[source]#
- Parameters:
pkg – a python module or package
- Returns:
a dictionary name -> func_or_dic_of_funcs
writers module#
- class openquake.baselib.writers.CsvWriter(sep=',', fmt='%12.8E')[source]#
Bases:
object
Class used in the exporters to save a bunch of CSV files
- save(data, fname, header=(), comment=None, renamedict=None)[source]#
Save data on fname.
- Parameters:
data – numpy array, list of lists or pandas DataFrame
fname – path name
header – header to use
comment – optional dictionary to be converted in a comment
renamedict – a dictionary for renaming the columns
- openquake.baselib.writers.build_header(dtype)[source]#
Convert a numpy nested dtype into a list of strings suitable as header of csv file.
>>> imt_dt = numpy.dtype([('PGA', numpy.float32, 3), ... ('PGV', numpy.float32, 4)]) >>> build_header(imt_dt) ['PGA:3', 'PGV:4'] >>> gmf_dt = numpy.dtype([('A', imt_dt), ('B', imt_dt), ... ('idx', numpy.uint32)]) >>> build_header(gmf_dt) ['A~PGA:3', 'A~PGV:4', 'B~PGA:3', 'B~PGV:4', 'idx']
- openquake.baselib.writers.castable_to_int(s)[source]#
Return True if the string s can be interpreted as an integer
- openquake.baselib.writers.extract_from(data, fields)[source]#
Extract data from numpy arrays with nested records.
>>> imt_dt = numpy.dtype([('PGA', float, 3), ('PGV', float, 4)]) >>> a = numpy.array([([1, 2, 3], [4, 5, 6, 7])], imt_dt) >>> extract_from(a, ['PGA']) array([[1., 2., 3.]])
>>> gmf_dt = numpy.dtype([('A', imt_dt), ('B', imt_dt), ... ('idx', numpy.uint32)]) >>> b = numpy.array([(([1, 2, 3], [4, 5, 6, 7]), ... ([1, 2, 4], [3, 5, 6, 7]), 8)], gmf_dt) >>> extract_from(b, ['idx']) array([8], dtype=uint32) >>> extract_from(b, ['B', 'PGV']) array([[3., 5., 6., 7.]])
- openquake.baselib.writers.parse_header(header)[source]#
Convert a list of the form [‘fieldname:fieldtype:fieldsize’,…] into a numpy composite dtype. The parser understands headers generated by
openquake.baselib.writers.build_header()
. Here is an example:>>> parse_header(['PGA:float32', 'PGV', 'avg:float32:2']) (['PGA', 'PGV', 'avg'], dtype([('PGA', '<f4'), ('PGV', '<f4'), ('avg', '<f4', (2,))]))
- Params header:
a list of type descriptions
- Returns:
column names and the corresponding composite dtype
- openquake.baselib.writers.write_csv(dest, data, sep=',', fmt='%.6E', header=(), comment=None, renamedict=None)[source]#
- Parameters:
dest – None, file, filename or io.StringIO instance
data – array to save
sep – separator to use (default comma)
fmt – formatting string (default ‘%12.8E’)
header – optional list with the names of the columns to display
comment – optional comment dictionary