Source code for openquake.calculators.export.risk

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2014-2023 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

import os
import json
import shutil
import tempfile
import itertools
import collections
import numpy
import pandas

from openquake.baselib import hdf5, writers, general, node, config
from openquake.baselib.python3compat import decode
from openquake.hazardlib import nrml
from openquake.hazardlib.stats import compute_stats2
from openquake.risklib import scientific
from openquake.calculators.extract import extract, sanitize, avglosses
from openquake.calculators import post_risk
from openquake.calculators.export import export, loss_curves
from openquake.calculators.export.hazard import savez
from openquake.commonlib.util import get_assets, compose_arrays

Output = collections.namedtuple('Output', 'ltype path array')
F32 = numpy.float32
F64 = numpy.float64
U16 = numpy.uint16
U32 = numpy.uint32
stat_dt = numpy.dtype([('mean', F32), ('stddev', F32)])


[docs]def get_rup_data(ebruptures): dic = {} for ebr in ebruptures: point = ebr.rupture.surface.get_middle_point() dic[ebr.id] = (ebr.rupture.mag, point.x, point.y, point.z) return dic
# ############################### exporters ############################## #
[docs]def tag2idx(tags): return {tag: i for i, tag in enumerate(tags)}
def _loss_type(ln): if ln[-4:] == '_ins': return ln[:-4] return ln
[docs]def get_aggtags(dstore): # returns a list of tag tuples if 'agg_keys' in dstore: # there was an aggregate_by aggtags = [ln.decode('utf8').split(',') for ln in dstore['agg_keys'][:]] aggtags += [('*total*',) * len(aggtags[0])] else: # np aggregate_by aggtags = [()] return aggtags
def _aggrisk(oq, aggids, aggtags, agg_values, aggrisk, md, dest): writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) cols = [col for col in aggrisk.columns if col not in {'agg_id', 'rlz_id', 'loss_id'}] csqs = [col for col in cols if not col.startswith('dmg_')] manyrlzs = hasattr(aggrisk, 'rlz_id') and len(aggrisk.rlz_id.unique()) > 1 fnames = [] K = len(agg_values) - 1 pairs = [([], aggrisk.agg_id == K)] # full aggregation for tagnames, agg_ids in zip(oq.aggregate_by, aggids): pairs.append((tagnames, numpy.isin(aggrisk.agg_id, agg_ids))) for tagnames, ok in pairs: out = general.AccumDict(accum=[]) for (agg_id, lid), df in aggrisk[ok].groupby(['agg_id', 'loss_id']): n = len(df) loss_type = scientific.LOSSTYPE[lid] if loss_type == 'occupants': loss_type += '_' + oq.time_event if loss_type == 'claim': # temporary hack continue out['loss_type'].extend([loss_type] * n) if tagnames: for tagname, tag in zip(tagnames, aggtags[agg_id]): out[tagname].extend([tag] * n) if manyrlzs: out['rlz_id'].extend(df.rlz_id) for col in cols: if col in csqs: # normally csqs = ['loss'] aval = scientific.get_agg_value( col, agg_values, agg_id, loss_type, oq.time_event) out[col + '_value'].extend(df[col]) out[col + '_ratio'].extend(df[col] / aval) else: # in ScenarioDamageTestCase:test_case_12 out[col].extend(df[col]) dsdic = {'dmg_0': 'no_damage'} for s, ls in enumerate(oq.limit_states, 1): dsdic['dmg_%d' % s] = ls df = pandas.DataFrame(out).rename(columns=dsdic) fname = dest.format('-'.join(tagnames)) writer.save(df, fname, comment=md) fnames.append(fname) return fnames
[docs]@export.add(('aggrisk', 'csv')) def export_aggrisk(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ oq = dstore['oqparam'] assetcol = dstore['assetcol'] md = dstore.metadata md.update(dict(investigation_time=oq.investigation_time, risk_investigation_time=oq.risk_investigation_time or oq.investigation_time)) aggrisk = dstore.read_df('aggrisk') dest = dstore.build_fname('aggrisk-{}', '', 'csv') agg_values = assetcol.get_agg_values( oq.aggregate_by, oq.max_aggregations) aggids, aggtags = assetcol.build_aggids( oq.aggregate_by, oq.max_aggregations) return _aggrisk(oq, aggids, aggtags, agg_values, aggrisk, md, dest)
[docs]@export.add(('aggrisk-stats', 'csv'), ('aggcurves-stats', 'csv')) def export_aggrisk_stats(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ oq = dstore['oqparam'] key = ekey[0].split('-')[0] # aggrisk or aggcurves writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) dest = dstore.build_fname(key + '-stats-{}', '', 'csv') dataf = extract(dstore, 'risk_stats/' + key) assetcol = dstore['assetcol'] agg_values = assetcol.get_agg_values( oq.aggregate_by, oq.max_aggregations) K = len(agg_values) - 1 aggids, aggtags = assetcol.build_aggids( oq.aggregate_by, oq.max_aggregations) pairs = [([], dataf.agg_id == K)] # full aggregation for tagnames, agg_ids in zip(oq.aggregate_by, aggids): pairs.append((tagnames, numpy.isin(dataf.agg_id, agg_ids))) fnames = [] for tagnames, ok in pairs: df = dataf[ok].copy() if tagnames: tagvalues = numpy.array([aggtags[agg_id] for agg_id in df.agg_id]) for n, name in enumerate(tagnames): df[name] = tagvalues[:, n] del df['agg_id'] fname = dest.format('-'.join(tagnames)) writer.save(df, fname, df.columns, comment=dstore.metadata) fnames.append(fname) return fnames
def _get_data(dstore, dskey, loss_types, stats): name, kind = dskey.split('-') # i.e. ('avg_losses', 'stats') if kind == 'stats': try: weights = dstore['weights'][()] except KeyError: # there is single realization, like in classical_risk/case_2 weights = [1.] if dskey in set(dstore): # precomputed rlzs_or_stats = list(stats) statfuncs = [stats[ros] for ros in stats] value = avglosses(dstore, loss_types, 'stats') # shape (A, S, L) elif dstore['oqparam'].collect_rlzs: rlzs_or_stats = list(stats) value = avglosses(dstore, loss_types, 'rlzs') else: # compute on the fly rlzs_or_stats, statfuncs = zip(*stats.items()) value = compute_stats2( avglosses(dstore, loss_types, 'rlzs'), statfuncs, weights) else: # rlzs value = avglosses(dstore, loss_types, kind) # shape (A, R, L) R = value.shape[1] rlzs_or_stats = ['rlz-%03d' % r for r in range(R)] return name, value, rlzs_or_stats # this is used by event_based_risk, classical_risk and scenario_risk
[docs]@export.add(('avg_losses-rlzs', 'csv'), ('avg_losses-stats', 'csv')) def export_avg_losses(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ dskey = ekey[0] oq = dstore['oqparam'] dt = [(ln, F32) for ln in oq.ext_loss_types] name, value, rlzs_or_stats = _get_data( dstore, dskey, oq.ext_loss_types, oq.hazard_stats()) writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) assets = get_assets(dstore) md = dstore.metadata md.update(dict(investigation_time=oq.investigation_time, risk_investigation_time=oq.risk_investigation_time or oq.investigation_time)) for ros, values in zip(rlzs_or_stats, value.transpose(1, 0, 2)): dest = dstore.build_fname(name, ros, 'csv') array = numpy.zeros(len(values), dt) for li, ln in enumerate(oq.ext_loss_types): array[ln] = values[:, li] writer.save(compose_arrays(assets, array), dest, comment=md, renamedict=dict(id='asset_id')) return writer.getsaved()
[docs]@export.add(('src_loss_table', 'csv')) def export_src_loss_table(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ oq = dstore['oqparam'] md = dstore.metadata md.update(dict(investigation_time=oq.investigation_time, risk_investigation_time=oq.risk_investigation_time or oq.investigation_time)) writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) for lt in dstore['src_loss_table']: aw = hdf5.ArrayWrapper.from_(dstore['src_loss_table/' + lt]) dest = dstore.build_fname('src_loss_' + lt, '', 'csv') writer.save(aw.to_dframe(), dest, comment=md) return writer.getsaved()
# this is used by all GMF-based risk calculators # NB: it exports only the event loss table, i.e. the totals
[docs]@export.add(('risk_by_event', 'csv')) def export_event_loss_table(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ oq = dstore['oqparam'] writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) dest = dstore.build_fname('risk_by_event', '', 'csv') md = dstore.metadata if 'scenario' not in oq.calculation_mode: md.update(dict(investigation_time=oq.investigation_time, risk_investigation_time=oq.risk_investigation_time or oq.investigation_time)) events = dstore.read_df('events', 'id') R = post_risk.fix_investigation_time(oq, dstore) if oq.investigation_time: eff_time = oq.investigation_time * oq.ses_per_logic_tree_path * R K = dstore.get_attr('risk_by_event', 'K', 0) try: lstates = dstore.get_attr('risk_by_event', 'limit_states').split() except KeyError: # ebrisk, no limit states lstates = [] df = dstore.read_df('risk_by_event', 'agg_id', dict(agg_id=K)) df['loss_type'] = scientific.LOSSTYPE[df.loss_id.to_numpy()] if 'variance' in df.columns: del df['variance'] ren = {'dmg_%d' % i: lstate for i, lstate in enumerate(lstates, 1)} df.rename(columns=ren, inplace=True) df = df.join(events, on='event_id') if 'ses_id' in df.columns: del df['ses_id'] if oq.collect_rlzs: df['rlz_id'] = 0 try: pla_factor = scientific.pla_factor( dstore.read_df('post_loss_amplification')) except KeyError: pla_factor = None if 'loss' in df.columns: # missing for damage dfs = [] for (loss_id, rlz), d in df.groupby(['loss_id', 'rlz_id']): d = d.sort_values('loss') if pla_factor: eperiods = eff_time / numpy.arange(len(d), 0., -1) d['pla_loss'] = pla_factor(eperiods) * d.loss dfs.append(d) df = pandas.concat(dfs) else: df = df.sort_values(['loss_id', 'event_id']) del df['rlz_id'] del df['loss_id'] if 'scenario' in oq.calculation_mode: del df['rup_id'] if 'year' in df.columns: del df['year'] writer.save(df, dest, comment=md) return writer.getsaved()
def _compact(array): # convert an array of shape (a, e) into an array of shape (a,) dt = array.dtype a, e = array.shape lst = [] for name in dt.names: lst.append((name, (dt[name], e))) return array.view(numpy.dtype(lst)).reshape(a) # this is used by classical_risk
[docs]@export.add(('loss_curves-rlzs', 'csv'), ('loss_curves-stats', 'csv'), ('loss_curves', 'csv')) def export_loss_curves(ekey, dstore): if '/' in ekey[0]: kind = ekey[0].split('/', 1)[1] else: kind = ekey[0].split('-', 1)[1] # rlzs or stats return loss_curves.LossCurveExporter(dstore).export('csv', kind)
# used by classical_risk
[docs]@export.add(('loss_maps-rlzs', 'csv'), ('loss_maps-stats', 'csv')) def export_loss_maps_csv(ekey, dstore): kind = ekey[0].split('-')[1] # rlzs or stats assets = get_assets(dstore) value = get_loss_maps(dstore, kind) oq = dstore['oqparam'] if kind == 'rlzs': rlzs_or_stats = dstore['full_lt'].get_realizations() else: rlzs_or_stats = oq.hazard_stats() writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) md = dstore.metadata for i, ros in enumerate(rlzs_or_stats): if hasattr(ros, 'ordinal'): # is a realization ros = 'rlz-%d' % ros.ordinal fname = dstore.build_fname('loss_maps', ros, ekey[1]) md.update( dict(kind=ros, risk_investigation_time=oq.risk_investigation_time or oq.investigation_time)) writer.save(compose_arrays(assets, value[:, i]), fname, comment=md, renamedict=dict(id='asset_id')) return writer.getsaved()
# used by classical_risk
[docs]@export.add(('loss_maps-rlzs', 'npz'), ('loss_maps-stats', 'npz')) def export_loss_maps_npz(ekey, dstore): kind = ekey[0].split('-')[1] # rlzs or stats assets = get_assets(dstore) value = get_loss_maps(dstore, kind) R = dstore['full_lt'].get_num_paths() if kind == 'rlzs': rlzs_or_stats = ['rlz-%03d' % r for r in range(R)] else: oq = dstore['oqparam'] rlzs_or_stats = oq.hazard_stats() fname = dstore.export_path('%s.%s' % ekey) dic = {} for i, ros in enumerate(rlzs_or_stats): dic[ros] = compose_arrays(assets, value[:, i]) savez(fname, **dic) return [fname]
# used by event_based_damage, scenario_damage, classical_damage
[docs]@export.add(('damages-rlzs', 'csv'), ('damages-stats', 'csv')) def export_damages_csv(ekey, dstore): oq = dstore['oqparam'] dmgstates = numpy.concatenate( [['no_damage'], dstore.getitem('crm').attrs['limit_states']]) ebd = oq.calculation_mode == 'event_based_damage' rlzs = dstore['full_lt'].get_realizations() orig = dstore[ekey[0]][:] # shape (A, R, L, D, P) writer = writers.CsvWriter(fmt='%.6E') assets = get_assets(dstore) md = dstore.metadata if oq.investigation_time: rit = oq.risk_investigation_time or oq.investigation_time md.update(dict(investigation_time=oq.investigation_time, risk_investigation_time=rit)) R = 1 if oq.collect_rlzs else len(rlzs) if ekey[0].endswith('stats'): rlzs_or_stats = oq.hazard_stats() else: rlzs_or_stats = ['rlz-%03d' % r for r in range(R)] name = ekey[0].split('-')[0] if oq.calculation_mode != 'classical_damage': name = 'avg_' + name csqs = tuple(dstore.getitem('crm').attrs['consequences']) for i, ros in enumerate(rlzs_or_stats): if ebd: # export only the consequences from damages-rlzs, i == 0 if len(csqs) == 0: # no consequences, export nothing return [] rate = len(dstore['events']) * oq.time_ratio / len(rlzs) data = orig[:, i] dtlist = [(col, F32) for col in data.dtype.names if col.endswith(csqs)] damages = numpy.zeros(len(data), dtlist) for csq, _ in dtlist: damages[csq] = data[csq] * rate fname = dstore.build_fname('avg_risk', ros, ekey[1]) else: # scenario_damage, classical_damage if oq.modal_damage_state: damages = modal_damage_array(orig[:, i], dmgstates) else: damages = orig[:, i] fname = dstore.build_fname(name, ros, ekey[1]) arr = compose_arrays(assets, damages) writer.save(arr, fname, comment=md, renamedict=dict(id='asset_id')) return writer.getsaved()
# emulate a Django point
[docs]class Location(object): def __init__(self, x, y): self.x, self.y = x, y self.wkt = 'POINT(%s %s)' % (x, y)
[docs]def indices(*sizes): return itertools.product(*map(range, sizes))
def _to_loss_maps(array, loss_maps_dt): # convert a 4D array into a 2D array of dtype loss_maps_dt A, R, _C, _LI = array.shape lm = numpy.zeros((A, R), loss_maps_dt) for li, name in enumerate(loss_maps_dt.names): for p, poe in enumerate(loss_maps_dt[name].names): lm[name][poe] = array[:, :, p, li] return lm
[docs]def get_loss_maps(dstore, kind): """ :param dstore: a DataStore instance :param kind: 'rlzs' or 'stats' """ oq = dstore['oqparam'] name = 'loss_maps-%s' % kind if name in dstore: # event_based risk return _to_loss_maps(dstore[name][()], oq.loss_maps_dt()) name = 'loss_curves-%s' % kind if name in dstore: # classical_risk # the loss maps are built on the fly from the loss curves loss_curves = dstore[name] loss_maps = scientific.broadcast( scientific.loss_maps, loss_curves, oq.conditional_loss_poes) return loss_maps raise KeyError('loss_maps/loss_curves missing in %s' % dstore)
[docs]def get_paths(rlz): """ :param rlz: a logic tree realization (composite or simple) :returns: a dict {'source_model_tree_path': string, 'gsim_tree_path': string} """ dic = {} if hasattr(rlz, 'sm_lt_path'): # composite realization dic['source_model_tree_path'] = '_'.join(rlz.sm_lt_path) dic['gsim_tree_path'] = '_'.join(rlz.gsim_lt_path) else: # simple GSIM realization dic['source_model_tree_path'] = '' dic['gsim_tree_path'] = '_'.join(rlz.lt_path) return dic
[docs]@export.add(('bcr-rlzs', 'csv'), ('bcr-stats', 'csv')) def export_bcr_map(ekey, dstore): oq = dstore['oqparam'] assets = get_assets(dstore) bcr_data = dstore[ekey[0]] _N, R = bcr_data.shape if ekey[0].endswith('stats'): rlzs_or_stats = oq.hazard_stats() else: rlzs_or_stats = ['rlz-%03d' % r for r in range(R)] fnames = [] writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) for t, ros in enumerate(rlzs_or_stats): path = dstore.build_fname('bcr', ros, 'csv') writer.save(compose_arrays(assets, bcr_data[:, t]), path, renamedict=dict(id='asset_id')) fnames.append(path) return writer.getsaved()
[docs]@export.add(('aggregate_by', 'csv')) def export_aggregate_by_csv(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ _token, what = ekey[0].split('/', 1) aw = extract(dstore, 'aggregate/' + what) fnames = [] writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) path = '%s.%s' % (sanitize(ekey[0]), ekey[1]) fname = dstore.export_path(path) writer.save(aw.to_dframe(), fname) fnames.append(fname) return fnames
# used in multi_risk
[docs]@export.add(('asset_risk', 'csv')) def export_asset_risk_csv(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) path = '%s.%s' % (sanitize(ekey[0]), ekey[1]) fname = dstore.export_path(path) md = json.loads(extract(dstore, 'exposure_metadata').json) tostr = {'taxonomy': md['taxonomy']} for tagname in md['tagnames']: tostr[tagname] = md[tagname] tagnames = sorted(set(md['tagnames']) - {'id'}) arr = extract(dstore, 'asset_risk').array rows = [] lossnames = sorted(name for name in arr.dtype.names if 'loss' in name) expnames = [name for name in arr.dtype.names if name not in md['tagnames'] and 'loss' not in name and name not in 'lon lat'] colnames = tagnames + ['lon', 'lat'] + expnames + lossnames # sanity check assert len(colnames) == len(arr.dtype.names) for rec in arr: row = [] for name in colnames: value = rec[name] try: row.append(tostr[name][value]) except KeyError: row.append(value) rows.append(row) writer.save(rows, fname, colnames) return [fname]
# used in multi_risk
[docs]@export.add(('agg_risk', 'csv')) def export_agg_risk_csv(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) path = '%s.%s' % (sanitize(ekey[0]), ekey[1]) fname = dstore.export_path(path) dset = dstore['agg_risk'] writer.save(dset[()], fname, dset.dtype.names) return [fname]
# used in export_aggcurves_csv def _fix(col): if col.endswith(('_aep', '_oep')): return col[:-4] # strip suffix return col
[docs]@export.add(('aggcurves', 'csv')) def export_aggcurves_csv(ekey, dstore): """ :param ekey: export key, i.e. a pair (datastore key, fmt) :param dstore: datastore object """ oq = dstore['oqparam'] assetcol = dstore['assetcol'] agg_values = assetcol.get_agg_values( oq.aggregate_by, oq.max_aggregations) aggids, aggtags = assetcol.build_aggids( oq.aggregate_by, oq.max_aggregations) E = len(dstore['events']) R = len(dstore['weights']) K = len(dstore['agg_values']) - 1 dataf = dstore.read_df('aggcurves') consequences = [col for col in dataf.columns if _fix(col) in scientific.KNOWN_CONSEQUENCES] dest = dstore.export_path('%s-{}.%s' % ekey) writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) md = dstore.metadata md['risk_investigation_time'] = (oq.risk_investigation_time or oq.investigation_time) md['num_events'] = E md['effective_time'] = ( oq.investigation_time * oq.ses_per_logic_tree_path * R) md['limit_states'] = dstore.get_attr('aggcurves', 'limit_states') # aggcurves cols = [col for col in dataf.columns if _fix(col) not in consequences and col not in ('agg_id', 'rlz_id', 'loss_id')] edic = general.AccumDict(accum=[]) manyrlzs = not oq.collect_rlzs and R > 1 fnames = [] pairs = [([], dataf.agg_id == K)] # full aggregation for tagnames, agg_ids in zip(oq.aggregate_by, aggids): pairs.append((tagnames, numpy.isin(dataf.agg_id, agg_ids))) LT = scientific.LOSSTYPE for tagnames, ok in pairs: edic = general.AccumDict(accum=[]) for (agg_id, rlz_id, loss_id), d in dataf[ok].groupby( ['agg_id', 'rlz_id', 'loss_id']): if loss_id == scientific.LOSSID['claim']: # temporary hack continue if loss_id == scientific.LOSSID['occupants']: lt = LT[loss_id] + '_' + oq.time_event else: lt = LT[loss_id] if tagnames: for tagname, tag in zip(tagnames, aggtags[agg_id]): edic[tagname].extend([tag] * len(d)) for col in cols: if not col.endswith(('_aep', '_oep')): edic[col].extend(d[col]) edic['loss_type'].extend([LT[loss_id]] * len(d)) if manyrlzs: edic['rlz_id'].extend([rlz_id] * len(d)) for cons in consequences: edic[cons + '_value'].extend(d[cons]) aval = scientific.get_agg_value( _fix(cons), agg_values, agg_id, lt, oq.time_event) edic[cons + '_ratio'].extend(d[cons] / aval) fname = dest.format('-'.join(tagnames)) writer.save(pandas.DataFrame(edic), fname, comment=md) fnames.append(fname) return fnames
[docs]@export.add(('reinsurance-risk_by_event', 'csv'), ('reinsurance-aggcurves', 'csv'), ('reinsurance-avg_portfolio', 'csv'), ('reinsurance-avg_policy', 'csv')) def export_reinsurance(ekey, dstore): dest = dstore.export_path('%s.%s' % ekey) df = dstore.read_df(ekey[0]) if 'event_id' in df.columns: events = dstore['events'][()] if 'year' not in events.dtype.names: # gmfs.hdf5 missing events df['year'] = 1 else: df['year'] = events[df.event_id.to_numpy()]['year'] if 'policy_id' in df.columns: # convert policy_id -> policy name policy_names = dstore['agg_keys'][:] df['policy_id'] = decode(policy_names[df['policy_id'].to_numpy() - 1]) fmap = json.loads(dstore.get_attr('treaty_df', 'field_map')) treaty_df = dstore.read_df('treaty_df') for code, col in zip(treaty_df.code, treaty_df.id): fmap['over_' + code] = 'overspill_' + col writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) writer.save(df.rename(columns=fmap), dest, comment=dstore.metadata) return [dest]
[docs]@export.add(('infra-avg_loss', 'csv'), ('infra-node_el', 'csv'), ('infra-taz_cl', 'csv'), ('infra-dem_cl', 'csv'), ('infra-event_ccl', 'csv'), ('infra-event_pcl', 'csv'), ('infra-event_wcl', 'csv'), ('infra-event_efl', 'csv')) def export_node_el(ekey, dstore): dest = dstore.export_path('%s.%s' % ekey) df = dstore.read_df(ekey[0]) writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) writer.save(df, dest, comment=dstore.metadata) return writer.getsaved()
[docs]def convert_df_to_vulnerability(loss_type, df): N = node.Node root = N('vulnerabilityModel', {'id': "vulnerability_model", 'assetCategory': "buildings", "lossCategory": loss_type}) descr = N('description', {}, f"{loss_type} vulnerability model") root.append(descr) for riskfunc in df.riskfunc: rfunc = json.loads(riskfunc)['openquake.risklib.scientific.VulnerabilityFunction'] vfunc = N('vulnerabilityFunction', {'id': rfunc['id'], 'dist': rfunc['distribution_name']}) imls = N('imls', {'imt': rfunc['imt']}, rfunc['imls']) vfunc.append(imls) vfunc.append(N('meanLRs', {}, rfunc['mean_loss_ratios'])) vfunc.append(N('covLRs', {}, rfunc['covs'])) root.append(vfunc) return root
[docs]def export_vulnerability_xml(dstore, edir): fnames = [] for loss_type, df in dstore.read_df('crm').groupby('loss_type'): nodeobj = convert_df_to_vulnerability(loss_type, df) dest = os.path.join(edir, '%s_vulnerability.xml' % loss_type) with open(dest, 'wb') as out: nrml.write([nodeobj], out) fnames.append(dest) return fnames
[docs]@export.add(('assetcol', 'csv')) def export_assetcol_csv(ekey, dstore): assetcol = dstore['assetcol'].array writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) df = pandas.DataFrame(assetcol) tagcol = dstore['assetcol'].tagcol tagnames = tagcol.tagnames sorted_cols = sorted([col for col in tagnames if col in df.columns]) unsorted_cols = [col for col in df.columns if col not in tagnames] df = df[unsorted_cols + sorted_cols] for asset_idx in range(len(assetcol)): for tagname in tagnames: tag_id = df[tagname][asset_idx] tag_str = tagcol.get_tag(tagname, tag_id).split('=')[1] df.loc[asset_idx, tagname] = tag_str df.drop(columns=['ordinal', 'site_id'], inplace=True) df['id'] = df['id'].apply(lambda x: x.decode('utf8')) dest_csv = dstore.export_path('%s.%s' % ekey) writer.save(df, dest_csv) return [dest_csv]
[docs]def export_exposure(dstore, edir): """ :param dstore: datastore object """ [dest] = export(('assetcol', 'csv'), dstore) assetcol_csv = os.path.join(edir, 'assetcol.csv') shutil.move(dest, assetcol_csv) tagnames = dstore['assetcol/tagcol'].tagnames cost_types = dstore.getitem('exposure') # cost_type, area_type, unit N = node.Node root = N('exposureModel', {'id': 'exposure', 'category': 'buildings'}) root.append(N('description', {}, 'Generated exposure')) conversions = N('conversions', {}) costtypes = N('costTypes', {}) for ct in cost_types: costtypes.append(N('costType', { 'name': ct['loss_type'], 'type': ct['cost_type'], 'unit': ct['unit']})) conversions.append(costtypes) root.append(conversions) root.append(N('occupancyPeriods', {}, 'night')) root.append(N('tagNames', {}, tagnames)) root.append(N('assets', {}, 'assetcol.csv')) exposure_xml = os.path.join(edir, 'exposure.xml') with open(exposure_xml, 'wb') as out: nrml.write([root], out) return [exposure_xml, assetcol_csv]
[docs]@export.add(('job', 'zip')) def export_job_zip(ekey, dstore): """ Exports: - job.ini - rupture.csv - gsim_lt.xml - site_model.csv - exposure.xml and assetcol.csv - vulnerability functions.xml - taxonomy_mapping.csv """ oq = dstore['oqparam'] edir = tempfile.mkdtemp(dir=config.directory.custom_tmp or tempfile.gettempdir()) fnames = export_exposure(dstore, edir) job_ini = os.path.join(edir, 'job.ini') with open(job_ini, 'w') as out: out.write(oq.to_ini(exposure='exposure.xml')) fnames.append(job_ini) csv = extract(dstore, 'ruptures?slice=0&slice=1').array dest = os.path.join(edir, 'rupture.csv') with open(dest, 'w') as out: out.write(csv) fnames.append(dest) gsim_lt = dstore['full_lt'].gsim_lt dest = os.path.join(edir, 'gsim_logic_tree.xml') with open(dest, 'wb') as out: nrml.write([gsim_lt.to_node()], out) fnames.append(dest) fnames.extend(export_vulnerability_xml(dstore, edir)) dest = os.path.join(edir, 'taxonomy_mapping.csv') taxmap = dstore.read_df('taxmap') writer = writers.CsvWriter(fmt=writers.FIVEDIGITS) del taxmap['taxi'] writer.save(taxmap, dest) fnames.append(dest) return fnames