Source code for openquake.calculators.extract
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2017-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.
from urllib.parse import parse_qs
from functools import lru_cache
import operator
import logging
import json
import gzip
import ast
import io
import requests
from h5py._hl.dataset import Dataset
from h5py._hl.group import Group
import numpy
import pandas
from scipy.cluster.vq import kmeans2
from openquake.baselib import config, hdf5, general, writers
from openquake.baselib.hdf5 import ArrayWrapper
from openquake.baselib.general import group_array, println
from openquake.baselib.python3compat import encode, decode
from openquake.hazardlib import logictree
from openquake.hazardlib.contexts import (
    ContextMaker, read_cmakers, read_ctx_by_grp)
from openquake.hazardlib.calc import disagg, stochastic, filters
from openquake.hazardlib.stats import calc_stats
from openquake.hazardlib.source import rupture
from openquake.risklib.scientific import LOSSTYPE, LOSSID
from openquake.risklib.asset import tagset
from openquake.commonlib import calc, util, oqvalidation, datastore
from openquake.calculators import getters
U16 = numpy.uint16
U32 = numpy.uint32
I64 = numpy.int64
F32 = numpy.float32
F64 = numpy.float64
TWO30 = 2 ** 30
TWO32 = 2 ** 32
ALL = slice(None)
CHUNKSIZE = 4*1024**2  # 4 MB
SOURCE_ID = stochastic.rupture_dt['source_id']
memoized = lru_cache()
[docs]def lit_eval(string):
    """
    `ast.literal_eval` the string if possible, otherwise returns it unchanged
    """
    try:
        return ast.literal_eval(string)
    except (ValueError, SyntaxError):
        return string
[docs]def get_info(dstore):
    """
    :returns: a dict with 'stats', 'loss_types', 'num_rlzs', 'tagnames', etc
    """
    oq = dstore['oqparam']
    stats = {stat: s for s, stat in enumerate(oq.hazard_stats())}
    loss_types = {lt: li for li, lt in enumerate(oq.loss_dt().names)}
    imt = {imt: i for i, imt in enumerate(oq.imtls)}
    num_rlzs = len(dstore['weights'])
    return dict(stats=stats, num_rlzs=num_rlzs, loss_types=loss_types,
                imtls=oq.imtls, investigation_time=oq.investigation_time,
                poes=oq.poes, imt=imt, uhs_dt=oq.uhs_dt(),
                limit_states=oq.limit_states,
                tagnames=tagset(oq.aggregate_by))
def _normalize(kinds, info):
    a = []
    b = []
    stats = info['stats']
    rlzs = False
    for kind in kinds:
        if kind.startswith('rlz-'):
            rlzs = True
            a.append(int(kind[4:]))
            b.append(kind)
        elif kind in stats:
            a.append(stats[kind])
            b.append(kind)
        elif kind == 'stats':
            a.extend(stats.values())
            b.extend(stats)
        elif kind == 'rlzs':
            rlzs = True
            a.extend(range(info['num_rlzs']))
            b.extend(['rlz-%03d' % r for r in range(info['num_rlzs'])])
    return a, b, rlzs
[docs]def parse(query_string, info={}):
    """
    :returns: a normalized query_dict as in the following examples:
    >>> parse('kind=stats', {'stats': {'mean': 0, 'max': 1}})
    {'kind': ['mean', 'max'], 'k': [0, 1], 'rlzs': False}
    >>> parse('kind=rlzs', {'stats': {}, 'num_rlzs': 3})
    {'kind': ['rlz-000', 'rlz-001', 'rlz-002'], 'k': [0, 1, 2], 'rlzs': True}
    >>> parse('kind=mean', {'stats': {'mean': 0, 'max': 1}})
    {'kind': ['mean'], 'k': [0], 'rlzs': False}
    >>> parse('kind=rlz-3&imt=PGA&site_id=0', {'stats': {}})
    {'kind': ['rlz-3'], 'imt': ['PGA'], 'site_id': [0], 'k': [3], 'rlzs': True}
    """
    qdic = parse_qs(query_string)
    for key, val in sorted(qdic.items()):
        # convert site_id to an int, loss_type to an int, etc
        if key == 'loss_type':
            qdic[key] = [LOSSID[k] for k in val]
            qdic['lt'] = val
        else:
            qdic[key] = [lit_eval(v) for v in val]
    if info:
        qdic['k'], qdic['kind'], qdic['rlzs'] = _normalize(qdic['kind'], info)
    return qdic
[docs]def sanitize(query_string):
    """
    Replace `/`, `?`, `&` characters with underscores and '=' with '-'
    """
    return query_string.replace(
        '/', '_').replace('?', '_').replace('&', '_').replace('=', '-')
[docs]def barray(iterlines):
    """
    Array of bytes
    """
    lst = [line.encode('utf-8') for line in iterlines]
    arr = numpy.array(lst)
    return arr
[docs]def avglosses(dstore, loss_types, kind):
    """
    :returns: an array of average losses of shape (A, R, L)
    """
    lst = []
    for loss_type in loss_types:
        lst.append(dstore['avg_losses-%s/%s' % (kind, loss_type)][()])
    # shape L, A, R -> A, R, L
    return numpy.array(lst).transpose(1, 2, 0)
[docs]def extract_(dstore, dspath):
    """
    Extracts an HDF5 path object from the datastore, for instance
    extract(dstore, 'sitecol').
    """
    obj = dstore[dspath]
    if isinstance(obj, Dataset):
        return ArrayWrapper(obj[()], obj.attrs)
    elif isinstance(obj, Group):
        return ArrayWrapper(numpy.array(list(obj)), obj.attrs)
    else:
        return obj
[docs]class Extract(dict):
    """
    A callable dictionary of functions with a single instance called
    `extract`. Then `extract(dstore, fullkey)` dispatches to the function
    determined by the first part of `fullkey` (a slash-separated
    string) by passing as argument the second part of `fullkey`.
    For instance extract(dstore, 'sitecol').
    """
[docs]    def add(self, key, cache=False):
        def decorator(func):
            self[key] = memoized(func) if cache else func
            return func
        return decorator
    def __call__(self, dstore, key):
        if '/' in key:
            k, v = key.split('/', 1)
            data = self[k](dstore, v)
        elif '?' in key:
            k, v = key.split('?', 1)
            data = self[k](dstore, v)
        elif key in self:
            data = self[key](dstore, '')
        else:
            data = extract_(dstore, key)
        if isinstance(data, pandas.DataFrame):
            return data
        return ArrayWrapper.from_(data)
extract = Extract()
[docs]@extract.add('oqparam')
def extract_oqparam(dstore, dummy):
    """
    Extract job parameters as a JSON npz. Use it as /extract/oqparam
    """
    js = hdf5.dumps(vars(dstore['oqparam']))
    return ArrayWrapper((), {'json': js})
# used by the QGIS plugin in scenario
[docs]@extract.add('realizations')
def extract_realizations(dstore, dummy):
    """
    Extract an array of realizations. Use it as /extract/realizations
    """
    dt = [('rlz_id', U32), ('branch_path', '<S100'), ('weight', F32)]
    oq = dstore['oqparam']
    scenario = 'scenario' in oq.calculation_mode
    full_lt = dstore['full_lt']
    rlzs = full_lt.rlzs
    # NB: branch_path cannot be of type hdf5.vstr otherwise the conversion
    # to .npz (needed by the plugin) would fail
    arr = numpy.zeros(len(rlzs), dt)
    arr['rlz_id'] = rlzs['ordinal']
    arr['weight'] = rlzs['weight']
    if scenario and len(full_lt.trts) == 1:  # only one TRT
        gsims = dstore.getitem('full_lt/gsim_lt')['uncertainty']
        if 'shakemap' in oq.inputs:
            gsims = ["[FromShakeMap]"]
        # NOTE: repr(gsim) has a form like "b'[ChiouYoungs2008]'"
        arr['branch_path'] = ['"%s"' % repr(gsim)[2:-1].replace('"', '""')
                              for gsim in gsims]  # quotes Excel-friendly
    else:  # use the compact representation for the branch paths
        arr['branch_path'] = encode(rlzs['branch_path'])
    return arr
