# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2015-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportshutilimportoperatorfromxml.etree.ElementTreeimportiterparsefromopenquake.baselib.generalimportgroupbyfromopenquake.baselib.nodeimportcontext,striptag,Nodefromopenquake.hazardlib.nrmlimportNRML05fromopenquake.hazardlibimportInvalidFile,nrml,sourceconverterfromopenquake.hazardlib.sourcewriterimportobj_to_nodefromopenquake.risklibimportscientific,read_nrml
[docs]defget_vulnerability_functions_04(fname):""" Parse the vulnerability model in NRML 0.4 format. :param fname: path of the vulnerability file :returns: a dictionary imt, taxonomy -> vulnerability function + vset """categories=dict(assetCategory=set(),lossCategory=set(),vulnerabilitySetID=set())imts=set()taxonomies=set()vf_dict={}# imt, taxonomy -> vulnerability functionforvsetinnrml.read(fname).vulnerabilityModel:categories['assetCategory'].add(vset['assetCategory'])categories['lossCategory'].add(vset['lossCategory'])categories['vulnerabilitySetID'].add(vset['vulnerabilitySetID'])IML=vset.IMLimt_str=IML['IMT']imls=~IMLimts.add(imt_str)forvfuninvset.getnodes('discreteVulnerability'):taxonomy=vfun['vulnerabilityFunctionID']iftaxonomyintaxonomies:raiseInvalidFile('Duplicated vulnerabilityFunctionID: %s: %s, line %d'%(taxonomy,fname,vfun.lineno))taxonomies.add(taxonomy)withcontext(fname,vfun):loss_ratios=~vfun.lossRatiocoefficients=~vfun.coefficientsVariationiflen(loss_ratios)!=len(imls):raiseInvalidFile('There are %d loss ratios, but %d imls: %s, line %d'%(len(loss_ratios),len(imls),fname,vfun.lossRatio.lineno))iflen(coefficients)!=len(imls):raiseInvalidFile('There are %d coefficients, but %d imls: %s, line %d'%(len(coefficients),len(imls),fname,vfun.coefficientsVariation.lineno))withcontext(fname,vfun):vf_dict[imt_str,taxonomy]=scientific.VulnerabilityFunction(taxonomy,imt_str,imls,loss_ratios,coefficients,vfun['probabilisticDistribution'])categories['id']='_'.join(sorted(categories['vulnerabilitySetID']))delcategories['vulnerabilitySetID']returnvf_dict,categories
[docs]defupgrade_file(path,multipoint):"""Upgrade to the latest NRML version"""node0=nrml.read(path)[0]shutil.copy(path,path+'.bak')# make a backup of the original filetag=striptag(node0.tag)gml=Trueiftag=='vulnerabilityModel':vf_dict,cat_dict=get_vulnerability_functions_04(path)# below I am converting into a NRML 0.5 vulnerabilityModelnode0=Node('vulnerabilityModel',cat_dict,nodes=[obj_to_node(val)forvalinvf_dict.values()])gml=Falseeliftag=='fragilityModel':node0=read_nrml.convert_fragility_model_04(nrml.read(path)[0],path)gml=Falseeliftag=='sourceModel':node0=nrml.read(path)[0]dic=groupby(node0.nodes,operator.itemgetter('tectonicRegion'))node0.nodes=[Node('sourceGroup',dict(tectonicRegion=trt,name="group %s"%i),nodes=srcs)fori,(trt,srcs)inenumerate(dic.items(),1)]ifmultipoint:sourceconverter.update_source_model(node0,path+'.bak')withopen(path,'wb')asf:nrml.write([node0],f,gml=gml)
# NB: this works only for migrations from NRML version 0.4 to 0.5# we will implement a more general solution when we will need to pass# to version 0.6
[docs]defmain(directory,dry_run=False,multipoint=False):""" Upgrade all the NRML files contained in the given directory to the latest NRML version. Works by walking all subdirectories. WARNING: there is no downgrade! """forcwd,dirs,filesinos.walk(directory):forfinfiles:path=os.path.join(cwd,f)iff.endswith('.xml'):ip=iterparse(path,events=('start',))next(ip)# read node zerotry:fulltag=next(ip)[1].tag# tag of the first nodexmlns,tag=fulltag.split('}')exceptException:# not a NRML filexmlns,tag='',''ifxmlns[1:]==NRML05:# already upgradedif'sourceModel'intagandmultipoint:print('upgrading to multiPointSources',path)node0=nrml.read(path)[0]sourceconverter.update_source_model(node0,path)withopen(path,'wb')asf:nrml.write([node0],f,gml=True)elif'nrml/0.4'inxmlnsand('vulnerability'intagor'fragility'intagor'sourceModel'intag):ifnotdry_run:print('Upgrading',path)try:upgrade_file(path,multipoint)exceptExceptionasexc:raiseprint(exc)else:print('Not upgrading',path)
main.directory='directory to consider'main.dry_run='test the upgrade without replacing the files'main.multipoint='replace PointSources with MultiPointSources'