openquake.baselib package

openquake.baselib.datastore module

class openquake.baselib.datastore.DataStore(calc_id=None, datadir=None, params=(), mode=None)[source]

Bases: collections.abc.MutableMapping

DataStore class to store the inputs/outputs of a calculation on the filesystem.

Here is a minimal example of usage:

>>> ds = DataStore()
>>> ds['example'] = 42
>>> print(ds['example'][()])
42
>>> ds.clear()

When reading the items, the DataStore will return a generator. The items will be ordered lexicographically according to their name.

There is a serialization protocol to store objects in the datastore. An object is serializable if it has a method __toh5__ returning an array and a dictionary, and a method __fromh5__ taking an array and a dictionary and populating the object. For an example of use see openquake.hazardlib.site.SiteCollection.

exception EmptyDataset[source]

Bases: ValueError

Raised when reading an empty dataset

build_fname(prefix, postfix, fmt, export_dir=None)[source]

Build a file name from a realization, by using prefix and extension.

Parameters:
  • prefix – the prefix to use
  • postfix – the postfix to use (can be a realization object)
  • fmt – the extension (‘csv’, ‘xml’, etc)
  • export_dir – export directory (if None use .export_dir)
Returns:

relative pathname including the extension

clear()[source]

Remove the datastore from the file system

close()[source]

Close the underlying hdf5 file

create_dset(key, dtype, shape=(None, ), compression=None, fillvalue=0, attrs=None)[source]

Create a one-dimensional HDF5 dataset.

Parameters:
  • key – name of the dataset
  • dtype – dtype of the dataset (usually composite)
  • shape – shape of the dataset, possibly extendable
  • compression – the kind of HDF5 compression to use
  • attrs – dictionary of attributes of the dataset
Returns:

a HDF5 dataset

export_dir

Return the underlying export directory

export_path(relname, export_dir=None)[source]

Return the path of the exported file by adding the export_dir in front, the calculation ID at the end.

Parameters:
  • relname – relative file name
  • export_dir – export directory (if None use .export_dir)
flush()[source]

Flush the underlying hdf5 file

get(key, default)[source]
Returns:the value associated to the datastore key, or the default
get_attr(key, name, default=None)[source]
Parameters:
  • key – dataset path
  • name – name of the attribute
  • default – value to return if the attribute is missing
get_attrs(key)[source]
Parameters:key – dataset path
Returns:dictionary of attributes for that path
get_file(key)[source]
Returns:a BytesIO object
getitem(name)[source]

Return a dataset by using h5py.File.__getitem__

getsize(key=None)[source]

Return the size in byte of the output associated to the given key. If no key is given, returns the total size of all files.

metadata
Returns:datastore metadata version, date, checksum as a dictionary
open(mode)[source]

Open the underlying .hdf5 file and the parent, if any

read_df(key, index=None)[source]
Parameters:
  • key – name of the structured dataset
  • index – if given, name of the “primary key” field
Returns:

pandas DataFrame associated to the dataset

retrieve_files(prefix='input')[source]
Yields:pairs (relative path, data)
save(key, kw)[source]

Update the object associated to key with the kw dictionary; works for LiteralAttrs objects and automatically flushes.

set_attrs(key, **kw)[source]

Set the HDF5 attributes of the given key

store_files(fnames, where='input/')[source]
Parameters:fnames – a set of full pathnames
swmr_on()[source]

Enable the SWMR mode on the underlying HDF5 file

openquake.baselib.datastore.dset2df(dset, index)[source]

Converts an HDF5 dataset with an attribute shape_descr into a Pandas dataframe. NB: this is very slow for large datasets.

openquake.baselib.datastore.extract_calc_id_datadir(filename, datadir=None)[source]

Extract the calculation ID from the given filename or integer:

>>> extract_calc_id_datadir('/mnt/ssd/oqdata/calc_25.hdf5')
(25, '/mnt/ssd/oqdata')
>>> extract_calc_id_datadir('/mnt/ssd/oqdata/wrong_name.hdf5')
Traceback (most recent call last):
   ...
ValueError: Cannot extract calc_id from /mnt/ssd/oqdata/wrong_name.hdf5
openquake.baselib.datastore.get_calc_ids(datadir=None)[source]

Extract the available calculation IDs from the datadir, in order.

openquake.baselib.datastore.get_datadir()[source]

Extracts the path of the directory where the openquake data are stored from the environment ($OQ_DATADIR) or from the shared_dir in the configuration file.

openquake.baselib.datastore.get_last_calc_id(datadir=None)[source]

Extract the latest calculation ID from the given directory. If none is found, return 0.

openquake.baselib.datastore.hdf5new(datadir=None)[source]

Return a new hdf5.File by instance with name determined by the last calculation in the datadir (plus one). Set the .path attribute to the generated filename.

openquake.baselib.datastore.read(calc_id, mode='r', datadir=None)[source]
Parameters:
  • calc_id – calculation ID or filename
  • mode – ‘r’ or ‘w’
  • datadir – the directory where to look
Returns:

the corresponding DataStore instance

Read the datastore, if it exists and it is accessible.

general

Utility functions of general interest.

class openquake.baselib.general.AccumDict(dic=None, accum=None, keys=())[source]

Bases: dict

An accumulating dictionary, useful to accumulate variables:

>>> acc = AccumDict()
>>> acc += {'a': 1}
>>> acc += {'a': 1, 'b': 1}
>>> acc
{'a': 2, 'b': 1}
>>> {'a': 1} + acc
{'a': 3, 'b': 1}
>>> acc + 1
{'a': 3, 'b': 2}
>>> 1 - acc
{'a': -1, 'b': 0}
>>> acc - 1
{'a': 1, 'b': 0}

The multiplication has been defined:

>>> prob1 = AccumDict(dict(a=0.4, b=0.5))
>>> prob2 = AccumDict(dict(b=0.5))
>>> prob1 * prob2
{'a': 0.4, 'b': 0.25}
>>> prob1 * 1.2
{'a': 0.48, 'b': 0.6}
>>> 1.2 * prob1
{'a': 0.48, 'b': 0.6}

And even the power:

>>> prob2 ** 2

{‘b’: 0.25}

It is very common to use an AccumDict of accumulators; here is an example using the empty list as accumulator:

>>> acc = AccumDict(accum=[])
>>> acc['a'] += [1]
>>> acc['b'] += [2]
>>> sorted(acc.items())
[('a', [1]), ('b', [2])]

The implementation is smart enough to make (deep) copies of the accumulator, therefore each key has a different accumulator, which initially is the empty list (in this case).

apply(func, *extras)[source]

>> a = AccumDict({‘a’: 1, ‘b’: 2}) >> a.apply(lambda x, y: 2 * x + y, 1) {‘a’: 3, ‘b’: 5}

class openquake.baselib.general.CallableDict(keyfunc=<function CallableDict.<lambda>>, keymissing=None)[source]

Bases: dict

A callable object built on top of a dictionary of functions, used as a smart registry or as a poor man generic function dispatching on the first argument. It is typically used to implement converters. Here is an example:

>>> format_attrs = CallableDict()  # dict of functions (fmt, obj) -> str
>>> @format_attrs.add('csv')  # implementation for csv
... def format_attrs_csv(fmt, obj):
...     items = sorted(vars(obj).items())
...     return '\n'.join('%s,%s' % item for item in items)
>>> @format_attrs.add('json')  # implementation for json
... def format_attrs_json(fmt, obj):
...     return json.dumps(vars(obj))