[docs]@extract.add('weights')
def extract_weights(dstore, what):
    """
    Extract the realization weights
    """
    rlzs = dstore['full_lt'].get_realizations()
    return numpy.array([rlz.weight['weight'] for rlz in rlzs])
[docs]@extract.add('gsims_by_trt')
def extract_gsims_by_trt(dstore, what):
    """
    Extract the dictionary gsims_by_trt
    """
    return ArrayWrapper((), dstore['full_lt'].gsim_lt.values)
[docs]@extract.add('exposure_metadata')
def extract_exposure_metadata(dstore, what):
    """
    Extract the loss categories and the tags of the exposure.
    Use it as /extract/exposure_metadata
    """
    dic = {}
    dic1, dic2 = dstore['assetcol/tagcol'].__toh5__()
    dic.update(dic1)
    dic.update(dic2)
    if 'asset_risk' in dstore:
        dic['multi_risk'] = sorted(
            set(dstore['asset_risk'].dtype.names) -
            set(dstore['assetcol/array'].dtype.names))
    dic['names'] = [name for name in dstore['assetcol/array'].dtype.names
                    if name.startswith(('value-', 'occupants'))
                    and name != 'occupants_avg']
    return ArrayWrapper((), dict(json=hdf5.dumps(dic)))
[docs]@extract.add('assets')
def extract_assets(dstore, what):
    """
    Extract an array of assets, optionally filtered by tag.
    Use it as /extract/assets?taxonomy=RC&taxonomy=MSBC&occupancy=RES
    """
    qdict = parse(what)
    dic = {}
    dic1, dic2 = dstore['assetcol/tagcol'].__toh5__()
    dic.update(dic1)
    dic.update(dic2)
    arr = dstore['assetcol/array'][()]
    for tag, vals in qdict.items():
        cond = numpy.zeros(len(arr), bool)
        for val in vals:
            tagidx, = numpy.where(dic[tag] == val)
            cond |= arr[tag] == tagidx
        arr = arr[cond]
    return ArrayWrapper(arr, dict(json=hdf5.dumps(dic)))
[docs]@extract.add('asset_risk')
def extract_asset_risk(dstore, what):
    """
    Extract an array of assets + risk fields, optionally filtered by tag.
    Use it as /extract/asset_risk?taxonomy=RC&taxonomy=MSBC&occupancy=RES
    """
    qdict = parse(what)
    dic = {}
    dic1, dic2 = dstore['assetcol/tagcol'].__toh5__()
    dic.update(dic1)
    dic.update(dic2)
    arr = dstore['asset_risk'][()]
    names = list(arr.dtype.names)
    for i, name in enumerate(names):
        if name == 'id':
            names[i] = 'asset_id'  # for backward compatibility
    arr.dtype.names = names
    for tag, vals in qdict.items():
        cond = numpy.zeros(len(arr), bool)
        for val in vals:
            tagidx, = numpy.where(dic[tag] == val)
            cond |= arr[tag] == tagidx
        arr = arr[cond]
    return ArrayWrapper(arr, dict(json=hdf5.dumps(dic)))
[docs]@extract.add('asset_tags')
def extract_asset_tags(dstore, tagname):
    """
    Extract an array of asset tags for the given tagname. Use it as
    /extract/asset_tags or /extract/asset_tags/taxonomy
    """
    tagcol = dstore['assetcol/tagcol']
    if tagname:
        yield tagname, barray(tagcol.gen_tags(tagname))
    for tagname in tagcol.tagnames:
        yield tagname, barray(tagcol.gen_tags(tagname))
[docs]def get_sites(sitecol, complete=True):
    """
    :returns:
        a lon-lat or lon-lat-depth array depending if the site collection
        is at sea level or not; if there is a custom_site_id, prepend it
    """
    sc = sitecol.complete if complete else sitecol
    if sc.at_sea_level():
        fields = ['lon', 'lat']
    else:
        fields = ['lon', 'lat', 'depth']
    if 'custom_site_id' in sitecol.array.dtype.names:
        fields.insert(0, 'custom_site_id')
    return sitecol[fields]
[docs]def hazard_items(dic, sites, *extras, **kw):
    """
    :param dic: dictionary of arrays of the same shape
    :param sites: a sites array with lon, lat fields of the same length
    :param extras: optional triples (field, dtype, values)
    :param kw: dictionary of parameters (like investigation_time)
    :returns: a list of pairs (key, value) suitable for storage in .npz format
    """
    for item in kw.items():
        yield item
    try:
        field = next(iter(dic))
    except StopIteration:
        return
    arr = dic[field]
    dtlist = [(str(field), arr.dtype) for field in sorted(dic)]
    for field, dtype, values in extras:
        dtlist.append((str(field), dtype))
    array = numpy.zeros(arr.shape, dtlist)
    for field in dic:
        array[field] = dic[field]
    for field, dtype, values in extras:
        array[field] = values
    yield 'all', util.compose_arrays(sites, array)
def _get_dict(dstore, name, imtls, stats):
    dic = {}
    dtlist = []
    for imt, imls in imtls.items():
        dt = numpy.dtype([(str(iml), F32) for iml in imls])
        dtlist.append((imt, dt))
    for s, stat in enumerate(stats):
        dic[stat] = dstore[name][:, s].flatten().view(dtlist)
    return dic
[docs]@extract.add('sitecol')
def extract_sitecol(dstore, what):
    """
    Extracts the site collection array (not the complete object, otherwise it
    would need to be pickled).
    Use it as /extract/sitecol?field=vs30
    """
    qdict = parse(what)
    if 'field' in qdict:
        [f] = qdict['field']
        return dstore['sitecol'][f]
    return dstore['sitecol'].array
def _items(dstore, name, what, info):
    params = parse(what, info)
    filt = {}
    if 'site_id' in params:
        filt['site_id'] = params['site_id'][0]
    if 'imt' in params:
        [imt] = params['imt']
        filt['imt'] = imt
    if params['rlzs']:
        for k in params['k']:
            filt['rlz_id'] = k
            yield 'rlz-%03d' % k, dstore.sel(name + '-rlzs', **filt)[:, 0]
    else:
        stats = list(info['stats'])
        for k in params['k']:
            filt['stat'] = stat = stats[k]
            yield stat, dstore.sel(name + '-stats', **filt)[:, 0]
    yield from params.items()
[docs]@extract.add('hcurves')
def extract_hcurves(dstore, what):
    """
    Extracts hazard curves. Use it as /extract/hcurves?kind=mean&imt=PGA or
    /extract/hcurves?kind=rlz-0&imt=SA(1.0)
    """
    info = get_info(dstore)
    if what == '':  # npz exports for QGIS
        sitecol = dstore['sitecol']
        sites = get_sites(sitecol, complete=False)
        dic = _get_dict(dstore, 'hcurves-stats', info['imtls'], info['stats'])
        yield from hazard_items(
            dic, sites, investigation_time=info['investigation_time'])
        return
    yield from _items(dstore, 'hcurves', what, info)
