# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import operator
import logging
import collections
import numpy
from openquake.baselib.python3compat import zip
from openquake.baselib.performance import Monitor
from openquake.baselib.general import groupby, split_in_blocks
from openquake.hazardlib import site, calc
from openquake.risklib import scientific, riskmodels
U32 = numpy.uint32
F32 = numpy.float32
FIELDS = ('site_id', 'lon', 'lat', 'idx', 'taxonomy', 'area', 'number',
'occupants', 'deductible-', 'insurance_limit-', 'retrofitted-')
by_taxonomy = operator.attrgetter('taxonomy')
[docs]class AssetCollection(object):
D, I, R = len('deductible-'), len('insurance_limit-'), len('retrofitted-')
def __init__(self, assets_by_site, cost_calculator, time_event,
time_events=''):
self.cc = cost_calculator
self.time_event = time_event
self.time_events = time_events
self.array, self.taxonomies = self.build_asset_collection(
assets_by_site, time_event)
fields = self.array.dtype.names
self.loss_types = sorted(f for f in fields
if not f.startswith(FIELDS))
self.deduc = [n for n in fields if n.startswith('deductible-')]
self.i_lim = [n for n in fields if n.startswith('insurance_limit-')]
self.retro = [n for n in fields if n.startswith('retrofitted-')]
[docs] def assets_by_site(self):
"""
:returns: numpy array of lists with the assets by each site
"""
assetcol = self.array
site_ids = sorted(set(assetcol['site_id']))
assets_by_site = [[] for sid in site_ids]
index = dict(zip(site_ids, range(len(site_ids))))
for i, ass in enumerate(assetcol):
assets_by_site[index[ass['site_id']]].append(self[i])
return numpy.array(assets_by_site)
def __iter__(self):
for i in range(len(self)):
yield self[i]
def __getitem__(self, indices):
if isinstance(indices, int): # single asset
a = self.array[indices]
values = {lt: a[lt] for lt in self.loss_types}
if 'occupants' in self.array.dtype.names:
values['occupants_' + str(self.time_event)] = a['occupants']
return riskmodels.Asset(
a['idx'],
self.taxonomies[a['taxonomy']],
number=a['number'],
location=(a['lon'], a['lat']),
values=values,
area=a['area'],
deductibles={lt[self.D:]: a[lt] for lt in self.deduc},
insurance_limits={lt[self.I:]: a[lt] for lt in self.i_lim},
retrofitteds={lt[self.R:]: a[lt] for lt in self.retro},
calc=self.cc, ordinal=indices)
new = object.__new__(self.__class__)
new.time_event = self.time_event
new.array = self.array[indices]
new.taxonomies = self.taxonomies
return new
def __len__(self):
return len(self.array)
def __toh5__(self):
attrs = {'time_event': self.time_event or 'None',
'time_events': self.time_events,
'loss_types': self.loss_types,
'deduc': self.deduc, 'i_lim': self.i_lim, 'retro': self.retro,
'nbytes': self.array.nbytes}
return dict(array=self.array, taxonomies=self.taxonomies,
cost_calculator=self.cc), attrs
def __fromh5__(self, dic, attrs):
vars(self).update(attrs)
self.array = dic['array'].value
self.taxonomies = dic['taxonomies'].value
self.cc = dic['cost_calculator']
@staticmethod
[docs] def build_asset_collection(assets_by_site, time_event=None):
"""
:param assets_by_site: a list of lists of assets
:param time_event: a time event string (or None)
:returns: two arrays `assetcol` and `taxonomies`
"""
for assets in assets_by_site:
if len(assets):
first_asset = assets[0]
break
else: # no break
raise ValueError('There are no assets!')
candidate_loss_types = list(first_asset.values)
loss_types = []
the_occupants = 'occupants_%s' % time_event
for candidate in candidate_loss_types:
if candidate.startswith('occupants'):
if candidate == the_occupants:
loss_types.append('occupants')
# discard occupants for different time periods
else:
loss_types.append(candidate)
deductible_d = first_asset.deductibles or {}
limit_d = first_asset.insurance_limits or {}
retrofitting_d = first_asset.retrofitteds or {}
deductibles = ['deductible-%s' % name for name in deductible_d]
limits = ['insurance_limit-%s' % name for name in limit_d]
retrofittings = ['retrofitted-%s' % n for n in retrofitting_d]
float_fields = loss_types + deductibles + limits + retrofittings
taxonomies = set()
for assets in assets_by_site:
for asset in assets:
taxonomies.add(asset.taxonomy)
sorted_taxonomies = sorted(taxonomies)
asset_dt = numpy.dtype(
[('idx', U32), ('lon', F32), ('lat', F32), ('site_id', U32),
('taxonomy', U32), ('number', F32), ('area', F32)] +
[(name, float) for name in float_fields])
num_assets = sum(len(assets) for assets in assets_by_site)
assetcol = numpy.zeros(num_assets, asset_dt)
asset_ordinal = 0
fields = set(asset_dt.fields)
for sid, assets_ in enumerate(assets_by_site):
for asset in sorted(assets_, key=operator.attrgetter('id')):
asset.ordinal = asset_ordinal
record = assetcol[asset_ordinal]
asset_ordinal += 1
for field in fields:
if field == 'taxonomy':
value = sorted_taxonomies.index(asset.taxonomy)
elif field == 'number':
value = asset.number
elif field == 'area':
value = asset.area
elif field == 'idx':
value = asset.id
elif field == 'site_id':
value = sid
elif field == 'lon':
value = asset.location[0]
elif field == 'lat':
value = asset.location[1]
elif field == 'occupants':
value = asset.values[the_occupants]
else:
try:
name, lt = field.split('-')
except ValueError: # no - in field
name, lt = 'value', field
# the line below retrieve one of `deductibles`,
# `insured_limits` or `retrofitteds` ("s" suffix)
value = getattr(asset, name + 's')[lt]
record[field] = value
return assetcol, numpy.array(sorted_taxonomies, (bytes, 100))
[docs]class CompositeRiskModel(collections.Mapping):
"""
A container (imt, taxonomy) -> riskmodel.
:param riskmodels: a dictionary (imt, taxonomy) -> riskmodel
:param damage_states: None or a list of damage states
"""
def __init__(self, riskmodels, damage_states=None):
self.damage_states = damage_states # not None for damage calculations
self._riskmodels = riskmodels
self.loss_types = []
self.curve_builders = []
self.lti = {} # loss_type -> idx
self.covs = 0 # number of coefficients of variation
self.taxonomies = [] # populated in get_risk_model
[docs] def get_min_iml(self):
iml = collections.defaultdict(list)
for taxo, rm in self._riskmodels.items():
for lt, rf in rm.risk_functions.items():
iml[rf.imt].append(rf.imls[0])
return {imt: min(iml[imt]) for imt in iml}
[docs] def build_loss_dtypes(self, conditional_loss_poes, insured_losses=False):
"""
:param conditional_loss_poes:
configuration parameter
:param insured_losses:
configuration parameter
:returns:
loss_curve_dt and loss_maps_dt
"""
lst = [('poe-%s' % poe, F32) for poe in conditional_loss_poes]
if insured_losses:
lst += [(name + '_ins', pair) for name, pair in lst]
lm_dt = numpy.dtype(lst)
lc_list = []
lm_list = []
for cb in (b for b in self.curve_builders if b.user_provided):
pairs = [('losses', (F32, cb.curve_resolution)),
('poes', (F32, cb.curve_resolution)),
('avg', F32)]
if insured_losses:
pairs += [(name + '_ins', pair) for name, pair in pairs]
lc_list.append((cb.loss_type, numpy.dtype(pairs)))
lm_list.append((cb.loss_type, lm_dt))
loss_curve_dt = numpy.dtype(lc_list) if lc_list else None
loss_maps_dt = numpy.dtype(lm_list) if lm_list else None
return loss_curve_dt, loss_maps_dt
# FIXME: scheduled for removal once we change agg_curve to be built from
# the user-provided loss ratios
[docs] def build_all_loss_dtypes(self, curve_resolution, conditional_loss_poes,
insured_losses=False):
"""
:param conditional_loss_poes:
configuration parameter
:param insured_losses:
configuration parameter
:returns:
loss_curve_dt and loss_maps_dt
"""
lst = [('poe-%s' % poe, F32) for poe in conditional_loss_poes]
if insured_losses:
lst += [(name + '_ins', pair) for name, pair in lst]
lm_dt = numpy.dtype(lst)
lc_list = []
lm_list = []
for loss_type in self.loss_types:
pairs = [('losses', (F32, curve_resolution)),
('poes', (F32, curve_resolution)),
('avg', F32)]
if insured_losses:
pairs += [(name + '_ins', pair) for name, pair in pairs]
lc_list.append((loss_type, numpy.dtype(pairs)))
lm_list.append((loss_type, lm_dt))
loss_curve_dt = numpy.dtype(lc_list) if lc_list else None
loss_maps_dt = numpy.dtype(lm_list) if lm_list else None
return loss_curve_dt, loss_maps_dt
[docs] def make_curve_builders(self, oqparam):
"""
Populate the inner lists .loss_types, .curve_builders.
"""
default_loss_ratios = numpy.linspace(
0, 1, oqparam.loss_curve_resolution + 1)[1:]
loss_types = self._get_loss_types()
for l, loss_type in enumerate(loss_types):
if oqparam.calculation_mode in ('classical', 'classical_risk'):
curve_resolutions = set()
lines = []
for key in sorted(self):
rm = self[key]
if loss_type in rm.loss_ratios:
ratios = rm.loss_ratios[loss_type]
curve_resolutions.add(len(ratios))
lines.append('%s %d' % (
rm.risk_functions[loss_type], len(ratios)))
if len(curve_resolutions) > 1:
logging.info(
'Different num_loss_ratios:\n%s', '\n'.join(lines))
cb = scientific.CurveBuilder(
loss_type, ratios, True,
oqparam.conditional_loss_poes, oqparam.insured_losses,
curve_resolution=max(curve_resolutions))
elif loss_type in oqparam.loss_ratios: # loss_ratios provided
cb = scientific.CurveBuilder(
loss_type, oqparam.loss_ratios[loss_type], True,
oqparam.conditional_loss_poes, oqparam.insured_losses)
else: # no loss_ratios provided
cb = scientific.CurveBuilder(
loss_type, default_loss_ratios, False,
oqparam.conditional_loss_poes, oqparam.insured_losses)
self.curve_builders.append(cb)
self.loss_types.append(loss_type)
self.lti[loss_type] = l
[docs] def get_loss_ratios(self):
"""
:returns: a 1-dimensional composite array with loss ratios by loss type
"""
lst = [('user_provided', numpy.bool)]
for cb in self.curve_builders:
lst.append((cb.loss_type, F32, len(cb.ratios)))
loss_ratios = numpy.zeros(1, numpy.dtype(lst))
for cb in self.curve_builders:
loss_ratios['user_provided'] = cb.user_provided
loss_ratios[cb.loss_type] = tuple(cb.ratios)
return loss_ratios
def _get_loss_types(self):
"""
:returns: a sorted list with all the loss_types contained in the model
"""
ltypes = set()
for rm in self.values():
ltypes.update(rm.loss_types)
return sorted(ltypes)
def __getitem__(self, taxonomy):
return self._riskmodels[taxonomy]
def __iter__(self):
return iter(sorted(self._riskmodels))
def __len__(self):
return len(self._riskmodels)
[docs] def get_imt_taxonomies(self, imt=None):
"""
:returns: sorted list of pairs (imt, taxonomies)
"""
imt_taxonomies = collections.defaultdict(set)
for taxonomy, riskmodel in self.items():
for loss_type, rf in sorted(riskmodel.risk_functions.items()):
if imt is None or imt == rf.imt:
imt_taxonomies[rf.imt].add(riskmodel.taxonomy)
return sorted(imt_taxonomies.items())
[docs] def gen_outputs(self, riskinput, rlzs_assoc, monitor,
assetcol=None):
"""
Group the assets per taxonomy and compute the outputs by using the
underlying riskmodels. Yield the outputs generated as dictionaries
out_by_lr.
:param riskinput: a RiskInput instance
:param rlzs_assoc: a RlzsAssoc instance
:param monitor: a monitor object used to measure the performance
:param assetcol: not None only for event based risk
"""
mon_hazard = monitor('building hazard')
mon_risk = monitor('computing riskmodel', measuremem=False)
with mon_hazard:
assets_by_site = (riskinput.assets_by_site if assetcol is None
else assetcol.assets_by_site())
hazard_by_site = riskinput.get_hazard(
rlzs_assoc, mon_hazard(measuremem=False))
for sid, assets in enumerate(assets_by_site):
hazard = hazard_by_site[sid]
the_assets = groupby(assets, by_taxonomy)
for taxonomy, assets in the_assets.items():
riskmodel = self[taxonomy]
epsgetter = riskinput.epsilon_getter(
[asset.ordinal for asset in assets])
for imt, taxonomies in riskinput.imt_taxonomies:
if taxonomy in taxonomies:
with mon_risk:
yield riskmodel.out_by_lr(
imt, assets, hazard[imt], epsgetter)
if hasattr(hazard_by_site, 'close'): # for event based risk
monitor.gmfbytes = hazard_by_site.close()
def __repr__(self):
lines = ['%s: %s' % item for item in sorted(self.items())]
return '<%s(%d, %d)\n%s>' % (
self.__class__.__name__, len(lines), self.covs, '\n'.join(lines))
[docs]def make_eps(assets_by_site, num_samples, seed, correlation):
"""
:param assets_by_site: a list of lists of assets
:param int num_samples: the number of ruptures
:param int seed: a random seed
:param float correlation: the correlation coefficient
:returns: epsilons matrix of shape (num_assets, num_samples)
"""
all_assets = (a for assets in assets_by_site for a in assets)
assets_by_taxo = groupby(all_assets, by_taxonomy)
num_assets = sum(map(len, assets_by_site))
eps = numpy.zeros((num_assets, num_samples), numpy.float32)
for taxonomy, assets in assets_by_taxo.items():
# the association with the epsilons is done in order
assets.sort(key=operator.attrgetter('id'))
shape = (len(assets), num_samples)
logging.info('Building %s epsilons for taxonomy %s', shape, taxonomy)
zeros = numpy.zeros(shape)
epsilons = scientific.make_epsilons(zeros, seed, correlation)
for asset, epsrow in zip(assets, epsilons):
eps[asset.ordinal] = epsrow
return eps
[docs]class GmfCollector(object):
"""
An object storing the GMFs in memory.
"""
def __init__(self, imts, rlzs):
self.imts = imts
self.rlzs = rlzs
self.dic = collections.defaultdict(lambda: ([], []))
self.nbytes = 0
[docs] def close(self):
self.dic.clear()
return self.nbytes
[docs] def save(self, eid, imti, rlz, gmf, sids):
for gmv, sid in zip(gmf, sids):
key = '%s/%s/%s' % (sid, self.imts[imti], rlz.ordinal)
glist, elist = self.dic[key]
glist.append(gmv)
elist.append(eid)
self.nbytes += gmf.nbytes * 2
def __getitem__(self, sid):
hazard = {}
for imt in self.imts:
hazard[imt] = {}
for rlz in self.rlzs:
key = '%s/%s/%s' % (sid, imt, rlz.ordinal)
data = self.dic[key]
if data[0]:
# a pairs of F32 arrays (gmvs, eids)
hazard[imt][rlz] = (numpy.array(data[0], F32),
numpy.array(data[1], U32))
return hazard
[docs]def create(GmfColl, eb_ruptures, sitecol, imts, rlzs_assoc,
trunc_level, correl_model, min_iml, monitor=Monitor()):
"""
:param GmfColl: a GmfCollector class to be instantiated
:param eb_ruptures: a list of EBRuptures with the same trt_model_id
:param sitecol: a SiteCollection instance
:param imts: list of IMT strings
:param rlzs_assoc: a RlzsAssoc instance
:param trunc_level: truncation level
:param correl_model: correlation model instance
:param min_iml: a dictionary of minimum intensity measure levels
:param monitor: a monitor instance
:returns: a GmfCollector instance
"""
trt_id = eb_ruptures[0].trt_id
gsims = rlzs_assoc.gsims_by_trt_id[trt_id]
rlzs_by_gsim = rlzs_assoc.get_rlzs_by_gsim(trt_id)
rlzs = rlzs_assoc.realizations
ctx_mon = monitor('make contexts')
gmf_mon = monitor('compute poes')
sites = sitecol.complete
samples = rlzs_assoc.samples[trt_id]
gmfcoll = GmfColl(imts, rlzs)
for ebr in eb_ruptures:
rup = ebr.rupture
with ctx_mon:
r_sites = site.FilteredSiteCollection(ebr.indices, sites)
computer = calc.gmf.GmfComputer(
rup, r_sites, imts, gsims, trunc_level, correl_model, samples)
with gmf_mon:
data = computer.calcgmfs(
rup.seed, ebr.events, rlzs_by_gsim, min_iml)
for eid, imti, rlz, gmf_sids in data:
gmfcoll.save(eid, imti, rlz, *gmf_sids)
return gmfcoll