format_attrs(fmt, obj) calls the correct underlying function depending on the fmt key. If the format is unknown a KeyError is raised. It is also possible to set a keymissing function to specify what to return if the key is missing.

For a more practical example see the implementation of the exporters in openquake.calculators.export

add(*keys)[source]

Return a decorator registering a new implementation for the CallableDict for the given keys.

exception openquake.baselib.general.CodeDependencyError[source]

Bases: Exception

exception openquake.baselib.general.DeprecationWarning[source]

Bases: UserWarning

Raised the first time a deprecated function is called

class openquake.baselib.general.DictArray(imtls)[source]

Bases: collections.abc.Mapping

A small wrapper over a dictionary of arrays serializable to HDF5:

>>> d = DictArray({'PGA': [0.01, 0.02, 0.04], 'PGV': [0.1, 0.2]})
>>> from openquake.baselib import hdf5
>>> with hdf5.File('/tmp/x.h5', 'w') as f:
...      f['d'] = d
...      f['d']
<DictArray
PGA: [0.01 0.02 0.04]
PGV: [0.1 0.2]>

The DictArray maintains the lexicographic order of the keys.

new(array)[source]

Convert an array of compatible length into a DictArray:

>>> d = DictArray({'PGA': [0.01, 0.02, 0.04], 'PGV': [0.1, 0.2]})
>>> d.new(numpy.arange(0, 5, 1))  # array of lenght 5 = 3 + 2
<DictArray
PGA: [0 1 2]
PGV: [3 4]>
class openquake.baselib.general.WeightedSequence(seq=())[source]

Bases: collections.abc.MutableSequence

A wrapper over a sequence of weighted items with a total weight attribute. Adding items automatically increases the weight.

insert(i, item_weight)[source]

Insert an item with the given weight in the sequence

classmethod merge(ws_list)[source]

Merge a set of WeightedSequence objects.

Parameters:ws_list – a sequence of :class: openquake.baselib.general.WeightedSequence instances
Returns:a openquake.baselib.general.WeightedSequence instance
openquake.baselib.general.add_columns(a, b, on, cols=None)[source]
>>> a_dt = [('aid', int), ('eid', int), ('loss', float)]
>>> b_dt = [('ordinal', int), ('zipcode', int)]
>>> a = numpy.array([(1, 0, 2.4), (2, 0, 2.2),
...                  (1, 1, 2.1), (2, 1, 2.3)], a_dt)
>>> b = numpy.array([(0, 20126), (1, 20127), (2, 20128)], b_dt)
>>> add_columns(a, b, 'aid', ['zipcode'])
array([(1, 0, 2.4, 20127), (2, 0, 2.2, 20128), (1, 1, 2.1, 20127),
       (2, 1, 2.3, 20128)],
      dtype=[('aid', '<i8'), ('eid', '<i8'), ('loss', '<f8'), ('zipcode', '<i8')])
openquake.baselib.general.add_defaults(array, **kw)[source]
Parameters:
  • array – a structured array
  • kw – a dictionary field name -> default value
Returns:

a new array with additional fields with default values

openquake.baselib.general.assert_close(a, b, rtol=1e-07, atol=0, context=None)[source]

Compare for equality up to a given precision two composite objects which may contain floats. NB: if the objects are or contain generators, they are exhausted.

Parameters:
  • a – an object
  • b – another object
  • rtol – relative tolerance
  • atol – absolute tolerance
openquake.baselib.general.assert_independent(package, *packages)[source]
Parameters:
  • package – Python name of a module/package
  • packages – Python names of modules/packages

Make sure the package does not depend from the packages.

openquake.baselib.general.bin_idxs(values, nbins, key=None, minval=None, maxval=None)[source]
Parameters:values – an array of N floats (or arrays)
Returns:an array of N indices
openquake.baselib.general.block_splitter(items, max_weight, weight=<function <lambda>>, key=<function nokey>)[source]
Parameters:
  • items – an iterator over items
  • max_weight – the max weight to split on
  • weight – a function returning the weigth of a given item
  • key – a function returning the kind of a given item

Group together items of the same kind until the total weight exceeds the max_weight and yield WeightedSequence instances. Items with weight zero are ignored.

For instance

>>> items = 'ABCDE'
>>> list(block_splitter(items, 3))
[<WeightedSequence ['A', 'B', 'C'], weight=3>, <WeightedSequence ['D', 'E'], weight=2>]

The default weight is 1 for all items. Here is an example leveraning on the key to group together results:

>>> items = ['A1', 'C2', 'D2', 'E2']
>>> list(block_splitter(items, 2, key=operator.itemgetter(1)))
[<WeightedSequence ['A1'], weight=1>, <WeightedSequence ['C2', 'D2'], weight=2>, <WeightedSequence ['E2'], weight=1>]
openquake.baselib.general.cached_property(method)[source]
Parameters:method – a method without arguments except self
Returns:a cached property
openquake.baselib.general.categorize(values, nchars=2)[source]

Takes an array with duplicate values and categorize it, i.e. replace the values with codes of length nchars in base64. With nchars=2 4096 unique values can be encoded, if there are more nchars must be increased otherwise a ValueError will be raised.

Parameters:
  • values – an array of V non-unique values
  • nchars – number of characters in base64 for each code
Returns:

an array of V non-unique codes

>>> categorize([1,2,2,3,4,1,1,2]) # 8 values, 4 unique ones
array([b'AA', b'AB', b'AB', b'AC', b'AD', b'AA', b'AA', b'AB'],
      dtype='|S2')
openquake.baselib.general.ceil(a, b)[source]

Divide a / b and return the biggest integer close to the quotient.

Parameters:
  • a – a number
  • b – a positive number
Returns:

the biggest integer close to the quotient

openquake.baselib.general.count(groupiter)[source]
openquake.baselib.general.countby(array, *kfields)[source]
Returns:a dict kfields -> number of records with that key
openquake.baselib.general.debug(templ, *args)[source]

Append a debug line to the file /tmp/debug.txt

openquake.baselib.general.deprecated(func, msg='', *args, **kw)[source]

A family of decorators to mark deprecated functions.

Parameters:msg – the message to print the first time the deprecated function is used.

Here is an example of usage:

>>> @deprecated(msg='Use new_function instead')
... def old_function():
...     'Do something'

Notice that if the function is called several time, the deprecation warning will be displayed only the first time.

openquake.baselib.general.detach_process()[source]

Detach the current process from the controlling terminal by using a double fork. Can be used only on platforms with fork (no Windows).

openquake.baselib.general.distinct(keys)[source]

Return the distinct keys in order.

openquake.baselib.general.duplicated(items)[source]
Returns:True if the items are duplicated, False otherwise
openquake.baselib.general.fast_agg(indices, values=None, axis=0, factor=None)[source]
Parameters:
  • indices – N indices in the range 0 … M - 1 with M < N
  • values – N values (can be arrays)
Returns:

M aggregated values (can be arrays)

>>> values = numpy.array([[.1, .11], [.2, .22], [.3, .33], [.4, .44]])
>>> fast_agg([0, 1, 1, 0], values)
array([[0.5 , 0.55],
       [0.5 , 0.55]])
openquake.baselib.general.fast_agg2(tags, values=None, axis=0)[source]
Parameters:
  • tags – N non-unique tags out of M
  • values – N values (can be arrays)
Returns:

(M unique tags, M aggregated values)