[docs]@extract.add('hmaps')
def extract_hmaps(dstore, what):
    """
    Extracts hazard maps. Use it as /extract/hmaps?imt=PGA
    """
    info = get_info(dstore)
    if what == '':  # npz exports for QGIS
        sitecol = dstore['sitecol']
        sites = get_sites(sitecol, complete=False)
        dic = _get_dict(dstore, 'hmaps-stats',
                        {imt: info['poes'] for imt in info['imtls']},
                        info['stats'])
        yield from hazard_items(
            dic, sites, investigation_time=info['investigation_time'])
        return
    yield from _items(dstore, 'hmaps', what, info)
[docs]@extract.add('uhs')
def extract_uhs(dstore, what):
    """
    Extracts uniform hazard spectra. Use it as /extract/uhs?kind=mean or
    /extract/uhs?kind=rlz-0, etc
    """
    info = get_info(dstore)
    if what == '':  # npz exports for QGIS
        sitecol = dstore['sitecol']
        sites = get_sites(sitecol, complete=False)
        dic = {}
        for stat, s in info['stats'].items():
            hmap = dstore['hmaps-stats'][:, s]  # shape (N, M, P)
            dic[stat] = calc.make_uhs(hmap, info)
        yield from hazard_items(
            dic, sites, investigation_time=info['investigation_time'])
        return
    for k, v in _items(dstore, 'hmaps', what, info):  # shape (N, M, P)
        if hasattr(v, 'shape') and len(v.shape) == 3:
            yield k, calc.make_uhs(v, info)
        else:
            yield k, v
[docs]@extract.add('effect')
def extract_effect(dstore, what):
    """
    Extracts the effect of ruptures. Use it as /extract/effect
    """
    grp = dstore['effect_by_mag_dst_trt']
    dist_bins = dict(grp.attrs)
    ndists = len(dist_bins[next(iter(dist_bins))])
    arr = numpy.zeros((len(grp), ndists, len(dist_bins)))
    for i, mag in enumerate(grp):
        arr[i] = dstore['effect_by_mag_dst_trt/' + mag][()]
    return ArrayWrapper(arr, dict(dist_bins=dist_bins, ndists=ndists,
                                  mags=[float(mag) for mag in grp]))
[docs]@extract.add('rups_by_mag_dist')
def extract_rups_by_mag_dist(dstore, what):
    """
    Extracts the number of ruptures by mag, dist.
    Use it as /extract/rups_by_mag_dist
    """
    return extract_effect(dstore, 'rups_by_mag_dist')
# for debugging classical calculations with few sites
[docs]@extract.add('rup_ids')
def extract_rup_ids(dstore, what):
    """
    Extract src_id, rup_id from the stored contexts
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/rup_ids
    """
    n = len(dstore['rup/grp_id'])
    data = numpy.zeros(n, [('src_id', U32), ('rup_id', I64)])
    data['src_id'] = dstore['rup/src_id'][:]
    data['rup_id'] = dstore['rup/rup_id'][:]
    data = numpy.unique(data)
    return data
# for debugging classical calculations with few sites
[docs]@extract.add('mean_by_rup')
def extract_mean_by_rup(dstore, what):
    """
    Extract src_id, rup_id, mean from the stored contexts
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/mean_by_rup
    """
    N = len(dstore['sitecol'])
    assert N == 1
    out = []
    ctx_by_grp = read_ctx_by_grp(dstore)
    cmakers = read_cmakers(dstore)
    for gid, ctx in ctx_by_grp.items():
        # shape (4, G, M, U) => U
        means = cmakers[gid].get_mean_stds([ctx], split_by_mag=True)[0].mean(
            axis=(0, 1))
        out.extend(zip(ctx.src_id, ctx.rup_id, means))
    out.sort(key=operator.itemgetter(0, 1))
    return numpy.array(out, [('src_id', U32), ('rup_id', I64), ('mean', F64)])
[docs]@extract.add('source_data')
def extract_source_data(dstore, what):
    """
    Extract performance information about the sources.
    Use it as /extract/source_data?
    """
    qdict = parse(what)
    if 'taskno' in qdict:
        sel = {'taskno': int(qdict['taskno'][0])}
    else:
        sel = {}
    df = dstore.read_df('source_data', 'src_id', sel=sel).sort_values('ctimes')
    dic = {col: df[col].to_numpy() for col in df.columns}
    return ArrayWrapper(df.index.to_numpy(), dic)
[docs]@extract.add('sources')
def extract_sources(dstore, what):
    """
    Extract information about a source model.
    Use it as /extract/sources?limit=10
    or /extract/sources?source_id=1&source_id=2
    or /extract/sources?code=A&code=B
    """
    qdict = parse(what)
    limit = int(qdict.get('limit', ['100'])[0])
    source_ids = qdict.get('source_id', None)
    if source_ids is not None:
        source_ids = [str(source_id) for source_id in source_ids]
    codes = qdict.get('code', None)
    if codes is not None:
        codes = [code.encode('utf8') for code in codes]
    fields = 'source_id code num_sites num_ruptures'
    info = dstore['source_info'][()][fields.split()]
    wkt = decode(dstore['source_wkt'][()])
    arrays = []
    if source_ids is not None:
        logging.info('Extracting sources with ids: %s', source_ids)
        info = info[numpy.isin(info['source_id'], source_ids)]
        if len(info) == 0:
            raise getters.NotFound(
                'There is no source with id %s' % source_ids)
    if codes is not None:
        logging.info('Extracting sources with codes: %s', codes)
        info = info[numpy.isin(info['code'], codes)]
        if len(info) == 0:
            raise getters.NotFound(
                'There is no source with code in %s' % codes)
    for code, rows in general.group_array(info, 'code').items():
        if limit < len(rows):
            logging.info('Code %s: extracting %d sources out of %s',
                         code, limit, len(rows))
        arrays.append(rows[:limit])
    if not arrays:
        raise ValueError('There  no sources')
    info = numpy.concatenate(arrays)
    wkt_gz = gzip.compress(';'.join(wkt).encode('utf8'))
    src_gz = gzip.compress(';'.join(decode(info['source_id'])).encode('utf8'))
    oknames = [name for name in info.dtype.names  # avoid pickle issues
               if name != 'source_id']
    arr = numpy.zeros(len(info), [(n, info.dtype[n]) for n in oknames])
    for n in oknames:
        arr[n] = info[n]
    return ArrayWrapper(arr, {'wkt_gz': wkt_gz, 'src_gz': src_gz})
[docs]@extract.add('gridded_sources')
def extract_gridded_sources(dstore, what):
    """
    Extract information about the gridded sources (requires ps_grid_spacing)
    Use it as /extract/gridded_sources?task_no=0.
    Returns a json string id -> lonlats
    """
    qdict = parse(what)
    task_no = int(qdict.get('task_no', ['0'])[0])
    dic = {}
    for i, lonlats in enumerate(dstore['ps_grid/%02d' % task_no][()]):
        dic[i] = numpy.round(F64(lonlats), 3)
    return ArrayWrapper((), {'json': hdf5.dumps(dic)})
[docs]@extract.add('task_info')
def extract_task_info(dstore, what):
    """
    Extracts the task distribution. Use it as /extract/task_info?kind=classical
    """
    dic = group_array(dstore['task_info'][()], 'taskname')
    if 'kind' in what:
        name = parse(what)['kind'][0]
        yield name, dic[encode(name)]
        return
    for name in dic:
        yield decode(name), dic[name]
