Source code for openquake.risklib.riskmodels

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2013-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import re
import json
import copy
import functools
import collections
import numpy
import pandas

from openquake.baselib import hdf5
from openquake.baselib.node import Node
from openquake.baselib.general import AccumDict, cached_property
from openquake.hazardlib import nrml, InvalidFile
from openquake.hazardlib.sourcewriter import obj_to_node
from openquake.risklib import scientific

U8 = numpy.uint8
U16 = numpy.uint16
U32 = numpy.uint32
F32 = numpy.float32
F64 = numpy.float64

lts = numpy.concatenate([scientific.LOSSTYPE, scientific.PERILTYPE])
LTYPE_REGEX = '|'.join(lt for lt in lts if '+' not in lt and '_ins' not in lt)
RISK_TYPE_REGEX = re.compile(r'(%s)_([\w_]+)' % LTYPE_REGEX)


def _assert_equal(d1, d2):
    d1.pop('loss_type', None)
    d2.pop('loss_type', None)
    assert sorted(d1) == sorted(d2), (sorted(d1), sorted(d2))
    for k, v in d1.items():
        if isinstance(v, dict):
            _assert_equal(v, d2[k])
        else:
            assert v == d2[k], (v, d2[k])


[docs]def get_risk_files(inputs): """ :param inputs: a dictionary key -> path name :returns: a dictionary "peril/kind/cost_type" -> fname """ rfs = {} job_ini = inputs['job_ini'] for key in sorted(inputs): if key == 'fragility': # backward compatibily for .ini files with key fragility_file # instead of structural_fragility_file rfs['earthquake/fragility/structural'] = inputs[ 'structural_fragility'] = inputs[key] del inputs['fragility'] elif key.endswith(('_fragility', '_vulnerability', '_vulnerability_retrofitted')): match = RISK_TYPE_REGEX.match(key) if match: kind = match.group(2) # fragility or vulnerability value = inputs[key] if isinstance(value, dict): # cost_type -> fname peril = match.group(1) for cost_type, fname in value.items(): rfs[f'{peril}/{kind}/{cost_type}'] = fname else: cost_type = match.group(1) rfs[f'earthquake/{kind}/{cost_type}'] = value else: raise ValueError('Invalid key in %s: %s_file' % (job_ini, key)) return rfs
# ########################### vulnerability ############################## #
[docs]def filter_vset(elem): return elem.tag.endswith('discreteVulnerabilitySet')
[docs]@obj_to_node.add('VulnerabilityFunction') def build_vf_node(vf): """ Convert a VulnerabilityFunction object into a Node suitable for XML conversion. """ nodes = [Node('imls', {'imt': vf.imt}, vf.imls), Node('meanLRs', {}, vf.mean_loss_ratios), Node('covLRs', {}, vf.covs)] return Node( 'vulnerabilityFunction', {'id': vf.id, 'dist': vf.distribution_name}, nodes=nodes)
[docs]def group_by_peril(funclist): """ Converts a list of objects with attribute .loss_type into a dictionary peril -> loss_type -> risk_function """ ddic = AccumDict(accum=AccumDict(accum=[])) # peril -> lt -> rf for rf in funclist: ddic[rf.peril][rf.loss_type].append(rf) for peril, dic in ddic.items(): for lt, lst in dic.items(): if len(lst) == 1: dic[lt] = lst[0] elif lst[1].kind == 'fragility': # EventBasedDamageTestCase.test_case_11 cf, ffl = lst ffl.cf = cf dic[lt] = ffl elif lst[1].kind == 'vulnerability_retrofitted': vf, retro = lst vf.retro = retro dic[lt] = vf else: raise RuntimeError(lst) return ddic
[docs]class RiskFuncList(list): """ A list of risk functions with attributes .id, .loss_type, .kind """
[docs] def groupby_id(self): """ :returns: dictionary id -> loss_type -> risk_function """ ddic = AccumDict(accum=[]) for rf in self: ddic[rf.id].append(rf) return {riskid: group_by_peril(rfs) for riskid, rfs in ddic.items()}
[docs]def get_risk_functions(oqparam): """ :param oqparam: an OqParam instance :returns: a list of risk functions """ job_ini = oqparam.inputs['job_ini'] rmodels = AccumDict() # (peril, loss_type, kind) -> rmodel for key, fname in get_risk_files(oqparam.inputs).items(): peril, kind, loss_type = key.split('/') # ex. earthquake/vulnerability/structural rmodel = nrml.to_python(fname) if len(rmodel) == 0: raise InvalidFile(f'{job_ini}: {fname} is empty!') rmodels[peril, loss_type, kind] = rmodel if rmodel.lossCategory is None: # NRML 0.4 continue cost_type = str(rmodel.lossCategory) rmodel_kind = rmodel.__class__.__name__ kind_ = kind.replace('_retrofitted', '') # strip retrofitted if not rmodel_kind.lower().startswith(kind_): raise ValueError( f'Error in the file "{key}_file={fname}": is ' f'of kind {rmodel_kind}, expected {kind.capitalize() + "Model"}') if cost_type != loss_type: raise ValueError( f'Error in the file "{key}_file={fname}": lossCategory is of ' f'type "{rmodel.lossCategory}", expected "{loss_type}"') cl_risk = oqparam.calculation_mode in ('classical', 'classical_risk') rlist = RiskFuncList() rlist.limit_states = [] for (peril, loss_type, kind), rm in sorted(rmodels.items()): if kind == 'fragility': for (imt, riskid), ffl in sorted(rm.items()): if not rlist.limit_states: rlist.limit_states.extend(rm.limitStates) # we are rejecting the case of loss types with different # limit states; this may change in the future assert rlist.limit_states == rm.limitStates, ( rlist.limit_states, rm.limitStates) ffl.peril = peril ffl.loss_type = loss_type ffl.kind = kind rlist.append(ffl) else: # vulnerability, vulnerability_retrofitted # only for classical_risk reduce the loss_ratios # to make sure they are strictly increasing for (imt, riskid), rf in sorted(rm.items()): rf = rf.strictly_increasing() if cl_risk else rf rf.peril = peril rf.loss_type = loss_type rf.kind = kind rlist.append(rf) return rlist
loss_poe_dt = numpy.dtype([('loss', F64), ('poe', F64)])
[docs]def rescale(curves, values): """ Multiply the losses in each curve of kind (losses, poes) by the corresponding value. :param curves: an array of shape (A, 2, C) :param values: an array of shape (A,) """ A, _, C = curves.shape assert A == len(values), (A, len(values)) array = numpy.zeros((A, C), loss_poe_dt) array['loss'] = [c * v for c, v in zip(curves[:, 0], values)] array['poe'] = curves[:, 1] return array
[docs]class PerilDict(dict): """ >>> pd = PerilDict({('earthquake', 'structural'): .23}) >>> pd['structural'] 0.23 >>> pd['structurl'] Traceback (most recent call last): ... KeyError: ('earthquake', 'structurl') """ def __getitem__(self, lt): if isinstance(lt, tuple): return dict.__getitem__(self, lt) else: # assume lt is a loss_type string return dict.__getitem__(self, ('earthquake', lt))
[docs]class RiskModel(object): """ Base class. Can be used in the tests as a mock. :param taxonomy: a taxonomy string :param risk_functions: a dict peril -> (loss_type, kind) -> risk_function """ time_event = None # used in scenario_risk compositemodel = None # set by get_crmodel alias = None # set in save_crmodel def __init__(self, calcmode, taxonomy, risk_functions, **kw): self.calcmode = calcmode self.taxonomy = taxonomy self.risk_functions = risk_functions vars(self).update(kw) # updates risk_investigation_time too steps = kw.get('lrem_steps_per_interval') if calcmode in ('classical', 'classical_risk'): self.loss_ratios = { lt: tuple(vf.mean_loss_ratios_with_steps(steps)) for lt, vf in risk_functions['earthquake'].items()} if calcmode == 'classical_bcr': self.loss_ratios_orig = {} self.loss_ratios_retro = {} for lt, vf in risk_functions['earthquake'].items(): self.loss_ratios_orig[lt] = tuple( vf.mean_loss_ratios_with_steps(steps)) self.loss_ratios_retro[lt] = tuple( vf.retro.mean_loss_ratios_with_steps(steps)) # set imt_by_lt self.imt_by_lt = {} # dictionary loss_type -> imt for lt, rf in risk_functions['earthquake'].items(): if rf.kind in ('vulnerability', 'fragility'): self.imt_by_lt[lt] = rf.imt @property def loss_types(self): """ The list of loss types in the underlying vulnerability functions, in lexicographic order """ return sorted(self.risk_functions['earthquake']) def __call__(self, assets, gmf_df, rndgen=None): meth = getattr(self, self.calcmode) res = {(peril, lt): meth(peril, lt, assets, gmf_df, rndgen) for peril in self.risk_functions for lt in self.loss_types} # for event_based_risk `res` is a map loss_type -> DataFrame(eid, aid, loss) return PerilDict(res) def __toh5__(self): return self.risk_functions, {'taxonomy': self.taxonomy} def __fromh5__(self, dic, attrs): vars(self).update(attrs) assert 'earthquake' in dic, list(dic) self.risk_functions = dic def __repr__(self): return '<%s %s>' % (self.__class__.__name__, self.taxonomy) # ######################## calculation methods ######################### #
[docs] def classical_risk(self, peril, loss_type, assets, hazard_curve, rng=None): """ :param str loss_type: the loss type considered :param assets: assets is an iterator over A :class:`openquake.risklib.scientific.Asset` instances :param hazard_curve: an array of poes :param eps: ignored, here only for API compatibility with other calculators :returns: a composite array (loss, poe) of shape (A, C) """ n = len(assets) vf = self.risk_functions[peril][loss_type] lratios = self.loss_ratios[loss_type] imls = self.hazard_imtls[vf.imt] poes = hazard_curve[self.hazard_imtls(vf.imt)] if loss_type == 'occupants': values = assets['occupants_avg'].to_numpy() else: values = assets['value-' + loss_type].to_numpy() rtime = self.risk_investigation_time or self.investigation_time lrcurves = numpy.array( [scientific.classical( vf, imls, poes, lratios, self.investigation_time, rtime)] * n) return rescale(lrcurves, values)
[docs] def classical_bcr(self, peril, loss_type, assets, hazard, rng=None): """ :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard: a dictionary col -> hazard curve :param _eps: dummy parameter, unused :returns: a list of triples (eal_orig, eal_retro, bcr_result) """ if loss_type != 'structural': raise NotImplementedError( 'retrofitted is not defined for ' + loss_type) n = len(assets) self.assets = assets vf = self.risk_functions[peril][loss_type] imls = self.hazard_imtls[vf.imt] poes = hazard[self.hazard_imtls(vf.imt)] rtime = self.risk_investigation_time or self.investigation_time curves_orig = functools.partial( scientific.classical, vf, imls, loss_ratios=self.loss_ratios_orig[loss_type], investigation_time=self.investigation_time, risk_investigation_time=rtime) curves_retro = functools.partial( scientific.classical, vf.retro, imls, loss_ratios=self.loss_ratios_retro[loss_type], investigation_time=self.investigation_time, risk_investigation_time=rtime) original_loss_curves = numpy.array([curves_orig(poes)] * n) retrofitted_loss_curves = numpy.array([curves_retro(poes)] * n) eal_original = numpy.array([scientific.average_loss(lc) for lc in original_loss_curves]) eal_retrofitted = numpy.array([scientific.average_loss(lc) for lc in retrofitted_loss_curves]) bcr_results = [ scientific.bcr( eal_original[i], eal_retrofitted[i], self.interest_rate, self.asset_life_expectancy, asset['value-' + loss_type], asset['retrofitted']) for i, asset in enumerate(assets.to_records())] return list(zip(eal_original, eal_retrofitted, bcr_results))
[docs] def classical_damage(self, peril, loss_type, assets, hazard_curve, rng=None): """ :param loss_type: the loss type :param assets: a list of N assets of the same taxonomy :param hazard_curve: a dictionary col -> hazard curve :returns: an array of N x D elements where N is the number of points and D the number of damage states. """ ffl = self.risk_functions[peril][loss_type] imls = self.hazard_imtls[ffl.imt] poes = hazard_curve[self.hazard_imtls(ffl.imt)] rtime = self.risk_investigation_time or self.investigation_time damage = scientific.classical_damage( ffl, imls, poes, investigation_time=self.investigation_time, risk_investigation_time=rtime, steps_per_interval=self.steps_per_interval) damages = numpy.array([a['value-number'] * damage for a in assets.to_records()]) return damages
[docs] def event_based_risk(self, peril, loss_type, assets, gmf_df, rndgen): """ :returns: a DataFrame with columns eid, eid, loss """ imt = self.imt_by_lt[loss_type] col = self.alias.get(imt, imt) sid = assets['site_id'] if loss_type == 'occupants': val = assets['occupants_%s' % self.time_event].to_numpy() else: val = assets['value-' + loss_type].to_numpy() asset_df = pandas.DataFrame(dict(aid=assets.index, val=val), sid) vf = self.risk_functions[peril][loss_type] return vf(asset_df, gmf_df, col, rndgen, self.minimum_asset_loss.get(loss_type, 0.))
scenario = ebrisk = scenario_risk = event_based_risk
[docs] def scenario_damage(self, peril, loss_type, assets, gmf_df, rng=None): """ :param loss_type: the loss type :param assets: a list of A assets of the same taxonomy :param gmf_df: a DataFrame of GMFs :param epsilons: dummy parameter, unused :returns: an array of shape (A, E, D) elements where N is the number of points, E the number of events and D the number of damage states. """ imt = self.imt_by_lt[loss_type] col = self.alias.get(imt, imt) gmvs = gmf_df[col].to_numpy() ffs = self.risk_functions[peril][loss_type] damages = scientific.scenario_damage(ffs, gmvs).T return numpy.array([damages] * len(assets))
event_based_damage = scenario_damage
# NB: the approach used here relies on the convention of having the # names of the arguments of the RiskModel class to be equal to the # names of the parameter in the oqparam object. This is seen as a # feature, since it forces people to be consistent with the names, # in the spirit of the 'convention over configuration' philosophy
[docs]def get_riskmodel(taxonomy, oqparam, risk_functions): """ Return an instance of the correct risk model class, depending on the attribute `calculation_mode` of the object `oqparam`. :param taxonomy: a taxonomy string :param oqparam: an object containing the parameters needed by the RiskModel class :param extra: extra parameters to pass to the RiskModel class """ extra = {'hazard_imtls': oqparam.imtls} extra['investigation_time'] = oqparam.investigation_time extra['risk_investigation_time'] = oqparam.risk_investigation_time extra['lrem_steps_per_interval'] = oqparam.lrem_steps_per_interval extra['steps_per_interval'] = oqparam.steps_per_interval extra['time_event'] = oqparam.time_event extra['minimum_asset_loss'] = oqparam.minimum_asset_loss if oqparam.calculation_mode == 'classical_bcr': extra['interest_rate'] = oqparam.interest_rate extra['asset_life_expectancy'] = oqparam.asset_life_expectancy return RiskModel(oqparam.calculation_mode, taxonomy, risk_functions, **extra)
# used only in riskmodels_test
[docs]def get_riskcomputer(dic, alias, limit_states=()): # builds a RiskComputer instance from a suitable dictionary rc = scientific.RiskComputer.__new__(scientific.RiskComputer) rc.D = len(limit_states) + 1 rc.wdic = {} rfs = AccumDict(accum=[]) steps = dic.get('lrem_steps_per_interval', 1) lts = set() riskid_perils = set() perils = set() for rlk, func in dic['risk_functions'].items(): peril, lt, riskid = rlk.split('#') perils.add(peril) riskid_perils.add((riskid, peril)) lts.add(lt) rf = hdf5.json_to_obj(json.dumps(func)) if hasattr(rf, 'init'): rf.init() rf.loss_type = lt rf.peril = peril if getattr(rf, 'retro', False): rf.retro = hdf5.json_to_obj(json.dumps(rf.retro)) rf.retro.init() rf.retro.loss_type = lt if hasattr(rf, 'array'): # fragility rf = rf.build(limit_states) rfs[riskid].append(rf) lts = sorted(lts) mal = dic.setdefault('minimum_asset_loss', {lt: 0. for lt in lts}) for riskid in rfs: rm = RiskModel(dic['calculation_mode'], 'taxonomy', group_by_peril(rfs[riskid]), lrem_steps_per_interval=steps, minimum_asset_loss=mal) rm.alias = alias rc[riskid] = rm if 'wdic' in dic: for rlt, weight in dic['wdic'].items(): riskid, peril = rlt.split('#') rc.wdic[riskid, peril] = weight else: rc.wdic = {(riskid, peril): 1. for riskid, peril in sorted(riskid_perils)} rc.P = len(perils) rc.loss_types = lts rc.minimum_asset_loss = mal rc.calculation_mode = dic['calculation_mode'] return rc
# ######################## CompositeRiskModel #########################
[docs]class ValidationError(Exception): pass
[docs]class CompositeRiskModel(collections.abc.Mapping): """ A container (riskid, kind) -> riskmodel :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fragdict: a dictionary riskid -> loss_type -> fragility functions :param vulndict: a dictionary riskid -> loss_type -> vulnerability function :param consdict: a dictionary riskid -> loss_type -> consequence functions """ tmap_df = () # to be set
[docs] @classmethod # TODO: reading new-style consequences is missing def read(cls, dstore, oqparam): """ :param dstore: a DataStore instance :returns: a :class:`CompositeRiskModel` instance """ risklist = RiskFuncList() if hasattr(dstore, 'get_attr'): # missing only in Aristotle mode, where dstore is an hdf5.File risklist.limit_states = dstore.get_attr('crm', 'limit_states') df = dstore.read_df('crm') for i, rf_json in enumerate(df.riskfunc): rf = hdf5.json_to_obj(rf_json) try: rf.peril = df.loc[i].peril except AttributeError: # in engine < 3.22 the peril was not stored rf.peril = 'earthquake' lt = rf.loss_type if rf.kind == 'fragility': # rf is a FragilityFunctionList risklist.append(rf) else: # rf is a vulnerability function rf.init() if lt.endswith('_retrofitted'): # strip _retrofitted, since len('_retrofitted') = 12 rf.loss_type = lt[:-12] rf.kind = 'vulnerability_retrofitted' else: rf.loss_type = lt rf.kind = 'vulnerability' risklist.append(rf) crm = CompositeRiskModel(oqparam, risklist) if 'taxmap' in dstore: crm.tmap_df = dstore.read_df('taxmap') return crm
def __init__(self, oqparam, risklist, consdict=()): self.oqparam = oqparam self.risklist = risklist # by taxonomy self.consdict = consdict or {} # new style consequences, by anything self.init()
[docs] def set_tmap(self, tmap_df): """ Set the attribute .tmap_df if the risk IDs in the taxonomy mapping are consistent with the fragility functions. """ self.tmap_df = tmap_df if 'consequence' not in self.oqparam.inputs: return csq_files = [] for fnames in self.oqparam.inputs['consequence'].values(): if isinstance(fnames, list): csq_files.extend(fnames) else: csq_files.append(fnames) cfs = '\n'.join(csq_files) df = self.tmap_df for peril in self.perils: for byname, coeffs in self.consdict.items(): # ex. byname = "losses_by_taxonomy" if len(coeffs): for per, risk_id, weight in zip(df.peril, df.risk_id, df.weight): if (per == '*' or per == peril) and risk_id != '?': try: coeffs[risk_id][peril] except KeyError as err: raise InvalidFile( 'Missing %s in\n%s' % (err, cfs))
[docs] def check_risk_ids(self, inputs): """ Check that there are no missing risk IDs for some risk functions """ ids_by_kind = AccumDict(accum=set()) for riskfunc in self.risklist: ids_by_kind[riskfunc.kind].add(riskfunc.id) kinds = tuple(ids_by_kind) # vulnerability, fragility, ... fnames = [fname for kind, fname in inputs.items() if kind.endswith(kinds)] if len(ids_by_kind) > 1: k = next(iter(ids_by_kind)) base_ids = set(ids_by_kind.pop(k)) for kind, ids in ids_by_kind.items(): if ids != base_ids: raise NameError( 'Check in the files %s the IDs %s' % (fnames, sorted(base_ids.symmetric_difference(ids)))) if self._riskmodels: for peril in self.perils: # check imt_by_lt has consistent loss types for all taxonomies missing = AccumDict(accum=[]) rms = [] if len(self.tmap_df): if len(self.tmap_df.peril.unique()) == 1: risk_ids = self.tmap_df.risk_id else: risk_ids = self.tmap_df[self.tmap_df.peril==peril].risk_id for risk_id in risk_ids.unique(): rms.append(self._riskmodels[risk_id]) else: rms.extend(self._riskmodels.values()) for rm in rms: # NB: in event_based_risk/case_8 the loss types are # area, number, occupants, residents for lt in self.loss_types: try: rm.imt_by_lt[lt] except KeyError: key = '%s/%s/%s' % (peril, kinds[0], lt) fname = self.oqparam._risk_files[key] missing[fname].append(rm.taxonomy) if missing: for fname, ids in missing.items(): raise InvalidFile( '%s: missing %s %s' % (fname, peril, ' '.join(ids)))
[docs] def compute_csq(self, assets, dd5, tmap_df, oq): """ :param assets: asset array :param dd5: distribution functions of shape (P, A, E, L, D) :param tmap_df: DataFrame corresponding to the given taxonomy :param oq: OqParam instance with .loss_types and .time_event :returns: a dict consequence_name, loss_type -> array[P, A, E] """ P, A, E, _L, _D = dd5.shape csq = AccumDict(accum=numpy.zeros((P, A, E))) for byname, coeffs in self.consdict.items(): # ex. byname = "losses_by_taxonomy" if len(coeffs): consequence, _tagname = byname.split('_by_') # by construction all assets have the same taxonomy for risk_id, df in tmap_df.groupby('risk_id'): for li, lt in enumerate(oq.loss_types): # dict loss_type -> coeffs for the given loss type for pi, peril in enumerate(self.perils): if len(df) == 1: [w] = df.weight else: # assume one weigth per peril [w] = df[df.peril == peril].weight coeff = (dd5[pi, :, :, li, 1:] @ coeffs[risk_id][peril][lt] * w) cAE = scientific.consequence( consequence, assets, coeff, lt, oq.time_event) csq[consequence, li][pi] += cAE return csq
[docs] def init(self): oq = self.oqparam if self.risklist: self.perils = oq.set_risk_imts(self.risklist) self.damage_states = [] self._riskmodels = {} # riskid -> crmodel if oq.calculation_mode.endswith('_bcr'): # classical_bcr calculator for riskid, risk_functions in self.risklist.groupby_id().items(): self._riskmodels[riskid] = get_riskmodel(riskid, oq, risk_functions) elif (any(rf.kind == 'fragility' for rf in self.risklist) or 'damage' in oq.calculation_mode): # classical_damage/scenario_damage calculator if oq.calculation_mode in ('classical', 'scenario'): # case when the risk files are in the job_hazard.ini file oq.calculation_mode += '_damage' if 'exposure' not in oq.inputs: raise RuntimeError( 'There are risk files in %r but not ' 'an exposure' % oq.inputs['job_ini']) self.damage_states = ['no_damage'] + list( self.risklist.limit_states) for riskid, ffs_by_lt in self.risklist.groupby_id().items(): self._riskmodels[riskid] = get_riskmodel(riskid, oq, ffs_by_lt) else: # classical, event based and scenario calculators for riskid, vfs in self.risklist.groupby_id().items(): self._riskmodels[riskid] = get_riskmodel(riskid, oq, vfs) self.imtls = oq.imtls self.lti = {} # loss_type -> idx self.covs = 0 # number of coefficients of variation # build a sorted list with all the loss_types contained in the model ltypes = set() for rm in self.values(): ltypes.update(rm.loss_types) self.loss_types = sorted(ltypes) self.taxonomies = set() self.distributions = set() for riskid, rm in self._riskmodels.items(): self.taxonomies.add(riskid) rm.compositemodel = self for lt, rf in rm.risk_functions.items(): if hasattr(rf, 'distribution_name'): self.distributions.add(rf.distribution_name) if hasattr(rf, 'init'): # vulnerability function if oq.ignore_covs: rf.covs = numpy.zeros_like(rf.covs) rf.init() # save the number of nonzero coefficients of variation if hasattr(rf, 'covs') and rf.covs.any(): self.covs += 1 self.curve_params = self.make_curve_params() # possibly set oq.minimum_intensity iml = collections.defaultdict(list) # ._riskmodels is empty if read from the hazard calculation for riskid, rm in self._riskmodels.items(): for lt, rf in rm.risk_functions['earthquake'].items(): iml[rf.imt].append(rf.imls[0]) if oq.aristotle: pass # don't set minimum_intensity elif sum(oq.minimum_intensity.values()) == 0 and iml: oq.minimum_intensity = {imt: min(ls) for imt, ls in iml.items()}
[docs] def eid_dmg_dt(self): """ :returns: a dtype (eid, dmg) """ L = len(self.lti) D = len(self.damage_states) return numpy.dtype([('eid', U32), ('dmg', (F32, (L, D)))])
[docs] def asset_damage_dt(self, float_dmg_dist): """ :returns: a composite dtype with damages and consequences """ dt = F32 if float_dmg_dist else U32 descr = ([('agg_id', U32), ('event_id', U32), ('loss_id', U8)] + [(dc, dt) for dc in self.get_dmg_csq()]) return numpy.dtype(descr)
@cached_property def taxonomy_dict(self): """ :returns: a dict taxonomy string -> taxonomy index """ # .taxonomy must be set by the engine tdict = {taxo: idx for idx, taxo in enumerate(self.taxonomy)} return tdict
[docs] def get_consequences(self): """ :returns: the list of available consequences """ csq = [] for consequence_by_tagname, arr in self.consdict.items(): if len(arr): csq.append(consequence_by_tagname.split('_by_')[0]) return csq
[docs] def get_dmg_csq(self): """ :returns: damage states (except no_damage) plus consequences """ D = len(self.damage_states) dmgs = ['dmg_%d' % d for d in range(1, D)] return dmgs + self.get_consequences()
[docs] def multi_damage_dt(self): """ :returns: composite datatype with fields peril-loss_type-damage_state """ dcs = self.damage_states + self.get_consequences() lst = [] for peril in self.perils: for ltype in self.oqparam.loss_types: for dc in dcs: if peril == 'earthquake': field = f'{ltype}-{dc}' else: field = f'{peril}-{ltype}-{dc}' lst.append((field, F32)) return numpy.dtype(lst)
[docs] def to_multi_damage(self, array5d): """ :param array5d: array of shape (P, A, R, L, Dc) :returns: array of shape (A, R) of dtype multi_damage_dt """ P, A, R, L, Dc = array5d.shape arr = numpy.zeros((A, R), self.multi_damage_dt()) for a in range(A): for r in range(R): lst = [] for pi in range(P): for li in range(L): for di in range(Dc): lst.append(array5d[pi, a, r, li, di]) arr[a, r] = tuple(lst) return arr
[docs] def make_curve_params(self): # the CurveParams are used only in classical_risk, classical_bcr # NB: populate the inner lists .loss_types too cps = [] for lti, loss_type in enumerate(self.loss_types): if self.oqparam.calculation_mode in ( 'classical', 'classical_risk'): curve_resolutions = set() lines = [] allratios = [] for taxo in sorted(self): rm = self[taxo] rf = rm.risk_functions['earthquake'][loss_type] if loss_type in rm.loss_ratios: ratios = rm.loss_ratios[loss_type] allratios.append(ratios) curve_resolutions.add(len(ratios)) lines.append('%s %d' % (rf, len(ratios))) if len(curve_resolutions) > 1: # number of loss ratios is not the same for all taxonomies: # then use the longest array; see classical_risk case_5 allratios.sort(key=len) for rm in self.values(): if rm.loss_ratios[loss_type] != allratios[-1]: rm.loss_ratios[loss_type] = allratios[-1] # logging.debug(f'Redefining loss ratios for {rm}') cp = scientific.CurveParams( lti, loss_type, max(curve_resolutions), allratios[-1], True ) if curve_resolutions else scientific.CurveParams( lti, loss_type, 0, [], False) else: # used only to store the association l -> loss_type cp = scientific.CurveParams(lti, loss_type, 0, [], False) cps.append(cp) self.lti[loss_type] = lti return cps
[docs] def get_loss_ratios(self): """ :returns: a 1-dimensional composite array with loss ratios by loss type """ lst = [('user_provided', bool)] for cp in self.curve_params: lst.append((cp.loss_type, F32, len(cp.ratios))) loss_ratios = numpy.zeros(1, numpy.dtype(lst)) for cp in self.curve_params: loss_ratios['user_provided'] = cp.user_provided loss_ratios[cp.loss_type] = tuple(cp.ratios) return loss_ratios
def __getitem__(self, taxo): return self._riskmodels[taxo]
[docs] def get_outputs(self, asset_df, haz, sec_losses=(), rndgen=None, country='?'): """ :param asset_df: a DataFrame of assets with the same taxonomy and country :param haz: a DataFrame of GMVs on the sites of the assets :param sec_losses: a list of functions :param rndgen: a MultiEventRNG instance :returns: a list of dictionaries loss_type-> output """ # rc.pprint() # dic = rc.todict() # rc2 = get_riskcomputer(dic) # dic2 = rc2.todict() # _assert_equal(dic, dic2) [taxidx] = asset_df.taxonomy.unique() rc = scientific.RiskComputer(self, taxidx, country) out = rc.output(asset_df, haz, sec_losses, rndgen) return list(out)
def __iter__(self): return iter(sorted(self._riskmodels)) def __len__(self): return len(self._riskmodels)
[docs] def reduce(self, taxonomies): """ :param taxonomies: a set of taxonomies :returns: a new CompositeRiskModel reduced to the given taxonomies """ new = copy.copy(self) new._riskmodels = {} for riskid, rm in self._riskmodels.items(): if riskid in taxonomies: new._riskmodels[riskid] = rm rm.compositemodel = new return new
[docs] def get_attrs(self): loss_types = hdf5.array_of_vstr(self.loss_types) limit_states = hdf5.array_of_vstr(self.damage_states[1:] if self.damage_states else []) attrs = dict(covs=self.covs, loss_types=loss_types, limit_states=limit_states, consequences=self.get_consequences()) rf = next(iter(self.values())) if hasattr(rf, 'loss_ratios'): for lt in self.loss_types: attrs['loss_ratios_' + lt] = rf.loss_ratios[lt] return attrs
[docs] def to_dframe(self): """ :returns: a DataFrame containing all risk functions """ dic = {'peril': [], 'riskid': [], 'loss_type': [], 'riskfunc': []} for riskid, rm in self._riskmodels.items(): for peril, rfdict in rm.risk_functions.items(): for lt, rf in rfdict.items(): dic['peril'].append(peril) dic['riskid'].append(riskid) dic['loss_type'].append(lt) dic['riskfunc'].append(hdf5.obj_to_json(rf)) return pandas.DataFrame(dic)
def __repr__(self): lines = ['%s: %s' % item for item in sorted(self.items())] return '<%s\n%s>' % (self.__class__.__name__, '\n'.join(lines))