# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2013-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""Validation library for the engine, the desktop tools, and anything else"""importosimportreimportastimportjsonimporttomlimportsocketimportloggingfromfunctoolsimportpartialimportnumpyfromopenquake.baselib.generalimportdistinct,pprodfromopenquake.baselibimportconfig,hdf5fromopenquake.hazardlibimportimt,scalerel,gsim,pmf,site,tomfromopenquake.hazardlib.gsim.baseimportregistry,gsim_aliasesfromopenquake.hazardlib.calc.filtersimport(# noqaIntegrationDistance,floatdict)PRECISION=pmf.PRECISIONSCALEREL=scalerel.get_available_magnitude_scalerel()GSIM=gsim.get_available_gsims()MAG,DIS,LON,LAT,EPS=0,1,2,3,4mag_pmf=partial(pprod,axis=(DIS,LON,LAT,EPS))dist_pmf=partial(pprod,axis=(MAG,LON,LAT,EPS))mag_dist_pmf=partial(pprod,axis=(LON,LAT,EPS))mag_dist_eps_pmf=partial(pprod,axis=(LON,LAT))lon_lat_pmf=partial(pprod,axis=(DIS,MAG,EPS))mag_lon_lat_pmf=partial(pprod,axis=(DIS,EPS))# applied on matrix MAG DIS LON LAT EPS
[docs]deftrt_pmf(matrices):""" From T matrices of shape (Ma, D, Lo, La, E, ...) into one matrix of shape (T, ...) """returnnumpy.array([pprod(mat,axis=(MAG,DIS,LON,LAT,EPS))formatinmatrices])
# this dictionary is useful to extract a fixed set of# submatrices from the full disaggregation matrix# NB: the TRT keys have extractor None, since the extractor# without TRT can be used; we still need to populate the pmf_map# since it is used to validate the keys accepted by the job.ini filepmf_map=dict([('Mag',mag_pmf),('Dist',dist_pmf),('Mag_Dist',mag_dist_pmf),('Mag_Dist_Eps',mag_dist_eps_pmf),('Lon_Lat',lon_lat_pmf),('Mag_Lon_Lat',mag_lon_lat_pmf),('TRT',trt_pmf),('TRT_Mag',None),('TRT_Lon_Lat',None),('TRT_Mag_Dist',None),('TRT_Mag_Dist_Eps',None),])
[docs]classFromFile(object):""" Fake GSIM to be used when the GMFs are imported from an external file and not computed with a GSIM. """DEFINED_FOR_INTENSITY_MEASURE_TYPES=set()REQUIRES_RUPTURE_PARAMETERS=set()REQUIRES_SITES_PARAMETERS=set()REQUIRES_DISTANCES=set()DEFINED_FOR_REFERENCE_VELOCITY=Nonekwargs={}
[docs]defto_toml(uncertainty):""" Converts an uncertainty node into a TOML string """ifhasattr(uncertainty,'attrib'):# is a nodetext=uncertainty.text.strip()kvs=uncertainty.attrib.items()else:# is a stringtext=uncertainty.strip()kvs=[]text=gsim_aliases.get(text,text)# use the gsim alias if anyifnottext.startswith('['):# a bare GSIM name was passedtext='[%s]'%textfork,vinkvs:try:v=ast.literal_eval(v)except(SyntaxError,ValueError):v=repr(v)text+='\n%s = %s'%(k,v)returntext
def_fix_toml(v):# horrible hack to remove a pickle error with# TomlDecoder.get_empty_inline_table.<locals>.DynamicInlineTableDict# using toml.loads(s, _dict=dict) would be the right way, but it does# not work :-(ifisinstance(v,numpy.ndarray):returnlist(v)elifhasattr(v,'items'):return{k1:_fix_toml(v1)fork1,v1inv.items()}elifisinstance(v,list):return[_fix_toml(x)forxinv]elifisinstance(v,numpy.float64):returnfloat(v)returnv# more tests are in tests/valid_test.py
[docs]defgsim(value,basedir=''):""" Convert a string into a GSIM instance >>> gsim('BooreAtkinson2011') [BooreAtkinson2011] """value=to_toml(value)# convert to TOML[(gsim_name,kwargs)]=toml.loads(value).items()kwargs=_fix_toml(kwargs)fork,vinkwargs.items():ifk.endswith(('_file','_table')):kwargs[k]=os.path.normpath(os.path.join(basedir,v))ifgsim_name=='FromFile':returnFromFile()try:gsim_class=registry[gsim_name]exceptKeyError:raiseValueError('Unknown GSIM: %s'%gsim_name)gs=gsim_class(**kwargs)gs._toml='\n'.join(line.strip()forlineinvalue.splitlines())returngs
[docs]defmodified_gsim(gmpe,**kwargs):""" Builds a ModifiableGMPE from a gmpe. Used for instance in the GEESE project as follows: mgs = modified_gsim(gsim, add_between_within_stds={'with_betw_ratio':1.5}) """text=gmpe._toml.replace('[','[ModifiableGMPE.gmpe.')+'\n'text+=toml.dumps({'ModifiableGMPE':kwargs})returngsim(text)
[docs]defoccurrence_model(value):""" Converts a TOML string into a TOM instance >>> print(occurrence_model('[PoissonTOM]\\ntime_span=50.0')) [PoissonTOM] time_span = 50.0 <BLANKLINE> """[(clsname,dic)]=toml.loads(value).items()returntom.registry[clsname](**dic)
[docs]defcompose(*validators):""" Implement composition of validators. For instance >>> utf8_not_empty = compose(utf8, not_empty) """defcomposed_validator(value):out=valueforvalidatorinreversed(validators):out=validator(out)returnoutcomposed_validator.__name__='compose(%s)'%','.join(val.__name__forvalinvalidators)returncomposed_validator
[docs]classNoneOr(object):""" Accept the empty string (casted to None) or something else validated by the underlying `cast` validator. """def__init__(self,cast):self.cast=castself.__name__=cast.__name__def__call__(self,value):ifvalue:returnself.cast(value)
[docs]classChoice(object):""" Check if the choice is valid (case sensitive). """@propertydef__name__(self):return'Choice%s'%str(self.choices)def__init__(self,*choices):self.choices=choicesdef__call__(self,value):ifvaluenotinself.choices:raiseValueError("Got '%s', expected %s"%(value,'|'.join(self.choices)))returnvalue
[docs]classChoiceCI(object):""" Check if the choice is valid (case insensitive version). """def__init__(self,*choices):self.choices=choicesself.__name__='ChoiceCI%s'%str(choices)def__call__(self,value):value=value.lower()ifvaluenotinself.choices:raiseValueError("'%s' is not a valid choice in %s"%(value,self.choices))returnvalue
category=ChoiceCI('population','buildings')
[docs]classChoices(Choice):""" Convert the choices, passed as a comma separated string, into a tuple of validated strings. For instance >>> Choices('xml', 'csv')('xml,csv') ('xml', 'csv') """def__call__(self,value):values=value.lower().split(',')forvalinvalues:ifvalnotinself.choices:raiseValueError("'%s' is not a valid choice in %s"%(val,self.choices))returntuple(values)
[docs]classRegex(object):""" Compare the value with the given regex """def__init__(self,regex):self.rx=re.compile(regex)self.__name__='Regex[%s]'%regexdef__call__(self,value):ifself.rx.match(value)isNone:raiseValueError("'%s' does not match the regex '%s'"%(value,self.rx.pattern))returnvalue
name=Regex(r'^[a-zA-Z_]\w*$')name_with_dashes=Regex(r'^[a-zA-Z_][\w\-]*$')# e.g. 2023-02-06 04:17:34+03:00# +03:00 indicates the time zone offset from Coordinated Universal Time (UTC)local_timestamp=Regex(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}([+-]\d{2}:\d{2})$")
[docs]classSimpleId(object):""" Check if the given value is a valid ID. :param length: maximum length of the ID :param regex: accepted characters """def__init__(self,length,regex=r'^[\w_\-:]+$'):self.length=lengthself.regex=regexself.__name__='SimpleId(%d, %s)'%(length,regex)def__call__(self,value):iflen(value)==0:raiseValueError('Invalid ID: can not be empty')ifmax(map(ord,value))>127:raiseValueError('Invalid ID %r: the only accepted chars are %s'%(value,self.regex))eliflen(value)>self.length:raiseValueError("The ID '%s' is longer than %d character"%(value,self.length))elifre.match(self.regex,value):returnvalueraiseValueError("Invalid ID '%s': the only accepted chars are %s"%(value,self.regex))
MAX_ID_LENGTH=75# length required for some sources in US14 collapsed modelASSET_ID_LENGTH=50# length that makes Murray happysimple_id=SimpleId(MAX_ID_LENGTH)branch_id=SimpleId(MAX_ID_LENGTH,r'^[\w\:\#_\-\.]+$')asset_id=SimpleId(ASSET_ID_LENGTH)source_id=SimpleId(MAX_ID_LENGTH,r'^[\w\-_:]+$')three_letters=SimpleId(3,r'^[A-Z]+$')nice_string=SimpleId(# nice for Windows, Linux, HDF5 and XMLASSET_ID_LENGTH,r'[a-zA-Z0-9\.`!#$%\(\)\+/,;@\[\]\^_{|}~-]+')mod_func=SimpleId(MAX_ID_LENGTH,r'[\w_]+\.[\w_]+')
[docs]defrisk_id(value):""" A valid risk ID cannot contain the characters #'" """if'#'invalueor'"'invalueor"'"invalue:raiseValueError('Invalid ID "%s" contains forbidden chars'%value)returnvalue
[docs]classFloatRange(object):def__init__(self,minrange,maxrange,name='',accept=None):self.minrange=minrangeself.maxrange=maxrangeself.name=nameself.accept=acceptself.__name__='FloatRange[%s:%s]'%(minrange,maxrange)def__call__(self,value):try:f=float_(value)exceptValueError:# passed a stringifvalue==self.accept:returnvalueelse:raiseiff>self.maxrange:raiseValueError("%s%s is bigger than the maximum (%s)"%(self.name,f,self.maxrange))iff<self.minrange:raiseValueError("%s%s is smaller than the minimum (%s)"%(self.name,f,self.minrange))returnf
magnitude=FloatRange(0,11,'magnitude')
[docs]defnot_empty(value):"""Check that the string is not all blanks"""ifvalueisNoneorvalue.strip()=='':raiseValueError('Got an empty string')returnvalue
[docs]defutf8(value):r""" Check that the string is UTF-8. Returns an encode bytestring. >>> utf8(b'\xe0') # doctest: +ELLIPSIS Traceback (most recent call last): ... ValueError: Not UTF-8: ... """try:ifisinstance(value,bytes):returnvalue.decode('utf-8')else:returnvalueexceptException:raiseValueError('Not UTF-8: %r'%value)
[docs]defutf8_not_empty(value):"""Check that the string is UTF-8 and not empty"""returnutf8(not_empty(value))
[docs]defnamelist(value):""" :param value: input string :returns: list of identifiers separated by whitespace or commas >>> namelist('a,b') ['a', 'b'] >>> namelist('a1 b_2\t_c') ['a1', 'b_2', '_c'] >>> namelist('a1 b_2 1c') ['a1', 'b_2', '1c'] """names=value.replace(',',' ').split()forninnames:try:source_id(n)exceptValueError:raiseValueError('List of names containing an invalid name:'' %s'%n)returnnames
[docs]deffloat_(value):""" :param value: input string :returns: a floating point number """try:returnfloat(value)exceptException:raiseValueError("'%s' is not a float"%value)
[docs]defnonzero(value):""" :param value: input string :returns: the value unchanged >>> nonzero('1') '1' >>> nonzero('0') Traceback (most recent call last): ... ValueError: '0' is zero """iffloat_(value)==0:raiseValueError("'%s' is zero"%value)returnvalue
# NB: numpy.round != round; for instance numpy.round(123.300795, 5)# is 123.30080, different from round(123.300795, 5) = 123.30079
[docs]deflongitude(value):""" :param value: input string :returns: longitude float, rounded to 5 digits, i.e. 1 meter maximum >>> longitude('0.123456') 0.12346 """lon=numpy.round(float_(value),5)iflon>180.:raiseValueError('longitude %s > 180'%lon)eliflon<-180.:raiseValueError('longitude %s < -180'%lon)returnlon
# NB: numpy.round != round; for instance numpy.round(123.300795, 5)# is 123.30080, different from round(123.300795, 5) = 123.30079
[docs]deflatitude(value):""" :param value: input string :returns: latitude float, rounded to 5 digits, i.e. 1 meter maximum >>> latitude('-0.123456') -0.12346 """lat=numpy.round(float_(value),5)iflat>90.:raiseValueError('latitude %s > 90'%lat)eliflat<-90.:raiseValueError('latitude %s < -90'%lat)returnlat
[docs]deflongitudes(value):""" :param value: a comma separated string of longitudes :returns: a list of longitudes """return[longitude(v)forvinvalue.split(',')]
[docs]deflatitudes(value):""" :param value: a comma separated string of latitudes :returns: a list of latitudes """return[latitude(v)forvinvalue.split(',')]
depth=float_
[docs]deflon_lat(value):""" :param value: a pair of coordinates :returns: a tuple (longitude, latitude) >>> lon_lat('12 14') (12.0, 14.0) >>> lon_lat('12,14') (12.0, 14.0) """lon,lat=value.replace(',',' ').split()returnlongitude(lon),latitude(lat)
[docs]defpoint(value):""" :param value: a tuple of coordinates as a string (2D or 3D) :returns: a tuple of coordinates as a string (2D or 3D) """lst=value.split()dim=len(lst)ifdim==2:returnlongitude(lst[0]),latitude(lst[1]),0.elifdim==3:returnlongitude(lst[0]),latitude(lst[1]),depth(lst[2])else:raiseValueError('Invalid point format: %s'%value)
[docs]defcoordinates(value):""" Convert a non-empty string into a list of lon-lat coordinates. >>> coordinates('') Traceback (most recent call last): ... ValueError: Empty list of coordinates: '' >>> coordinates('1.1 1.2') [(1.1, 1.2, 0.0)] >>> coordinates('1.1 1.2, 2.2 2.3') [(1.1, 1.2, 0.0), (2.2, 2.3, 0.0)] >>> coordinates('1.1 1.2 -0.4, 2.2 2.3 -0.5') [(1.1, 1.2, -0.4), (2.2, 2.3, -0.5)] >>> coordinates('0 0 0, 0 0 -1') Traceback (most recent call last): ... ValueError: Found overlapping site #2, 0 0 -1 """ifisinstance(value,list):# assume list of lists/tuplesreturn[point(' '.join(map(str,v)))forvinvalue]ifnotvalue.strip():raiseValueError('Empty list of coordinates: %r'%value)points=[]pointset=set()fori,lineinenumerate(value.split(','),1):pnt=point(line)ifpnt[:2]inpointset:raiseValueError("Found overlapping site #%d, %s"%(i,line))pointset.add(pnt[:2])points.append(pnt)returnpoints
[docs]defwkt_polygon(value):""" Convert a string with a comma separated list of coordinates into a WKT polygon, by closing the ring. """points=['%s%s'%(lon,lat)forlon,lat,depincoordinates(value)]# close the linear polygon ring by appending the first coord to the endpoints.append(points[0])return'POLYGON((%s))'%', '.join(points)
[docs]defpositivefloats(value):""" :param value: string of whitespace separated floats :returns: a list of positive floats """values=value.strip('[]').split()floats=list(map(positivefloat,values))returnfloats
[docs]deffloats(value):""" :param value: string of whitespace separated floats :returns: a list of floats """returnlist(map(float,value.split()))
[docs]defboolean(value):""" :param value: input string such as '0', '1', 'true', 'false' :returns: boolean >>> boolean('') False >>> boolean('True') True >>> boolean('false') False >>> boolean('t') Traceback (most recent call last): ... ValueError: Not a boolean: t """value=str(value).strip().lower()try:return_BOOL_DICT[value]exceptKeyError:raiseValueError('Not a boolean: %s'%value)
[docs]defrange01(value):""" :param value: a string convertible to a float in the range 0..1 """val=value.lower()ifval=='true':return1.elifval=='false':return0.returnFloatRange(0,1)(val)
probability=FloatRange(0,1)
[docs]defprobabilities(value,rows=0,cols=0):""" :param value: input string, comma separated or space separated :param rows: the number of rows if the floats are in a matrix (0 otherwise) :param cols: the number of columns if the floats are in a matrix (or 0 :returns: a list of probabilities >>> probabilities('') [] >>> probabilities('1') [1.0] >>> probabilities('0.1 0.2') [0.1, 0.2] >>> probabilities('0.1, 0.2') # commas are ignored [0.1, 0.2] """probs=list(map(probability,value.replace(',',' ').split()))ifrowsandcols:probs=numpy.array(probs).reshape((len(rows),len(cols)))returnprobs
[docs]defdecreasing_probabilities(value):""" :param value: input string, comma separated or space separated :returns: a list of decreasing probabilities >>> decreasing_probabilities('1') Traceback (most recent call last): ... ValueError: Not enough probabilities, found '1' >>> decreasing_probabilities('0.2 0.1') [0.2, 0.1] >>> decreasing_probabilities('0.1 0.2') Traceback (most recent call last): ... ValueError: The probabilities 0.1 0.2 are not in decreasing order """probs=probabilities(value)iflen(probs)<2:raiseValueError('Not enough probabilities, found %r'%value)elifsorted(probs,reverse=True)!=probs:raiseValueError('The probabilities %s are not in decreasing order'%value)returnprobs
[docs]defintensity_measure_type(value):""" Make sure `value` is a valid intensity measure type and return it in a normalized form >>> intensity_measure_type('SA(0.10)') # NB: strips the trailing 0 'SA(0.1)' >>> intensity_measure_type('SA') # this is invalid Traceback (most recent call last): ... ValueError: Invalid IMT: 'SA' """try:returnstr(imt.from_string(value))exceptException:raiseValueError("Invalid IMT: '%s'"%value)
[docs]defcheck_levels(imls,imt,min_iml=1E-10):""" Raise a ValueError if the given levels are invalid. :param imls: a list of intensity measure and levels :param imt: the intensity measure type :param min_iml: minimum intensity measure level (default 1E-10) >>> check_levels([0.1, 0.2], 'PGA') # ok >>> check_levels([], 'PGA') Traceback (most recent call last): ... ValueError: No imls for PGA: [] >>> check_levels([0.2, 0.1], 'PGA') Traceback (most recent call last): ... ValueError: The imls for PGA are not sorted: [0.2, 0.1] >>> check_levels([0.2, 0.2], 'PGA') Traceback (most recent call last): ... ValueError: Found duplicated levels for PGA: [0.2, 0.2] """iflen(imls)<1:raiseValueError('No imls for %s: %s'%(imt,imls))elifimls!=sorted(imls):raiseValueError('The imls for %s are not sorted: %s'%(imt,imls))eliflen(distinct(imls))<len(imls):raiseValueError("Found duplicated levels for %s: %s"%(imt,imls))elifimls[0]==0andimls[1]<=min_iml:# apply the cutoffraiseValueError("The min_iml %s=%s is larger than the second level ""for %s"%(imt,min_iml,imls))elifimls[0]==0andimls[1]>min_iml:# apply the cutoffimls[0]=min_iml
[docs]defintensity_measure_types_and_levels(value):""" :param value: input string :returns: Intensity Measure Type and Levels dictionary >>> intensity_measure_types_and_levels('{"SA(0.10)": [0.1, 0.2]}') {'SA(0.1)': [0.1, 0.2]} """dic=dictionary(value)forimt_str,imlsinlist(dic.items()):norm_imt=str(imt.from_string(imt_str))ifnorm_imt!=imt_str:dic[norm_imt]=imlsdeldic[imt_str]check_levels(imls,imt_str)# ValueError if the levels are invalidreturndic
[docs]defloss_ratios(value):""" :param value: input string :returns: dictionary loss_type -> loss ratios >>> loss_ratios('{"structural": [0.1, 0.2]}') {'structural': [0.1, 0.2]} """dic=dictionary(value)forlt,ratiosindic.items():forratioinratios:ifnot0<=ratio<=1:raiseValueError('Loss ratio %f for loss_type %s is not in ''the range [0, 1]'%(ratio,lt))check_levels(ratios,lt)# ValueError if the levels are invalidreturndic
[docs]deflogscale(x_min,x_max,n):""" :param x_min: minumum value :param x_max: maximum value :param n: number of steps :returns: an array of n values from x_min to x_max """ifnot(isinstance(n,int)andn>0):raiseValueError('n must be a positive integer, got %s'%n)ifx_min<=0:raiseValueError('x_min must be positive, got %s'%x_min)ifx_max<=x_min:raiseValueError('x_max (%s) must be bigger than x_min (%s)'%(x_max,x_min))delta=numpy.log(x_max/x_min)returnnumpy.exp(delta*numpy.arange(n)/(n-1))*x_min
[docs]defdictionary(value):""" :param value: input string corresponding to a literal Python object :returns: the Python object >>> dictionary('') {} >>> dictionary('{}') {} >>> dictionary('{"a": 1}') {'a': 1} >>> dictionary('"vs30_clustering: true"') # an error really done by a user Traceback (most recent call last): ... ValueError: '"vs30_clustering: true"' is not a valid Python dictionary >>> dictionary('{"ls": logscale(0.01, 2, 5)}') {'ls': [0.01, 0.03760603093086393, 0.14142135623730948, 0.5318295896944986, 1.9999999999999991]} """ifnotvalue:return{}value=value.replace('logscale(','("logscale", ')# dirty but quicktry:dic=dict(ast.literal_eval(value))exceptException:raiseValueError('%r is not a valid Python dictionary'%value)forkey,valindic.items():try:has_logscale=(val[0]=='logscale')exceptException:# no val[0]continueifhas_logscale:dic[key]=list(logscale(*val[1:]))returndic
[docs]defmag_scale_rel(value):""" :param value: a Magnitude-Scale relationship in hazardlib :returns: the corresponding hazardlib object Parametric MSR classes are supported with TOML syntax; for instance >>> mag_scale_rel("CScalingMSR.C=4.7") <CScalingMSR> """value=value.strip()if'.'invalueor'['invalue:[(value,kwargs)]=toml.loads(value).items()else:kwargs={}ifvaluenotinSCALEREL:raiseValueError("'%s' is not a recognized magnitude-scale relationship"%value)returnSCALEREL[value](**kwargs)
[docs]defpmf(value):""" Comvert a string into a Probability Mass Function. :param value: a sequence of probabilities summing up to 1 (no commas) :returns: a list of pairs [(probability, index), ...] with index starting from 0 >>> pmf("0.157 0.843") [(0.157, 0), (0.843, 1)] """probs=probabilities(value)ifsum(probs)!=1:# avoid https://github.com/gem/oq-engine/issues/5901raiseValueError('The probabilities %s do not sum up to 1!'%value)return[(p,i)fori,pinenumerate(probs)]
[docs]defcheck_weights(nodes_with_a_weight):""" Ensure that the sum of the values is 1 :param nodes_with_a_weight: a list of Node objects with a weight attribute """weights=[n['weight']forninnodes_with_a_weight]ifabs(sum(weights)-1.)>PRECISION:raiseValueError('The weights do not sum up to 1: %s'%weights)returnnodes_with_a_weight
[docs]defweights(value):""" Space-separated list of weights: >>> weights('0.1 0.2 0.7') [0.1, 0.2, 0.7] >>> weights('0.1 0.2 0.8') Traceback (most recent call last): ... ValueError: The weights do not sum up to 1: [0.1, 0.2, 0.8] """probs=probabilities(value)ifabs(sum(probs)-1.)>PRECISION:raiseValueError('The weights do not sum up to 1: %s'%probs)returnprobs
[docs]defhypo_list(nodes):""" :param nodes: a hypoList node with N hypocenter nodes :returns: a numpy array of shape (N, 3) with strike, dip and weight """check_weights(nodes)data=[]fornodeinnodes:data.append([node['alongStrike'],node['downDip'],node['weight']])returnnumpy.array(data,float)
[docs]defslip_list(nodes):""" :param nodes: a slipList node with N slip nodes :returns: a numpy array of shape (N, 2) with slip angle and weight """check_weights(nodes)data=[]fornodeinnodes:data.append([slip_range(~node),node['weight']])returnnumpy.array(data,float)
[docs]defposList(value):""" :param value: a string with the form `lon1 lat1 [depth1] ... lonN latN [depthN]` without commas, where the depts are optional. :returns: a list of floats without other validations """values=value.split()num_values=len(values)ifnum_values%3andnum_values%2:raiseValueError('Wrong number: nor pairs not triplets: %s'%values)try:returnlist(map(float_,values))exceptExceptionasexc:raiseValueError('Found a non-float in %s: %s'%(value,exc))
[docs]defpoint3d(value,lon,lat,depth):""" This is used to convert nodes of the form <hypocenter lon="LON" lat="LAT" depth="DEPTH"/> :param value: None :param lon: longitude string :param lat: latitude string :returns: a validated triple (lon, lat, depth) """returnlongitude(lon),latitude(lat),positivefloat(depth)
[docs]defab_values(value):""" a and b values of the GR magniture-scaling relation. a is a positive float, b is just a float. """a,b=value.split()returnpositivefloat(a),float_(b)
[docs]defintegers(value):""" :param value: input string :returns: non-empty list of integers >>> integers('1, 2') [1, 2] >>> integers(' ') Traceback (most recent call last): ... ValueError: Not a list of integers: ' ' """if'.'invalue:raiseValueError('There are decimal points in %s'%value)values=value.strip('[]').replace(',',' ').split()ifnotvalues:raiseValueError('Not a list of integers: %r'%value)try:ints=[int(float(v))forvinvalues]exceptException:raiseValueError('Not a list of integers: %r'%value)returnints
[docs]defpositiveints(value):""" >>> positiveints('1, -1') Traceback (most recent call last): ... ValueError: -1 is negative in '1, -1' """ints=integers(value)forvalinints:ifval<0:raiseValueError('%d is negative in %r'%(val,value))returnints
[docs]deftile_spec(value):""" Specify a tile with a string of format "no:nt" where `no` is an integer in the range `1..nt` and `nt` is the total number of tiles. >>> tile_spec('[1,2]') [1, 2] >>> tile_spec('[2,2]') [2, 2] """no,ntiles=ast.literal_eval(value)assertntiles>0,ntilesassertno>0andno<=ntiles,noreturn[no,ntiles]
[docs]defuncertainty_model(value):""" Format whitespace in XML nodes of kind uncertaintyModel """ifvalue.lstrip().startswith('['):# TOML, do not mess with newlinesreturnvalue.strip()return' '.join(value.split())# remove newlines too
[docs]defhost_port(value=None):""" Returns a pair (host_IP, port_number). >>> host_port('localhost:1908') ('127.0.0.1', 1908) If value is missing returns the parameters in openquake.cfg """ifnotvalue:host=os.environ.get('OQ_DATABASE',config.dbserver.host)return(host,config.dbserver.port)host,port=value.split(':')returnsocket.gethostbyname(host),int(port)
# used for the exposure validationcost_type=Choice('structural','nonstructural','contents','business_interruption')cost_type_type=Choice('aggregated','per_area','per_asset')
[docs]defsite_param(dic):""" Convert a dictionary site_model_param -> string into a dictionary of valid casted site parameters. """new={}forname,valindic.items():ifname=='vs30Type':# avoid "Unrecognized parameter vs30Type"new['vs30measured']=val=='measured'elifnamenotinsite.site_param_dt:raiseValueError('Unrecognized parameter %s'%name)else:new[name]=valreturnnew
[docs]classParam(object):""" A descriptor for validated parameters with a default, to be used as attributes in ParamSet objects. :param validator: the validator :param default: the default value """NODEFAULT=object()def__init__(self,validator,default=NODEFAULT,name=None):ifnotcallable(validator):raiseValueError('%r for %s is not a validator: it is not callable'%(validator,name))ifnothasattr(validator,'__name__'):raiseValueError('%r for %s is not a validator: it has no __name__'%(validator,name))self.validator=validatorself.default=defaultself.name=name# set by ParamSet.__metaclass__def__get__(self,obj,objclass):ifobjisnotNone:ifself.defaultisself.NODEFAULT:raiseAttributeError(self.name)returnself.defaultreturnself
[docs]classMetaParamSet(type):""" Set the `.name` attribute of every Param instance defined inside any subclass of ParamSet. """def__init__(cls,name,bases,dic):forname,valindic.items():ifisinstance(val,Param):val.name=name
# used in commonlib.oqvalidation
[docs]classParamSet(metaclass=MetaParamSet):""" A set of valid interrelated parameters. Here is an example of usage: >>> class MyParams(ParamSet): ... a = Param(positiveint) ... b = Param(positivefloat) ... ... def is_valid_not_too_big(self): ... "The sum of a and b must be under 10: a={a} and b={b}" ... return self.a + self.b < 10 >>> mp = MyParams(a='1', b='7.2') >>> mp <MyParams a=1, b=7.2> >>> MyParams(a='1', b='9.2').validate() Traceback (most recent call last): ... ValueError: The sum of a and b must be under 10: a=1 and b=9.2 The constrains are applied in lexicographic order. The attribute corresponding to a Param descriptor can be set as usual: >>> mp.a = '2' >>> mp.a '2' A list with the literal strings can be extracted as follows: >>> mp.to_params() [('a', "'2'"), ('b', '7.2')] It is possible to build a new object from a dictionary of parameters which are assumed to be already validated: >>> MyParams.from_(dict(a="'2'", b='7.2')) <MyParams a='2', b=7.2> """params={}KNOWN_INPUTS={}
[docs]@classmethoddefcheck(cls,dic):""" Check if a dictionary name->string can be converted into a dictionary name->value. If the name does not correspond to a known parameter, print a warning. :returns: a dictionary of converted parameters """out={}forname,textindic.items():try:p=getattr(cls,name)exceptAttributeError:logging.warning('Ignored unknown parameter %s',name)else:out[name]=p.validator(text)returnout
[docs]@classmethoddeffrom_(cls,dic):""" Build a new ParamSet from a dictionary of string-valued parameters which are assumed to be already valid. """self=cls.__new__(cls)fork,vindic.items():setattr(self,k,ast.literal_eval(v))returnself
[docs]defto_params(self):""" Convert the instance dictionary into a sorted list of pairs (name, valrepr) where valrepr is the string representation of the underlying value. """dic=self.__dict__return[(k,repr(dic[k]))forkinsorted(dic)ifnotk.startswith('_')]
def__init__(self,**names_vals):forname,valinnames_vals.items():ifname.startswith(('_','is_valid_')):raiseNameError('The parameter name %s is not acceptable'%name)try:convert=getattr(self.__class__,name).validatorexceptAttributeError:ifnamenotinself.KNOWN_INPUTS:logging.warning("The parameter '%s' is unknown, ignoring"%name)continuetry:value=convert(val)exceptExceptionasexc:raiseValueError('%s: could not convert to %s: %s=%s'%(exc,convert.__name__,name,val))setattr(self,name,value)
[docs]defvalidate(self):""" Apply the `is_valid` methods to self and possibly raise a ValueError. """# it is important to have the validator applied in a fixed ordervalids=[getattr(self,valid)forvalidinsorted(dir(self.__class__))ifvalid.startswith('is_valid_')]foris_validinvalids:ifnotis_valid():docstring='\n'.join(line.strip()forlineinis_valid.__doc__.splitlines())doc=docstring.format(**vars(self))raiseValueError(doc)
[docs]defjson(self):""" :returns: the parameters as a JSON string """dic={k:_fix_toml(v)fork,vinself.__dict__.items()ifnotk.startswith('_')}returnjson.dumps(dic)
[docs]classRjbEquivalent(object):""" A class to compute the equivalent Rjb distance. Usage: >> reqv = RjbEquivalent('lookup.hdf5') >> reqv.get(repi_distances, mag) """def__init__(self,filename):withhdf5.File(filename,'r')asf:self.repi=f['default/repi'][()]# shape Dself.mags=f['default/mags'][()]# shape Mself.reqv=f['default/reqv'][()]# shape D x M
[docs]defget(self,repi,mag):""" :param repi: an array of epicentral distances in the range self.repi :param mag: a magnitude in the range self.mags :returns: an array of equivalent distances """mag_idx=numpy.abs(mag-self.mags).argmin()dists=[]fordistinrepi:repi_idx=numpy.abs(dist-self.repi).argmin()dists.append(self.reqv[repi_idx,mag_idx])returnnumpy.array(dists)
[docs]defbasename(src,splitchars='.:'):""" :returns: the base name of a split source >>> basename('SC:10;0') 'SC;0' """src_id=srcifisinstance(src,str)elsesrc.source_idforcharinsplitchars:src_id=re.sub(r'\%s\d+'%char,'',src_id)returnsrc_id
[docs]defcorename(src):""" :param src: source object or source name :returns: the core name of a source """src=srcifisinstance(src,str)elsesrc.source_id# @ section of multifault source# ! source model logic tree branch# : component of mutex source# ; alternate logictree version of a source# . component of split sourcereturnre.split('[!:;.]',src)[0]
[docs]deffragmentno(src):"Postfix after :.; as an integer"# in disagg/case-12 one has source IDs like 'SL_kerton:665!b16'fragments=re.split('[:.;]',src.source_id)iflen(fragments)==1:# no fragment number, like in AELO for NZLreturn-1fragment=fragments[1].split('!')[0]# strip !b16returnint(fragment)