def _agg(losses, idxs):
    shp = losses.shape[1:]
    if not idxs:
        # no intersection, return a 0-dim matrix
        return numpy.zeros((0,) + shp, losses.dtype)
    # numpy.array wants lists, not sets, hence the sorted below
    return losses[numpy.array(sorted(idxs))].sum(axis=0)
def _filter_agg(assetcol, losses, selected, stats=''):
    # losses is an array of shape (A, ..., R) with A=#assets, R=#realizations
    aids_by_tag = assetcol.get_aids_by_tag()
    idxs = set(range(len(assetcol)))
    tagnames = []
    for tag in selected:
        tagname, tagvalue = tag.split('=', 1)
        if tagvalue == '*':
            tagnames.append(tagname)
        else:
            idxs &= aids_by_tag[tag]
    if len(tagnames) > 1:
        raise ValueError('Too many * as tag values in %s' % tagnames)
    elif not tagnames:  # return an array of shape (..., R)
        return ArrayWrapper(
            _agg(losses, idxs), dict(selected=encode(selected), stats=stats))
    else:  # return an array of shape (T, ..., R)
        [tagname] = tagnames
        _tags = list(assetcol.tagcol.gen_tags(tagname))
        all_idxs = [idxs & aids_by_tag[t] for t in _tags]
        # NB: using a generator expression for all_idxs caused issues (?)
        data, tags = [], []
        for idxs, tag in zip(all_idxs, _tags):
            agglosses = _agg(losses, idxs)
            if len(agglosses):
                data.append(agglosses)
                tags.append(tag)
        return ArrayWrapper(
            numpy.array(data),
            dict(selected=encode(selected), tags=encode(tags), stats=stats))
[docs]def get_loss_type_tags(what):
    try:
        loss_type, query_string = what.rsplit('?', 1)
    except ValueError:  # no question mark
        loss_type, query_string = what, ''
    tags = query_string.split('&') if query_string else []
    return loss_type, tags
# probably not used
[docs]@extract.add('csq_curves')
def extract_csq_curves(dstore, what):
    """
    Aggregate damages curves from the event_based_damage calculator:
    /extract/csq_curves?agg_id=0&loss_type=occupants
    Returns an ArrayWrapper of shape (P, D1) with attribute return_periods
    """
    info = get_info(dstore)
    qdic = parse(what + '&kind=mean', info)
    [li] = qdic['loss_type']  # loss type index
    [agg_id] = qdic.get('agg_id', [0])
    df = dstore.read_df('aggcurves', 'return_period',
                        dict(agg_id=agg_id, loss_id=li))
    cols = [col for col in df.columns if col not in 'agg_id loss_id']
    return ArrayWrapper(df[cols].to_numpy(),
                        dict(return_period=df.index.to_numpy(),
                             consequences=cols))
# NB: used by QGIS but not by the exporters
# tested in test_case_1_ins
[docs]@extract.add('agg_curves')
def extract_agg_curves(dstore, what):
    """
    Aggregate loss curves from the ebrisk calculator:
    /extract/agg_curves?kind=stats&absolute=1&loss_type=occupants&occupancy=RES
    Returns an array of shape (#periods, #stats) or (#periods, #rlzs)
    """
    info = get_info(dstore)
    qdic = parse(what, info)
    try:
        tagnames = dstore['oqparam'].aggregate_by[0]
    except IndexError:
        tagnames = []
    k = qdic['k']  # rlz or stat index
    lts = qdic['lt']
    [li] = qdic['loss_type']  # loss type index
    tagdict = {tag: qdic[tag] for tag in tagnames}
    if set(tagnames) != info['tagnames']:
        raise ValueError('Expected tagnames=%s, got %s' %
                         (info['tagnames'], tagnames))
    tagvalues = [tagdict[t][0] for t in tagnames]
    if tagnames:
        lst = decode(dstore['agg_keys'][:])
        agg_id = lst.index(','.join(tagvalues))
    else:
        agg_id = 0  # total aggregation
    ep_fields = dstore.get_attr('aggcurves', 'ep_fields')
    if qdic['rlzs']:
        [li] = qdic['loss_type']  # loss type index
        units = dstore.get_attr('aggcurves', 'units').split()
        df = dstore.read_df('aggcurves', sel=dict(agg_id=agg_id, loss_id=li))
        rps = list(df.return_period.unique())
        P = len(rps)
        R = len(qdic['kind'])
        EP = len(ep_fields)
        arr = numpy.zeros((R, P, EP))
        for rlz in df.rlz_id.unique():
            for ep_field_idx, ep_field in enumerate(ep_fields):
                # NB: df may contains zeros but there are no missing periods
                # by construction (see build_aggcurves)
                arr[rlz, :, ep_field_idx] = df[df.rlz_id == rlz][ep_field]
    else:
        name = 'agg_curves-stats/' + lts[0]
        shape_descr = hdf5.get_shape_descr(dstore.get_attr(name, 'json'))
        rps = list(shape_descr['return_period'])
        units = dstore.get_attr(name, 'units').split()
        arr = dstore[name][agg_id, k]  # shape (P, S, EP)
    if qdic['absolute'] == [1]:
        pass
    elif qdic['absolute'] == [0]:
        evalue, = dstore['agg_values'][agg_id][lts]
        arr /= evalue
    else:
        raise ValueError('"absolute" must be 0 or 1 in %s' % what)
    attrs = dict(shape_descr=['kind', 'return_period', 'ep_field'] + tagnames)
    attrs['kind'] = qdic['kind']
    attrs['return_period'] = rps
    attrs['units'] = units  # used by the QGIS plugin
    attrs['ep_field'] = ep_fields
    for tagname, tagvalue in zip(tagnames, tagvalues):
        attrs[tagname] = [tagvalue]
    if tagnames:
        arr = arr.reshape(arr.shape + (1,) * len(tagnames))
    return ArrayWrapper(arr, dict(json=hdf5.dumps(attrs)))
[docs]@extract.add('agg_losses')
def extract_agg_losses(dstore, what):
    """
    Aggregate losses of the given loss type and tags. Use it as
    /extract/agg_losses/structural?taxonomy=RC&custom_site_id=20126
    /extract/agg_losses/structural?taxonomy=RC&custom_site_id=*
    :returns:
        an array of shape (T, R) if one of the tag names has a `*` value
        an array of shape (R,), being R the number of realizations
        an array of length 0 if there is no data for the given tags
    """
    loss_type, tags = get_loss_type_tags(what)
    if not loss_type:
        raise ValueError('loss_type not passed in agg_losses/<loss_type>')
    if 'avg_losses-stats/' + loss_type in dstore:
        stats = list(dstore['oqparam'].hazard_stats())
        losses = dstore['avg_losses-stats/' + loss_type][:]
    elif 'avg_losses-rlzs/' + loss_type in dstore:
        stats = ['mean']
        losses = dstore['avg_losses-rlzs/' + loss_type][:]
    else:
        raise KeyError('No losses found in %s' % dstore)
    return _filter_agg(dstore['assetcol'], losses, tags, stats)
[docs]@extract.add('agg_damages')
def extract_agg_damages(dstore, what):
    """
    Aggregate damages of the given loss type and tags. Use it as
    /extract/agg_damages/structural?taxonomy=RC&custom_site_id=20126
    :returns:
        array of shape (R, D), being R the number of realizations and D the
        number of damage states, or an array of length 0 if there is no data
        for the given tags
    """
    loss_type, tags = get_loss_type_tags(what)
    if 'damages-rlzs' in dstore:
        oq = dstore['oqparam']
        lti = oq.lti[loss_type]
        damages = dstore['damages-rlzs'][:, :, lti]
    else:
        raise KeyError('No damages found in %s' % dstore)
    return _filter_agg(dstore['assetcol'], damages, tags)