>>> values = numpy.array([[.1, .11], [.2, .22], [.3, .33], [.4, .44]])
>>> fast_agg2(['A', 'B', 'B', 'A'], values)
(array(['A', 'B'], dtype='<U1'), array([[0.5 , 0.55],
       [0.5 , 0.55]]))

It can also be used to count the number of tags:

>>> fast_agg2(['A', 'B', 'B', 'A', 'A'])
(array(['A', 'B'], dtype='<U1'), array([3., 2.]))
openquake.baselib.general.fast_agg3(structured_array, kfield, vfields, factor=None)[source]

Aggregate a structured array with a key field (the kfield) and some value fields (the vfields).

>>> data = numpy.array([(1, 2.4), (1, 1.6), (2, 2.5)],
...                    [('aid', U16), ('val', F32)])
>>> fast_agg3(data, 'aid', ['val'])
array([(1, 4. ), (2, 2.5)], dtype=[('aid', '<u2'), ('val', '<f4')])
openquake.baselib.general.gen_slices(start, stop, blocksize)[source]

Yields slices of lenght at most block_size.

>>> list(gen_slices(1, 6, 2))
[slice(1, 3, None), slice(3, 5, None), slice(5, 6, None)]
openquake.baselib.general.gen_subclasses(cls)[source]
Returns:the subclasses of cls, ordered by name
openquake.baselib.general.get_array(array, **kw)[source]

Extract a subarray by filtering on the given keyword arguments

openquake.baselib.general.get_array_nbytes(sizedict)[source]
Parameters:sizedict – mapping name -> num_dimensions
Returns:(size of the array in bytes, descriptive message)
>>> get_array_nbytes(dict(nsites=2, nbins=5))
(80, '(nsites=2) * (nbins=5) * 8 bytes = 80 B')
openquake.baselib.general.get_duplicates(array, *fields)[source]
Returns:a dictionary {key: num_dupl} for duplicate records
openquake.baselib.general.get_indices(integers)[source]
Parameters:integers – a sequence of integers (with repetitions)
Returns:a dict integer -> [(start, stop), …]
>>> get_indices([0, 0, 3, 3, 3, 2, 2, 0])
{0: [(0, 2), (7, 8)], 3: [(2, 5)], 2: [(5, 7)]}
openquake.baselib.general.getsizeof(o, ids=None)[source]

Find the memory footprint of a Python object recursively, see https://code.tutsplus.com/tutorials/understand-how-much-memory-your-python-objects-use–cms-25609 :param o: the object :returns: the size in bytes

openquake.baselib.general.gettemp(content=None, dir=None, prefix='tmp', suffix='tmp')[source]

Create temporary file with the given content.

Please note: the temporary file can be deleted by the caller or not.

Parameters:
  • content (string) – the content to write to the temporary file.
  • dir (string) – directory where the file should be created
  • prefix (string) – file name prefix
  • suffix (string) – file name suffix
Returns:

a string with the path to the temporary file

openquake.baselib.general.git_suffix(fname)[source]
Returns:<short git hash> if Git repository found
openquake.baselib.general.group_array(array, *kfields)[source]

Convert an array into a dict kfields -> array

openquake.baselib.general.groupby(objects, key, reducegroup=<class 'list'>)[source]
Parameters:
  • objects – a sequence of objects with a key value
  • key – the key function to extract the key value
  • reducegroup – the function to apply to each group
Returns:

a dict {key value: map(reducegroup, group)}

>>> groupby(['A1', 'A2', 'B1', 'B2', 'B3'], lambda x: x[0],
...         lambda group: ''.join(x[1] for x in group))
{'A': '12', 'B': '123'}
openquake.baselib.general.groupby2(records, kfield, vfield)[source]
Parameters:
  • records – a sequence of records with positional or named fields
  • kfield – the index/name/tuple specifying the field to use as a key
  • vfield – the index/name/tuple specifying the field to use as a value
Returns:

an list of pairs of the form (key, [value, …]).

>>> groupby2(['A1', 'A2', 'B1', 'B2', 'B3'], 0, 1)
[('A', ['1', '2']), ('B', ['1', '2', '3'])]

Here is an example where the keyfield is a tuple of integers:

>>> groupby2(['A11', 'A12', 'B11', 'B21'], (0, 1), 2)
[(('A', '1'), ['1', '2']), (('B', '1'), ['1']), (('B', '2'), ['1'])]
openquake.baselib.general.groupby_bin(values, nbins, key=None, minval=None, maxval=None)[source]
>>> values = numpy.arange(10)
>>> for group in groupby_bin(values, 3):
...     print(group)
[0, 1, 2]
[3, 4, 5]
[6, 7, 8, 9]
openquake.baselib.general.humansize(nbytes, suffixes=('B', 'KB', 'MB', 'GB', 'TB', 'PB'))[source]

Return file size in a human-friendly format

openquake.baselib.general.import_all(module_or_package)[source]

If module_or_package is a module, just import it; if it is a package, recursively imports all the modules it contains. Returns the names of the modules that were imported as a set. The set can be empty if the modules were already in sys.modules.

openquake.baselib.general.multi_index(shape, axis=None)[source]
Parameters:
  • shape – a shape of lenght L with P = S1 * S2 * … * SL
  • axis – None or an integer in the range 0 .. L -1
Yields:

P tuples of indices with a slice(None) at the axis position (if any)

openquake.baselib.general.nokey(item)[source]

Dummy function to apply to items without a key

openquake.baselib.general.not_equal(array_or_none1, array_or_none2)[source]

Compare two arrays that can also be None or have diffent shapes and returns a boolean.

>>> a1 = numpy.array([1])
>>> a2 = numpy.array([2])
>>> a3 = numpy.array([2, 3])
>>> not_equal(a1, a2)
True
>>> not_equal(a1, a3)
True
>>> not_equal(a1, None)
True
class openquake.baselib.general.pack(dic, attrs=())[source]

Bases: dict

Compact a dictionary of lists into a dictionary of arrays. If attrs are given, consider those keys as attributes. For instance,

>>> p = pack(dict(x=[1], a=[0]), ['a'])
>>> p
{'x': array([1])}
>>> p.a
array([0])
openquake.baselib.general.println(msg)[source]

Convenience function to print messages on a single line in the terminal

openquake.baselib.general.random_filter(objects, reduction_factor, seed=42)[source]

Given a list of objects, returns a sublist by extracting randomly some elements. The reduction factor (< 1) tells how small is the extracted list compared to the original list.

openquake.baselib.general.random_histogram(counts, nbins, seed)[source]

Distribute a total number of counts on a set of bins homogenously.

>>> random_histogram(1, 2, 42)
array([1, 0])
>>> random_histogram(100, 5, 42)
array([28, 18, 17, 19, 18])
>>> random_histogram(10000, 5, 42)
array([2043, 2015, 2050, 1930, 1962])
openquake.baselib.general.removetmp()[source]

Remove the temporary files created by gettemp

openquake.baselib.general.run_in_process(code, *args)[source]

Run in an external process the given Python code and return the output as a Python object. If there are arguments, then code is taken as a template and traditional string interpolation is performed.

Parameters:
  • code – string or template describing Python code
  • args – arguments to be used for interpolation
Returns:

the output of the process, as a Python object

openquake.baselib.general.safeprint(*args, **kwargs)[source]

Convert and print characters using the proper encoding

