# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2014-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""Utilities to read the input files recognized by the OpenQuake engine."""importosimportreimportastimportcopyimportzlibimportshutilimportzipfileimportpathlibimportloggingimporttempfileimportfunctoolsimportconfigparserimportcollectionsimportitertoolsimportjsonimportnumpyimportpandasfromscipy.spatialimportcKDTreefromscipy.spatial.distanceimportcdistimportrequestsfromshapelyimportwkt,geometryfromopenquake.baselibimportconfig,hdf5,parallel,InvalidFilefromopenquake.baselib.performanceimportMonitorfromopenquake.baselib.generalimport(random_filter,countby,get_duplicates,check_extension,gettemp,AccumDict)fromopenquake.baselib.python3compatimportzip,decodefromopenquake.baselib.nodeimportNodefromopenquake.hazardlib.constimportStdDevfromopenquake.hazardlib.geo.packagerimportfionafromopenquake.hazardlib.calc.filtersimportgetdefaultfromopenquake.hazardlib.calc.gmfimportCorrelationButNoInterIntraStdDevsfromopenquake.hazardlibimport(source,geo,site,imt,valid,sourceconverter,source_reader,nrml,pmf,logictree,gsim_lt,get_smlt)fromopenquake.hazardlib.source.ruptureimportbuild_planar_rupture_from_dictfromopenquake.hazardlib.map_arrayimportMapArrayfromopenquake.hazardlib.geo.utilsimport(spherical_to_cartesian,geohash3,get_dist)fromopenquake.hazardlib.shakemap.parsersimportconvert_to_oq_rupturefromopenquake.risklibimportasset,riskmodels,scientific,reinsurancefromopenquake.risklib.riskmodelsimportget_risk_functionsfromopenquake.commonlib.oqvalidationimportOqParamfromopenquake.qa_tests_dataimportmosaic,global_riskF32=numpy.float32F64=numpy.float64U8=numpy.uint8U16=numpy.uint16U32=numpy.uint32U64=numpy.uint64Site=collections.namedtuple('Site','sid lon lat')
[docs]classDuplicatedPoint(Exception):""" Raised when reading a CSV file with duplicated (lon, lat) pairs """
# used in extract_fom_zip
[docs]defcollect_files(dirpath,cond=lambdafullname:True):""" Recursively collect the files contained inside dirpath. :param dirpath: path to a readable directory :param cond: condition on the path to collect the file """files=set()forfnameinos.listdir(dirpath):fullname=os.path.join(dirpath,fname)ifos.path.isdir(fullname):# navigate insidefiles.update(collect_files(fullname))else:# collect filesifcond(fullname):files.add(fullname)returnsorted(files)# job_haz before job_risk
[docs]defextract_from_zip(path,ext='.ini',targetdir=None):""" Given a zip archive and an extension (by default .ini), unzip the archive into the target directory and the files with the given extension. :param path: pathname of the archive :param ext: file extension to search for :returns: filenames """targetdir=targetdirortempfile.mkdtemp(dir=config.directory.custom_tmporNone)withzipfile.ZipFile(path)asarchive:archive.extractall(targetdir)return[fforfincollect_files(targetdir)ifos.path.basename(f).endswith(ext)and'__MACOSX'notinf]
[docs]defunzip_rename(zpath,name):""" :param zpath: full path to a .zip archive :param name: exposure.xml or ssmLT.xml :returns: path to an .xml file with the same name of the archive """xpath=zpath[:-4]+'.xml'ifos.path.exists(xpath):# already unzippedreturnxpathdpath=os.path.dirname(zpath)withzipfile.ZipFile(zpath)asarchive:fornaminarchive.namelist():fname=os.path.join(dpath,nam)ifos.path.exists(fname):# already unzippedos.rename(fname,fname+'.bak')logging.warning('Overriding %s with the file in %s',fname,zpath)logging.info('Unzipping %s',zpath)archive.extractall(dpath)xname=os.path.join(dpath,name)ifos.path.exists(xname):os.rename(xname,xpath)returnxpath
[docs]defnormpath(fnames,base_path):vals=[]forfnameinfnames:val=os.path.normpath(os.path.join(base_path,fname))ifnotos.path.exists(val):raiseOSError('No such file: %s'%val)vals.append(val)returnvals
def_normalize(key,fnames,base_path):# returns (input_type, filenames)# check that all the fnames have the same extension# NB: for consequences fnames is a list of listsflatten=[]forfnameinfnames:ifisinstance(fname,list):flatten.extend(fname)else:flatten.append(fname)check_extension(flatten)input_type,_ext=key.rsplit('_',1)filenames=[]forvalinfnames:ifisinstance(val,list):val=normpath(val,base_path)elifos.path.isabs(val):raiseValueError('%s=%s is an absolute path'%(key,val))elifval.endswith('.zip'):zpath=os.path.normpath(os.path.join(base_path,val))ifkey=='exposure_file':name='exposure.xml'elifkey=='source_model_logic_tree_file':name='ssmLT.xml'elifkey=='mmi_file':filenames.append(os.path.join(base_path,val))continueelse:raiseKeyError('Unknown key %s'%key)val=unzip_rename(zpath,name)elifval.startswith('${mosaic}/'):if'mosaic'inconfig.directory:# support ${mosaic}/XXX/in/ssmLT.xmlval=val.format(**config.directory)[1:]# strip initial "$"else:continueelse:val=os.path.normpath(os.path.join(base_path,val))ifisinstance(val,str)andnotos.path.exists(val):# tested in archive_err_2raiseOSError('No such file: %s'%val)filenames.append(val)returninput_type,filenames
[docs]defupdate(params,items,base_path):""" Update a dictionary of string parameters with new parameters. Manages correctly file parameters. """forkey,valueinitems:ifkeyin('hazard_curves_csv','hazard_curves_file','gmfs_csv','gmfs_file','site_model_csv','site_model_file','source_model_file','exposure_csv','exposure_file'):input_type,fnames=_normalize(key,value.split(),base_path)params['inputs'][input_type]=fnameselifkey.endswith(('_file','_csv','_hdf5')):ifvalue.startswith('{'):dic=ast.literal_eval(value)# name -> relpathinput_type,fnames=_normalize(key,dic.values(),base_path)params['inputs'][input_type]=dict(zip(dic,fnames))params[input_type]=' '.join(dic)elifvalue:input_type,fnames=_normalize(key,[value],base_path)assertlen(fnames)in(0,1)forfnameinfnames:params['inputs'][input_type]=fnameelse:# remove the key if the value is emptybasekey,_file=key.rsplit('_',1)params['inputs'].pop(basekey,None)elif(isinstance(value,str)andvalue.endswith('.hdf5')andkey!='description'):logging.warning('The [reqv] syntax has been deprecated, see ''https://github.com/gem/oq-engine/blob/master/doc/''adv-manual/equivalent-distance-app for the new ''syntax')fname=os.path.normpath(os.path.join(base_path,value))try:reqv=params['inputs']['reqv']exceptKeyError:params['inputs']['reqv']={key:fname}else:reqv.update({key:fname})else:params[key]=value
[docs]defcheck_params(cp,fname):params_sets=[set(cp.options(section))forsectionincp.sections()]forpairinitertools.combinations(params_sets,2):params_intersection=sorted(set.intersection(*pair))ifparams_intersection:raiseInvalidFile(f'{fname}: parameter(s) {params_intersection} is(are) defined'' in multiple sections')
# NB: this function must NOT log, since it is called when the logging# is not configured yet
[docs]defget_params(job_ini,kw={}):""" Parse a .ini file or a .zip archive :param job_ini: Configuration file | zip archive | URL :param kw: Optionally override some parameters :returns: A dictionary of parameters """ifisinstance(job_ini,pathlib.Path):job_ini=str(job_ini)ifjob_ini.startswith(('http://','https://')):resp=requests.get(job_ini)job_ini=gettemp(suffix='.zip')withopen(job_ini,'wb')asf:f.write(resp.content)# directory containing the config files we're parsingjob_ini=os.path.abspath(job_ini)base_path=os.path.dirname(job_ini)params=dict(base_path=base_path,inputs={'job_ini':job_ini})input_zip=Noneifjob_ini.endswith('.zip'):input_zip=job_inijob_inis=extract_from_zip(job_ini)ifnotjob_inis:raiseNameError('Could not find job.ini inside %s'%input_zip)job_ini=job_inis[0]ifnotos.path.exists(job_ini):raiseIOError('File not found: %s'%job_ini)base_path=os.path.dirname(job_ini)params=dict(base_path=base_path,inputs={'job_ini':job_ini})cp=configparser.ConfigParser(interpolation=None)cp.read([job_ini],encoding='utf-8-sig')# skip BOM on Windowscheck_params(cp,job_ini)dic={}forsectincp.sections():dic.update(cp.items(sect))# put source_model_logic_tree_file on top of the items so that# oq-risk-tests alaska, which has a smmLT.zip file works, since# it is unzipped before and therefore the files can be read laterif'source_model_logic_tree_file'indic:fname=dic.pop('source_model_logic_tree_file')items=[('source_model_logic_tree_file',fname)]+list(dic.items())else:items=dic.items()update(params,items,base_path)ifinput_zip:params['inputs']['input_zip']=os.path.abspath(input_zip)update(params,kw.items(),base_path)# override on demandreturnparams
[docs]defis_fraction(string):""" :returns: True if the string can be converted to a probability """try:f=float(string)except(ValueError,TypeError):returnreturn0<f<1
[docs]defget_oqparam(job_ini,pkg=None,kw={},validate=True):""" Parse a dictionary of parameters from an INI-style config file. :param job_ini: Path to configuration file/archive or dictionary of parameters with a key "calculation_mode" :param pkg: Python package where to find the configuration file (optional) :param kw: Dictionary of strings to override the job parameters :returns: An :class:`openquake.commonlib.oqvalidation.OqParam` instance containing the validated and casted parameters/values parsed from the job.ini file as well as a subdictionary 'inputs' containing absolute paths to all of the files referenced in the job.ini, keyed by the parameter name. """ifnotisinstance(job_ini,dict):basedir=os.path.dirname(pkg.__file__)ifpkgelse''job_ini=get_params(os.path.join(basedir,job_ini),kw)re=os.environ.get('OQ_REDUCE')# debugging facilityifis_fraction(re):# reduce the imtls to the first imt# reduce the logic tree to one random realization# reduce the sites by a factor of `re`# reduce the ses by a factor of `re`os.environ['OQ_SAMPLE_SITES']=reses=job_ini.get('ses_per_logic_tree_path')ifses:ses=int(numpy.ceil(int(ses)*float(re)))job_ini['ses_per_logic_tree_path']=str(ses)imtls=job_ini.get('intensity_measure_types_and_levels')ifimtls:imtls=valid.intensity_measure_types_and_levels(imtls)imt=next(iter(imtls))job_ini['intensity_measure_types_and_levels']=repr({imt:imtls[imt]})oqparam=OqParam(**job_ini)oqparam._input_files=get_input_files(oqparam)ifvalidate:# always true except from oqzipoqparam.validate()returnoqparam
[docs]defget_csv_header(fname,sep=','):""" :param fname: a CSV file :param sep: the separator (default comma) :returns: the first non-commented line of fname and the file object """f=open(fname,encoding='utf-8-sig')first=next(f).strip().split(sep)whilefirst[0].startswith('#'):first=next(f).strip().split(sep)returnfirst,f
[docs]defget_mesh_exp(oqparam,h5=None):""" Extract the mesh of points to compute from the sites, the sites_csv, the region, the site model, the exposure in this order. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: a pair (mesh, exposure) both of which can be None """exposure=get_exposure(oqparam,h5)ifoqparam.impact:sm=get_site_model(oqparam,h5)mesh=geo.Mesh(sm['lon'],sm['lat'])returnmesh,exposureifoqparam.sites:mesh=geo.Mesh.from_coords(oqparam.sites)returnmesh,exposureelif'hazard_curves'inoqparam.inputs:fname=oqparam.inputs['hazard_curves']ifisinstance(fname,list):# for csvmesh,_pmap=get_pmap_from_csv(oqparam,fname)returnmesh,exposureraiseNotImplementedError('Reading from %s'%fname)elifoqparam.region_grid_spacing:ifoqparam.region:poly=geo.Polygon.from_wkt(oqparam.region)elifexposure:# in case of implicit grid the exposure takes precedence over# the site modelpoly=exposure.mesh.get_convex_hull()elif'site_model'inoqparam.inputs:# this happens in event_based/case_19, where there is an implicit# grid over the site modelsm=get_site_model(oqparam)# do not store in h5!poly=geo.Mesh(sm['lon'],sm['lat']).get_convex_hull()else:raiseInvalidFile('There is a grid spacing but not a region, ''nor a site model, nor an exposure in %s'%oqparam.inputs['job_ini'])try:logging.info('Inferring the hazard grid')mesh=poly.dilate(oqparam.region_grid_spacing).discretize(oqparam.region_grid_spacing)returngeo.Mesh.from_coords(zip(mesh.lons,mesh.lats)),exposureexceptException:raiseValueError('Could not discretize region with grid spacing ''%(region_grid_spacing)s'%vars(oqparam))# the site model has the precedence over the exposure, see the# discussion in https://github.com/gem/oq-engine/pull/5217elif'site_model'inoqparam.inputs:logging.info('Extracting the hazard sites from the site model')sm=get_site_model(oqparam,h5)mesh=geo.Mesh(sm['lon'],sm['lat'])elif'exposure'inoqparam.inputs:mesh=exposure.meshelse:mesh=Nonereturnmesh,exposure
[docs]defget_poor_site_model(fname):""" :returns: a poor site model with only lon, lat fields """withopen(fname,encoding='utf-8-sig')asf:data=[ln.replace(',',' ')forlninf]coords=sorted(valid.coordinates(','.join(data)))# sorting the coordinates so that event_based do not depend on the orderdt=[('lon',float),('lat',float),('depth',float)]returnnumpy.array(coords,dt)
[docs]defrup_radius(rup):""" Maximum distance from the rupture mesh to the hypocenter """hypo=rup.hypocenterxyz=spherical_to_cartesian(hypo.x,hypo.y,hypo.z).reshape(1,3)radius=cdist(rup.surface.mesh.xyz,xyz).min(axis=0)returnradius
[docs]deffilter_site_array_around(array,rup,dist):""" :param array: array with fields 'lon', 'lat' :param rup: a rupture object :param dist: integration distance in km :returns: slice to the rupture """hypo=rup.hypocenterx,y,z=hypo.x,hypo.y,hypo.zxyz_all=spherical_to_cartesian(array['lon'],array['lat'],0)xyz=spherical_to_cartesian(x,y,z)# first raw filteringtree=cKDTree(xyz_all)# NB: on macOS query_ball returns the indices in a different order# than on linux and windows, hence the need to sortidxs=tree.query_ball_point(xyz,dist+rup_radius(rup),eps=.001)idxs.sort()# then fine filteringarray=array[idxs]idxs,=numpy.where(get_dist(xyz_all[idxs],xyz)<dist)iflen(idxs)<len(array):logging.info('Filtered %d/%d sites',len(idxs),len(array))returnarray[idxs]
[docs]defget_site_model_around(site_model_hdf5,rup,dist):""" :param site_model_hdf5: path to an HDF5 file containing a 'site_model' :param rup: a rupture object :param dist: integration distance in km :returns: site model close to the rupture """withhdf5.File(site_model_hdf5)asf:sm=f['site_model'][:]returnfilter_site_array_around(sm,rup,dist)
def_smparse(fname,oqparam,arrays,sm_fieldsets):# check if the file is a list of lon,lat without headerwithopen(fname,encoding='utf-8-sig')asf:lon,_rest=next(f).split(',',1)try:valid.longitude(lon)exceptValueError:# has a headersm=hdf5.read_csv(fname,site.site_param_dt).arrayelse:sm=get_poor_site_model(fname)sm_fieldsets[fname]=set(sm.dtype.names)# make sure site_id starts from 0, if givenif'site_id'insm.dtype.names:if(sm['site_id']!=numpy.arange(len(sm))).any():raiseInvalidFile('%s: site_id not sequential from zero'%fname)# round coordinates and check for duplicate pointssm['lon']=numpy.round(sm['lon'],5)sm['lat']=numpy.round(sm['lat'],5)dupl=get_duplicates(sm,'lon','lat')ifdupl:raiseInvalidFile('Found duplicate sites %s in %s'%(dupl,fname))# used global parameters is local ones are missingparams=sorted(set(sm.dtype.names)|set(oqparam.req_site_params))z=numpy.zeros(len(sm),[(p,site.site_param_dt[p])forpinparams])fornameinz.dtype.names:try:z[name]=sm[name]exceptValueError:# missing, use the global parameterifname!='backarc':# backarc has default zero# exercised in the test classical/case_28_bisz[name]=check_site_param(oqparam,name)arrays.append(z)
[docs]defcheck_site_param(oqparam,name):""" Extract the value of the given parameter """longname=site.param[name]# vs30 -> reference_vs30_valuevalue=getattr(oqparam,longname,None)ifvalueisNone:raiseInvalidFile('Missing site_model_file specifying the parameter %s'%name)ifisinstance(value,float)andnumpy.isnan(value):raiseInvalidFile(f"{oqparam.inputs['job_ini']}: "f"{site.param[name]} not specified")elifname=='vs30measured':# special casevalue=value=='measured'returnvalue
[docs]defget_site_model(oqparam,h5=None):""" :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: an array with fields lon, lat, vs30, ... """ifh5and'site_model'inh5:returnh5['site_model'][:]ifoqparam.impact:# read the site model close to the rupturerup=get_rupture(oqparam)dist=oqparam.maximum_distance('*')(rup.mag)sm=get_site_model_around(oqparam.inputs['exposure'][0],rup,dist)ifh5:h5['site_model']=smreturnsmarrays=[]sm_fieldsets={}forfnameinoqparam.inputs['site_model']:ifisinstance(fname,str)andnotfname.endswith('.xml'):# parsing site_model.csv and populating arrays_smparse(fname,oqparam,arrays,sm_fieldsets)continue# parsing site_model.xmlnodes=nrml.read(fname).siteModelparams=[valid.site_param(node.attrib)fornodeinnodes]missing=set(oqparam.req_site_params)-set(params[0])if'vs30measured'inmissing:# use a default of Falsemissing-={'vs30measured'}forparaminparams:param['vs30measured']=Falseif'backarc'inmissing:# use a default of Falsemissing-={'backarc'}forparaminparams:param['backarc']=Falseif'ampcode'inmissing:# use a default of b''missing-={'ampcode'}forparaminparams:param['ampcode']=b''ifmissing:raiseInvalidFile('%s: missing parameter %s'%(oqparam.inputs['site_model'],', '.join(missing)))# NB: the sorted in sorted(params[0]) is essential, otherwise there is# an heisenbug in scenario/test_case_4site_model_dt=numpy.dtype([(p,site.site_param_dt[p])forpinsorted(params[0])])sm=numpy.array([tuple(param[name]fornameinsite_model_dt.names)forparaminparams],site_model_dt)dupl="\n".join('%s%s'%locforloc,nincountby(sm,'lon','lat').items()ifn>1)ifdupl:raiseInvalidFile('There are duplicated sites in %s:\n%s'%(fname,dupl))arrays.append(sm)# all source model input files must have the same fieldsforthis_sm_fnameinsm_fieldsets:forother_sm_fnameinsm_fieldsets:ifother_sm_fname==this_sm_fname:continuethis_fieldset=sm_fieldsets[this_sm_fname]other_fieldset=sm_fieldsets[other_sm_fname]fieldsets_diff=this_fieldset-other_fieldsetiffieldsets_diff:raiseInvalidFile(f'Fields {fieldsets_diff} present in'f' {this_sm_fname} were not found in {other_sm_fname}')sm=numpy.concatenate(arrays,dtype=arrays[0].dtype)ifh5:h5['site_model']=smreturnsm
[docs]defdebug_site(oqparam,haz_sitecol):""" Reduce the site collection to the custom_site_id specified in OQ_DEBUG_SITE. For conditioned GMFs, keep the stations. """siteid=os.environ.get('OQ_DEBUG_SITE')ifsiteid:complete=copy.copy(haz_sitecol.complete)ok=haz_sitecol['custom_site_id']==siteid.encode('ascii')ifnotok.any():raiseValueError('There is no custom_site_id=%s',siteid)if'station_data'inoqparam.inputs:# keep the stations while restricting to the specified sitesdata,_imts=get_station_data(oqparam,haz_sitecol)ok|=numpy.isin(haz_sitecol.sids,sdata.site_id.to_numpy())haz_sitecol.array=haz_sitecol[ok]haz_sitecol.complete=completeoqparam.concurrent_tasks=0
def_vs30(dic):# dic is a dictionary key -> pathnamesif'site_model'indic:files=hdf5.sniff(dic['site_model'])returnany('vs30'incsv.fieldsforcsvinfiles)
[docs]defget_site_collection(oqparam,h5=None):""" Returns a SiteCollection instance by looking at the points and the site model defined by the configuration parameters. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ifh5and'sitecol'inh5:returnh5['sitecol']ifoqparam.ruptures_hdf5:withhdf5.File(oqparam.ruptures_hdf5)asr:rup_sitecol=r['sitecol']mesh,exp=get_mesh_exp(oqparam,h5)ifmeshisNoneandoqparam.ground_motion_fields:ifoqparam.calculation_mode!='preclassical':raiseInvalidFile('You are missing sites.csv or site_model.csv in %s'%oqparam.inputs['job_ini'])returnNoneelifmeshisNone:# a None sitecol is okay when computing the ruptures onlyreturnNoneelse:# use the default site paramsif('gmfs'inoqparam.inputsor'hazard_curves'inoqparam.inputsor'shakemap'inoqparam.inputs):req_site_params=set()# no parameters are requiredelse:req_site_params=oqparam.req_site_paramsifoqparam.ruptures_hdf5andnot_vs30(oqparam.inputs):assoc_dist=(oqparam.region_grid_spacing*1.414ifoqparam.region_grid_spacingelse10)# 10 km is around the grid spacing used in the mosaicsc=site.SiteCollection.from_points(mesh.lons,mesh.lats,mesh.depths,oqparam,req_site_params)logging.info('Associating the mesh to the site parameters')sitecol,_array,_discarded=geo.utils.assoc(sc,rup_sitecol,assoc_dist,'filter')sitecol.make_complete()return_get_sitecol(sitecol,exp,oqparam,h5)elifh5and'site_model'inh5:sm=h5['site_model'][:]elifoqparam.impactand(notoqparam.infrastructure_connectivity_analysis):# filter the far away sitesrup=get_rupture(oqparam)dist=oqparam.maximum_distance('*')(rup.mag)[expo_hdf5]=oqparam.inputs['exposure']sm=get_site_model_around(expo_hdf5,rup,dist)elif(noth5and'site_model'inoqparam.inputsand'exposure'notinoqparam.inputs):# tested in test_with_site_modelsm=get_site_model(oqparam,h5)iflen(sm)>len(mesh):# the association will happen in base.pysm=oqparamelif'site_model'notinoqparam.inputs:# check the required site parameters are not NaNsm=oqparamforreq_site_paraminreq_site_params:ifreq_site_paraminsite.param:check_site_param(oqparam,req_site_param)else:sm=oqparamsitecol=site.SiteCollection.from_points(mesh.lons,mesh.lats,mesh.depths,sm,req_site_params)return_get_sitecol(sitecol,exp,oqparam,h5)
def_get_sitecol(sitecol,exp,oqparam,h5):if('vs30'insitecol.array.dtype.namesandnotnumpy.isnan(sitecol.vs30).any()):assertsitecol.vs30.max()<32767,sitecol.vs30.max()ifoqparam.tile_spec:if'custom_site_id'notinsitecol.array.dtype.names:gh=sitecol.geohash(6)assertlen(numpy.unique(gh))==len(gh),'geohashes are not unique'sitecol.add_col('custom_site_id','S6',gh)tileno,ntiles=oqparam.tile_specassertlen(sitecol)>ntiles,(len(sitecol),ntiles)mask=sitecol.sids%ntiles==tileno-1oqparam.max_sites_disagg=1sitecol=sitecol.filter(mask)sitecol.make_complete()ss=os.environ.get('OQ_SAMPLE_SITES')ifss:# debugging tip to reduce the size of a calculation# OQ_SAMPLE_SITES=.1 oq engine --run job.ini# will run a computation with 10 times less sitessitecol.array=numpy.array(random_filter(sitecol.array,float(ss)))sitecol.make_complete()sitecol.array['lon']=numpy.round(sitecol.lons,5)sitecol.array['lat']=numpy.round(sitecol.lats,5)sitecol.exposure=exp# add custom_site_id in risk calculations (or GMF calculations)custom_site_id=any(xinoqparam.calculation_modeforxin('scenario','event_based','risk','damage'))ifcustom_site_idand'custom_site_id'notinsitecol.array.dtype.names:gh=sitecol.geohash(8)iflen(numpy.unique(gh))<len(gh):logging.error('geohashes are not unique')sitecol.add_col('custom_site_id','S8',gh)ifsitecolisnotsitecol.complete:# tested in scenario_risk/test_case_8gh=sitecol.complete.geohash(8)sitecol.complete.add_col('custom_site_id','S8',gh)debug_site(oqparam,sitecol)ifh5:h5['sitecol']=sitecolreturnsitecol
[docs]defget_gsim_lt(oqparam,trts=('*',)):""" :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param trts: a sequence of tectonic region types as strings; trts=['*'] means that there is no filtering :returns: a GsimLogicTree instance obtained by filtering on the provided tectonic region types. """if'gsim_logic_tree'notinoqparam.inputs:returnlogictree.GsimLogicTree.from_(oqparam.gsim,oqparam.inputs['job_ini'])gsim_file=os.path.join(oqparam.base_path,oqparam.inputs['gsim_logic_tree'])gsim_lt=logictree.GsimLogicTree(gsim_file,trts)gmfcorr=oqparam.correl_modelfortrt,gsimsingsim_lt.values.items():forgsimingsims:# NB: gsim.DEFINED_FOR_TECTONIC_REGION_TYPE can be != trt,# but it is not an error, it is actually the most common case!ifgmfcorrand(gsim.DEFINED_FOR_STANDARD_DEVIATION_TYPES=={StdDev.TOTAL})andnotoqparam.with_betw_ratio:raiseCorrelationButNoInterIntraStdDevs(gmfcorr,gsim)imt_dep_w=any(len(branch.weight.dic)>1forbranchingsim_lt.branches)ifoqparam.number_of_logic_tree_samplesandimt_dep_w:logging.error('IMT-dependent weights in the logic tree cannot work ''with sampling, because they would produce different ''GMPE paths for each IMT that cannot be combined, so ''I am using the default weights')forbranchingsim_lt.branches:fork,winsorted(branch.weight.dic.items()):ifk!='weight':logging.debug('Using weight=%s instead of %s for %s%s',branch.weight.dic['weight'],w,branch.gsim,k)delbranch.weight.dic[k]ifoqparam.collapse_gsim_logic_tree:logging.info('Collapsing the gsim logic tree')gsim_lt=gsim_lt.collapse(oqparam.collapse_gsim_logic_tree)returngsim_lt
[docs]defget_rupture(oqparam):""" Read the `rupture_model` XML file or the `rupture_dict` dictionary :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: an hazardlib rupture """rupture_model=oqparam.inputs.get('rupture_model')rup=Noneifrupture_modelandrupture_model.endswith('.xml'):[rup_node]=nrml.read(rupture_model)conv=sourceconverter.RuptureConverter(oqparam.rupture_mesh_spacing)rup=conv.convert_node(rup_node)rup.tectonic_region_type='*'# there is no TRT for scenario ruptureselifrupture_modelandrupture_model.endswith('.json'):withopen(rupture_model)asf:rup_data=json.load(f)rup,err_msg=convert_to_oq_rupture(rup_data)iferr_msg:logging.warning(err_msg)ifrupisNone:# assume rupture_dictrup=build_planar_rupture_from_dict(oqparam.rupture_dict)returnrup
[docs]defget_source_model_lt(oqparam):""" :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: a :class:`openquake.hazardlib.logictree.SourceModelLogicTree` instance """smlt=get_smlt(vars(oqparam))srcids=set(smlt.source_data['source'])forsrcinoqparam.reqv_ignore_sources:ifsrcnotinsrcids:raiseNameError('The source %r in reqv_ignore_sources does ''not exist in the source model(s)'%src)iflen(oqparam.source_id)==1:# reduce to a single sourcereturnsmlt.reduce(oqparam.source_id[0])returnsmlt
[docs]defget_full_lt(oqparam):""" :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: a :class:`openquake.hazardlib.logictree.FullLogicTree` instance """source_model_lt=get_source_model_lt(oqparam)trts=source_model_lt.tectonic_region_typestrts_lower={trt.lower()fortrtintrts}reqv=oqparam.inputs.get('reqv',{})fortrtinreqv:iftrtinoqparam.discard_trts.split(','):continueeliftrt.lower()notintrts_lower:logging.warning('Unknown TRT=%s in [reqv] section'%trt)gsim_lt=get_gsim_lt(oqparam,trtsor['*'])oversampling=oqparam.oversamplingfull_lt=logictree.FullLogicTree(source_model_lt,gsim_lt,oversampling)p=full_lt.source_model_lt.num_paths*gsim_lt.get_num_paths()iffull_lt.gsim_lt.has_imt_weights()andoqparam.use_rates:raiseValueError('use_rates=true cannot be used with imtWeight')ifoqparam.number_of_logic_tree_samples:if(oqparam.oversampling=='forbid'andoqparam.number_of_logic_tree_samples>=pand'event'notinoqparam.calculation_mode):raiseValueError('Use full enumeration since there are only ''{:_d} realizations'.format(p))unique=numpy.unique(full_lt.rlzs['branch_path'])logging.info('Considering {:_d} logic tree paths out of {:_d}, unique'' {:_d}'.format(oqparam.number_of_logic_tree_samples,p,len(unique)))else:# full enumerationifnotoqparam.fastmeanandp>oqparam.max_potential_paths:raiseValueError('There are too many potential logic tree paths (%d):''raise `max_potential_paths`, use sampling instead of ''full enumeration, or set use_rates=true '%p)elif(oqparam.is_event_based()and(oqparam.ground_motion_fieldsoroqparam.hazard_curves_from_gmfs)andp>oqparam.max_potential_paths/100):logging.warning('There are many potential logic tree paths (%d): ''try to use sampling or reduce the source model'%p)ifsource_model_lt.is_source_specific:logging.info('There is a source specific logic tree')returnfull_lt
[docs]defget_logic_tree(oqparam):""" :returns: a CompositeLogicTree instance """flt=get_full_lt(oqparam)returnlogictree.compose(flt.source_model_lt,flt.gsim_lt)
[docs]defcheck_min_mag(sources,minimum_magnitude):""" Raise an error if all sources are below the minimum_magnitude """ok=0forsrcinsources:min_mag=getdefault(minimum_magnitude,src.tectonic_region_type)maxmag=src.get_min_max_mag()[1]ifmin_mag<=maxmag:ok+=1ifnotok:raiseRuntimeError('All sources were discarded by minimum_magnitude')
def_check_csm(csm,oqparam,h5):# checkscsm.gsim_lt.check_imts(oqparam.imtls)srcs=csm.get_sources()check_min_mag(srcs,oqparam.minimum_magnitude)ifh5and'sitecol'inh5:csm.sitecol=h5['sitecol']else:csm.sitecol=get_site_collection(oqparam,h5)ifcsm.sitecolisNone:# missing sites.csv (test_case_1_ruptures)returnifos.environ.get('OQ_CHECK_INPUT'):# slow checkssource.check_complex_faults(srcs)# tested in test_mosaic
[docs]defget_cache_path(oqparam,h5=None):""" :returns: cache path of the form OQ_DATA/csm_<checksum>.hdf5 """ifoqparam.cachedir:checksum=get_checksum32(oqparam,h5)returnos.path.join(oqparam.cachedir,'csm_%d.hdf5'%checksum)return''
[docs]defget_composite_source_model(oqparam,dstore=None):""" Parse the XML and build a complete composite source model in memory. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param dstore: an open datastore where to save the source info """if'source_model_logic_tree'inoqparam.inputs:logging.info('Reading %s',oqparam.inputs['source_model_logic_tree'])elif'source_model'inoqparam.inputs:logging.info('Reading %s',oqparam.inputs['source_model'])h5=dstore.hdf5ifdstoreelseNonewithMonitor('building full_lt',measuremem=True,h5=h5):full_lt=get_full_lt(oqparam)# builds the weightspath=get_cache_path(oqparam,h5)ifos.path.exists(path):fromopenquake.commonlibimportdatastore# avoid circular importwithdatastore.read(os.path.realpath(path))asds:csm=ds['_csm']csm.init(full_lt)else:csm=source_reader.get_csm(oqparam,full_lt,dstore)_check_csm(csm,oqparam,dstore)returncsm
[docs]defget_imts(oqparam):""" Return a sorted list of IMTs as hazardlib objects """returnlist(map(imt.from_string,sorted(oqparam.imtls)))
[docs]defget_crmodel(oqparam):""" Return a :class:`openquake.risklib.riskinput.CompositeRiskModel` instance :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance """ifoqparam.impact:withhdf5.File(oqparam.inputs['exposure'][0],'r')asexp:try:crm=riskmodels.CompositeRiskModel.read(exp,oqparam)exceptKeyError:pass# missing crm in exposure.hdf5 in mosaic/case_01else:returncrmrisklist=get_risk_functions(oqparam)limit_states=risklist.limit_statesperils=numpy.array(sorted(set(rf.perilforrfinrisklist)))ifnotoqparam.limit_statesandlimit_states:oqparam.limit_states=limit_stateselif'damage'inoqparam.calculation_modeandlimit_states:assertoqparam.limit_states==limit_statesconsdict={}if'consequence'inoqparam.inputs:ifnotlimit_states:raiseInvalidFile('Missing fragility functions in %s'%oqparam.inputs['job_ini'])# build consdict of the form consequence_by_tagname -> tag -> arrayloss_dt=oqparam.loss_dt()forby,fnamesinoqparam.inputs['consequence'].items():ifby=='taxonomy':# obsolete nameby='risk_id'ifisinstance(fnames,str):# single filefnames=[fnames]# i.e. files collapsed.csv, fatalities.csv, ... with headers like# taxonomy,consequence,slight,moderate,extensivedf=pandas.concat([pandas.read_csv(fname)forfnameinfnames])# NB: consequence files depend on loss_type, unlike fragility filesif'taxonomy'indf.columns:# obsolete namedf['risk_id']=df['taxonomy']deldf['taxonomy']if'loss_type'notindf.columns:df['loss_type']='structural'if'peril'notindf.columns:df['peril']='groundshaking'forconsequence,groupindf.groupby('consequence'):ifconsequencenotinscientific.KNOWN_CONSEQUENCES:raiseInvalidFile('Unknown consequence %s in %s'%(consequence,fnames))bytag={tag:_cons_coeffs(grp,perils,loss_dt,limit_states)fortag,grpingroup.groupby(by)}consdict['%s_by_%s'%(consequence,by)]=bytag# for instance consdict['collapsed_by_taxonomy']['W_LFM-DUM_H3']# is [(0.05,), (0.2 ,), (0.6 ,), (1. ,)] for damage state and structuralcrm=riskmodels.CompositeRiskModel(oqparam,risklist,consdict)returncrm
[docs]defget_exposure(oqparam,h5=None):""" Read the full exposure in memory and build a list of :class:`openquake.risklib.asset.Asset` instances. :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :returns: an :class:`Exposure` instance or a compatible AssetCollection """oq=oqparamif'exposure'notinoq.inputs:returnfnames=oq.inputs['exposure']withMonitor('reading exposure',measuremem=True,h5=h5):ifoqparam.impact:sm=get_site_model(oq,h5)# the site model around the rupturegh3=numpy.array(sorted(set(geohash3(sm['lon'],sm['lat']))))exposure=asset.Exposure.read_around(fnames[0],gh3)withhdf5.File(fnames[0])asf:if'crm'inf:loss_types=f['crm'].attrs['loss_types']oq.all_cost_types=loss_typesoq.minimum_asset_loss={lt:0forltinloss_types}else:exposure=asset.Exposure.read_all(oq.inputs['exposure'],oq.calculation_mode,oq.ignore_missing_costs,errors='ignore'ifoq.ignore_encoding_errorselseNone,infr_conn_analysis=oq.infrastructure_connectivity_analysis,aggregate_by=oq.aggregate_by)returnexposure
[docs]defconcat_if_different(values):unique_values=values.dropna().unique().astype(str)# If all values are identical, return the single unique value,# otherwise join with "|"return'|'.join(unique_values)
[docs]defread_df(fname,lon,lat,id,duplicates_strategy='error'):""" Read a DataFrame containing lon-lat-id fields. In case of rows having the same coordinates, duplicates_strategy determines how to manage duplicates: - 'error': raise an error (default) - 'keep_first': keep the first occurrence - 'keep_last': keep the last occurrence - 'avg': calculate the average numeric values """assertduplicates_strategyin('error','keep_first','keep_last','avg'),duplicates_strategy# NOTE: the id field has to be treated as a string even if contains numbersdframe=pandas.read_csv(fname,dtype={id:str})dframe[lon]=numpy.round(dframe[lon].to_numpy(),5)dframe[lat]=numpy.round(dframe[lat].to_numpy(),5)duplicates=dframe[dframe.duplicated(subset=[lon,lat],keep=False)]ifnotduplicates.empty:msg='%s: has duplicate sites %s'%(fname,list(duplicates[id]))ifduplicates_strategy=='error':raiseInvalidFile(msg)msg+=f' (duplicates_strategy: {duplicates_strategy})'logging.warning(msg)ifduplicates_strategy=='keep_first':dframe=dframe.drop_duplicates(subset=[lon,lat],keep='first')elifduplicates_strategy=='keep_last':dframe=dframe.drop_duplicates(subset=[lon,lat],keep='last')elifduplicates_strategy=='avg':string_columns=dframe.select_dtypes(include='object').columnsnumeric_columns=dframe.select_dtypes(include='number').columns# group by lon-lat, averaging columns and concatenating by "|"# the different contents of string columnsdframe=dframe.groupby([lon,lat],as_index=False).agg({**{col:concat_if_differentforcolinstring_columns},**{col:'mean'forcolinnumeric_columns}})returndframe
[docs]defget_station_data(oqparam,sitecol,duplicates_strategy='error'):""" Read the station data input file and build a list of ground motion stations and recorded ground motion values along with their uncertainty estimates :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param sitecol: the hazard site collection :param duplicates_strategy: either 'error', 'keep_first', 'keep_last', 'avg' :returns: station_data, observed_imts """ifparallel.oq_distribute()=='zmq':logging.error('Conditioned scenarios are not meant to be run '' on a cluster')# Read the station data and associate the site ID from longitude, latitudedf=read_df(oqparam.inputs['station_data'],'LONGITUDE','LATITUDE','STATION_ID',duplicates_strategy=duplicates_strategy)lons=df['LONGITUDE'].to_numpy()lats=df['LATITUDE'].to_numpy()nsites=len(sitecol.complete)sitecol.extend(lons,lats)logging.info('Extended complete site collection from %d to %d sites',nsites,len(sitecol.complete))dic={(lo,la):sidforlo,la,sidinsitecol.complete[['lon','lat','sids']]}sids=U32([dic[lon,lat]forlon,latinzip(lons,lats)])# Identify the columns with IM values# Replace replace() with removesuffix() for pandas ≥ 1.4imt_candidates=df.filter(regex="_VALUE$").columns.str.replace("_VALUE","")imts=[valid.intensity_measure_type(imt)forimtinimt_candidates]im_cols=[imt+'_'+statforimtinimtsforstatin["mean","std"]]cols=[]foriminimts:stddev_str="STDDEV"ifim=="MMI"else"LN_SIGMA"cols.append(im+'_VALUE')cols.append(im+'_'+stddev_str)forim_value_colin[im+'_VALUE'foriminimts]:if(df[im_value_col]==0).any():file_basename=os.path.basename(oqparam.inputs['station_data'])wrong_rows=df[['STATION_ID',im_value_col]].loc[df.index[df[im_value_col]==0]]raiseInvalidFile(f"Please remove station data with zero intensity value from"f" {file_basename}:\n"f" {wrong_rows}")station_data=pandas.DataFrame(df[cols].values,columns=im_cols)station_data['site_id']=sidsreturnstation_data,imts
[docs]defget_sitecol_assetcol(oqparam,haz_sitecol=None,inp_types=(),h5=None):""" :param oqparam: calculation parameters :param haz_sitecol: the hazard site collection :param inp_types: the input loss types :returns: (site collection, asset collection, discarded, exposure) """asset_hazard_distance=max(oqparam.asset_hazard_distance.values())ifhaz_sitecolisNone:haz_sitecol=get_site_collection(oqparam,h5)try:exp=haz_sitecol.exposureexceptAttributeError:exp=get_exposure(oqparam)ifoqparam.region_grid_spacing:haz_distance=oqparam.region_grid_spacing*1.414ifhaz_distance!=asset_hazard_distance:logging.debug('Using asset_hazard_distance=%d km instead of %d km',haz_distance,asset_hazard_distance)else:haz_distance=asset_hazard_distance# associate the assets to the hazard sites# this is absurdely fast: 10 million assets can be associated in <10sA=len(exp.assets)N=len(haz_sitecol)withMonitor('associating exposure',measuremem=True,h5=h5):region=wkt.loads(oqparam.region)ifoqparam.regionelseNonesitecol,discarded=exp.associate(haz_sitecol,haz_distance,region)logging.info('Associated {:_d} assets (of {:_d}) to {:_d} sites'' (of {:_d})'.format(len(exp.assets),A,len(sitecol),N))assetcol=asset.AssetCollection(exp,sitecol,oqparam.time_event,oqparam.aggregate_by)u,c=numpy.unique(assetcol['taxonomy'],return_counts=True)idx=c.argmax()# index of the most common taxonomytax=assetcol.tagcol.taxonomy[u[idx]]logging.info('Found %d taxonomies with ~%.1f assets each',len(u),len(assetcol)/len(u))logging.info('The most common taxonomy is %s with %d assets',tax,c[idx])# check on missing fields in the exposureif'risk'inoqparam.calculation_mode:forexp_typeininp_types:ifnotany(exp_typeinnamefornameinassetcol.array.dtype.names):raiseInvalidFile('The exposure %s is missing %s'%(oqparam.inputs['exposure'],exp_type))if(notoqparam.hazard_calculation_idand'gmfs'notinoqparam.inputsand'hazard_curves'notinoqparam.inputsand'station_data'notinoqparam.inputsandsitecolisnotsitecol.complete):# for predefined hazard you cannot reduce the site collection; instead# you can in other cases, typically with a grid which is mostly empty# (i.e. there are many hazard sites with no assets)assetcol.reduce_also(sitecol)returnsitecol,assetcol,discarded,exp
[docs]defimpact_tmap(oqparam,taxidx):""" :returns: a taxonomy mapping dframe """acc=AccumDict(accum=[])# loss_type, taxi, risk_id, weightwithhdf5.File(oqparam.inputs['exposure'][0],'r')asexp:forkeyinexp['tmap']:# tmap has fields conversion, taxonomy, weightdf=exp.read_df('tmap/'+key)fortaxo,risk_id,weightinzip(df.taxonomy,df.conversion,df.weight):iftaxointaxidx:acc['country'].append(key)acc['peril'].append('groundshaking')acc['taxi'].append(taxidx[taxo])acc['risk_id'].append(risk_id)acc['weight'].append(weight)returnpandas.DataFrame(acc)
# tested in TaxonomyMappingTestCase
[docs]deftaxonomy_mapping(oqparam,taxidx):""" :param oqparam: OqParam instance :param taxidx: dictionary taxo:str -> taxi:int :returns: a dictionary loss_type -> [[(riskid, weight), ...], ...] """ifoqparam.impact:returnimpact_tmap(oqparam,taxidx)elif'taxonomy_mapping'notinoqparam.inputs:# trivial mappingnt=len(taxidx)# number of taxonomiesdf=pandas.DataFrame(dict(weight=numpy.ones(nt),taxi=taxidx.values(),risk_id=list(taxidx),peril=['*']*nt,country=['?']*nt))returndffname=oqparam.inputs['taxonomy_mapping']return_taxonomy_mapping(fname,taxidx)
def_taxonomy_mapping(filename,taxidx):try:tmap_df=pandas.read_csv(filename,converters=dict(weight=float))exceptExceptionase:raisee.__class__('%s while reading %s'%(e,filename))if'weight'notintmap_df:tmap_df['weight']=1.if'peril'notintmap_df:tmap_df['peril']='*'if'country'notintmap_df:tmap_df['country']='?'if'conversion'intmap_df.columns:# conversion was the old name in the header for engine <= 3.12tmap_df=tmap_df.rename(columns={'conversion':'risk_id'})assertset(tmap_df)=={'country','peril','taxonomy','risk_id','weight'},set(tmap_df)taxos=set()for(taxo,per),dfintmap_df.groupby(['taxonomy','peril']):taxos.add(taxo)ifabs(df.weight.sum()-1.)>pmf.PRECISION:raiseInvalidFile('%s: the weights do not sum up to 1 for %s'%(filename,taxo))missing=set(taxidx)-taxosifmissing:raiseInvalidFile('The taxonomy strings %s are in the exposure but not in ''the taxonomy mapping file %s'%(missing,filename))tmap_df['taxi']=[taxidx.get(taxo,-1)fortaxointmap_df.taxonomy]deltmap_df['taxonomy']# NB: we are ignoring the taxonomies in the mapping but not in the exposure# for instance in EventBasedRiskTestCase::test_case_5returntmap_df[tmap_df.taxi!=-1]
[docs]defassert_probabilities(array,fname):""" Check that the array contains valid probabilities """forpoe_fieldin(fforfinarray.dtype.namesiff.startswith('poe-')):arr=array[poe_field]if(arr>1).any():raiseInvalidFile('%s: contains probabilities > 1: %s'%(fname,arr[arr>1]))if(arr<0).any():raiseInvalidFile('%s: contains probabilities < 0: %s'%(fname,arr[arr<0]))
[docs]defget_pmap_from_csv(oqparam,fnames):""" :param oqparam: an :class:`openquake.commonlib.oqvalidation.OqParam` instance :param fnames: a space-separated list of .csv relative filenames :returns: the site mesh and the hazard curves read by the .csv files """read=functools.partial(hdf5.read_csv,dtypedict={None:float})imtls={}dic={}forfnameinfnames:wrapper=read(fname)assert_probabilities(wrapper.array,fname)dic[wrapper.imt]=wrapper.arrayimtls[wrapper.imt]=levels_from(wrapper.dtype.names)oqparam.hazard_imtls=imtlsoqparam.investigation_time=wrapper.investigation_timearray=wrapper.arraymesh=geo.Mesh(array['lon'],array['lat'])N=len(mesh)L=sum(len(imls)forimlsinoqparam.imtls.values())data=numpy.zeros((N,L))level=0foriminoqparam.imtls:arr=dic[im]forpoeinarr.dtype.names[3:]:data[:,level]=arr[poe]level+=1forfieldin('lon','lat','depth'):# sanity checknumpy.testing.assert_equal(arr[field],array[field])pmap=MapArray(numpy.arange(N,dtype=U32),len(data),1)pmap.array=data.reshape(N,L,1)returnmesh,pmap
tag2code={'multiFaultSource':b'F','areaSource':b'A','multiPointSource':b'M','pointSource':b'P','simpleFaultSource':b'S','complexFaultSource':b'C','characteristicFaultSource':b'X','nonParametricSeismicSource':b'N'}# tested in commands_test
[docs]defreduce_sm(paths,source_ids):""" :param paths: list of source_model.xml files :param source_ids: dictionary src_id -> array[src_id, code] :returns: dictionary with keys good, total, model, path, xmlns NB: duplicate sources are not removed from the XML """ifisinstance(source_ids,dict):# in oq reduce_smdefok(src_node):ifsrc_node.tag.endswith('Surface'):# in geometrySectionsreturnTruecode=tag2code[re.search(r'\}(\w+)',src_node.tag).group(1)]arr=source_ids.get(src_node['id'])ifarrisNone:returnFalsereturn(arr['code']==code).any()else:# list of source IDs, in extract_sourcedefok(src_node):returnsrc_node['id']insource_idsforpathinpaths:good=0total=0logging.info('Reading %s',path)root=nrml.read(path)model=Node('sourceModel',root[0].attrib)origmodel=root[0]ifroot['xmlns']=='http://openquake.org/xmlns/nrml/0.4':forsrc_nodeinorigmodel:total+=1ifok(src_node):good+=1model.nodes.append(src_node)else:# nrml/0.5forsrc_groupinorigmodel:sg=copy.copy(src_group)sg.nodes=[]weights=src_group.get('srcs_weights')ifweights:assertlen(weights)==len(src_group.nodes)else:weights=[1]*len(src_group.nodes)reduced_weigths=[]forsrc_node,weightinzip(src_group,weights):total+=1ifok(src_node):good+=1sg.nodes.append(src_node)reduced_weigths.append(weight)src_node.attrib.pop('tectonicRegion',None)src_group['srcs_weights']=reduced_weigthsifsg.nodes:model.nodes.append(sg)yielddict(good=good,total=total,model=model,path=path,xmlns=root['xmlns'])
# used in oq reduce_sm and utils/extract_source
[docs]defreduce_source_model(smlt_file,source_ids,remove=True):""" Extract sources from the composite source model. :param smlt_file: path to a source model logic tree file :param source_ids: dictionary source_id -> records (src_id, code) :param remove: if True, remove sm.xml files containing no sources :returns: the number of sources satisfying the filter vs the total """total=good=0to_remove=set()paths=logictree.collect_info(smlt_file).smpathsifisinstance(source_ids,dict):source_ids={decode(k):vfork,vinsource_ids.items()}fordicinparallel.Starmap.apply(reduce_sm,(paths,source_ids)):path=dic['path']model=dic['model']good+=dic['good']total+=dic['total']shutil.copy(path,path+'.bak')ifmodel:withopen(path,'wb')asf:nrml.write([model],f,xmlns=dic['xmlns'])elifremove:# remove the files completely reducedto_remove.add(path)ifgood:forpathinto_remove:os.remove(path)parallel.Starmap.shutdown()returngood,total
[docs]defread_delta_rates(fname,idx_nr):""" :param fname: path to a CSV file with fields (source_id, rup_id, delta) :param idx_nr: dictionary source_id -> (src_id, num_ruptures) with Ns sources :returns: list of Ns floating point arrays of different lenghts """delta_df=pandas.read_csv(fname,converters=dict(source_id=str,rup_id=int,delta=float),index_col=0)assertlist(delta_df.columns)==['rup_id','delta']delta=[numpy.zeros(0)for_inidx_nr]forsrc,dfindelta_df.groupby(delta_df.index):idx,nr=idx_nr[src]rupids=df.rup_id.to_numpy()iflen(numpy.unique(rupids))<len(rupids):raiseInvalidFile('%s: duplicated rup_id for %s'%(fname,src))drates=numpy.zeros(nr)drates[rupids]=df.delta.to_numpy()delta[idx]=dratesreturndelta
[docs]defget_shapefiles(dirname):""" :param dirname: directory containing the shapefiles :returns: list of shapefiles """out=[]extensions=('.shp','.dbf','.prj','.shx')forfnameinos.listdir(dirname):iffname.endswith(extensions):out.append(os.path.join(dirname,fname))returnout
[docs]defget_reinsurance(oqparam,assetcol=None):""" :returns: (policy_df, treaty_df, field_map) """ifassetcolisNone:_sitecol,assetcol,_discarded,_exp=get_sitecol_assetcol(oqparam)[(_loss_type,fname)]=oqparam.inputs['reinsurance'].items()# make sure the first aggregate by is policyifoqparam.aggregate_by[0]!=['policy']:raiseInvalidFile('%s: aggregate_by=%s'%(fname,oqparam.aggregate_by))[(_key,fname)]=oqparam.inputs['reinsurance'].items()p,t,f=reinsurance.parse(fname,assetcol.tagcol.policy_idx)# check ideductiblearr=assetcol.arrayforpol_no,deducinzip(p.policy,p.deductible):ifdeduc:ideduc=arr[arr['policy']==pol_no]['ideductible']ifideduc.any():pol=assetcol.tagcol.policy[pol_no]raiseInvalidFile('%s: for policy %s there is a deductible ''also in the exposure!'%(fname,pol))returnp,t,f
[docs]defget_input_files(oqparam):""" :param oqparam: an OqParam instance :param hazard: if True, consider only the hazard files :returns: input path names in a specific order """fnames=set()# files entering in the checksumuri=oqparam.shakemap_uriifisinstance(uri,dict)anduri:# local files, like .npy arraysforkey,valinuri.items():ifkey=='fname'orkey.endswith('_url'):val=val.replace('file://','')fname=os.path.join(oqparam.base_path,val)ifos.path.exists(fname):uri[key]=fnamefnames.add(fname)# additional separate shapefilesifuri['kind']=='shapefile'andnoturi['fname'].endswith('.zip'):fnames.update(get_shapefiles(os.path.dirname(fname)))forkeyinoqparam.inputs:fname=oqparam.inputs[key]# collect .hdf5 tables for the GSIMs, if anyifkey=='gsim_logic_tree':fnames.update(gsim_lt.collect_files(fname))fnames.add(fname)elifkey=='source_model':fnames.add(oqparam.inputs['source_model'])elifkey=='exposure':# fname is a listfnames.update(fname)ifany(f.endswith(('.xml','.nrml'))forfinfnames):forexpinasset.Exposure.read_headers(fname):fnames.update(exp.datafiles)elifkey=='reinsurance':[xml]=fname.values()node=nrml.read(xml)csv=~node.reinsuranceModel.policiesfnames.add(xml)fnames.add(os.path.join(os.path.dirname(xml),csv))elifkey=='geometry':fnames.add(fname)elifisinstance(fname,dict):forkey,valinfname.items():ifisinstance(val,list):# list of filesfnames.update(val)else:fnames.add(val)elifisinstance(fname,list):forfinfname:iff==oqparam.input_dir:raiseInvalidFile('%s there is an empty path in %s'%(oqparam.inputs['job_ini'],key))fnames.update(fname)elifkey=='source_model_logic_tree':info=logictree.collect_info(fname)fnames.update(info.smpaths)fnames.update(info.h5paths)fnames.add(fname)else:fnames.add(fname)returnsorted(fnames)
def_checksum(fnames,checksum=0):""" :returns: the 32 bit checksum of a list of files """forfnamein(fforfinfnamesiff!='<in-memory>'):ifnotos.path.exists(fname):zpath=os.path.splitext(fname)[0]+'.zip'ifnotos.path.exists(zpath):raiseOSError('No such file: %s or %s'%(fname,zpath))withopen(zpath,'rb')asf:data=f.read()else:withopen(fname,'rb')asf:data=f.read()checksum=zlib.adler32(data,checksum)returnchecksum
[docs]defget_checksum32(oqparam,h5=None):""" Build an unsigned 32 bit integer from the hazard input files :param oqparam: an OqParam instance """checksum=_checksum(oqparam._input_files)hazard_params=[]forkey,valinsorted(vars(oqparam).items()):ifkeyin('rupture_mesh_spacing','complex_fault_mesh_spacing','width_of_mfd_bin','area_source_discretization','random_seed','number_of_logic_tree_samples','minimum_magnitude','source_id','sites','floating_x_step','floating_y_step'):hazard_params.append('%s = %s'%(key,val))data='\n'.join(hazard_params).encode('utf8')checksum=zlib.adler32(data,checksum)ifh5:h5.attrs['checksum32']=checksumreturnchecksum
# NOTE: we expect to call this for mosaic or global_risk, with buffer 0 or 0.1
[docs]@functools.lru_cache(maxsize=4)defread_geometries(fname,name,buffer=0):""" :param fname: path of the file containing the geometries :param name: name of the primary key field :param buffer: shapely buffer in degrees :returns: data frame with codes and geometries """withfiona.open(fname)asf:codes=[]geoms=[]forfeatureinf:props=feature['properties']geom=feature['geometry']code=props[name]ifcodeandgeom:codes.append(code)geoms.append(geometry.shape(geom).buffer(buffer))else:logging.error(f'{code=}, {geom=} in {fname}')returnpandas.DataFrame(dict(code=codes,geom=geoms))
[docs]@functools.lru_cache()defread_cities(fname,lon_name,lat_name,label_name):""" Reading coordinates and names of populated places from a CSV file :returns: a Pandas DataFrame """data=pandas.read_csv(fname)expected_colnames_set={lon_name,lat_name,label_name}ifnotexpected_colnames_set.issubset(data.columns):raiseValueError(f"CSV file must contain {expected_colnames_set} columns.")returndata
[docs]defread_mosaic_df(buffer):""" :returns: a DataFrame of geometries for the mosaic models """''' fname = os.path.join(os.path.dirname(mosaic.__file__), 'mosaic.geojson') if os.path.exists(fname): return read_geometries(fname, 'name', buffer) '''fname=os.path.join(os.path.dirname(mosaic.__file__),'mosaic.geojson')returnread_geometries(fname,'name',buffer)
[docs]defread_countries_df(buffer=0.1):""" :returns: a DataFrame of geometries for the world countries """logging.info('Reading geoBoundariesCGAZ_ADM0.sh')# slowfname=os.path.join(os.path.dirname(global_risk.__file__),'geoBoundariesCGAZ_ADM0.shp')returnread_geometries(fname,'shapeGroup',buffer)
[docs]defread_cities_df(lon_field='longitude',lat_field='latitude',label_field='name'):""" Reading from a 'worldcities.csv' file in the mosaic_dir, if present, or returning None otherwise :returns: a DataFrame of coordinates and names of populated places """fname=os.path.join(os.path.dirname(mosaic.__file__),'worldcities.csv')returnread_cities(fname,lon_field,lat_field,label_field)
[docs]defread_source_models(fnames,hdf5path='',**converterparams):""" :param fnames: a list of source model files :param hdf5path: auxiliary .hdf5 file used to store the multifault sources :param converterparams: a dictionary of parameters like rupture_mesh_spacing :returns: a list of SourceModel instances """converter=sourceconverter.SourceConverter()vars(converter).update(converterparams)smodels=list(nrml.read_source_models(fnames,converter))smdict=dict(zip(fnames,smodels))src_groups=[sgforsminsmdict.values()forsginsm.src_groups]secparams=source_reader.fix_geometry_sections(smdict,src_groups,hdf5path)forsmodelinsmodels:forsginsmodel.src_groups:forsrcinsg:ifsrc.code==b'F':# multifaultsrc.set_msparams(secparams)returnsmodels