[docs]@extract.add('aggregate')
def extract_aggregate(dstore, what):
    """
    /extract/aggregate/avg_losses?
    kind=mean&loss_type=structural&tag=taxonomy&tag=occupancy
    """
    name, qstring = what.split('?', 1)
    info = get_info(dstore)
    qdic = parse(qstring, info)
    suffix = '-rlzs' if qdic['rlzs'] else '-stats'
    tagnames = qdic.get('tag', [])
    assetcol = dstore['assetcol']
    loss_types = info['loss_types']
    ridx = qdic['k'][0]
    lis = qdic.get('loss_type', [])  # list of indices
    if lis:
        lt = LOSSTYPE[lis[0]]
        array = dstore['avg_losses%s/%s' % (suffix, lt)][:, ridx]
        aw = ArrayWrapper(assetcol.aggregateby(tagnames, array), {}, [lt])
    else:
        array = avglosses(dstore, loss_types, suffix[1:])[:, ridx]
        aw = ArrayWrapper(assetcol.aggregateby(tagnames, array), {},
                          loss_types)
    for tagname in tagnames:
        setattr(aw, tagname, getattr(assetcol.tagcol, tagname)[1:])
    aw.shape_descr = tagnames
    return aw
[docs]@extract.add('losses_by_asset')
def extract_losses_by_asset(dstore, what):
    oq = dstore['oqparam']
    loss_dt = oq.loss_dt(F32)
    R = dstore['full_lt'].get_num_paths()
    stats = oq.hazard_stats()  # statname -> statfunc
    assets = util.get_assets(dstore)
    if 'losses_by_asset' in dstore:
        losses_by_asset = dstore['losses_by_asset'][()]
        for r in range(R):
            # I am exporting the 'mean' and ignoring the 'stddev'
            losses = cast(losses_by_asset[:, r]['mean'], loss_dt)
            data = util.compose_arrays(assets, losses)
            yield 'rlz-%03d' % r, data
    elif 'avg_losses-stats' in dstore:
        # only QGIS is testing this
        avg_losses = avglosses(dstore, loss_dt.names, 'stats')  # shape ASL
        for s, stat in enumerate(stats):
            losses = cast(avg_losses[:, s], loss_dt)
            data = util.compose_arrays(assets, losses)
            yield stat, data
    elif 'avg_losses-rlzs' in dstore:  # there is only one realization
        avg_losses = avglosses(dstore, loss_dt.names, 'rlzs')
        losses = cast(avg_losses, loss_dt)
        data = util.compose_arrays(assets, losses)
        yield 'rlz-000', data
def _gmf(df, num_sites, imts):
    # convert data into the composite array expected by QGIS
    gmfa = numpy.zeros(num_sites, [(imt, F32) for imt in imts])
    for m, imt in enumerate(imts):
        gmfa[imt][U32(df.sid)] = df[f'gmv_{m}']
    return gmfa
# used by the QGIS plugin for a single eid
[docs]@extract.add('gmf_data')
def extract_gmf_npz(dstore, what):
    oq = dstore['oqparam']
    qdict = parse(what)
    [eid] = qdict.get('event_id', [0])  # there must be a single event
    rlzi = dstore['events'][eid]['rlz_id']
    try:
        complete = dstore['complete']
    except KeyError:
        complete = dstore['sitecol']
    sites = get_sites(complete)
    n = len(sites)
    try:
        df = dstore.read_df('gmf_data', 'eid').loc[eid]
    except KeyError:
        # zero GMF
        yield 'rlz-%03d' % rlzi, []
    else:
        gmfa = _gmf(df, n, oq.imtls)
        yield 'rlz-%03d' % rlzi, util.compose_arrays(sites, gmfa)
# extract the relevant GMFs as an npz file with fields eid, sid, gmv_
[docs]@extract.add('relevant_gmfs')
def extract_relevant_gmfs(dstore, what):
    qdict = parse(what)
    [thr] = qdict.get('threshold', ['1'])
    eids = get_relevant_event_ids(dstore, float(thr))
    try:
        sbe = dstore.read_df('gmf_data/slice_by_event', 'eid')
    except KeyError:
        df = dstore.read_df('gmf_data', 'eid')
        return df.loc[eids].reset_index()
    dfs = []
    for eid in eids:
        ser = sbe.loc[eid]
        slc = slice(ser.start, ser.stop)
        dfs.append(dstore.read_df('gmf_data', slc=slc))
    return pandas.concat(dfs)
[docs]@extract.add('avg_gmf')
def extract_avg_gmf(dstore, what):
    qdict = parse(what)
    info = get_info(dstore)
    [imt] = qdict['imt']
    imti = info['imt'][imt]
    sitecol = dstore['sitecol']
    avg_gmf = dstore['avg_gmf'][0, :, imti]
    yield imt, avg_gmf[sitecol.sids]
    yield 'sids', sitecol.sids
    yield 'lons', sitecol.lons
    yield 'lats', sitecol.lats
[docs]@extract.add('num_events')
def extract_num_events(dstore, what):
    """
    :returns: the number of events (if any)
    """
    yield 'num_events', len(dstore['events'])
[docs]def build_damage_dt(dstore):
    """
    :param dstore: a datastore instance
    :returns:
       a composite dtype loss_type -> (ds1, ds2, ...)
    """
    oq = dstore['oqparam']
    attrs = json.loads(dstore.get_attr('damages-rlzs', 'json'))
    limit_states = list(dstore.get_attr('crm', 'limit_states'))
    csqs = attrs['dmg_state'][len(limit_states) + 1:]  # consequences
    dt_list = [(ds, F32) for ds in ['no_damage'] + limit_states + csqs]
    damage_dt = numpy.dtype(dt_list)
    loss_types = oq.loss_dt().names
    return numpy.dtype([(lt, damage_dt) for lt in loss_types])
[docs]def build_csq_dt(dstore):
    """
    :param dstore: a datastore instance
    :returns:
       a composite dtype loss_type -> (csq1, csq2, ...)
    """
    oq = dstore['oqparam']
    attrs = json.loads(dstore.get_attr('damages-rlzs', 'json'))
    limit_states = list(dstore.get_attr('crm', 'limit_states'))
    csqs = attrs['dmg_state'][len(limit_states) + 1:]  # consequences
    dt = numpy.dtype([(csq, F32) for csq in csqs])
    loss_types = oq.loss_dt().names
    return numpy.dtype([(lt, dt) for lt in loss_types])
[docs]def build_damage_array(data, damage_dt):
    """
    :param data: an array of shape (A, L, D)
    :param damage_dt: a damage composite data type loss_type -> states
    :returns: a composite array of length N and dtype damage_dt
    """
    A, L, D = data.shape
    dmg = numpy.zeros(A, damage_dt)
    for a in range(A):
        for li, lt in enumerate(damage_dt.names):
            dmg[lt][a] = tuple(data[a, li])
    return dmg
[docs]@extract.add('damages-rlzs')
def extract_damages_npz(dstore, what):
    oq = dstore['oqparam']
    damage_dt = build_damage_dt(dstore)
    R = dstore['full_lt'].get_num_paths()
    if oq.collect_rlzs:
        R = 1
    data = dstore['damages-rlzs']
    assets = util.get_assets(dstore)
    for r in range(R):
        damages = build_damage_array(data[:, r], damage_dt)
        yield 'rlz-%03d' % r, util.compose_arrays(assets, damages)
