# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
from __future__ import division
import sys
import abc
import pdb
import math
import socket
import logging
import operator
import traceback
import collections
import numpy
from openquake.hazardlib.geo import geodetic
from openquake.baselib import general, hdf5
from openquake.baselib.performance import Monitor
from openquake.risklib import riskinput, __version__
from openquake.commonlib import readinput, riskmodels, datastore, source
from openquake.commonlib.oqvalidation import OqParam
from openquake.commonlib.parallel import starmap, executor
from openquake.commonlib.views import view, rst_table, stats
from openquake.baselib.python3compat import with_metaclass
get_taxonomy = operator.attrgetter('taxonomy')
get_weight = operator.attrgetter('weight')
get_trt = operator.attrgetter('trt_model_id')
get_imt = operator.attrgetter('imt')
calculators = general.CallableDict(operator.attrgetter('calculation_mode'))
Site = collections.namedtuple('Site', 'sid lon lat')
F32 = numpy.float32
[docs]class AssetSiteAssociationError(Exception):
"""Raised when there are no hazard sites close enough to any asset"""
rlz_dt = numpy.dtype([('uid', (bytes, 200)), ('gsims', (bytes, 200)),
('weight', F32)])
logversion = {True}
[docs]def set_array(longarray, shortarray):
"""
:param longarray: a numpy array of floats of length L >= l
:param shortarray: a numpy array of floats of length l
Fill `longarray` with the values of `shortarray`, starting from the left.
If `shortarry` is shorter than `longarray`, then the remaining elements on
the right are filled with `numpy.nan` values.
"""
longarray[:len(shortarray)] = shortarray
longarray[len(shortarray):] = numpy.nan
[docs]def gsim_names(rlz):
"""
Names of the underlying GSIMs separated by spaces
"""
return ' '.join(str(v) for v in rlz.gsim_rlz.value)
[docs]class BaseCalculator(with_metaclass(abc.ABCMeta)):
"""
Abstract base class for all calculators.
:param oqparam: OqParam object
:param monitor: monitor object
:param calc_id: numeric calculation ID
"""
sitecol = datastore.persistent_attribute('sitecol')
etags = datastore.persistent_attribute('etags')
assetcol = datastore.persistent_attribute('assetcol')
cost_types = datastore.persistent_attribute('cost_types')
job_info = datastore.persistent_attribute('job_info')
performance = datastore.persistent_attribute('performance')
csm = datastore.persistent_attribute('composite_source_model')
pre_calculator = None # to be overridden
is_stochastic = False # True for scenario and event based calculators
@property
def taxonomies(self):
return self.datastore['assetcol/taxonomies'].value
def __init__(self, oqparam, monitor=Monitor(), calc_id=None):
self.monitor = monitor
self.datastore = datastore.DataStore(calc_id)
self.monitor.calc_id = self.datastore.calc_id
self.monitor.hdf5path = self.datastore.hdf5path
self.datastore.export_dir = oqparam.export_dir
self.oqparam = oqparam
[docs] def save_params(self, **kw):
"""
Update the current calculation parameters and save engine_version
"""
vars(self.oqparam).update(engine_version=__version__, **kw)
self.datastore['oqparam'] = self.oqparam # save the updated oqparam
self.datastore.flush()
[docs] def run(self, pre_execute=True, concurrent_tasks=None, close=True, **kw):
"""
Run the calculation and return the exported outputs.
"""
self.close = close
self.set_log_format()
if logversion: # make sure this is logged only once
logging.info('Using engine version %s', __version__)
logversion.pop()
if (concurrent_tasks is not None and concurrent_tasks !=
OqParam.concurrent_tasks.default):
self.oqparam.concurrent_tasks = concurrent_tasks
self.save_params(**kw)
exported = {}
try:
if pre_execute:
self.pre_execute()
result = self.execute()
self.post_execute(result)
exported = self.export(kw.get('exports', ''))
except KeyboardInterrupt:
pids = ' '.join(str(p.pid) for p in executor._processes)
sys.stderr.write(
'You can manually kill the workers with kill %s\n' % pids)
raise
except:
if kw.get('pdb'): # post-mortem debug
tb = sys.exc_info()[2]
traceback.print_exc(tb)
pdb.post_mortem(tb)
else:
logging.critical('', exc_info=True)
raise
self.clean_up()
return exported
[docs] def core_task(*args):
"""
Core routine running on the workers.
"""
raise NotImplementedError
@abc.abstractmethod
[docs] def pre_execute(self):
"""
Initialization phase.
"""
@abc.abstractmethod
[docs] def execute(self):
"""
Execution phase. Usually will run in parallel the core
function and return a dictionary with the results.
"""
@abc.abstractmethod
[docs] def post_execute(self, result):
"""
Post-processing phase of the aggregated output. It must be
overridden with the export code. It will return a dictionary
of output files.
"""
[docs] def export(self, exports=None):
"""
Export all the outputs in the datastore in the given export formats.
:returns: dictionary output_key -> sorted list of exported paths
"""
# avoid circular imports
from openquake.commonlib.export import export as exp
exported = {}
individual_curves = self.oqparam.individual_curves
if isinstance(exports, tuple):
fmts = exports
elif exports: # is a string
fmts = exports.split(',')
elif isinstance(self.oqparam.exports, tuple):
fmts = self.oqparam.exports
else: # is a string
fmts = self.oqparam.exports.split(',')
for fmt in fmts:
if not fmt:
continue
keys = set(self.datastore)
if (self.oqparam.uniform_hazard_spectra and not
self.oqparam.hazard_maps):
# do not export the hazard maps, even if they are there
keys.remove('hmaps')
for key in sorted(keys): # top level keys
if 'rlzs' in key and not individual_curves:
continue # skip individual curves
ekey = (key, fmt)
if ekey not in exp: # non-exportable output
continue
with self.monitor('export'):
exported[ekey] = exp(ekey, self.datastore)
logging.info('exported %s: %s', key, exported[ekey])
# special case for uhs which is a view
if (self.oqparam.uniform_hazard_spectra and
'hmaps' in self.datastore):
ekey = ('uhs', fmt)
exported[ekey] = exp(ekey, self.datastore)
logging.info('exported %s: %s', key, exported[ekey])
return exported
[docs] def clean_up(self):
"""
Collect the realizations and the monitoring information,
then close the datastore.
"""
if 'hcurves' in self.datastore:
self.datastore.set_nbytes('hcurves')
if 'hmaps' in self.datastore:
self.datastore.set_nbytes('hmaps')
self.datastore.flush()
if self.close: # in the engine we close later
try:
self.datastore.close()
except (RuntimeError, ValueError):
# there could be a mysterious HDF5 error
# the ValueError: Unrecognized type code -1
# happens with the command
# $ oq run event_based_risk/case_master/job.ini --exports csv
logging.warn('', exc_info=True)
[docs]def check_time_event(oqparam, time_events):
"""
Check the `time_event` parameter in the datastore, by comparing
with the periods found in the exposure.
"""
time_event = oqparam.time_event
if time_event and time_event not in time_events:
raise ValueError(
'time_event is %s in %s, but the exposure contains %s' %
(time_event, oqparam.inputs['job_ini'], ', '.join(time_events)))
[docs]class HazardCalculator(BaseCalculator):
"""
Base class for hazard calculators based on source models
"""
SourceManager = source.SourceManager
mean_curves = None # to be overridden
[docs] def assoc_assets_sites(self, sitecol):
"""
:param sitecol: a sequence of sites
:returns: a pair (filtered_sites, assets_by_site)
The new site collection is different from the original one
if some assets were discarded or if there were missing assets
for some sites.
"""
maximum_distance = self.oqparam.asset_hazard_distance
siteobjects = geodetic.GeographicObjects(
Site(sid, lon, lat) for sid, lon, lat in
zip(sitecol.sids, sitecol.lons, sitecol.lats))
assets_by_sid = general.AccumDict()
for assets in self.assets_by_site:
if len(assets):
lon, lat = assets[0].location
site, _ = siteobjects.get_closest(lon, lat, maximum_distance)
if site:
assets_by_sid += {site.sid: list(assets)}
if not assets_by_sid:
raise AssetSiteAssociationError(
'Could not associate any site to any assets within the '
'maximum distance of %s km' % maximum_distance)
mask = numpy.array([sid in assets_by_sid for sid in sitecol.sids])
assets_by_site = [assets_by_sid.get(sid, []) for sid in sitecol.sids]
return sitecol.filter(mask), numpy.array(assets_by_site)
[docs] def count_assets(self):
"""
Count how many assets are taken into consideration by the calculator
"""
return sum(len(assets) for assets in self.assets_by_site)
[docs] def compute_previous(self):
precalc = calculators[self.pre_calculator](
self.oqparam, self.monitor('precalculator'),
self.datastore.calc_id)
precalc.run()
if 'scenario' not in self.oqparam.calculation_mode:
self.csm = precalc.csm
pre_attrs = vars(precalc)
for name in ('riskmodel', 'assets_by_site'):
if name in pre_attrs:
setattr(self, name, getattr(precalc, name))
[docs] def read_previous(self, precalc_id):
parent = datastore.read(precalc_id)
self.datastore.set_parent(parent)
# copy missing parameters from the parent
params = {name: value for name, value in
vars(parent['oqparam']).items()
if name not in vars(self.oqparam)}
self.save_params(**params)
self.read_risk_data()
[docs] def basic_pre_execute(self):
self.read_risk_data()
if 'source' in self.oqparam.inputs:
with self.monitor(
'reading composite source model', autoflush=True):
self.csm = readinput.get_composite_source_model(self.oqparam)
self.datastore['csm_info'] = self.csm.info
self.rup_data = {}
self.init()
if 'source' in self.oqparam.inputs:
with self.monitor('managing sources', autoflush=True):
self.taskman = self.send_sources()
attrs = self.datastore.hdf5['composite_source_model'].attrs
attrs['weight'] = self.csm.weight
attrs['filtered_weight'] = self.csm.filtered_weight
attrs['maxweight'] = self.csm.maxweight
[docs] def pre_execute(self):
"""
Check if there is a pre_calculator or a previous calculation ID.
If yes, read the inputs by invoking the precalculator or by retrieving
the previous calculation; if not, read the inputs directly.
"""
job_info = hdf5.LiteralAttrs()
if self.pre_calculator is not None:
# the parameter hazard_calculation_id is only meaningful if
# there is a precalculator
precalc_id = self.oqparam.hazard_calculation_id
if precalc_id is None: # recompute everything
self.compute_previous()
else: # read previously computed data
self.read_previous(precalc_id)
self.init()
else: # we are in a basic calculator
self.basic_pre_execute()
if 'source' in self.oqparam.inputs:
vars(job_info).update(readinput.get_job_info(
self.oqparam, self.csm, self.sitecol))
job_info.hostname = socket.gethostname()
if hasattr(self, 'riskmodel'):
job_info.require_epsilons = bool(self.riskmodel.covs)
self.job_info = job_info
self.datastore.flush()
try:
csm_info = self.datastore['csm_info']
except KeyError:
pass
else:
csm_info.gsim_lt.check_imts(self.oqparam.imtls)
[docs] def init(self):
"""
To be overridden to initialize the datasets needed by the calculation
"""
self.random_seed = None
if 'csm_info' in self.datastore or 'csm_info' in self.datastore.parent:
self.rlzs_assoc = self.datastore['csm_info'].get_rlzs_assoc()
else: # build a fake; used by risk-from-file calculators
self.datastore['csm_info'] = fake = source.CompositionInfo.fake()
self.rlzs_assoc = fake.get_rlzs_assoc()
self.datastore['realizations'] = numpy.array(
[(r.uid, gsim_names(r), r.weight)
for r in self.rlzs_assoc.realizations], rlz_dt)
[docs] def read_exposure(self):
"""
Read the exposure, the riskmodel and update the attributes .exposure,
.sitecol, .assets_by_site, .cost_types, .taxonomies.
"""
logging.info('Reading the exposure')
with self.monitor('reading exposure', autoflush=True):
self.exposure = readinput.get_exposure(self.oqparam)
arefs = numpy.array(self.exposure.asset_refs)
self.datastore['asset_refs'] = arefs
self.datastore.set_attrs('asset_refs', nbytes=arefs.nbytes)
all_cost_types = set(self.oqparam.all_cost_types)
fname = self.oqparam.inputs['exposure']
self.cost_calculator = readinput.get_exposure_lazy(
fname, all_cost_types)[-1]
self.sitecol, self.assets_by_site = (
readinput.get_sitecol_assets(self.oqparam, self.exposure))
if len(self.exposure.cost_types):
self.cost_types = self.exposure.cost_types
[docs] def load_riskmodel(self):
"""
Read the risk model and set the attribute .riskmodel.
The riskmodel can be empty for hazard calculations.
Save the loss ratios (if any) in the datastore.
"""
rmdict = riskmodels.get_risk_models(self.oqparam)
if not rmdict: # can happen only in a hazard calculation
return
self.oqparam.set_risk_imtls(rmdict)
self.save_params() # re-save oqparam
self.riskmodel = rm = readinput.get_risk_model(self.oqparam, rmdict)
if 'taxonomies' in self.datastore:
# check that we are covering all the taxonomies in the exposure
missing = set(self.taxonomies) - set(rm.taxonomies)
if rm and missing:
raise RuntimeError('The exposure contains the taxonomies %s '
'which are not in the risk model' % missing)
# save the risk models and loss_ratios in the datastore
for taxonomy, rmodel in rm.items():
self.datastore['composite_risk_model/' + taxonomy] = (
rmodel.risk_functions)
if hasattr(rmodel, 'retro_functions'):
self.datastore[
'composite_risk_model/%s-retrofitted' % taxonomy] = (
rmodel.retro_functions)
attrs = self.datastore['composite_risk_model'].attrs
attrs['loss_types'] = rm.loss_types
attrs['min_iml'] = sorted(rm.get_min_iml().items())
if rm.damage_states:
attrs['damage_states'] = rm.damage_states
self.datastore['loss_ratios'] = rm.get_loss_ratios()
self.datastore.set_nbytes('composite_risk_model')
self.datastore.set_nbytes('loss_ratios')
self.datastore.hdf5.flush()
[docs] def read_risk_data(self):
"""
Read the exposure (if any), the risk model (if any) and then the
site collection, possibly extracted from the exposure.
"""
oq = self.oqparam
with self.monitor('reading site collection', autoflush=True):
haz_sitecol = readinput.get_site_collection(oq)
if haz_sitecol is not None:
logging.info('Read %d hazard site(s)', len(haz_sitecol))
oq_hazard = (self.datastore.parent['oqparam']
if self.datastore.parent else None)
if 'exposure' in oq.inputs:
self.read_exposure()
self.load_riskmodel() # must be called *after* read_exposure
num_assets = self.count_assets()
if self.datastore.parent:
haz_sitecol = self.datastore.parent['sitecol']
if haz_sitecol is not None and haz_sitecol != self.sitecol:
with self.monitor('assoc_assets_sites'):
self.sitecol, self.assets_by_site = \
self.assoc_assets_sites(haz_sitecol.complete)
ok_assets = self.count_assets()
num_sites = len(self.sitecol)
logging.warn('Associated %d assets to %d sites, %d discarded',
ok_assets, num_sites, num_assets - ok_assets)
elif oq_hazard and 'exposure' in oq_hazard.inputs:
logging.info('Re-using the already imported exposure')
self.load_riskmodel()
else: # no exposure
self.load_riskmodel()
self.sitecol = haz_sitecol
if oq_hazard:
parent = self.datastore.parent
if 'assetcol' in parent:
check_time_event(oq, parent['assetcol'].time_events)
if oq_hazard.time_event and oq_hazard.time_event != oq.time_event:
raise ValueError(
'The risk configuration file has time_event=%s but the '
'hazard was computed with time_event=%s' % (
oq.time_event, oq_hazard.time_event))
# asset collection
if hasattr(self, 'assets_by_site'):
self.assetcol = riskinput.AssetCollection(
self.assets_by_site, self.cost_calculator, oq.time_event,
time_events=sorted(self.exposure.time_events) or '')
elif hasattr(self, 'assetcol'):
self.assets_by_site = self.assetcol.assets_by_site()
[docs] def is_tiling(self):
"""
:returns:
True if the calculator produces more than one tile, False otherwise
"""
return (self.oqparam.calculation_mode == 'classical' and
len(self.sitecol) > self.oqparam.sites_per_tile)
[docs] def send_sources(self):
"""
Filter/split and send the sources to the workers.
:returns: a :class:`openquake.commonlib.parallel.TaskManager`
"""
oq = self.oqparam
tiles = [self.sitecol]
self.num_tiles = 1
if self.is_tiling():
hint = math.ceil(len(self.sitecol) / oq.sites_per_tile)
tiles = self.sitecol.split_in_tiles(hint)
self.num_tiles = len(tiles)
logging.info('Generating %d tiles of %d sites each',
self.num_tiles, len(tiles[0]))
manager = source.SourceManager(
self.csm, oq.maximum_distance, self.datastore,
self.monitor.new(oqparam=oq), self.random_seed,
oq.filter_sources, num_tiles=self.num_tiles)
tm = starmap(self.core_task.__func__, manager.gen_args(tiles))
manager.store_source_info(self.datastore)
return tm
[docs] def save_data_transfer(self, taskmanager):
"""
Save information about the data transfer in the risk calculation
as attributes of agg_loss_table
"""
if taskmanager.received: # nothing is received when OQ_DISTRIBUTE=no
tname = taskmanager.name
self.datastore.save('job_info', {
tname + '_sent': taskmanager.sent,
tname + '_max_received_per_task': max(taskmanager.received),
tname + '_tot_received': sum(taskmanager.received),
tname + '_num_tasks': len(taskmanager.received)})
[docs] def post_process(self):
"""For compatibility with the engine"""
[docs]class RiskCalculator(HazardCalculator):
"""
Base class for all risk calculators. A risk calculator must set the
attributes .riskmodel, .sitecol, .assets_by_site, .exposure
.riskinputs in the pre_execute phase.
"""
extra_args = () # to be overridden in subclasses
[docs] def check_poes(self, curves_by_trt_gsim):
"""Overridden in ClassicalDamage"""
[docs] def make_eps(self, num_ruptures):
"""
:param num_ruptures: the size of the epsilon array for each asset
"""
oq = self.oqparam
with self.monitor('building epsilons', autoflush=True):
return riskinput.make_eps(
self.assets_by_site, num_ruptures,
oq.master_seed, oq.asset_correlation)
[docs] def execute(self):
"""
Parallelize on the riskinputs and returns a dictionary of results.
Require a `.core_task` to be defined with signature
(riskinputs, riskmodel, rlzs_assoc, monitor).
"""
self.monitor.oqparam = self.oqparam
rlz_ids = getattr(self.oqparam, 'rlz_ids', ())
if rlz_ids:
self.rlzs_assoc = self.rlzs_assoc.extract(rlz_ids)
all_args = ((riskinput, self.riskmodel, self.rlzs_assoc) +
self.extra_args + (self.monitor,)
for riskinput in self.riskinputs)
res = starmap(self.core_task.__func__, all_args).reduce()
return res
@view.add('task_info')
[docs]def view_task_info(token, dstore):
"""
Display statistical information about the tasks performance
"""
pdata = dstore['performance_data'].value
tasks = [calc.core_task.__name__ for calc in calculators.values()]
data = ['measurement mean stddev min max num_tasks'.split()]
for task in set(tasks): # strip duplicates
records = pdata[pdata['operation'] == 'total ' + task]
if len(records):
for stat in ('time_sec', 'memory_mb'):
val = records[stat]
data.append(stats(task + '.' + stat, val))
if len(data) == 1:
return 'Not available'
return rst_table(data)