# -*- coding: utf-8 -*-
# Copyright (c) 2012-2014, GEM Foundation.
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
"""
Hazard input management for Risk calculators.
"""
import itertools
import operator
import numpy
from openquake.hazardlib.imt import from_string
from openquake.risklib import scientific
from openquake.engine import logs
from openquake.engine.db import models
from django.db import transaction
BYTES_PER_FLOAT = numpy.zeros(1, dtype=float).nbytes
[docs]class AssetSiteAssociationError(Exception):
pass
[docs]class Hazard(object):
"""
Hazard objects have attributes .hazard_output, .data and .imt.
Moreover you can extract the .hid (hazard_output.id) and the
.weight associated to the underlying realization.
The hazard .data is a numpy array of shape (N, R) where N is the
number of assets and R the number of seismic events (ruptures)
or the resolution of the hazard curve, depending on the calculator.
"""
def __init__(self, hazard_output, data, imt):
self.hazard_output = hazard_output
self.data = data
self.imt = imt
@property
def hid(self):
"""Return the id of the given hazard output"""
return self.hazard_output.id
@property
def weight(self):
"""Return the realization weight of the hazard output"""
h = self.hazard_output.output_container
if hasattr(h, 'lt_realization') and h.lt_realization:
return h.lt_realization.weight
[docs]def make_epsilons(asset_count, num_samples, seed, correlation):
"""
:param int asset_count: the number of assets
:param int num_ruptures: the number of ruptures
:param int seed: a random seed
:param float correlation: the correlation coefficient
"""
zeros = numpy.zeros((asset_count, num_samples))
return scientific.make_epsilons(zeros, seed, correlation)
[docs]class HazardGetter(object):
"""
A HazardGetter instance stores a chunk of assets and their associated
hazard data. In the case of scenario and event based calculators it
also stores the ruptures and the epsilons.
The HazardGetter must be pickable such that it
should be possible to use different strategies (e.g. distributed
or not, using postgis or not).
:attr assets:
The assets for which we want to extract the hazard
:attr site_ids:
The ids of the sites associated to the hazards
"""
def __init__(self, imt, taxonomy, hazard_outputs, assets):
self.imt = imt
self.taxonomy = taxonomy
self.hazard_outputs = hazard_outputs
self.assets = assets
self.site_ids = []
self.asset_site_ids = []
# asset_site associations
for asset in self.assets:
asset_site_id = asset.asset_site_id
assoc = models.AssetSite.objects.get(pk=asset_site_id)
self.site_ids.append(assoc.site.id)
self.asset_site_ids.append(asset_site_id)
[docs] def get_hazards(self):
"""
Return a list of Hazard instances for the given IMT.
"""
return [Hazard(ho, self._get_data(ho), self.imt)
for ho in self.hazard_outputs]
[docs] def get_data(self):
"""
Shortcut returning the hazard data when there is a single realization
"""
[hazard] = self.get_hazards()
return hazard.data
@property
def hid(self):
"""
Return the id of the hazard output, when there is a single realization
"""
[ho] = self.hazard_outputs
return ho.id
def __repr__(self):
eps = getattr(self, 'epsilons', None)
eps = '' if eps is None else ', %s epsilons' % str(eps.shape)
return "<%s %d assets%s, taxonomy=%s>" % (
self.__class__.__name__, len(self.assets), eps,
self.taxonomy)
[docs]class HazardCurveGetter(HazardGetter):
"""
Simple HazardCurve Getter that performs a spatial query for each
asset.
""" + HazardGetter.__doc__
def _get_data(self, ho):
# extract the poes for each site from the given hazard output
imt_type, sa_period, sa_damping = from_string(self.imt)
oc = ho.output_container
if oc.output.output_type == 'hazard_curve_multi':
oc = models.HazardCurve.objects.get(
output__oq_job=oc.output.oq_job,
output__output_type='hazard_curve',
statistics=oc.statistics,
lt_realization=oc.lt_realization,
imt=imt_type,
sa_period=sa_period,
sa_damping=sa_damping)
cursor = models.getcursor('job_init')
query = """\
SELECT hzrdr.hazard_curve_data.poes
FROM hzrdr.hazard_curve_data
WHERE hazard_curve_id = %s AND location = %s
"""
all_curves = []
for site_id in self.site_ids:
location = models.HazardSite.objects.get(pk=site_id).location
cursor.execute(query, (oc.id, 'SRID=4326; ' + location.wkt))
poes = cursor.fetchall()[0][0]
all_curves.append(poes)
return all_curves
[docs]def expand(array, N):
"""
Given a non-empty array with n elements, expands it to a larger
array with N elements.
>>> expand([1], 3)
array([1, 1, 1])
>>> expand([1, 2, 3], 10)
array([1, 2, 3, 1, 2, 3, 1, 2, 3, 1])
>>> expand(numpy.zeros((2, 10)), 5).shape
(5, 10)
"""
n = len(array)
assert n > 0, 'Empty array'
if n >= N:
raise ValueError('Cannot expand an array of %d elements to %d',
n, N)
return numpy.array([array[i % n] for i in xrange(N)])
[docs]def haz_out_to_ses_coll(ho):
if ho.output_type == 'gmf_scenario':
out = models.Output.objects.get(output_type='ses', oq_job=ho.oq_job)
return [out.ses]
return models.SESCollection.objects.filter(
trt_model__lt_model=ho.output_container.lt_realization.lt_model)
[docs]class GroundMotionGetter(HazardGetter):
"""
Hazard getter for loading ground motion values.
""" + HazardGetter.__doc__
def __init__(self, imt, taxonomy, hazard_outputs, assets):
"""
Perform the needed queries on the database to populate
hazards and epsilons.
"""
HazardGetter.__init__(self, imt, taxonomy, hazard_outputs, assets)
self.hazards = {} # dict ho, imt -> {site_id: {rup_id: gmv}}
self.rupture_ids = []
sescolls = set()
for ho in self.hazard_outputs:
self.hazards[ho] = self._get_gmv_dict(ho)
for sc in haz_out_to_ses_coll(ho):
sescolls.add(sc)
sescolls = sorted(sescolls)
for sc in sescolls:
self.rupture_ids.extend(
sc.get_ruptures().values_list('id', flat=True))
epsilon_rows = [] # ordered by asset_site_id
for asset_site_id in self.asset_site_ids:
row = []
for eps in models.Epsilon.objects.filter(
ses_collection__in=sescolls,
asset_site=asset_site_id):
row.extend(eps.epsilons)
epsilon_rows.append(row)
if epsilon_rows:
self.epsilons = numpy.array(epsilon_rows)
[docs] def get_epsilons(self):
"""
Expand the underlying epsilons
"""
eps = self.epsilons
# expand the inner epsilons to the right number, if needed
_n, m = eps.shape
e = len(self.rupture_ids)
if e > m: # there are more ruptures than epsilons
# notice the double transpose below; a shape (1, 3) will go into
# (1, 3); without, it would go incorrectly into (3, 3)
return expand(eps.T, e).T
return eps
def _get_gmv_dict(self, ho):
# return a nested dictionary site_id -> {rupture_id: gmv}
imt_type, sa_period, sa_damping = from_string(self.imt)
gmf_id = ho.output_container.id
if sa_period:
imt_query = 'imt=%s and sa_period=%s and sa_damping=%s'
else:
imt_query = 'imt=%s and sa_period is %s and sa_damping is %s'
gmv_dict = {} # dict site_id -> {rup_id: gmv}
cursor = models.getcursor('job_init')
cursor.execute('select site_id, rupture_ids, gmvs from '
'hzrdr.gmf_data where gmf_id=%s and site_id in %s '
'and {} order by site_id'.format(imt_query),
(gmf_id, tuple(set(self.site_ids)),
imt_type, sa_period, sa_damping))
for sid, group in itertools.groupby(cursor, operator.itemgetter(0)):
gmvs = []
ruptures = []
for site_id, rupture_ids, gmvs_chunk in group:
gmvs.extend(gmvs_chunk)
ruptures.extend(rupture_ids)
gmv_dict[sid] = dict(itertools.izip(ruptures, gmvs))
return gmv_dict
def _get_data(self, ho):
# return a list of N arrays with R elements each
all_gmvs = []
no_data = 0
gmv_dict = self.hazards[ho]
for site_id in self.site_ids:
gmv = gmv_dict.get(site_id, {})
if not gmv:
no_data += 1
array = numpy.array([gmv.get(r, 0.) for r in self.rupture_ids])
all_gmvs.append(array)
if no_data:
logs.LOG.info('No data for %d assets out of %d, IMT=%s',
no_data, len(self.site_ids), self.imt)
return all_gmvs
[docs]class RiskInitializer(object):
"""
A facility providing the brigde between the hazard (sites and outputs)
and the risk (assets and risk models). When .init_assocs is called,
populates the `asset_site` table with the associations between the assets
in the current exposure model and the sites in the previous hazard
calculation.
:param hazard_outputs:
outputs of the previous hazard calculation
:param taxonomy:
the taxonomy of the assets we are interested in
:param calc:
a risk calculator
Warning: instantiating a RiskInitializer may perform a potentially
expensive geospatial query.
"""
def __init__(self, taxonomy, calc):
self.exposure_model = calc.exposure_model
self.hazard_outputs = calc.get_hazard_outputs()
self.taxonomy = taxonomy
self.calc = calc
self.oqparam = models.OqJob.objects.get(
pk=calc.oqparam.hazard_calculation_id)
self.calculation_mode = self.calc.oqparam.calculation_mode
self.number_of_ground_motion_fields = self.oqparam.get_param(
'number_of_ground_motion_fields', 0)
max_dist = calc.best_maximum_distance * 1000 # km to meters
self.cursor = models.getcursor('job_init')
hazard_exposure = models.extract_from([self.oqparam], 'exposuremodel')
if self.exposure_model and hazard_exposure and \
self.exposure_model.id == hazard_exposure.id:
# no need of geospatial queries, just join on the location
self.assoc_query = self.cursor.mogrify("""\
WITH assocs AS (
SELECT %s, exp.id, hsite.id
FROM riski.exposure_data AS exp
JOIN hzrdi.hazard_site AS hsite
ON exp.site::text = hsite.location::text
WHERE hsite.hazard_calculation_id = %s
AND exposure_model_id = %s AND taxonomy=%s
AND ST_COVERS(ST_GeographyFromText(%s), exp.site)
)
INSERT INTO riskr.asset_site (job_id, asset_id, site_id)
SELECT * FROM assocs""", (self.calc.job.id, self.oqparam.id,
self.exposure_model.id, taxonomy,
self.calc.oqparam.region_constraint))
else:
# associate each asset to the closest hazard site
self.assoc_query = self.cursor.mogrify("""\
WITH assocs AS (
SELECT DISTINCT ON (exp.id) %s, exp.id, hsite.id
FROM riski.exposure_data AS exp
JOIN hzrdi.hazard_site AS hsite
ON ST_DWithin(exp.site, hsite.location, %s)
WHERE hsite.hazard_calculation_id = %s
AND exposure_model_id = %s AND taxonomy=%s
AND ST_COVERS(ST_GeographyFromText(%s), exp.site)
ORDER BY exp.id, ST_Distance(exp.site, hsite.location, false)
)
INSERT INTO riskr.asset_site (job_id, asset_id, site_id)
SELECT * FROM assocs""", (self.calc.job.id, max_dist, self.oqparam.id,
self.exposure_model.id, taxonomy,
self.calc.oqparam.region_constraint))
self.num_assets = 0
self._rupture_ids = {}
self.epsilons_shape = {}
[docs] def init_assocs(self):
"""
Stores the associations asset <-> site into the database
"""
with transaction.atomic(using='job_init'):
# insert the associations for the current taxonomy
self.cursor.execute(self.assoc_query)
# now read the associations just inserted
self.num_assets = models.AssetSite.objects.filter(
job=self.calc.job, asset__taxonomy=self.taxonomy).count()
# check if there are no associations
if self.num_assets == 0:
raise AssetSiteAssociationError(
'Could not associate any asset of taxonomy %s to '
'hazard sites within the distance of %s km'
% (self.taxonomy, self.calc.best_maximum_distance))
[docs] def calc_nbytes(self, epsilon_sampling=None):
"""
:param epsilon_sampling:
flag saying if the epsilon_sampling feature is enabled
:returns:
the number of bytes to be allocated (a guess)
If the hazard_outputs come from an event based or scenario computation,
populate the .epsilons_shape dictionary.
"""
if self.calculation_mode.startswith('event_based'):
lt_model_ids = set(ho.output_container.lt_realization.lt_model.id
for ho in self.hazard_outputs)
for trt_model in models.TrtModel.objects.filter(
lt_model__in=lt_model_ids):
ses_coll = models.SESCollection.objects.get(
trt_model=trt_model)
num_ruptures = ses_coll.get_ruptures().count()
samples = min(epsilon_sampling, num_ruptures) \
if epsilon_sampling else num_ruptures
self.epsilons_shape[ses_coll.id] = (self.num_assets, samples)
elif self.calculation_mode.startswith('scenario'):
[out] = self.oqparam.output_set.filter(output_type='ses')
samples = self.number_of_ground_motion_fields
self.epsilons_shape[out.ses.id] = (self.num_assets, samples)
nbytes = 0
for (n, r) in self.epsilons_shape.values():
# the max(n, r) is taken because if n > r then the limiting
# factor is the size of the correlation matrix, i.e. n
nbytes += max(n, r) * n * BYTES_PER_FLOAT
return nbytes
[docs] def init_epsilons(self, epsilon_sampling=None):
"""
:param epsilon_sampling:
flag saying if the epsilon_sampling feature is enabled
Populate the .epsilons_shape and the ._rupture_ids dictionaries.
For the calculators `event_based_risk` and `scenario_risk` also
stores the epsilons in the database for each asset_site association.
"""
oq = self.calc.oqparam
if not self.epsilons_shape:
self.calc_nbytes(epsilon_sampling)
if self.calculation_mode.startswith('event_based'):
lt_model_ids = set(ho.output_container.lt_realization.lt_model.id
for ho in self.hazard_outputs)
ses_collections = models.SESCollection.objects.filter(
trt_model__lt_model__in=lt_model_ids)
elif self.calculation_mode.startswith('scenario'):
[out] = self.oqparam.output_set.filter(output_type='ses')
ses_collections = [out.ses]
else:
ses_collections = []
for ses_coll in ses_collections:
scid = ses_coll.id # ses collection id
num_assets, num_samples = self.epsilons_shape[scid]
self._rupture_ids[scid] = ses_coll.get_ruptures(
).values_list('id', flat=True)
# build the epsilons, except for scenario_damage
if self.calculation_mode != 'scenario_damage':
logs.LOG.info('Building (%d, %d) epsilons for taxonomy %s',
num_assets, num_samples, self.taxonomy)
asset_sites = models.AssetSite.objects.filter(
job=self.calc.job, asset__taxonomy=self.taxonomy)
eps = make_epsilons(
num_assets, num_samples,
oq.master_seed, oq.asset_correlation)
models.Epsilon.saveall(ses_coll, asset_sites, eps)