# tested on oq-risk-tests event_based/etna
[docs]@extract.add('event_based_mfd')
def extract_mfd(dstore, what):
    """
    Compare n_occ/eff_time with occurrence_rate.
    Example: http://127.0.0.1:8800/v1/calc/30/extract/event_based_mfd?
    """
    oq = dstore['oqparam']
    R = len(dstore['weights'])
    eff_time = oq.investigation_time * oq.ses_per_logic_tree_path * R
    rup_df = dstore.read_df('ruptures', 'id')[
        ['mag', 'n_occ', 'occurrence_rate']]
    rup_df.mag = numpy.round(rup_df.mag, 1)
    dic = dict(mag=[], freq=[], occ_rate=[])
    for mag, df in rup_df.groupby('mag'):
        dic['mag'].append(mag)
        dic['freq'].append(df.n_occ.sum() / eff_time)
        dic['occ_rate'].append(df.occurrence_rate.sum())
    return ArrayWrapper((), {k: numpy.array(v) for k, v in dic.items()})
[docs]@extract.add('composite_risk_model.attrs')
def crm_attrs(dstore, what):
    """
    :returns:
        the attributes of the risk model, i.e. limit_states, loss_types,
        min_iml and covs, needed by the risk exporters.
    """
    attrs = dstore.get_attrs('crm')
    return ArrayWrapper((), dict(json=hdf5.dumps(attrs)))
def _get(dstore, name):
    try:
        dset = dstore[name + '-stats']
        return dset, list(dstore['oqparam'].hazard_stats())
    except KeyError:  # single realization
        return dstore[name + '-rlzs'], ['mean']
[docs]@extract.add('events')
def extract_relevant_events(dstore, dummy=None):
    """
    Extract the relevant events
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/events
    """
    all_events = dstore['events'][:]
    if 'relevant_events' not in dstore:
        all_events.sort(order='id')
        return all_events
    rel_events = dstore['relevant_events'][:]
    events = all_events[rel_events]
    events.sort(order='id')
    return events
[docs]@extract.add('ruptures_within')
def get_ruptures_within(dstore, bbox):
    """
    Extract the ruptures within the given bounding box, a string
    minlon,minlat,maxlon,maxlat.
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/ruptures_with/8,44,10,46
    """
    minlon, minlat, maxlon, maxlat = map(float, bbox.split(','))
    hypo = dstore['ruptures']['hypo'].T  # shape (3, N)
    mask = ((minlon <= hypo[0]) * (minlat <= hypo[1]) *
            (maxlon >= hypo[0]) * (maxlat >= hypo[1]))
    return dstore['ruptures'][mask]
[docs]@extract.add('disagg')
def extract_disagg(dstore, what):
    """
    Extract a disaggregation output as an ArrayWrapper.
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/
    disagg?kind=Mag_Dist&imt=PGA&site_id=1&poe_id=0&spec=stats
    """
    qdict = parse(what)
    spec = qdict['spec'][0]
    label = qdict['kind'][0]
    sid = int(qdict['site_id'][0])
    oq = dstore['oqparam']
    imts = list(oq.imtls)
    if 'imt' in qdict:
        imti = [imts.index(imt) for imt in qdict['imt']]
    else:
        imti = slice(None)
    if 'poe_id' in qdict:
        poei = [int(x) for x in qdict['poe_id']]
    else:
        poei = slice(None)
    if 'traditional' in spec:
        spec = spec[:4]  # rlzs or stats
        traditional = True
    else:
        traditional = False
    def bin_edges(dset, sid):
        if len(dset.shape) == 2:  # (lon, lat) bins
            return dset[sid]
        return dset[:]  # regular bin edges
    bins = {k: bin_edges(v, sid) for k, v in dstore['disagg-bins'].items()}
    fullmatrix = dstore['disagg-%s/%s' % (spec, label)][sid]
    # matrix has shape (..., M, P, Z)
    matrix = fullmatrix[..., imti, poei, :]
    if traditional:
        poe_agg = dstore['poe4'][sid, imti, poei]  # shape (M, P, Z)
        matrix[:] = numpy.log(1. - matrix) / numpy.log(1. - poe_agg)
    disag_tup = tuple(label.split('_'))
    axis = [bins[k] for k in disag_tup]
    # compute axis mid points, except for string axis (i.e. TRT)
    axis = [(ax[: -1] + ax[1:]) / 2. if ax.dtype.char != 'S'
            else ax for ax in axis]
    attrs = qdict.copy()
    for k, ax in zip(disag_tup, axis):
        attrs[k.lower()] = ax
    attrs['imt'] = qdict['imt'] if 'imt' in qdict else imts
    imt = attrs['imt'][0]
    if len(oq.poes) == 0:
        mean_curve = dstore.sel(
            'hcurves-stats', imt=imt, stat='mean')[sid, 0, 0]
        # using loglog interpolation like in compute_hazard_maps
        attrs['poe'] = numpy.exp(
            numpy.interp(numpy.log(oq.iml_disagg[imt]),
                         numpy.log(oq.imtls[imt]),
                         numpy.log(mean_curve.reshape(-1))))
    elif 'poe_id' in qdict:
        attrs['poe'] = [oq.poes[p] for p in poei]
    else:
        attrs['poe'] = oq.poes
    attrs['traditional'] = traditional
    attrs['shape_descr'] = [k.lower() for k in disag_tup] + ['imt', 'poe']
    rlzs = dstore['best_rlzs'][sid]
    if spec == 'rlzs':
        weight = dstore['full_lt'].init().rlzs['weight']
        weights = weight[rlzs]
        weights /= weights.sum()  # normalize to 1
        attrs['weights'] = weights.tolist()
    extra = ['rlz%d' % rlz for rlz in rlzs] if spec == 'rlzs' else ['mean']
    return ArrayWrapper(matrix, attrs, extra)
def _disagg_output_dt(shapedic, disagg_outputs, imts, poes_disagg):
    dt = [('site_id', U32), ('lon', F32), ('lat', F32),
          ('lon_bins', (F32, shapedic['lon'] + 1)),
          ('lat_bins', (F32, shapedic['lat'] + 1))]
    Z = shapedic['Z']
    for out in disagg_outputs:
        shp = tuple(shapedic[key] for key in out.lower().split('_'))
        for imt in imts:
            for poe in poes_disagg:
                dt.append(('%s-%s-%s' % (out, imt, poe), (F32, shp)))
    for imt in imts:
        for poe in poes_disagg:
            dt.append(('iml-%s-%s' % (imt, poe), (F32, (Z,))))
    return dt
[docs]def norm(qdict, params):
    dic = {}
    for par in params:
        dic[par] = int(qdict[par][0]) if par in qdict else 0
    return dic
