Source code for openquake.calculators.extract
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2017-2019 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from urllib.parse import parse_qs
import collections
import operator
import logging
import ast
import io
import requests
from h5py._hl.dataset import Dataset
from h5py._hl.group import Group
import numpy
try:
from functools import lru_cache
except ImportError:
from openquake.risklib.utils import memoized
else:
memoized = lru_cache(100)
from openquake.baselib import config, hdf5
from openquake.baselib.hdf5 import ArrayWrapper, vstr
from openquake.baselib.general import group_array, deprecated, println
from openquake.baselib.python3compat import encode, decode
from openquake.calculators import getters
from openquake.calculators.export.loss_curves import get_loss_builder
from openquake.commonlib import calc, util, oqvalidation
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64
TWO32 = 2 ** 32
ALL = slice(None)
CHUNKSIZE = 4*1024**2 # 4 MB
[docs]def lit_eval(string):
"""
`ast.literal_eval` the string if possible, otherwise returns it unchanged
"""
try:
return ast.literal_eval(string)
except (ValueError, SyntaxError):
return string
def _normalize(kinds, stats, num_rlzs):
statindex = dict(zip(stats, range(len(stats))))
dic = {}
for kind in kinds:
if kind == 'stats':
for s, stat in enumerate(stats):
dic[stat] = s
elif kind == 'rlzs':
for r in range(num_rlzs):
dic['rlz-%03d' % r] = r
elif kind.startswith('rlz-'):
dic[kind] = int(kind[4:])
elif kind in stats:
dic[kind] = statindex[kind]
else:
raise ValueError('Invalid kind %r' % kind)
return dic
[docs]def parse(query_string, stats, num_rlzs=0):
"""
:returns: a normalized query_dict as in the following examples:
>>> parse('kind=stats', ['mean'])
{'kind': {'mean': 0}}
>>> parse('kind=rlzs', [], 3)
{'kind': {'rlz-000': 0, 'rlz-001': 1, 'rlz-002': 2}}
>>> parse('kind=mean', ['max', 'mean'])
{'kind': {'mean': 1}}
>>> parse('kind=rlz-3&imt=PGA&site_id=0', [])
{'kind': {'rlz-3': 3}, 'imt': ['PGA'], 'site_id': [0]}
"""
qdic = parse_qs(query_string)
for key, val in qdic.items(): # for instance, convert site_id to an int
qdic[key] = [lit_eval(v) for v in val]
qdic['kind'] = _normalize(qdic['kind'], stats, num_rlzs)
return qdic
[docs]def barray(iterlines):
"""
Array of bytes
"""
lst = [line.encode('utf-8') for line in iterlines]
arr = numpy.array(lst)
return arr
[docs]def extract_(dstore, dspath):
"""
Extracts an HDF5 path object from the datastore, for instance
extract(dstore, 'sitecol').
"""
obj = dstore[dspath]
if isinstance(obj, Dataset):
return ArrayWrapper(obj.value, obj.attrs)
elif isinstance(obj, Group):
return ArrayWrapper(numpy.array(list(obj)), obj.attrs)
else:
return obj
[docs]class Extract(dict):
"""
A callable dictionary of functions with a single instance called
`extract`. Then `extract(dstore, fullkey)` dispatches to the function
determined by the first part of `fullkey` (a slash-separated
string) by passing as argument the second part of `fullkey`.
For instance extract(dstore, 'sitecol'), extract(dstore, 'asset_values/0')
etc.
"""
[docs] def add(self, key, cache=False):
def decorator(func):
self[key] = memoized(func) if cache else func
return func
return decorator
def __call__(self, dstore, key):
if '/' in key:
k, v = key.split('/', 1)
data = self[k](dstore, v)
elif '?' in key:
k, v = key.split('?', 1)
data = self[k](dstore, v)
elif key in self:
data = self[key](dstore, '')
else:
data = extract_(dstore, key)
return ArrayWrapper.from_(data)
extract = Extract()
# used by the QGIS plugin
[docs]@extract.add('realizations')
def extract_realizations(dstore, dummy):
"""
Extract an array of realizations. Use it as /extract/realizations
"""
rlzs = dstore['csm_info'].rlzs
dt = [('ordinal', U32), ('weight', F32), ('gsims', '<S64')]
arr = numpy.zeros(len(rlzs), dt)
arr['ordinal'] = rlzs['ordinal']
arr['weight'] = rlzs['weight']
arr['gsims'] = rlzs['gsims']
return arr
[docs]@extract.add('asset_values', cache=True)
def extract_asset_values(dstore, sid):
"""
Extract an array of asset values for the given sid. Use it as
/extract/asset_values/0
:returns:
(aid, loss_type1, ..., loss_typeN) composite array
"""
if sid:
return extract(dstore, 'asset_values')[int(sid)]
assetcol = extract(dstore, 'assetcol')
asset_refs = assetcol.asset_refs
assets_by_site = assetcol.assets_by_site()
lts = assetcol.loss_types
time_event = assetcol.time_event
dt = numpy.dtype([('aref', asset_refs.dtype), ('aid', numpy.uint32)] +
[(str(lt), numpy.float32) for lt in lts])
data = []
for assets in assets_by_site:
vals = numpy.zeros(len(assets), dt)
for a, asset in enumerate(assets):
vals[a]['aref'] = asset_refs[a]
vals[a]['aid'] = asset.ordinal
for lt in lts:
vals[a][lt] = asset.value(lt, time_event)
data.append(vals)
return data
[docs]@extract.add('asset_tags')
def extract_asset_tags(dstore, tagname):
"""
Extract an array of asset tags for the given tagname. Use it as
/extract/asset_tags or /extract/asset_tags/taxonomy
"""
tagcol = dstore['assetcol/tagcol']
if tagname:
yield tagname, barray(tagcol.gen_tags(tagname))
for tagname in tagcol.tagnames:
yield tagname, barray(tagcol.gen_tags(tagname))
[docs]def get_mesh(sitecol, complete=True):
"""
:returns:
a lon-lat or lon-lat-depth array depending if the site collection
is at sea level or not
"""
sc = sitecol.complete if complete else sitecol
if sc.at_sea_level():
mesh = numpy.zeros(len(sc), [('lon', F64), ('lat', F64)])
mesh['lon'] = sc.lons
mesh['lat'] = sc.lats
else:
mesh = numpy.zeros(len(sc), [('lon', F64), ('lat', F64),
('depth', F64)])
mesh['lon'] = sc.lons
mesh['lat'] = sc.lats
mesh['depth'] = sc.depths
return mesh
[docs]def hazard_items(dic, mesh, *extras, **kw):
"""
:param dic: dictionary of arrays of the same shape
:param mesh: a mesh array with lon, lat fields of the same length
:param extras: optional triples (field, dtype, values)
:param kw: dictionary of parameters (like investigation_time)
:returns: a list of pairs (key, value) suitable for storage in .npz format
"""
for item in kw.items():
yield item
arr = dic[next(iter(dic))]
dtlist = [(str(field), arr.dtype) for field in sorted(dic)]
for field, dtype, values in extras:
dtlist.append((str(field), dtype))
array = numpy.zeros(arr.shape, dtlist)
for field in dic:
array[field] = dic[field]
for field, dtype, values in extras:
array[field] = values
yield 'all', util.compose_arrays(mesh, array)
def _get_dict(dstore, name, imtls, stats):
dic = {}
dtlist = []
for imt, imls in imtls.items():
dt = numpy.dtype([(str(iml), F32) for iml in imls])
dtlist.append((imt, dt))
for s, stat in enumerate(stats):
dic[stat] = dstore[name][:, s].flatten().view(dtlist)
return dic
[docs]@extract.add('hcurves')
def extract_hcurves(dstore, what):
"""
Extracts hazard curves. Use it as /extract/hcurves?kind=mean or
/extract/hcurves?kind=rlz-0, /extract/hcurves?kind=stats,
/extract/hcurves?kind=rlzs etc
"""
oq = dstore['oqparam']
num_rlzs = len(dstore['weights'])
stats = oq.hazard_stats()
if what == '': # npz exports for QGIS
sitecol = dstore['sitecol']
mesh = get_mesh(sitecol, complete=False)
dic = _get_dict(dstore, 'hcurves-stats', oq.imtls, stats)
yield from hazard_items(
dic, mesh, investigation_time=oq.investigation_time)
return
params = parse(what, stats, num_rlzs)
if 'imt' in params:
[imt] = params['imt']
slc = oq.imtls(imt)
else:
slc = ALL
sids = params.get('site_id', ALL)
for k, i in params['kind'].items():
if k.startswith('rlz-'):
yield k, hdf5.extract(dstore['hcurves-rlzs'], sids, i, slc)[:, 0]
else:
yield k, hdf5.extract(dstore['hcurves-stats'], sids, i, slc)[:, 0]
yield from params.items()
[docs]@extract.add('hmaps')
def extract_hmaps(dstore, what):
"""
Extracts hazard maps. Use it as /extract/hmaps?imt=PGA
"""
oq = dstore['oqparam']
stats = oq.hazard_stats()
num_rlzs = len(dstore['weights'])
if what == '': # npz exports for QGIS
sitecol = dstore['sitecol']
mesh = get_mesh(sitecol, complete=False)
dic = _get_dict(dstore, 'hmaps-stats',
{imt: oq.poes for imt in oq.imtls}, stats)
yield from hazard_items(
dic, mesh, investigation_time=oq.investigation_time)
return
params = parse(what, stats, num_rlzs)
if 'imt' in params:
[imt] = params['imt']
m = list(oq.imtls).index(imt)
s = slice(m, m + 1)
else:
s = ALL
for k, i in params['kind'].items():
if k.startswith('rlz-'):
yield k, hdf5.extract(dstore['hmaps-rlzs'], ALL, i, s, ALL)[:, 0]
else:
yield k, hdf5.extract(dstore['hmaps-stats'], ALL, i, s, ALL)[:, 0]
yield from params.items()
[docs]@extract.add('uhs')
def extract_uhs(dstore, what):
"""
Extracts uniform hazard spectra. Use it as /extract/uhs?kind=mean or
/extract/uhs?kind=rlz-0, etc
"""
oq = dstore['oqparam']
num_rlzs = len(dstore['weights'])
stats = oq.hazard_stats()
if what == '': # npz exports for QGIS
sitecol = dstore['sitecol']
mesh = get_mesh(sitecol, complete=False)
dic = {}
for s, stat in enumerate(stats):
hmap = dstore['hmaps-stats'][:, s]
dic[stat] = calc.make_uhs(hmap, oq)
yield from hazard_items(
dic, mesh, investigation_time=oq.investigation_time)
return
params = parse(what, stats, num_rlzs)
periods = []
for m, imt in enumerate(oq.imtls):
if imt == 'PGA' or imt.startswith('SA'):
periods.append(m)
if 'site_id' in params:
sids = params['site_id']
else:
sids = ALL
for k, i in params['kind'].items():
if k.startswith('rlz-'):
yield k, hdf5.extract(
dstore['hmaps-rlzs'], sids, i, periods, ALL)[:, 0]
else:
yield k, hdf5.extract(
dstore['hmaps-stats'], sids, i, periods, ALL)[:, 0]
yield from params.items()
def _agg(losses, idxs):
shp = losses.shape[1:]
if not idxs:
# no intersection, return a 0-dim matrix
return numpy.zeros((0,) + shp, losses.dtype)
# numpy.array wants lists, not sets, hence the sorted below
return losses[numpy.array(sorted(idxs))].sum(axis=0)
def _filter_agg(assetcol, losses, selected, stats=''):
# losses is an array of shape (A, ..., R) with A=#assets, R=#realizations
aids_by_tag = assetcol.get_aids_by_tag()
idxs = set(range(len(assetcol)))
tagnames = []
for tag in selected:
tagname, tagvalue = tag.split('=', 1)
if tagvalue == '*':
tagnames.append(tagname)
else:
idxs &= aids_by_tag[tag]
if len(tagnames) > 1:
raise ValueError('Too many * as tag values in %s' % tagnames)
elif not tagnames: # return an array of shape (..., R)
return ArrayWrapper(
_agg(losses, idxs), dict(selected=encode(selected), stats=stats))
else: # return an array of shape (T, ..., R)
[tagname] = tagnames
_tags = list(assetcol.tagcol.gen_tags(tagname))
all_idxs = [idxs & aids_by_tag[t] for t in _tags]
# NB: using a generator expression for all_idxs caused issues (?)
data, tags = [], []
for idxs, tag in zip(all_idxs, _tags):
agglosses = _agg(losses, idxs)
if len(agglosses):
data.append(agglosses)
tags.append(tag)
return ArrayWrapper(
numpy.array(data),
dict(selected=encode(selected), tags=encode(tags), stats=stats))
[docs]def get_loss_type_tags(what):
try:
loss_type, query_string = what.rsplit('?', 1)
except ValueError: # no question mark
loss_type, query_string = what, ''
tags = query_string.split('&') if query_string else []
return loss_type, tags
[docs]@extract.add('agg_losses')
def extract_agg_losses(dstore, what):
"""
Aggregate losses of the given loss type and tags. Use it as
/extract/agg_losses/structural?taxonomy=RC&zipcode=20126
/extract/agg_losses/structural?taxonomy=RC&zipcode=*
:returns:
an array of shape (T, R) if one of the tag names has a `*` value
an array of shape (R,), being R the number of realizations
an array of length 0 if there is no data for the given tags
"""
loss_type, tags = get_loss_type_tags(what)
if not loss_type:
raise ValueError('loss_type not passed in agg_losses/<loss_type>')
l = dstore['oqparam'].lti[loss_type]
if 'losses_by_asset' in dstore: # scenario_risk
stats = None
losses = dstore['losses_by_asset'][:, :, l]['mean']
elif 'avg_losses-stats' in dstore: # event_based_risk, classical_risk
stats = dstore['avg_losses-stats'].attrs['stats']
losses = dstore['avg_losses-stats'][:, :, l]
elif 'avg_losses-rlzs' in dstore: # event_based_risk, classical_risk
stats = [b'mean']
losses = dstore['avg_losses-rlzs'][:, :, l]
else:
raise KeyError('No losses found in %s' % dstore)
return _filter_agg(dstore['assetcol'], losses, tags, stats)
[docs]@extract.add('agg_damages')
def extract_agg_damages(dstore, what):
"""
Aggregate damages of the given loss type and tags. Use it as
/extract/agg_damages/structural?taxonomy=RC&zipcode=20126
:returns:
array of shape (R, D), being R the number of realizations and D
the number of damage states or array of length 0 if there is no
data for the given tags
"""
loss_type, tags = get_loss_type_tags(what)
if 'dmg_by_asset' in dstore: # scenario_damage
losses = dstore['dmg_by_asset'][loss_type]['mean']
else:
raise KeyError('No damages found in %s' % dstore)
return _filter_agg(dstore['assetcol'], losses, tags)
def _get_curves(curves, li):
shp = curves.shape + curves.dtype.shape
return curves.value.view(F32).reshape(shp)[:, :, :, li]
[docs]@extract.add('agg_curves')
def extract_agg_curves(dstore, what):
"""
Aggregate loss curves of the given loss type and tags for
event based risk calculations. Use it as
/extract/agg_curves/structural?taxonomy=RC&zipcode=20126
:returns:
array of shape (S, P), being P the number of return periods
and S the number of statistics
"""
oq = dstore['oqparam']
loss_type, tags = get_loss_type_tags(what)
if 'curves-stats' in dstore: # event_based_risk
losses = _get_curves(dstore['curves-stats'], oq.lti[loss_type])
stats = dstore['curves-stats'].attrs['stats']
elif 'curves-rlzs' in dstore: # event_based_risk, 1 rlz
losses = _get_curves(dstore['curves-rlzs'], oq.lti[loss_type])
assert losses.shape[1] == 1, 'There must be a single realization'
stats = [b'mean'] # suitable to be stored as hdf5 attribute
else:
raise KeyError('No curves found in %s' % dstore)
res = _filter_agg(dstore['assetcol'], losses, tags, stats)
cc = dstore['assetcol/cost_calculator']
res.units = cc.get_units(loss_types=[loss_type])
res.return_periods = get_loss_builder(dstore).return_periods
return res
[docs]@extract.add('aggregate_by')
def extract_aggregate_by(dstore, what):
"""
/extract/aggregate_by/taxonomy,occupancy/curves/structural
yield pairs (<stat>, <array of shape (T, O, S, P)>)
/extract/aggregate_by/taxonomy,occupancy/avg_losses/structural
yield pairs (<stat>, <array of shape (T, O, S)>)
"""
try:
tagnames, name, loss_type = what.split('/')
except ValueError: # missing '/' at the end
tagnames, name = what.split('/')
loss_type = ''
assert name == 'avg_losses', name
tagnames = tagnames.split(',')
assetcol = dstore['assetcol']
oq = dstore['oqparam']
dset, stats = _get(dstore, name)
for s, stat in enumerate(stats):
if loss_type:
array = dset[:, s, oq.lti[loss_type]]
else:
array = dset[:, s]
aw = ArrayWrapper(assetcol.aggregate_by(tagnames, array), {})
for tagname in tagnames:
setattr(aw, tagname, getattr(assetcol.tagcol, tagname))
if not loss_type:
aw.extra = ('loss_type',) + oq.loss_dt().names
aw.tagnames = encode(tagnames)
yield decode(stat), aw
[docs]@extract.add('losses_by_asset')
def extract_losses_by_asset(dstore, what):
loss_dt = dstore['oqparam'].loss_dt()
rlzs = dstore['csm_info'].get_rlzs_assoc().realizations
assets = util.get_assets(dstore)
if 'losses_by_asset' in dstore:
losses_by_asset = dstore['losses_by_asset'].value
for rlz in rlzs:
# I am exporting the 'mean' and ignoring the 'stddev'
losses = cast(losses_by_asset[:, rlz.ordinal]['mean'], loss_dt)
data = util.compose_arrays(assets, losses)
yield 'rlz-%03d' % rlz.ordinal, data
elif 'avg_losses-stats' in dstore:
avg_losses = dstore['avg_losses-stats'].value
stats = dstore['avg_losses-stats'].attrs['stats']
for s, stat in enumerate(stats):
losses = cast(avg_losses[:, s], loss_dt)
data = util.compose_arrays(assets, losses)
yield stat, data
elif 'avg_losses-rlzs' in dstore: # there is only one realization
avg_losses = dstore['avg_losses-rlzs'].value
losses = cast(avg_losses, loss_dt)
data = util.compose_arrays(assets, losses)
yield 'rlz-000', data
[docs]@extract.add('losses_by_event')
def extract_losses_by_event(dstore, what):
dic = group_array(dstore['losses_by_event'].value, 'rlzi')
for rlzi in dic:
yield 'rlz-%03d' % rlzi, dic[rlzi]
def _gmf_scenario(data, num_sites, imts):
# convert data into the composite array expected by QGIS
eids = sorted(numpy.unique(data['eid']))
eid2idx = {eid: idx for idx, eid in enumerate(eids)}
E = len(eid2idx)
gmf_dt = numpy.dtype([(imt, (F32, (E,))) for imt in imts])
gmfa = numpy.zeros(num_sites, gmf_dt)
for rec in data:
arr = gmfa[rec['sid']]
for imt, gmv in zip(imts, rec['gmv']):
arr[imt][eid2idx[rec['eid']]] = gmv
return gmfa, E
[docs]@extract.add('gmf_data')
def extract_gmf_scenario_npz(dstore, what):
oq = dstore['oqparam']
mesh = get_mesh(dstore['sitecol'])
n = len(mesh)
data_by_rlzi = group_array(dstore['gmf_data/data'].value, 'rlzi')
for rlzi in data_by_rlzi:
gmfa, e = _gmf_scenario(data_by_rlzi[rlzi], n, oq.imtls)
logging.info('Exporting array of shape %s for rlz %d',
(n, e), rlzi)
yield 'rlz-%03d' % rlzi, util.compose_arrays(mesh, gmfa)
[docs]def build_damage_dt(dstore, mean_std=True):
"""
:param dstore: a datastore instance
:param mean_std: a flag (default True)
:returns:
a composite dtype loss_type -> (mean_ds1, stdv_ds1, ...) or
loss_type -> (ds1, ds2, ...) depending on the flag mean_std
"""
oq = dstore['oqparam']
damage_states = ['no_damage'] + list(
dstore.get_attr(oq.risk_model, 'limit_states'))
dt_list = []
for ds in damage_states:
ds = str(ds)
if mean_std:
dt_list.append(('%s_mean' % ds, F32))
dt_list.append(('%s_stdv' % ds, F32))
else:
dt_list.append((ds, F32))
damage_dt = numpy.dtype(dt_list)
loss_types = dstore.get_attr(oq.risk_model, 'loss_types')
return numpy.dtype([(str(lt), damage_dt) for lt in loss_types])
[docs]def build_damage_array(data, damage_dt):
"""
:param data: an array of length N with fields 'mean' and 'stddev'
:param damage_dt: a damage composite data type loss_type -> states
:returns: a composite array of length N and dtype damage_dt
"""
L = len(data) if data.shape else 1
dmg = numpy.zeros(L, damage_dt)
for lt in damage_dt.names:
for i, ms in numpy.ndenumerate(data[lt]):
if damage_dt[lt].names[0].endswith('_mean'):
lst = []
for m, s in zip(ms['mean'], ms['stddev']):
lst.append(m)
lst.append(s)
dmg[lt][i] = tuple(lst)
else:
dmg[lt][i] = ms['mean']
return dmg
[docs]@extract.add('dmg_by_asset')
def extract_dmg_by_asset_npz(dstore, what):
damage_dt = build_damage_dt(dstore)
rlzs = dstore['csm_info'].get_rlzs_assoc().realizations
data = dstore['dmg_by_asset']
assets = util.get_assets(dstore)
for rlz in rlzs:
dmg_by_asset = build_damage_array(data[:, rlz.ordinal], damage_dt)
yield 'rlz-%03d' % rlz.ordinal, util.compose_arrays(
assets, dmg_by_asset)
[docs]@extract.add('event_based_mfd')
def extract_mfd(dstore, what):
"""
Display num_ruptures by magnitude for event based calculations.
Example: http://127.0.0.1:8800/v1/calc/30/extract/event_based_mfd
"""
dd = collections.defaultdict(int)
for rup in dstore['ruptures'].value:
dd[rup['mag']] += 1
dt = numpy.dtype([('mag', float), ('freq', int)])
magfreq = numpy.array(sorted(dd.items(), key=operator.itemgetter(0)), dt)
return magfreq
[docs]@extract.add('src_loss_table')
def extract_src_loss_table(dstore, loss_type):
"""
Extract the source loss table for a give loss type, ordered in decreasing
order. Example:
http://127.0.0.1:8800/v1/calc/30/extract/src_loss_table/structural
"""
oq = dstore['oqparam']
li = oq.lti[loss_type]
source_ids = dstore['source_info']['source_id']
idxs = dstore['ruptures'].value[['srcidx', 'grp_id']]
losses = dstore['rup_loss_table'][:, li]
slt = numpy.zeros(len(source_ids), [('grp_id', U32), (loss_type, F32)])
for loss, (srcidx, grp_id) in zip(losses, idxs):
slt[srcidx][loss_type] += loss
slt[srcidx]['grp_id'] = grp_id
slt = util.compose_arrays(source_ids, slt, 'source_id')
slt.sort(order=loss_type)
return slt[::-1]
[docs]@extract.add('mean_std_curves')
def extract_mean_std_curves(dstore, what):
"""
Yield imls/IMT and poes/IMT containg mean and stddev for all sites
"""
getter = getters.PmapGetter(dstore)
arr = getter.get_mean().array
for imt in getter.imtls:
yield 'imls/' + imt, getter.imtls[imt]
yield 'poes/' + imt, arr[:, getter.imtls(imt)]
[docs]@extract.add('composite_risk_model.attrs')
def crm_attrs(dstore, what):
"""
:returns:
the attributes of the risk model, i.e. limit_states, loss_types,
min_iml and covs, needed by the risk exporters.
"""
name = dstore['oqparam'].risk_model
return ArrayWrapper((), dstore.get_attrs(name))
def _get(dstore, name):
try:
dset = dstore[name + '-stats']
return dset, [b.decode('utf8') for b in dset.attrs['stats']]
except KeyError: # single realization
return dstore[name + '-rlzs'], ['mean']
[docs]@extract.add('losses_by_tag')
@deprecated(msg='This feature will be removed soon')
def losses_by_tag(dstore, tag):
"""
Statistical average losses by tag. For instance call
$ oq extract losses_by_tag/occupancy
"""
dt = [(tag, vstr)] + dstore['oqparam'].loss_dt_list()
aids = dstore['assetcol/array'][tag]
dset, stats = _get(dstore, 'avg_losses')
arr = dset.value
tagvalues = dstore['assetcol/tagcol/' + tag][1:] # except tagvalue="?"
for s, stat in enumerate(stats):
out = numpy.zeros(len(tagvalues), dt)
for li, (lt, lt_dt) in enumerate(dt[1:]):
for i, tagvalue in enumerate(tagvalues):
out[i][tag] = tagvalue
counts = arr[aids == i + 1, s, li].sum()
if counts:
out[i][lt] = counts
yield stat, out
[docs]@extract.add('curves_by_tag')
@deprecated(msg='This feature will be removed soon')
def curves_by_tag(dstore, tag):
"""
Statistical loss curves by tag. For instance call
$ oq extract curves_by_tag/occupancy
"""
dt = ([(tag, vstr), ('return_period', U32)] +
dstore['oqparam'].loss_dt_list())
aids = dstore['assetcol/array'][tag]
dset, stats = _get(dstore, 'curves')
periods = dset.attrs['return_periods']
arr = dset.value
P = arr.shape[2] # shape (A, S, P, LI)
tagvalues = dstore['assetcol/tagcol/' + tag][1:] # except tagvalue="?"
for s, stat in enumerate(stats):
out = numpy.zeros(len(tagvalues) * P, dt)
for li, (lt, lt_dt) in enumerate(dt[2:]):
n = 0
for i, tagvalue in enumerate(tagvalues):
for p, period in enumerate(periods):
out[n][tag] = tagvalue
out[n]['return_period'] = period
counts = arr[aids == i + 1, s, p, li].sum()
if counts:
out[n][lt] = counts
n += 1
yield stat, out
[docs]@extract.add('rupture')
def extract_rupture(dstore, serial):
"""
Extract information about the given event index.
Example:
http://127.0.0.1:8800/v1/calc/30/extract/rupture/1066
"""
ridx = list(dstore['ruptures']['serial']).index(int(serial))
[getter] = getters.gen_rupture_getters(dstore, slice(ridx, ridx + 1))
yield from getter.get_rupdict().items()
[docs]@extract.add('event_info')
def extract_event_info(dstore, eidx):
"""
Extract information about the given event index.
Example:
http://127.0.0.1:8800/v1/calc/30/extract/event_info/0
"""
event = dstore['events'][int(eidx)]
serial = int(event['eid'] // TWO32)
ridx = list(dstore['ruptures']['serial']).index(serial)
[getter] = getters.gen_rupture_getters(dstore, slice(ridx, ridx + 1))
rupdict = getter.get_rupdict()
rlzi = event['rlz']
rlzs_assoc = dstore['csm_info'].get_rlzs_assoc()
gsim = rlzs_assoc.gsim_by_trt[rlzi][rupdict['trt']]
for key, val in rupdict.items():
yield key, val
yield 'rlzi', rlzi
yield 'gsim', repr(gsim)
[docs]@extract.add('ruptures_within')
def get_ruptures_within(dstore, bbox):
"""
Extract the ruptures within the given bounding box, a string
minlon,minlat,maxlon,maxlat.
Example:
http://127.0.0.1:8800/v1/calc/30/extract/ruptures_with/8,44,10,46
"""
minlon, minlat, maxlon, maxlat = map(float, bbox.split(','))
hypo = dstore['ruptures']['hypo'].T # shape (3, N)
mask = ((minlon <= hypo[0]) * (minlat <= hypo[1]) *
(maxlon >= hypo[0]) * (maxlat >= hypo[1]))
return dstore['ruptures'][mask]
[docs]@extract.add('source_geom')
def extract_source_geom(dstore, srcidxs):
"""
Extract the geometry of a given sources
Example:
http://127.0.0.1:8800/v1/calc/30/extract/source_geom/1,2,3
"""
for i in srcidxs.split(','):
rec = dstore['source_info'][int(i)]
geom = dstore['source_geom'][rec['gidx1']:rec['gidx2']]
yield rec['source_id'], geom
# ##################### extraction from the WebAPI ###################### #
[docs]class Extractor(object):
"""
A class to extract data from a calculation.
:param calc_id: a calculation ID
NB: instantiating the Extractor opens the datastore.
"""
def __init__(self, calc_id):
self.calc_id = calc_id
self.dstore = util.read(calc_id)
self.oqparam = self.dstore['oqparam']
[docs] def get(self, what):
"""
:param what: what to extract
:returns: an ArrayWrapper instance
"""
return extract(self.dstore, what)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
[docs]class WebExtractor(Extractor):
"""
A class to extract data from the WebAPI.
:param calc_id: a calculation ID
:param server: hostname of the webapi server (can be '')
:param username: login username (can be '')
:param password: login password (can be '')
NB: instantiating the WebExtractor opens a session.
"""
def __init__(self, calc_id, server=None, username=None, password=None):
self.calc_id = calc_id
self.server = config.webapi.server if server is None else server
if username is None:
username = config.webapi.username
if password is None:
password = config.webapi.password
self.sess = requests.Session()
if username:
login_url = '%s/accounts/ajax_login/' % self.server
logging.info('POST %s', login_url)
resp = self.sess.post(
login_url, data=dict(username=username, password=password))
if resp.status_code != 200:
raise WebAPIError(resp.text)
url = '%s/v1/calc/%d/oqparam' % (self.server, calc_id)
logging.info('GET %s', url)
resp = self.sess.get(url)
if resp.status_code == 404:
raise WebAPIError('Not Found: %s' % url)
elif resp.status_code != 200:
raise WebAPIError(resp.text)
self.status = self.sess.get(
'%s/v1/calc/%d/status' % (self.server, calc_id)).json()
self.oqparam = object.__new__(oqvalidation.OqParam)
vars(self.oqparam).update(resp.json())
[docs] def get(self, what):
"""
:param what: what to extract
:returns: an ArrayWrapper instance
"""
url = '%s/v1/calc/%d/extract/%s' % (self.server, self.calc_id, what)
logging.info('GET %s', url)
resp = self.sess.get(url)
if resp.status_code != 200:
raise WebAPIError(resp.text)
npz = numpy.load(io.BytesIO(resp.content))
attrs = {k: npz[k] for k in npz if k != 'array'}
try:
arr = npz['array']
except KeyError:
arr = ()
return ArrayWrapper(arr, attrs)
[docs] def dump(self, fname):
"""
Dump the remote datastore on a local path.
"""
url = '%s/v1/calc/%d/datastore' % (self.server, self.calc_id)
resp = self.sess.get(url, stream=True)
down = 0
with open(fname, 'wb') as f:
logging.info('Saving %s', fname)
for chunk in resp.iter_content(CHUNKSIZE):
f.write(chunk)
down += len(chunk)
println('Downloaded {:,} bytes'.format(down))
print()