openquake.baselib.general.socket_ready(hostport)[source]
Parameters:hostport – a pair (host, port) or a string (tcp://)host:port
Returns:True if the socket is ready and False otherwise
openquake.baselib.general.split_in_blocks(sequence, hint, weight=<function <lambda>>, key=<function nokey>)[source]

Split the sequence in a number of WeightedSequences close to hint.

Parameters:
  • sequence – a finite sequence of items
  • hint – an integer suggesting the number of subsequences to generate
  • weight – a function returning the weigth of a given item
  • key – a function returning the key of a given item

The WeightedSequences are of homogeneous key and they try to be balanced in weight. For instance

>>> items = 'ABCDE'
>>> list(split_in_blocks(items, 3))
[<WeightedSequence ['A', 'B'], weight=2>, <WeightedSequence ['C', 'D'], weight=2>, <WeightedSequence ['E'], weight=1>]
openquake.baselib.general.split_in_slices(number, num_slices)[source]
Parameters:
  • number – a positive number to split in slices
  • num_slices – the number of slices to return (at most)
Returns:

a list of slices

>>> split_in_slices(4, 2)
[slice(0, 2, None), slice(2, 4, None)]
>>> split_in_slices(5, 1)
[slice(0, 5, None)]
>>> split_in_slices(5, 2)
[slice(0, 3, None), slice(3, 5, None)]
>>> split_in_slices(2, 4)
[slice(0, 1, None), slice(1, 2, None)]
openquake.baselib.general.warn(msg, *args)[source]

Print a warning on stderr

openquake.baselib.general.zipfiles(fnames, archive, mode='w', log=<function <lambda>>, cleanup=False)[source]

Build a zip archive from the given file names.

Parameters:
  • fnames – list of path names
  • archive – path of the archive or BytesIO object

hdf5

class openquake.baselib.hdf5.ArrayWrapper(array, attrs, extra=('value', ))[source]

Bases: object

A pickleable and serializable wrapper over an array, HDF5 dataset or group

Parameters:
  • array – an array (or the empty tuple)
  • attrs – metadata of the array (or dictionary of arrays)
dtype

dtype of the underlying array

classmethod from_(obj, extra='value')[source]
save(path, **extra)[source]
Parameters:
  • path – an .hdf5 pathname
  • extra – extra attributes to be saved in the file
shape

shape of the underlying array

to_dict()[source]

Convert the public attributes into a dictionary

to_table()[source]

Convert an ArrayWrapper with shape (D1, …, DN) and attributes T1, …, TN which are list of tags of lenghts D1, … DN into a table with rows (tag1, … tagN, extra1, … extraM) of maximum length D1 * … * DN. Zero values are discarded.

>>> from pprint import pprint
>>> dic = dict(shape_descr=['taxonomy', 'occupancy'],
...            taxonomy=['RC', 'WOOD'],
...            occupancy=['RES', 'IND', 'COM'])
>>> arr = numpy.zeros((2, 3))
>>> arr[0, 0] = 2000
>>> arr[0, 1] = 5000
>>> arr[1, 0] = 500
>>> aw = ArrayWrapper(arr, dic)
>>> pprint(aw.to_table())
[('taxonomy', 'occupancy', 'value'),
 ('RC', 'RES', 2000.0),
 ('RC', 'IND', 5000.0),
 ('WOOD', 'RES', 500.0)]
toml()[source]
Returns:a TOML string representation of the ArrayWrapper
class openquake.baselib.hdf5.ByteCounter(nbytes=0)[source]

Bases: object

A visitor used to measure the dimensions of a HDF5 dataset or group. Use it as ByteCounter.get_nbytes(dset_or_group).

classmethod get_nbytes(dset)[source]
class openquake.baselib.hdf5.File(name, mode=None, driver=None, libver='latest', userblock_size=None, swmr=True, rdcc_nslots=None, rdcc_nbytes=None, rdcc_w0=None, track_order=None, **kwds)[source]

Bases: h5py._hl.files.File

Subclass of h5py.File able to store and retrieve objects conforming to the HDF5 protocol used by the OpenQuake software. It works recursively also for dictionaries of the form name->obj.

>>> f = File('/tmp/x.h5', 'w')
>>> f['dic'] = dict(a=dict(x=1, y=2), b=3)
>>> dic = f['dic']
>>> dic['a']['x'][()]
1
>>> dic['b'][()]
3
>>> f.close()
getitem(name)[source]

Return a dataset by using h5py.File.__getitem__

save_attrs(path, attrs, **kw)[source]
save_vlen(key, data)[source]

Save a sequence of variable-length arrays

Parameters:
  • key – name of the dataset
  • data – data to store as a list of arrays
classmethod temporary()[source]

Returns a temporary hdf5 file, open for writing. The temporary name is stored in the .path attribute. It is the user responsability to remove the file when closed.

class openquake.baselib.hdf5.Group(items, attrs)[source]

Bases: collections.abc.Mapping

A mock for a h5py group object

class openquake.baselib.hdf5.LiteralAttrs[source]

Bases: object

A class to serialize a set of parameters in HDF5 format. The goal is to store simple parameters as an HDF5 table in a readable way. Each parameter can be retrieved as an attribute, given its name. The implementation treats specially dictionary attributes, by storing them as attrname.keyname strings, see the example below:

>>> class Ser(LiteralAttrs):
...     def __init__(self, a, b):
...         self.a = a
...         self.b = b
>>> ser = Ser(1, dict(x='xxx', y='yyy'))
>>> arr, attrs = ser.__toh5__()
>>> for k, v in arr:
...     print('%s=%s' % (k, v))
a=1
b.x='xxx'
b.y='yyy'
>>> s = object.__new__(Ser)
>>> s.__fromh5__(arr, attrs)
>>> s.a
1
>>> s.b['x']
'xxx'

The implementation is not recursive, i.e. there will be at most one dot in the serialized names (in the example here a, b.x, b.y).

openquake.baselib.hdf5.array_of_vstr(lst)[source]
Parameters:lst – a list of strings or bytes
Returns:an array of variable length ASCII strings
openquake.baselib.hdf5.build_dt(dtypedict, names)[source]

Build a composite dtype for a list of names and dictionary name -> dtype with a None entry corresponding to the default dtype.

openquake.baselib.hdf5.cls2dotname(cls)[source]

The full Python name (i.e. pkg.subpkg.mod.cls) of a class

openquake.baselib.hdf5.create(hdf5, name, dtype, shape=(None, ), compression=None, fillvalue=0, attrs=None)[source]
Parameters:
  • hdf5 – a h5py.File object
  • name – an hdf5 key string
  • dtype – dtype of the dataset (usually composite)
  • shape – shape of the dataset (can be extendable)
  • compression – None or ‘gzip’ are recommended
  • attrs – dictionary of attributes of the dataset
Returns:

a HDF5 dataset

openquake.baselib.hdf5.decode_array(values)[source]

Decode the values which are bytestrings.

openquake.baselib.hdf5.dotname2cls(dotname)[source]

The class associated to the given dotname (i.e. pkg.subpkg.mod.cls)

openquake.baselib.hdf5.extend(dset, array, **attrs)[source]

Extend an extensible dataset with an array of a compatible dtype.

Parameters:
  • dset – an h5py dataset
  • array – an array of length L
Returns:

the total length of the dataset (i.e. initial length + L)

openquake.baselib.hdf5.extract(dset, *d_slices)[source]
Parameters:
  • dset – a D-dimensional dataset or array
  • d_slices – D slice objects (or similar)
Returns:

a reduced D-dimensional array

>>> a = numpy.array([[1, 2, 3], [4, 5, 6]])  # shape (2, 3)
>>> extract(a, slice(None), 1)
array([[2],
       [5]])
>>> extract(a, [0, 1], slice(1, 3))
array([[2, 3],
       [5, 6]])
openquake.baselib.hdf5.get_nbytes(dset)[source]

If the dataset has an attribute ‘nbytes’, return it. Otherwise get the size of the underlying array. Returns None if the dataset is actually a group.

openquake.baselib.hdf5.maybe_encode(value)[source]

If value is a sequence of strings, encode it

openquake.baselib.hdf5.parse_comment(comment)[source]

Parse a comment of the form investigation_time=50.0, imt=”PGA”, … and returns it as pairs of strings:

>>> parse_comment('''path=['b1'], time=50.0, imt="PGA"''')
[('path', ['b1']), ('time', 50.0), ('imt', 'PGA')]
openquake.baselib.hdf5.preshape(obj)[source]
Returns:the shape of obj, except the last dimension
openquake.baselib.hdf5.read_csv(fname, dtypedict={None: <class 'float'>}, renamedict={}, sep=', ')[source]
Parameters:
  • fname – a CSV file with an header and float fields
  • dtypedict – a dictionary fieldname -> dtype, None -> default
  • renamedict – aliases for the fields to rename
  • sep – separator (default comma)
Returns:

a structured array of floats

node

This module defines a Node class, together with a few conversion functions which are able to convert NRML files into hierarchical objects (DOM). That makes it easier to read and write XML from Python and viceversa. Such features are used in the command-line conversion tools. The Node class is kept intentionally similar to an Element class, however it overcomes the limitation of ElementTree: in particular a node can manage a lazy iterable of subnodes, whereas ElementTree wants to keep everything in memory. Moreover the Node class provides a convenient dot notation to access subnodes.

The Node class is instantiated with four arguments:

  1. the node tag (a mandatory string)
  2. the node attributes (a dictionary)
  3. the node value (a string or None)
  4. the subnodes (an iterable over nodes)

If a node has subnodes, its value should be None.

For instance, here is an example of instantiating a root node with two subnodes a and b:

>>> from openquake.baselib.node import Node
>>> a = Node('a', {}, 'A1')
>>> b = Node('b', {'attrb': 'B'}, 'B1')
>>> root = Node('root', nodes=[a, b])
>>> root
<root {} None ...>

Node objects can be converted into nicely indented strings:

>>> print(root.to_str())
root
  a 'A1'
  b{attrb='B'} 'B1'

The subnodes can be retrieved with the dot notation:

>>> root.a
<a {} A1 >

The value of a node can be extracted with the ~ operator:

>>> ~root.a
'A1'

If there are multiple subnodes with the same name

>>> root.append(Node('a', {}, 'A2'))  # add another 'a' node

the dot notation will retrieve the first node.

It is possible to retrieve the other nodes from the ordinal index:

>>> root[0], root[1], root[2]
(<a {} A1 >, <b {'attrb': 'B'} B1 >, <a {} A2 >)

The list of all subnodes with a given name can be retrieved as follows:

>>> list(root.getnodes('a'))
[<a {} A1 >, <a {} A2 >]

It is also possible to delete a node given its index:

>>> del root[2]

A node is an iterable object yielding its subnodes:

>>> list(root)
[<a {} A1 >, <b {'attrb': 'B'} B1 >]

The attributes of a node can be retrieved with the square bracket notation:

>>> root.b['attrb']
'B'

It is possible to add and remove attributes freely:

>>> root.b['attr'] = 'new attr'
>>> del root.b['attr']

Node objects can be easily converted into ElementTree objects:

>>> node_to_elem(root)  
<Element 'root' at ...>

Then is trivial to generate the XML representation of a node:

>>> from xml.etree import ElementTree
>>> print(ElementTree.tostring(node_to_elem(root)).decode('utf-8'))
<root><a>A1</a><b attrb="B">B1</b></root>

Generating XML files larger than the available memory requires some care. The trick is to use a node generator, such that it is not necessary to keep the entire tree in memory. Here is an example:

>>> def gen_many_nodes(N):
...     for i in xrange(N):
...         yield Node('a', {}, 'Text for node %d' % i)
>>> lazytree = Node('lazytree', {}, nodes=gen_many_nodes(10))

The lazytree object defined here consumes no memory, because the nodes are not created a instantiation time. They are created as soon as you start iterating on the lazytree. In particular list(lazytree) will generated all of them. If your goal is to store the tree on the filesystem in XML format you should use a writing routine converting a subnode at the time, without requiring the full list of them. The routines provided by ElementTree are no good, however commonlib.writers provide an StreamingXMLWriter just for that purpose.

Lazy trees should not be used unless it is absolutely necessary in order to save memory; the problem is that if you use a lazy tree the slice notation will not work (the underlying generator will not accept it); moreover it will not be possible to iterate twice on the subnodes, since the generator will be exhausted. Notice that even accessing a subnode with the dot notation will avance the generator. Finally, nodes containing lazy nodes will not be pickleable.

class openquake.baselib.node.Node(fulltag, attrib=None, text=None, nodes=None, lineno=None)[source]

Bases: object

A class to make it easy to edit hierarchical structures with attributes, such as XML files. Node objects must be pickleable and must consume as little memory as possible. Moreover they must be easily converted from and to ElementTree objects. The advantage over ElementTree objects is that subnodes can be lazily generated and that they can be accessed with the dot notation.

append(node)[source]

Append a new subnode

attrib
get(attr, value=None)[source]

Get the given attr; if missing, returns value or None.

getnodes(name)[source]

Return the direct subnodes with name ‘name’

lineno
nodes
tag
text
to_str(expandattrs=True, expandvals=True)[source]

Convert the node into a string, intended for testing/debugging purposes

Parameters:
  • expandattrs – print the values of the attributes if True, else print only the names
  • expandvals – print the values if True, else print only the tag names
class openquake.baselib.node.SourceLineParser[source]

Bases: xml.etree.ElementTree.XMLParser

A custom parser managing line numbers: works for Python <= 3.3

class openquake.baselib.node.StreamingXMLWriter(bytestream, indent=4, encoding='utf-8', nsmap=None)[source]

Bases: object

A bynary stream XML writer. The typical usage is something like this:

with StreamingXMLWriter(output_file) as writer:
    writer.start_tag('root')
    for node in nodegenerator():
        writer.serialize(node)
    writer.end_tag('root')
emptyElement(name, attrs)[source]

Add an empty element (may have attributes)

end_tag(name)[source]

Close an XML tag

serialize(node)[source]

Serialize a node object (typically an ElementTree object)

shorten(tag)[source]

Get the short representation of a fully qualified tag

Parameters:tag (str) – a (fully qualified or not) XML tag
start_tag(name, attrs=None)[source]

Open an XML tag

class openquake.baselib.node.ValidatingXmlParser(validators, stop=None)[source]

Bases: object

Validating XML Parser based on Expat. It has two methods .parse_file and .parse_bytes returning a validated Node object.

Parameters:
  • validators – a dictionary of validation functions
  • stop – the tag where to stop the parsing (if any)
exception Exit[source]

Bases: Exception

Raised when the parsing is stopped before the end on purpose

parse_bytes(bytestr, isfinal=True)[source]

Parse a byte string. If the string is very large, split it in chuncks and parse each chunk with isfinal=False, then parse an empty chunk with isfinal=True.

parse_file(file_or_fname)[source]

Parse a file or a filename

openquake.baselib.node.context(fname, node)[source]

Context manager managing exceptions and adding line number of the current node and name of the current file to the error message.

Parameters:
  • fname – the current file being processed
  • node – the current node being processed
openquake.baselib.node.floatformat(fmt_string)[source]

Context manager to change the default format string for the function openquake.commonlib.writers.scientificformat().

Parameters:fmt_string – the format to use; for instance ‘%13.9E’
openquake.baselib.node.fromstring(text)[source]

Parse an XML string and return a tree

openquake.baselib.node.iterparse(source, events=('end', ), remove_comments=True, **kw)[source]

Thin wrapper around ElementTree.iterparse

openquake.baselib.node.node_copy(node, nodefactory=<class 'openquake.baselib.node.Node'>)[source]

Make a deep copy of the node

openquake.baselib.node.node_display(root, expandattrs=False, expandvals=False, output=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Write an indented representation of the Node object on the output; this is intended for testing/debugging purposes.

Parameters:
  • root – a Node object
  • expandattrs (bool) – if True, the values of the attributes are also printed, not only the names
  • expandvals (bool) – if True, the values of the tags are also printed, not only the names.
  • output – stream where to write the string representation of the node
openquake.baselib.node.node_from_dict(dic, nodefactory=<class 'openquake.baselib.node.Node'>)[source]

Convert a (nested) dictionary into a Node object.

openquake.baselib.node.node_from_elem(elem, nodefactory=<class 'openquake.baselib.node.Node'>, lazy=())[source]

Convert (recursively) an ElementTree object into a Node object.

openquake.baselib.node.node_from_ini(ini_file, nodefactory=<class 'openquake.baselib.node.Node'>, root_name='ini')[source]

Convert a .ini file into a Node object.

Parameters:ini_file – a filename or a file like object in read mode
openquake.baselib.node.node_from_xml(xmlfile, nodefactory=<class 'openquake.baselib.node.Node'>)[source]

Convert a .xml file into a Node object.

Parameters:xmlfile – a file name or file object open for reading
openquake.baselib.node.node_to_dict(node)[source]

Convert a Node object into a (nested) dictionary with attributes tag, attrib, text, nodes.

Parameters:node – a Node-compatible object
openquake.baselib.node.node_to_elem(root)[source]

Convert (recursively) a Node object into an ElementTree object.

openquake.baselib.node.node_to_ini(node, output=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Convert a Node object with the right structure into a .ini file.

Params node:a Node object
Params output:a file-like object opened in write mode
openquake.baselib.node.node_to_xml(node, output=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, nsmap=None)[source]

Convert a Node object into a pretty .xml file without keeping everything in memory. If you just want the string representation use tostring(node).

Parameters:
  • node – a Node-compatible object (ElementTree nodes are fine)
  • nsmap – if given, shorten the tags with aliases
openquake.baselib.node.parse(source, remove_comments=True, **kw)[source]

Thin wrapper around ElementTree.parse

openquake.baselib.node.pprint(self, stream=None, indent=1, width=80, depth=None)[source]

Pretty print the underlying literal Python object

openquake.baselib.node.read_nodes(fname, filter_elem, nodefactory=<class 'openquake.baselib.node.Node'>, remove_comments=True)[source]

Convert an XML file into a lazy iterator over Node objects satifying the given specification, i.e. a function element -> boolean.

Parameters:
  • fname – file name of file object
  • filter_elem – element specification

In case of errors, add the file name to the error message.

openquake.baselib.node.scientificformat(value, fmt='%13.9E', sep=' ', sep2=':')[source]
Parameters:
  • value – the value to convert into a string
  • fmt – the formatting string to use for float values
  • sep – separator to use for vector-like values
  • sep2 – second separator to use for matrix-like values

Convert a float or an array into a string by using the scientific notation and a fixed precision (by default 10 decimal digits). For instance:

>>> scientificformat(-0E0)
'0.000000000E+00'
>>> scientificformat(-0.004)
'-4.000000000E-03'
>>> scientificformat([0.004])
'4.000000000E-03'
>>> scientificformat([0.01, 0.02], '%10.6E')
'1.000000E-02 2.000000E-02'
>>> scientificformat([[0.1, 0.2], [0.3, 0.4]], '%4.1E')
'1.0E-01:2.0E-01 3.0E-01:4.0E-01'
openquake.baselib.node.striptag(tag)[source]

Get the short representation of a fully qualified tag

Parameters:tag (str) – a (fully qualified or not) XML tag
openquake.baselib.node.to_literal(self)[source]

Convert the node into a literal Python object

openquake.baselib.node.tostring(node, indent=4, nsmap=None)[source]

Convert a node into an XML string by using the StreamingXMLWriter. This is useful for testing purposes.

Parameters:
  • node – a node object (typically an ElementTree object)
  • indent – the indentation to use in the XML (default 4 spaces)

parallel

The Starmap API

There are several good libraries to manage parallel programming in Python, both in the standard library and in third party packages. Since we are not interested in reinventing the wheel, OpenQuake does not provide any new parallel library; however, it does offer some glue code so that you can use over your library of choice. Currently threading, multiprocessing, zmq and celery are supported. Moreover, openquake.baselib.parallel offers some additional facilities that make it easier to parallelize scientific computations, i.e. embarrassingly parallel problems.

Typically one wants to apply a callable to a list of arguments in parallel, and then combine together the results. This is known as a MapReduce problem. As a simple example, we will consider the problem of counting the letters in a text, by using the following count function:

The collections.Counter class works sequentially, and can solve the problem in parallel by using openquake.baselib.parallel.Starmap:

>>> arglist = [('hello',), ('world',)]  # list of arguments
>>> smap = Starmap(count, arglist)  # Starmap instance, nothing started yet
>>> sorted(smap.reduce().items())  # build the counts per letter
[('d', 1), ('e', 1), ('h', 1), ('l', 3), ('o', 2), ('r', 1), ('w', 1)]

A Starmap object is an iterable: when iterating over it produces task results. It also has a reduce method similar to functools.reduce with sensible defaults:

  1. the default aggregation function is add, so there is no need to specify it
  2. the default accumulator is an empty accumulation dictionary (see openquake.baselib.AccumDict) working as a Counter, so there is no need to specify it.

You can of course override the defaults, so if you really want to return a Counter you can do

>>> res = Starmap(count, arglist).reduce(acc=collections.Counter())

In the engine we use nearly always callables that return dictionaries and we aggregate nearly always with the addition operator, so such defaults are very convenient. You are encouraged to do the same, since we found that approach to be very flexible. Typically in a scientific application you will return a dictionary of numpy arrays.

The parallelization algorithm used by Starmap will depend on the environment variable OQ_DISTRIBUTE. Here are the possibilities available at the moment:

OQ_DISTRIBUTE not set or set to “processpool”:
use multiprocessing
OQ_DISTRIBUTE set to “no”:
disable the parallelization, useful for debugging
OQ_DISTRIBUTE set to “celery”:
use celery, useful if you have multiple machines in a cluster
OQ_DISTRIBUTE set tp “zmq”
use the zmq concurrency mechanism (experimental)

There is also an OQ_DISTRIBUTE = “threadpool”; however the performance of using threads instead of processes is normally bad for the kind of applications we are interested in (CPU-dominated, which large tasks such that the time to spawn a new process is negligible with respect to the time to perform the task), so it is not recommended.

If you are using a pool, is always a good idea to cleanup resources at the end with

>>> Starmap.shutdown()

Starmap.shutdown is always defined. It does nothing if there is no pool, but it is still better to call it: in the future, you may change idea and use another parallelization strategy requiring cleanup. In this way your code is future-proof.

Monitoring

A major feature of the Starmap API is the ability to monitor the time spent in each task and the memory allocated. Such information is written into an HDF5 file that can be provided by the user or autogenerated. To autogenerate the file you can use openquake.baselib.datastore.hdf5new() which will create a file named calc_XXX.hdf5 in your $OQ_DATA directory (if the environment variable is not set, the engine will use $HOME/oqdata). Here is an example of usage:

>>> from openquake.baselib.datastore import hdf5new
>>> h5 = hdf5new()
>>> smap = Starmap(count, [['hello'], ['world']], h5=h5)
>>> print(sorted(smap.reduce().items()))
[('d', 1), ('e', 1), ('h', 1), ('l', 3), ('o', 2), ('r', 1), ('w', 1)]

After the calculation, or even while the calculation is running, you can open the calculation file for reading and extract the performance information for it. The engine provides a command to do that, oq show performance, but you can also get it manually, with a call to openquake.baselib.performance.performance_view(h5) which will return the performance information as a numpy array:

>>> from openquake.baselib.performance import performance_view
>>> performance_view(h5).dtype.names
('operation', 'time_sec', 'memory_mb', 'counts')
>>> h5.close()

The four columns are as follows:

operation:
the name of the function running in parallel (in this case ‘count’)
time_sec:
the cumulative time in second spent running the function
memory_mb:
the maximum allocated memory per core
counts:
the number of times the function was called (in this case 2)

The Starmap.apply API

The Starmap class has a very convenient classmethod Starmap.apply which is used in several places in the engine. Starmap.apply is useful when you have a sequence of objects that you want to split in homogenous chunks and then apply a callable to each chunk (in parallel). For instance, in the letter counting example discussed before, Starmap.apply could be used as follows:

>>> text = 'helloworld'  # sequence of characters
>>> res3 = Starmap.apply(count, (text,)).reduce()
>>> assert res3 == res

The API of Starmap.apply is designed to extend the one of apply, a builtin of Python 2; the second argument is the tuple of arguments passed to the first argument. The difference with apply is that Starmap.apply returns a Starmap object so that nothing is actually done until you iterate on it (reduce is doing that).

How many chunks will be produced? That depends on the parameter concurrent_tasks; it it is not passed, it has a default of 5 times the number of cores in your machine - as returned by os.cpu_count() - and Starmap.apply will try to produce a number of chunks close to that number. The nice thing is that it is also possible to pass a weight function. Suppose for instance that instead of a list of letters you have a list of seismic sources: some sources requires a long computation time (such as ComplexFaultSources), some requires a short computation time (such as PointSources). By giving an heuristic weight to the different sources it is possible to produce chunks with nearly homogeneous weight; in particular PointSource tasks will contain a lot more sources than tasks with ComplexFaultSources.

It is essential in large computations to have a homogeneous task distribution, otherwise you will end up having a big task dominating the computation time (i.e. you may have 1000 cores of which 999 are free, having finished all the short tasks, but you have to wait for days for the single core processing the slow task). The OpenQuake engine does a great deal of work trying to split slow sources in more manageable fast sources.

class openquake.baselib.parallel.FakePickle(sentbytes)[source]

Bases: object

unpickle()[source]
class openquake.baselib.parallel.IterResult(iresults, taskname, argnames, sent, h5)[source]

Bases: object

Parameters:
  • iresults – an iterator over Result objects
  • taskname – the name of the task
  • done_total – a function returning the number of done tasks and the total
  • sent – a nested dictionary name -> {argname: number of bytes sent}
  • progress – a logging function for the progress report
  • hdf5path – a path where to store persistently the performance info
reduce(agg=<built-in function add>, acc=None)[source]
classmethod sum(iresults)[source]

Sum the data transfer information of a set of results

class openquake.baselib.parallel.Pickled(obj)[source]

Bases: object

An utility to manually pickling/unpickling objects. The reason is that celery does not use the HIGHEST_PROTOCOL, so relying on celery is slower. Moreover Pickled instances have a nice string representation and length giving the size of the pickled bytestring.

Parameters:obj – the object to pickle
unpickle()[source]

Unpickle the underlying object

class openquake.baselib.parallel.Result(val, mon, tb_str='', msg='')[source]

Bases: object

Parameters:
  • val – value to return or exception instance
  • mon – Monitor instance
  • tb_str – traceback string (empty if there was no exception)
  • msg – message string (default empty)
func = None
get()[source]

Returns the underlying value or raise the underlying exception

classmethod new(func, args, mon, sentbytes=0)[source]
Returns:a new Result instance
class openquake.baselib.parallel.Starmap(task_func, task_args=(), distribute=None, progress=<function info>, h5=None, num_cores=None)[source]

Bases: object

classmethod apply(task, args, concurrent_tasks=None, maxweight=None, weight=<function Starmap.<lambda>>, key=<function Starmap.<lambda>>, distribute=None, progress=<function info>, h5=None, num_cores=None)[source]

Apply a task to a tuple of the form (sequence, *other_args) by first splitting the sequence in chunks, according to the weight of the elements and possibly to a key (see :func: openquake.baselib.general.split_in_blocks).

Parameters:
  • task – a task to run in parallel
  • args – the arguments to be passed to the task function
  • concurrent_tasks – hint about how many tasks to generate
  • maxweight – if not None, used to split the tasks
  • weight – function to extract the weight of an item in arg0
  • key – function to extract the kind of an item in arg0
  • distribute – if not given, inferred from OQ_DISTRIBUTE
  • progress – logging function to use (default logging.info)
  • h5 – an open hdf5.File where to store the performance info
  • num_cores – the number of available cores
Returns:

an IterResult object

get_results()[source]
Returns:an IterResult instance
classmethod init(poolsize=None, distribute=None)[source]
log_percent()[source]

Log the progress of the computation in percentage

num_cores = None
pids = ()
reduce(agg=<built-in function add>, acc=None)[source]

Submit all tasks and reduce the results

running_tasks = []
classmethod shutdown()[source]
submit(args, func=None, monitor=None)[source]

Submit the given arguments to the underlying task

submit_all()[source]
Returns:an IterResult object
openquake.baselib.parallel.celery_submit(self, func, args, monitor)[source]
openquake.baselib.parallel.check_mem_usage(soft_percent=None, hard_percent=None)[source]

Display a warning if we are running out of memory

openquake.baselib.parallel.count(word)[source]

Used as example in the documentation

openquake.baselib.parallel.dask_submit(self, func, args, monitor)[source]
openquake.baselib.parallel.get_pickled_sizes(obj)[source]

Return the pickled sizes of an object and its direct attributes, ordered by decreasing size. Here is an example:

>> total_size, partial_sizes = get_pickled_sizes(Monitor(‘’)) >> total_size 345 >> partial_sizes [(‘_procs’, 214), (‘exc’, 4), (‘mem’, 4), (‘start_time’, 4), (‘_start_time’, 4), (‘duration’, 4)]

Notice that the sizes depend on the operating system and the machine.

openquake.baselib.parallel.getargnames(task_func)[source]
openquake.baselib.parallel.init_workers()[source]

Waiting function, used to wake up the process pool

openquake.baselib.parallel.no_submit(self, func, args, monitor)[source]
openquake.baselib.parallel.oq_distribute(task=None)[source]
Returns:the value of OQ_DISTRIBUTE or ‘processpool’
openquake.baselib.parallel.pickle_sequence(objects)[source]

Convert an iterable of objects into a list of pickled objects. If the iterable contains copies, the pickling will be done only once. If the iterable contains objects already pickled, they will not be pickled again.

Parameters:objects – a sequence of objects to pickle
openquake.baselib.parallel.processpool_submit(self, func, args, monitor)[source]
openquake.baselib.parallel.safely_call(func, args, task_no=0, mon=<Monitor [jenkins]>)[source]

Call the given function with the given arguments safely, i.e. by trapping the exceptions. Return a pair (result, exc_type) where exc_type is None if no exceptions occur, otherwise it is the exception class and the result is a string containing error message and traceback.

Parameters:
  • func – the function to call
  • args – the arguments
  • task_no – the task number
  • mon – a monitor
openquake.baselib.parallel.sequential_apply(task, args, concurrent_tasks=32, maxweight=None, weight=<function <lambda>>, key=<function <lambda>>, progress=<function info>)[source]

Apply sequentially task to args by splitting args[0] in blocks

openquake.baselib.parallel.split_task(func, *args, duration=1000, weight=operator.attrgetter('weight'))[source]
Parameters:
  • func – a task function with a monitor as last argument
  • args – arguments of the task function
  • duration – split the task if it exceeds the duration
  • weight – weight function for the elements in args[0]
Yields:

a partial result, 0 or more task objects, 0 or 1 partial result

openquake.baselib.parallel.threadpool_submit(self, func, args, monitor)[source]
openquake.baselib.parallel.zmq_submit(self, func, args, monitor)[source]

performance

class openquake.baselib.performance.Monitor(operation='', measuremem=False, inner_loop=False, h5=None)[source]

Bases: object

Measure the resident memory occupied by a list of processes during the execution of a block of code. Should be used as a context manager, as follows:

with Monitor('do_something') as mon:
    do_something()
print mon.mem

At the end of the block the Monitor object will have the following 5 public attributes:

.start_time: when the monitor started (a datetime object) .duration: time elapsed between start and stop (in seconds) .exc: usually None; otherwise the exception happened in the with block .mem: the memory delta in bytes

The behaviour of the Monitor can be customized by subclassing it and by overriding the method on_exit(), called at end and used to display or store the results of the analysis.

NB: if the .address attribute is set, it is possible for the monitor to send commands to that address, assuming there is a multiprocessing.connection.Listener listening.

address = None
authkey = None
calc_id = None
dt

Last time interval measured

flush(h5)[source]

Save the measurements on the performance file

get_data()[source]
Returns:an array of dtype perf_dt, with the information of the monitor (operation, time_sec, memory_mb, counts); the lenght of the array can be 0 (for counts=0) or 1 (otherwise).
measure_mem()[source]

A memory measurement (in bytes)

new(operation='no operation', **kw)[source]

Return a copy of the monitor usable for a different operation.

reset()[source]

Reset duration, mem, counts

save_task_info(h5, res, name, mem_gb=0)[source]

Called by parallel.IterResult.

Parameters:
  • h5 – where to save the info
  • res – a Result object
  • name – name of the task function
  • mem_gb – memory consumption at the saving time (optional)
start_time

Datetime instance recording when the monitoring started

openquake.baselib.performance.init_performance(hdf5file, swmr=False)[source]
Parameters:hdf5file – file name of hdf5.File instance
openquake.baselib.performance.memory_rss(pid)[source]
Returns:the RSS memory allocated by a process
openquake.baselib.performance.performance_view(dstore)[source]

Returns the performance view as a numpy array.

python3compat

Compatibility layer for Python 2 and 3. Mostly copied from six and future, but reduced to the subset of utilities needed by GEM. This is done to avoid an external dependency.

openquake.baselib.python3compat.decode(val)[source]

Decode an object assuming the encoding is UTF-8.

Param:a unicode or bytes object
Returns:a unicode object
openquake.baselib.python3compat.encode(val)[source]

Encode a string assuming the encoding is UTF-8.

Param:a unicode or bytes object
Returns:bytes
openquake.baselib.python3compat.raise_(tp, value=None, tb=None)[source]

A function that matches the Python 2.x raise statement. This allows re-raising exceptions with the cls value and traceback on Python 2 and 3.

openquake.baselib.python3compat.round(x, d=0)[source]
openquake.baselib.python3compat.with_metaclass(meta, *bases)[source]

Returns an instance of meta inheriting from the given bases. To be used to replace the __metaclass__ syntax.

openquake.baselib.python3compat.zip(arg, *args)[source]

runtests

class openquake.baselib.runtests.TestLoader[source]

Bases: object

loadTestsFromNames(suitename, module=None)[source]
class openquake.baselib.runtests.TestResult(stream, descriptions, verbosity)[source]

Bases: unittest.runner.TextTestResult

save_times(fname)[source]
startTest(test)[source]
stopTest(test)[source]
timedict = {}
openquake.baselib.runtests.addTest(self, test)[source]

sap

Here is a minimal example of usage:

>>> from openquake.baselib import sap
>>> def fun(input, inplace, output=None, out='/tmp'):
...     'Example'
...     for item in sorted(locals().items()):
...         print('%s = %s' % item)

>>> p = sap.script(fun)
>>> p.arg('input', 'input file or archive')
>>> p.flg('inplace', 'convert inplace')
>>> p.arg('output', 'output archive')
>>> p.opt('out', 'optional output file')

>>> p.callfunc(['a'])
inplace = False
input = a
out = /tmp
output = None

>>> p.callfunc(['a', 'b', '-i', '-o', 'OUT'])
inplace = True
input = a
out = OUT
output = b

Parsers can be composed too.

class openquake.baselib.sap.Script(func, name=None, parentparser=None, help=True)[source]

Bases: object

A simple way to define command processors based on argparse. Each parser is associated to a function and parsers can be composed together, by dispatching on a given name (if not given, the function name is used).

arg(name, help, type=None, choices=None, metavar=None, nargs=None)[source]

Describe a positional argument

callfunc(argv=None)[source]

Parse the argv list and extract a dictionary of arguments which is then passed to the function underlying the script.

check_arguments()[source]

Make sure all arguments have a specification

flg(name, help, abbrev=None)[source]

Describe a flag

group(descr)[source]

Added a new group of arguments with the given description

help()[source]

Return the help message as a string

opt(name, help, abbrev=None, type=None, choices=None, metavar=None, nargs=None)[source]

Describe an option

openquake.baselib.sap.compose(scripts, name='main', description=None, prog=None, version=None)[source]

Collects together different scripts and builds a single script dispatching to the subparsers depending on the first argument, i.e. the name of the subparser to invoke.

Parameters:
  • scripts – a list of script instances
  • name – the name of the composed parser
  • description – description of the composed parser
  • prog – name of the script printed in the usage message
  • version – version of the script printed with –version
openquake.baselib.sap.get_parentparser(parser, description=None, help=True)[source]
Parameters:
  • parserargparse.ArgumentParser instance or None
  • description – string used to build a new parser if parser is None
  • help – flag used to build a new parser if parser is None
Returns:

if parser is None the new parser; otherwise the .parentparser attribute (if set) or the parser itself (if not set)

openquake.baselib.sap.script(func)[source]
openquake.baselib.sap.str_choices(choices)[source]

Returns {choice1, …, choiceN} or the empty string

slots