[docs]@extract.add('mean_rates_by_src')
def extract_mean_rates_by_src(dstore, what):
    """
    Extract the mean_rates_by_src information.
    Example: http://127.0.0.1:8800/v1/calc/30/extract/mean_rates_by_src?site_id=0&imt=PGA&iml=.001
    """
    qdict = parse(what)
    dset = dstore['mean_rates_by_src/array']
    oq = dstore['oqparam']
    src_id = dstore['mean_rates_by_src/src_id'][:]
    [imt] = qdict['imt']
    [iml] = qdict['iml']
    [site_id] = qdict.get('site_id', ['0'])
    site_id = int(site_id)
    imt_id = list(oq.imtls).index(imt)
    rates = dset[site_id, imt_id]
    L1, Ns = rates.shape
    arr = numpy.zeros(len(src_id), [('src_id', hdf5.vstr), ('rate', '<f8')])
    arr['src_id'] = src_id
    arr['rate'] = [numpy.interp(iml, oq.imtls[imt], rates[:, i])
                   for i in range(Ns)]
    arr.sort(order='rate')
    return ArrayWrapper(arr[::-1], dict(site_id=site_id, imt=imt, iml=iml))
# TODO: extract from disagg-stats, avoid computing means on the fly
[docs]@extract.add('disagg_layer')
def extract_disagg_layer(dstore, what):
    """
    Extract a disaggregation layer containing all sites and outputs
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/disagg_layer?
    """
    qdict = parse(what)
    oq = dstore['oqparam']
    oq.maximum_distance = filters.IntegrationDistance(oq.maximum_distance)
    if 'kind' in qdict:
        kinds = qdict['kind']
    else:
        kinds = oq.disagg_outputs
    sitecol = dstore['sitecol']
    poes_disagg = oq.poes_disagg or (None,)
    realizations = numpy.array(dstore['full_lt'].get_realizations())
    oq.mags_by_trt = dstore['source_mags']
    edges, shapedic = disagg.get_edges_shapedic(oq, sitecol, len(realizations))
    dt = _disagg_output_dt(shapedic, kinds, oq.imtls, poes_disagg)
    out = numpy.zeros(len(sitecol), dt)
    hmap3 = dstore['hmap3'][:]  # shape (N, M, P)
    best_rlzs = dstore['best_rlzs'][:]
    arr = {kind: dstore['disagg-rlzs/' + kind][:] for kind in kinds}
    for sid, lon, lat, rec in zip(
            sitecol.sids, sitecol.lons, sitecol.lats, out):
        rlzs = realizations[best_rlzs[sid]]
        rec['site_id'] = sid
        rec['lon'] = lon
        rec['lat'] = lat
        rec['lon_bins'] = edges[2][sid]
        rec['lat_bins'] = edges[3][sid]
        for m, imt in enumerate(oq.imtls):
            ws = numpy.array([rlz.weight[imt] for rlz in rlzs])
            ws /= ws.sum()  # normalize to 1
            for p, poe in enumerate(poes_disagg):
                for kind in kinds:
                    key = '%s-%s-%s' % (kind, imt, poe)
                    rec[key] = arr[kind][sid, ..., m, p, :] @ ws
                rec['iml-%s-%s' % (imt, poe)] = hmap3[sid, m, p]
    return ArrayWrapper(out, dict(mag=edges[0], dist=edges[1], eps=edges[-2],
                                  trt=numpy.array(encode(edges[-1]))))
# ######################### extracting ruptures ##############################
[docs]class RuptureData(object):
    """
    Container for information about the ruptures of a given
    tectonic region type.
    """
    def __init__(self, trt, gsims, mags):
        self.trt = trt
        self.cmaker = ContextMaker(trt, gsims, {'imtls': {}, 'mags': mags})
        self.params = sorted(self.cmaker.REQUIRES_RUPTURE_PARAMETERS -
                             set('mag strike dip rake hypo_depth'.split()))
        self.dt = numpy.dtype([
            ('rup_id', I64), ('source_id', SOURCE_ID), ('multiplicity', U32),
            ('occurrence_rate', F64),
            ('mag', F32), ('lon', F32), ('lat', F32), ('depth', F32),
            ('strike', F32), ('dip', F32), ('rake', F32),
            ('boundaries', hdf5.vfloat32)] +
            [(param, F32) for param in self.params])
[docs]    def to_array(self, proxies):
        """
        Convert a list of rupture proxies into an array of dtype RuptureData.dt
        """
        data = []
        for proxy in proxies:
            ebr = proxy.to_ebr(self.trt)
            rup = ebr.rupture
            dic = self.cmaker.get_rparams(rup)
            ruptparams = tuple(dic[param] for param in self.params)
            point = rup.surface.get_middle_point()
            boundaries = rup.surface.get_surface_boundaries_3d()
            try:
                rate = ebr.rupture.occurrence_rate
            except AttributeError:  # for nonparametric sources
                rate = numpy.nan
            data.append(
                (ebr.id, ebr.source_id, ebr.n_occ, rate,
                 rup.mag, point.x, point.y, point.z, rup.surface.get_strike(),
                 rup.surface.get_dip(), rup.rake, boundaries) + ruptparams)
        return numpy.array(data, self.dt)
# used in the rupture exporter and in the plugin
[docs]@extract.add('rupture_info')
def extract_rupture_info(dstore, what):
    """
    Extract some information about the ruptures, including the boundary.
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/rupture_info?min_mag=6
    """
    qdict = parse(what)
    if 'min_mag' in qdict:
        [min_mag] = qdict['min_mag']
    else:
        min_mag = 0
    oq = dstore['oqparam']
    dtlist = [('rup_id', I64), ('multiplicity', U32), ('mag', F32),
              ('centroid_lon', F32), ('centroid_lat', F32),
              ('centroid_depth', F32), ('trt', '<S50'),
              ('strike', F32), ('dip', F32), ('rake', F32)]
    rows = []
    boundaries = []
    for rgetter in getters.get_rupture_getters(dstore):
        proxies = rgetter.get_proxies(min_mag)
        mags = dstore[f'source_mags/{rgetter.trt}'][:]
        rdata = RuptureData(rgetter.trt, rgetter.rlzs_by_gsim, mags)
        arr = rdata.to_array(proxies)
        for r in arr:
            coords = ['%.5f %.5f' % xyz[:2] for xyz in zip(*r['boundaries'])]
            coordset = sorted(set(coords))
            if len(coordset) < 4:   # degenerate to line
                boundaries.append('LINESTRING(%s)' % ', '.join(coordset))
            else:  # good polygon
                boundaries.append('POLYGON((%s))' % ', '.join(coords))
            rows.append(
                (r['rup_id'], r['multiplicity'], r['mag'],
                 r['lon'], r['lat'], r['depth'],
                 rgetter.trt, r['strike'], r['dip'], r['rake']))
    arr = numpy.array(rows, dtlist)
    geoms = gzip.compress('\n'.join(boundaries).encode('utf-8'))
    return ArrayWrapper(arr, dict(investigation_time=oq.investigation_time,
                                  boundaries=geoms))
[docs]def get_relevant_rup_ids(dstore, threshold):
    """
    :param dstore:
        a DataStore instance with a `risk_by_rupture` dataframe
    :param threshold:
        fraction of the total losses
    :returns:
        array with the rupture IDs cumulating the highest losses
        up to the threshold (usually 95% of the total loss)
    """
    assert 0 <= threshold <= 1, threshold
    if 'loss_by_rupture' not in dstore:
        return
    rupids = dstore['loss_by_rupture/rup_id'][:]
    cumsum = dstore['loss_by_rupture/loss'][:].cumsum()
    thr = threshold * cumsum[-1]
    for i, csum in enumerate(cumsum, 1):
        if csum > thr:
            break
    return rupids[:i]
[docs]def get_relevant_event_ids(dstore, threshold):
    """
    :param dstore:
        a DataStore instance with a `risk_by_rupture` dataframe
    :param threshold:
        fraction of the total losses
    :returns:
        array with the event IDs cumulating the highest losses
        up to the threshold (usually 95% of the total loss)
    """
    assert 0 <= threshold <= 1, threshold
    if 'loss_by_event' not in dstore:
        return
    eids = dstore['loss_by_event/event_id'][:]
    try:
        cumsum = dstore['loss_by_event/loss'][:].cumsum()
    except KeyError:  # no losses
        return eids
    thr = threshold * cumsum[-1]
    for i, csum in enumerate(cumsum, 1):
        if csum > thr:
            break
    return eids[:i]
[docs]@extract.add('ruptures')
def extract_ruptures(dstore, what):
    """
    Extract the ruptures with their geometry as a big CSV string
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/ruptures?rup_id=6
    """
    oq = dstore['oqparam']
    trts = list(dstore.getitem('full_lt').attrs['trts'])
    comment = dict(trts=trts, ses_seed=oq.ses_seed)
    qdict = parse(what)
    if 'min_mag' in qdict:
        [min_mag] = qdict['min_mag']
    else:
        min_mag = 0
    if 'rup_id' in qdict:
        rup_id = int(qdict['rup_id'][0])
        ebrups = [getters.get_ebrupture(dstore, rup_id)]
        info = dstore['source_info'][rup_id // TWO30]
        comment['source_id'] = info['source_id'].decode('utf8')
    else:
        if 'threshold' in qdict:
            [threshold] = qdict['threshold']
            rup_ids = get_relevant_rup_ids(dstore, threshold)
        else:
            rup_ids = None
        ebrups = []
        for rgetter in getters.get_rupture_getters(dstore, rupids=rup_ids):
            ebrups.extend(rupture.get_ebr(proxy.rec, proxy.geom, rgetter.trt)
                          for proxy in rgetter.get_proxies(min_mag))
    bio = io.StringIO()
    arr = rupture.to_csv_array(ebrups)
    writers.write_csv(bio, arr, comment=comment)
    return bio.getvalue()
[docs]@extract.add('eids_by_gsim')
def extract_eids_by_gsim(dstore, what):
    """
    Returns a dictionary gsim -> event_ids for the first TRT
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/eids_by_gsim
    """
    rlzs = dstore['full_lt'].get_realizations()
    gsims = [str(rlz.gsim_rlz.value[0]) for rlz in rlzs]
    evs = extract_relevant_events(dstore)
    df = pandas.DataFrame({'id': evs['id'], 'rlz_id': evs['rlz_id']})
    for r, evs in df.groupby('rlz_id'):
        yield gsims[r], numpy.array(evs['id'])
[docs]@extract.add('risk_stats')
def extract_risk_stats(dstore, what):
    """
    Compute the risk statistics from a DataFrame with individual realizations
    Example:
    http://127.0.0.1:8800/v1/calc/30/extract/risk_stats/aggrisk
    """
    oq = dstore['oqparam']
    stats = oq.hazard_stats()
    df = dstore.read_df(what)
    df['loss_type'] = [LOSSTYPE[lid] for lid in df.loss_id]
    del df['loss_id']
    kfields = [f for f in df.columns if f in {
        'agg_id', 'loss_type', 'return_period'}]
    weights = dstore['weights'][:]
    return calc_stats(df, kfields, stats, weights)
[docs]@extract.add('med_gmv')
def extract_med_gmv(dstore, what):
    """
    Extract med_gmv array for the given source
    """
    return extract_(dstore, 'med_gmv/' + what)
# #####################  extraction from the WebAPI ###################### #
[docs]class Extractor(object):
    """
    A class to extract data from a calculation.
    :param calc_id: a calculation ID
    NB: instantiating the Extractor opens the datastore.
    """
    def __init__(self, calc_id):
        self.dstore = datastore.read(calc_id)
        self.calc_id = self.dstore.calc_id
        self.oqparam = self.dstore['oqparam']
[docs]    def get(self, what, asdict=False):
        """
        :param what: what to extract
        :returns: an ArrayWrapper instance or a dictionary if asdict is True
        """
        aw = extract(self.dstore, what)
        if asdict:
            return {k: v for k, v in vars(aw).items() if not k.startswith('_')}
        return aw
    def __enter__(self):
        return self
    def __exit__(self, *args):
        self.close()
[docs]class WebExtractor(Extractor):
    """
    A class to extract data from the WebAPI.
    :param calc_id: a calculation ID
    :param server: hostname of the webapi server (can be '')
    :param username: login username (can be '')
    :param password: login password (can be '')
    NB: instantiating the WebExtractor opens a session.
    """
    def __init__(self, calc_id, server=None, username=None, password=None):
        self.calc_id = calc_id
        self.server = config.webapi.server if server is None else server
        if username is None:
            username = config.webapi.username
        if password is None:
            password = config.webapi.password
        self.sess = requests.Session()
        if username:
            login_url = '%s/accounts/ajax_login/' % self.server
            logging.info('POST %s', login_url)
            resp = self.sess.post(
                login_url, data=dict(username=username, password=password))
            if resp.status_code != 200:
                raise WebAPIError(resp.text)
        url = '%s/v1/calc/%d/extract/oqparam' % (self.server, calc_id)
        logging.info('GET %s', url)
        resp = self.sess.get(url)
        if resp.status_code == 404:
            raise WebAPIError('Not Found: %s' % url)
        elif resp.status_code != 200:
            raise WebAPIError(resp.text)
        self.oqparam = object.__new__(oqvalidation.OqParam)
        js = bytes(numpy.load(io.BytesIO(resp.content))['json'])
        vars(self.oqparam).update(json.loads(js))
[docs]    def get(self, what):
        """
        :param what: what to extract
        :returns: an ArrayWrapper instance
        """
        url = '%s/v1/calc/%d/extract/%s' % (self.server, self.calc_id, what)
        logging.info('GET %s', url)
        resp = self.sess.get(url)
        if resp.status_code != 200:
            raise WebAPIError(resp.text)
        logging.info('Read %s of data' % general.humansize(len(resp.content)))
        npz = numpy.load(io.BytesIO(resp.content))
        attrs = {k: npz[k] for k in npz if k != 'array'}
        try:
            arr = npz['array']
        except KeyError:
            arr = ()
        return ArrayWrapper(arr, attrs)
[docs]    def dump(self, fname):
        """
        Dump the remote datastore on a local path.
        """
        url = '%s/v1/calc/%d/datastore' % (self.server, self.calc_id)
        resp = self.sess.get(url, stream=True)
        down = 0
        with open(fname, 'wb') as f:
            logging.info('Saving %s', fname)
            for chunk in resp.iter_content(CHUNKSIZE):
                f.write(chunk)
                down += len(chunk)
                println('Downloaded {:,} bytes'.format(down))
        print()
[docs]def clusterize(hmaps, rlzs, k):
    """
    :param hmaps: array of shape (R, M, P)
    :param rlzs: composite array of shape R
    :param k: number of clusters to build
    :returns: array of K elements with dtype (rlzs, branch_paths, centroid)
    """
    R, M, P = hmaps.shape
    hmaps = hmaps.transpose(0, 2, 1).reshape(R, M * P)
    dt = [('rlzs', hdf5.vuint32), ('branch_paths', object),
          ('centroid', (F32, M*P))]
    centroid, labels = kmeans2(hmaps, k, minit='++')
    df = pandas.DataFrame(dict(path=rlzs['branch_path'], label=labels))
    tbl = []
    for label, grp in df.groupby('label'):
        paths = logictree.collect_paths(encode(list(grp['path'])))
        tbl.append((grp.index, paths, centroid[label]))
    return numpy.array(tbl, dt)
