# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2010-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>."""Logic tree parser, verifier and processor. See specs athttps://blueprints.launchpad.net/openquake-old/+spec/openquake-logic-tree-moduleA logic tree object must be iterable and yielding realizations, i.e. objectswith attributes `value`, `weight`, `lt_path` and `ordinal`."""importosimportreimportastimportcopyimportjsonimporttimeimportloggingimportfunctoolsimportitertoolsimportcollectionsimportoperatorimportnumpyfromopenquake.baselibimporthdf5,nodefromopenquake.baselib.python3compatimportdecodefromopenquake.baselib.nodeimportnode_from_elem,context,Nodefromopenquake.baselib.generalimport(cached_property,groupby,group_array,AccumDict,BASE183,BASE33489)fromopenquake.hazardlibimportnrml,InvalidFile,pmf,validfromopenquake.hazardlib.sourceconverterimportSourceGroupfromopenquake.hazardlib.gsim_ltimport(GsimLogicTree,bsnodes,fix_bytes,keyno,abs_paths)fromopenquake.hazardlib.ltimport(Branch,BranchSet,count_paths,Realization,CompositeLogicTree,dummy_branchset,LogicTreeError,parse_uncertainty,random)U16=numpy.uint16U32=numpy.uint32I32=numpy.int32F32=numpy.float32TWO24=2**24rlz_dt=numpy.dtype([('ordinal',U32),('branch_path',hdf5.vstr),('weight',F32),])source_dt=numpy.dtype([('branch',hdf5.vstr),('trt',hdf5.vstr),('fname',hdf5.vstr),# useful to reduce the XML files('source',hdf5.vstr),])source_model_dt=numpy.dtype([('name',hdf5.vstr),('weight',F32),('path',hdf5.vstr),('samples',U32),])src_group_dt=numpy.dtype([('trt_smr',U32),('name',hdf5.vstr),('trti',U16),('effrup',I32),('totrup',I32),('sm_id',U32),])branch_dt=numpy.dtype([('branchset',hdf5.vstr),('branch',hdf5.vstr),('utype',hdf5.vstr),('uvalue',hdf5.vstr),('weight',float),])TRT_REGEX=re.compile(r'tectonicRegion="([^"]+?)"')ID_REGEX=re.compile(r'Source\s+id="([^"]+?)"')OQ_REDUCE=os.environ.get('OQ_REDUCE')=='smlt'# this is very fast
[docs]defget_trt_by_src(source_model_file,source_id=''):""" :returns: a dictionary source ID -> tectonic region type of the source """xml=source_model_file.read()trt_by_src={}if"http://openquake.org/xmlns/nrml/0.5"inxml:# fast lane using regex, tectonicRegion is always before Source idpieces=TRT_REGEX.split(xml.replace("'",'"'))# fix single quotesfortext,trtinzip(pieces[2::2],pieces[1::2]):forsrc_idinID_REGEX.findall(text):# disagg/case_12src_id=src_id.split(':')[0]# colon conventionifsource_id:ifsource_id==src_id:trt_by_src[src_id]=trtelse:trt_by_src[src_id]=trtelse:# parse the XML with ElementTreeforsrcinnode.fromstring(xml)[0]:src_id=src.attrib['id'].split(':')[0]# colon conventionifsource_id:ifsource_id==src_id:trt_by_src[src_id]=src.attrib['tectonicRegion']else:trt_by_src[src_id]=src.attrib['tectonicRegion']returntrt_by_src
[docs]defprod(iterator):""" Replacement of math.prod for Python < 3.8 """res=1foreliniterator:res*=elreturnres
[docs]defunique(objects,key=None):""" Raise a ValueError if there is a duplicated object, otherwise returns the objects as they are. """dupl=[]forobj,groupinitertools.groupby(sorted(objects),key):ifsum(1for_ingroup)>1:dupl.append(obj)ifdupl:raiseValueError('Found duplicates %s'%dupl)returnobjects
[docs]@functools.lru_cache()defget_effective_rlzs(rlzs):""" Group together realizations with the same path and yield the first representative of each group. :param rlzs: a list of Realization instances with a .pid property """effective=[]ordinal=0forgroupingroupby(rlzs,operator.attrgetter('pid')).values():rlz=group[0]effective.append(Realization(rlz.value,sum(r.weightforringroup),ordinal,rlz.lt_path,len(group)))ordinal+=1returneffective
[docs]defget_eff_rlzs(sm_rlzs,gsim_rlzs):""" Group together realizations with the same path and yield the first representative of each group """triples=[]# pid, sm_rlz, gsim_rlzforsm_rlz,gsim_rlzinzip(sm_rlzs,gsim_rlzs):triples.append((sm_rlz.pid+'~'+gsim_rlz.pid,sm_rlz,gsim_rlz))ordinal=0effective=[]forrowsingroupby(triples,operator.itemgetter(0)).values():_pid,sm_rlz,gsim_rlz=rows[0]weight=numpy.array([len(rows)/len(triples)])effective.append(LtRealization(ordinal,sm_rlz.lt_path,gsim_rlz,weight))ordinal+=1returneffective
[docs]defcollect_info(smltpath,branchID=''):""" Given a path to a source model logic tree, collect all of the path names to the source models it contains. :param smltpath: source model logic tree file :param branchID: if given, consider only that branch :returns: an Info namedtuple (smpaths, h5paths, applytosources) """n=nrml.read(smltpath)try:blevels=n.logicTreeexceptException:raiseInvalidFile('%s is not a valid source_model_logic_tree_file'%smltpath)smpaths=set()h5paths=set()applytosources=collections.defaultdict(list)# branchID -> source IDsforblevelinblevels:forbsetinbsnodes(smltpath,blevel):if'applyToSources'inbset.attrib:applytosources[bset.get('applyToBranches')].extend(bset['applyToSources'].split())ifbset['uncertaintyType']in'sourceModel extendModel':forbrinbset:ifbranchIDandbranchID!=br['branchID']:continuewithcontext(smltpath,br):fnames=abs_paths(smltpath,unique(br.uncertaintyModel.text.split()))smpaths.update(fnames)forfnameinfnames:hdf5file=os.path.splitext(fname)[0]+'.hdf5'ifos.path.exists(hdf5file):h5paths.add(hdf5file)ifOQ_REDUCE:# only take first branchbreakreturnInfo(sorted(smpaths),sorted(h5paths),applytosources)
[docs]defreduce_fnames(fnames,source_id):""" If the source ID is ambiguous (i.e. there is "!") only returns the filenames containing the source, otherwise return all the filenames """try:_srcid,fname=source_id.split('!')exceptValueError:returnfnamesreturn[fforfinfnamesiffnameinf]
[docs]defread_source_groups(fname):""" :param fname: a path to a source model XML file :return: a list of SourceGroup objects containing source nodes """smodel=nrml.read(fname).sourceModelsrc_groups=[]ifsmodel[0].tag.endswith('sourceGroup'):# NRML 0.5 formatforsg_nodeinsmodel:sg=SourceGroup(sg_node['tectonicRegion'])sg.sources=sg_node.nodessrc_groups.append(sg)else:# NRML 0.4 format: smodel is a list of source nodessrc_groups.extend(SourceGroup.collect(smodel))returnsrc_groups
[docs]defshorten(path_tuple,shortener,kind):""" :param path: sequence of strings :param shortener: dictionary longstring -> shortstring :param kind: 'smlt' or 'gslt' :returns: shortened version of the path """# NB: path_tuple can have the form ('EDF_areas',# 'Mmax:10-br#0', 'Mmax:11-br#0', ..., 'ab:4014-br#23')# with shortener['EDF_areas'] = 'A0',# shortener['ab:4014-br#23'] = 'X138'iflen(shortener)==1:return'A'chars=[]forbsno,keyinenumerate(path_tuple):ifkey[0]=='.':# dummy branchchars.append('.')else:ifkind=='smlt'andbsno==0:# shortener[key] has the form 2-letters+numberchars.append(shortener[key][:2])else:# shortener[key] has the form letter+numberchars.append(shortener[key][0])return''.join(chars)
# useful to print reduced logic trees
[docs]defcollect_paths(paths,b1=ord('['),b2=ord(']'),til=ord('~')):""" Collect branch paths belonging to the same cluster >>> collect_paths([b'0~A0', b'0~A1']) b'[0]~[A][01]' """n=len(paths[0])forpathinpaths[1:]:assertlen(path)==n,(len(path),n)sets=[set()for_inrange(n)]forc,sinenumerate(sets):forpathinpaths:s.add(path[c])ints=[]forsinsets:chars=sorted(s)ifchars!=[til]:ints.append(b1)ints.extend(chars)ifchars!=[til]:ints.append(b2)returnbytes(ints)
[docs]defreducible(lt,cluster_paths):""" :param lt: a logic tree with B branches :param cluster_paths: list of paths for a realization cluster :returns: a list [filename, (branchSetID, branchIDs), ...] """longener={short:longforlong,shortinlt.shortener.items()}tuplesets=[set()for_inlt.bsetdict]forpathincluster_paths:forb,charsinenumerate(path.strip('][').split('][')):tuplesets[b].add(tuple(c+str(i)fori,cinenumerate(chars)))res=[lt.filename]forbs,tuplesetinzip(sorted(lt.bsetdict),tuplesets):# a branch is reducible if there is the same combinations for all pathstry:[br_ids]=tuplesetexceptValueError:continueres.append((bs,[longener[brid]forbridinbr_ids]))returnres
# this is not used right now, but tested
[docs]defreduce_full(full_lt,rlz_clusters):""" :param full_lt: a FullLogicTree instance :param rlz_clusters: list of paths for a realization cluster :returns: a dictionary with what can be reduced """smrlz_clusters=[]gsrlz_clusters=[]forpathinrlz_clusters:smr,gsr=decode(path).split('~')smrlz_clusters.append(smr)gsrlz_clusters.append(gsr)f1,*p1=reducible(full_lt.source_model_lt,smrlz_clusters)f2,*p2=reducible(full_lt.gsim_lt,gsrlz_clusters)before=(full_lt.source_model_lt.get_num_paths()*full_lt.gsim_lt.get_num_paths())after=before/prod(len(p[1])forpinp1+p2)return{f1:dict(p1),f2:dict(p2),'size_before_after':(before,after)}
[docs]classSourceModelLogicTree(object):""" Source model logic tree parser. :param filename: Full pathname of logic tree file :raises LogicTreeError: If logic tree file has a logic error, which can not be prevented by xml schema rules (like referencing sources with missing id). """_xmlschema=NoneFILTERS=('applyToTectonicRegionType','applyToSources','applyToBranches')ABSOLUTE_UNCERTAINTIES=('abGRAbsolute','bGRAbsolute','maxMagGRAbsolute','simpleFaultGeometryAbsolute','truncatedGRFromSlipAbsolute','complexFaultGeometryAbsolute','setMSRAbsolute')
[docs]@classmethoddeftrivial(cls,source_model_file,sampling_method='early_weights',source_id=''):""" :returns: a trivial SourceModelLogicTree with a single branch """self=object.__new__(cls)self.basepath=os.path.dirname(source_model_file)self.source_id=source_idself.source_data=[]ifsource_model_file=='_fake.xml':self.tectonic_region_types={'*'}else:self.tectonic_region_types=set()self.collect_source_model_data('br0',source_model_file)self.source_data=numpy.array(self.source_data,source_dt)self.info=Info([source_model_file],[],collections.defaultdict(list))arr=numpy.array([('bs0','br0','sourceModel',source_model_file,1)],branch_dt)dic=dict(filename=source_model_file,seed=0,num_samples=0,sampling_method=sampling_method,num_paths=1,is_source_specific=0,source_data=self.source_data,tectonic_region_types=self.tectonic_region_types,source_id=source_id,branchID='',bsetdict='{"bs0": {"uncertaintyType": "sourceModel"}}')self.__fromh5__(arr,dic)returnself
[docs]@classmethoddeffake(cls):""" :returns: a fake SourceModelLogicTree with a single branch """returncls.trivial('_fake.xml')
def__init__(self,filename,seed=0,num_samples=0,sampling_method='early_weights',test_mode=False,branchID='',source_id=''):self.filename=filenameself.basepath=os.path.dirname(filename)# NB: converting the random_seed into an integer is needed on Windowsself.seed=int(seed)self.num_samples=num_samplesself.sampling_method=sampling_methodself.test_mode=test_modeself.branchID=branchID# used to read only one sourceModel branchself.source_id=source_idself.branches={}# branch_id -> branchself.bsetdict={}self.previous_branches=[]self.tectonic_region_types=set()self.root_branchset=Noneroot=nrml.read(filename)try:tree=root.logicTreeexceptAttributeError:raiseLogicTreeError(root,self.filename,"missing logicTree node")self.shortener={}self.branchsets=[]self.parse_tree(tree)self.set_num_paths()
[docs]defset_num_paths(self):""" Count the total number of paths in a smart way """# determine if the logic tree is source specificdicts=list(self.bsetdict.values())[1:]ifnotdicts:self.is_source_specific=Falseself.num_paths=count_paths(self.root_branchset.branches)returnsrc_ids=set()fordicindicts:ats=dic.get('applyToSources')ifnotats:self.is_source_specific=Falseself.num_paths=count_paths(self.root_branchset.branches)returneliflen(ats.split())!=1:self.is_source_specific=Falseself.num_paths=count_paths(self.root_branchset.branches)returnsrc_ids.add(ats)# to be source-specific applyToBranches must be trivialself.is_source_specific=all(bset.appliedisNoneforbsetinself.branchsets)ifself.is_source_specific:# fast algorithm, otherwise models like ZAF would hangself.num_paths=prod(sslt.num_pathsforssltinself.decompose().values())else:# slow algorithmself.num_paths=count_paths(self.root_branchset.branches)
[docs]defreduce(self,source_id,num_samples=None):""" :returns: a new logic tree reduced to a single source """# NB: source_id contains "@" in the case of a split multi fault sourcenum_samples=self.num_samplesifnum_samplesisNoneelsenum_samplesnew=self.__class__(self.filename,self.seed,num_samples,self.sampling_method,self.test_mode,self.branchID,source_id)returnnew
[docs]defparse_tree(self,tree_node):""" Parse the whole tree and point ``root_branchset`` attribute to the tree's root. """t0=time.time()self.info=collect_info(self.filename,self.branchID)# the list is populated in collect_source_model_dataself.source_data=[]forbsno,bnodeinenumerate(tree_node.nodes):[bsnode]=bsnodes(self.filename,bnode)self.parse_branchset(bsnode,bsno)self.source_data=numpy.array(self.source_data,source_dt)unique=numpy.unique(self.source_data['fname'])dt=time.time()-t0logging.debug('Validated source model logic tree with %d underlying ''files in %.2f seconds',len(unique),dt)
[docs]defparse_branchset(self,branchset_node,bsno):""" :param branchset_ node: ``etree.Element`` object with tag "logicTreeBranchSet". :param bsno: The sequential number of the branchset, starting from 0. Enumerates children branchsets and call :meth:`parse_branchset`, :meth:`validate_branchset`, :meth:`parse_branches` and finally :meth:`apply_branchset` for each. Keeps track of "open ends" -- the set of branches that don't have any child branchset on this step of execution. After processing of every branchset only those branches that are listed in it can have child branchsets (if there is one on the next level). """attrs=branchset_node.attrib.copy()uncertainty_type=branchset_node.attrib.get('uncertaintyType')dic=dict((filtername,branchset_node.attrib.get(filtername))forfilternameinself.FILTERSiffilternameinbranchset_node.attrib)self.validate_filters(branchset_node,uncertainty_type,dic)filters=self.parse_filters(branchset_node,uncertainty_type,dic)if'applyToSources'infiltersandnotfilters['applyToSources']:return# ignore the branchsetordinal=len(self.bsetdict)branchset=BranchSet(uncertainty_type,ordinal,filters)branchset.id=bsid=attrs.pop('branchSetID')ifbsidinself.bsetdict:raisenrml.DuplicatedID('%s in %s'%(bsid,self.filename))self.bsetdict[bsid]=attrsself.validate_branchset(branchset_node,bsno,branchset)self.parse_branches(branchset_node,branchset)dummies=[]# dummy branches in case of applyToBranchesifself.root_branchsetisNone:# not set yetself.root_branchset=branchsetifnotbranchset.branches:delself.bsetdict[bsid]returnprev_ids=' '.join(pb.branch_idforpbinself.previous_branches)app2brs=branchset_node.attrib.get('applyToBranches')orprev_idsmissing=set(prev_ids.split())-set(app2brs.split())ifmissing:# apply only to some branchesbranchset.applied=app2brsself.apply_branchset(app2brs,branchset_node.lineno,branchset)not_applied=set(prev_ids.split())-set(app2brs.split())forbridinnot_applied:ifbridinself.branches:self.branches[brid].bset=dummy=dummy_branchset()[dummybranch]=dummy.branchesself.branches[dummybranch.branch_id]=dummybranchdummies.append(dummybranch)else:# apply to all previous branchesforbranchinself.previous_branches:branch.bset=branchsetself.previous_branches=branchset.branches+dummiesself.branchsets.append(branchset)
[docs]defparse_branches(self,branchset_node,branchset):""" Create and attach branches at ``branchset_node`` to ``branchset``. :param branchset_node: Same as for :meth:`parse_branchset`. :param branchset: An instance of :class:`BranchSet`. Checks that each branch has :meth:`valid <validate_uncertainty_value>` value, unique id and that all branches have total weight of 1.0. :return: ``None``, all branches are attached to provided branchset. """bs_id=branchset_node['branchSetID']weight_sum=0branches=branchset_node.nodesifOQ_REDUCE:# only take first branchbranches=[branches[0]]branches[0].uncertaintyWeight.text=1.values=[]bsno=len(self.branchsets)zeros=[]# NB: because the engine lacks the ability to apply correlated# uncertainties to all the sources in a source model, people build# spurious source models in preprocessing; for instance EDF/CEA have 4# real source models which are extended to 400 source models; this is# bad, since the required disk space is 100x larger, the read time is# 100x larger copying the files is an issue, etc.# To stop people to commit such abuses there is a limit of 183# branches; however, you can actually raise the limit to 33489 branches# by commenting/uncommenting the two lines below, if you really needmaxlen=183# maxlen = 183 if bsno else 33489 # sourceModel branchset can be longerifself.branchID==''andlen(branches)>maxlen:msg=('%s: the branchset %s has too many branches (%d > %d)\n''you should split it, see https://docs.openquake.org/''oq-engine/advanced/latest/logic_trees.html')raiseInvalidFile(msg%(self.filename,bs_id,len(branches),maxlen))forbrno,branchnodeinenumerate(branches):weight=~branchnode.uncertaintyWeightvalue_node=node_from_elem(branchnode.uncertaintyModel)ifvalue_node.textisnotNone:values.append(value_node.text.strip())value=parse_uncertainty(branchset.uncertainty_type,value_node,self.filename)ifbranchset.uncertainty_typein('sourceModel','extendModel'):vals=[]# filenames with sources in ittry:forfnameinvalue_node.text.split():if(fname.endswith(('.xml','.nrml'))andnotself.test_mode):ok=self.collect_source_model_data(branchnode['branchID'],fname)ifok:vals.append(fname)exceptExceptionasexc:raiseLogicTreeError(value_node,self.filename,str(exc))fromexcifself.branchIDandself.branchIDnotinbranchnode['branchID']:value=''# reduce all branches except branchIDelifself.source_id:# only the files containing source_idsrcid=self.source_id.split('@')[0]value=' '.join(reduce_fnames(vals,srcid))branch_id=branchnode.attrib.get('branchID')ifbranch_idinself.branches:raiseLogicTreeError(branchnode,self.filename,"branchID '%s' is not unique"%branch_id)ifvalue=='':# with logic tree reduction a branch can be empty# see case_68_biszero_id=branch_idzeros.append(weight)else:branch=Branch(bs_id,branch_id,weight,value)self.branches[branch_id]=branchbranchset.branches.append(branch)# use two-letter abbrev for the first branchset (sourceModel)base=BASE183ifbsnoelseBASE33489self.shortener[branch_id]=keyno(branch_id,bsno,brno,base)weight_sum+=weightifzeros:branch=Branch(bs_id,zero_id,sum(zeros),'')self.branches[branch_id]=branchbranchset.branches.append(branch)ifabs(weight_sum-1.0)>pmf.PRECISION:raiseLogicTreeError(branchset_node,self.filename,f"branchset weights sum up to {weight_sum}, not 1")if''.join(values)andlen(set(values))<len(values):raiseLogicTreeError(branchset_node,self.filename,"there are duplicate values in uncertaintyModel: "+' '.join(values))
[docs]defget_num_paths(self):""" :returns: the number of paths in the logic tree """returnself.num_samplesifself.num_sampleselseself.num_paths
def__iter__(self):""" Yield Realization tuples. Notice that the weight is homogeneous when sampling is enabled, since it is accounted for in the sampling procedure. """ifself.num_samples:# random sampling of the logic treeprobs=random((self.num_samples,len(self.bsetdict)),self.seed,self.sampling_method)ordinal=0forbranchesinself.root_branchset.sample(probs,self.sampling_method):value=[br.valueforbrinbranches]smlt_path_ids=[br.branch_idforbrinbranches]ifself.sampling_method.startswith('early_'):weight=1./self.num_samples# already accountedelifself.sampling_method.startswith('late_'):weight=numpy.prod([br.weightforbrinbranches])else:raiseNotImplementedError(self.sampling_method)yieldRealization(value,weight,ordinal,tuple(smlt_path_ids))ordinal+=1else:# full enumerationrlzs=[]forweight,branchesinself.root_branchset.enumerate_paths():value=[br.valueforbrinbranches]branch_ids=[branch.branch_idforbranchinbranches]rlz=Realization(value,weight,0,tuple(branch_ids))rlzs.append(rlz)rlzs.sort(key=operator.attrgetter('pid'))forr,rlzinenumerate(rlzs):rlz.ordinal=ryieldrlz
[docs]defparse_filters(self,branchset_node,uncertainty_type,filters):""" Converts "applyToSources" and "applyToBranches" filters by splitting into lists. """if'applyToSources'infilters:srcs=filters['applyToSources'].split()ifself.source_id:srcs=[srcforsrcinsrcsifsrc==self.source_id]filters['applyToSources']=srcsif'applyToBranches'infilters:filters['applyToBranches']=filters['applyToBranches'].split()returnfilters
[docs]defvalidate_filters(self,branchset_node,uncertainty_type,filters):""" See superclass' method for description and signature specification. Checks that the following conditions are met: * "sourceModel" uncertainties can not have filters. * Absolute uncertainties must have only one filter -- "applyToSources", with only one source id. * All other uncertainty types can have either no or one filter. * Filter "applyToSources" must mention only source ids that exist in source models. * Filter "applyToTectonicRegionType" must mention only tectonic region types that exist in source models. """f=filters.copy()if'applyToBranches'inf:delf['applyToBranches']ifuncertainty_type=='sourceModel'andf:raiseLogicTreeError(branchset_node,self.filename,'filters are not allowed on source model uncertainty')iflen(f)>1:raiseLogicTreeError(branchset_node,self.filename,"only one filter is allowed per branchset")if'applyToTectonicRegionType'inf:iff['applyToTectonicRegionType'] \
notinself.tectonic_region_types:raiseLogicTreeError(branchset_node,self.filename,"source models don't define sources of tectonic region ""type '%s'"%f['applyToTectonicRegionType'])ifuncertainty_typeinself.ABSOLUTE_UNCERTAINTIES:ifnotfornotlist(f)==['applyToSources'] \
ornotlen(f['applyToSources'].split())==1:raiseLogicTreeError(branchset_node,self.filename,"uncertainty of type '%s' must define 'applyToSources' ""with only one source id"%uncertainty_type)ifuncertainty_typein('simpleFaultDipRelative','simpleFaultDipAbsolute'):ifnotfor'applyToSources'notinf:raiseLogicTreeError(branchset_node,self.filename,"uncertainty of type '%s' must define 'applyToSources'"%uncertainty_type)if'applyToSources'inf:ifself.source_id:srcids=[sforsinf['applyToSources'].split()ifs==self.source_id]else:srcids=f['applyToSources'].split()forsource_idinsrcids:branchIDs={bridfor(brid,trt,fname,srcid)inself.source_dataifsrcid==source_id}ifnotbranchIDs:raiseLogicTreeError(branchset_node,self.filename,"source with id '%s' is not defined in source ""models"%source_id)elif(len(branchIDs)>1and'applyToBranches'notinbranchset_node.attrib):raiseLogicTreeError(branchset_node,self.filename,f"{source_id} belongs to multiple branches {branchIDs}"": applyToBranches"" must be specified together with"" applyToSources")
[docs]defvalidate_branchset(self,branchset_node,bsno,branchset):""" See superclass' method for description and signature specification. Checks that the following conditions are met: * First branching level must contain exactly one branchset, which must be of type "sourceModel". * All other branchsets must not be of type "sourceModel" or "gmpeModel". """ifbsno==0:ifbranchset.uncertainty_type!='sourceModel':raiseLogicTreeError(branchset_node,self.filename,'first branchset must define an uncertainty ''of type "sourceModel"')else:ifbranchset.uncertainty_type=='sourceModel':raiseLogicTreeError(branchset_node,self.filename,'uncertainty of type "sourceModel" can be defined ''on first branchset only')elifbranchset.uncertainty_type=='gmpeModel':raiseLogicTreeError(branchset_node,self.filename,'uncertainty of type "gmpeModel" is not allowed ''in source model logic tree')
[docs]defapply_branchset(self,apply_to_branches,lineno,branchset):""" See superclass' method for description and signature specification. Parses branchset node's attribute ``@applyToBranches`` to apply following branchests to preceding branches selectively. Branching level can have more than one branchset exactly for this: different branchsets can apply to different open ends. Checks that branchset tries to be applied only to branches on previous branching level which do not have a child branchset yet. """forbranch_idinapply_to_branches.split():ifbranch_idnotinself.branches:raiseLogicTreeError(lineno,self.filename,"branch '%s' is not yet defined"%branch_id)branch=self.branches[branch_id]ifnotbranch.is_leaf():raiseLogicTreeError(lineno,self.filename,"branch '%s' already has child branchset"%branch_id)branch.bset=branchset
def_get_source_model(self,source_model_file):# NB: do not remove this, it is meant to be overridden in the testsreturnopen(os.path.join(self.basepath,source_model_file),encoding='utf-8')
[docs]defcollect_source_model_data(self,branch_id,fname):""" Parse source model file and collect information about source ids, source types and tectonic region types available in it. That information is used then for :meth:`validate_filters` and :meth:`validate_uncertainty_value`. :param branch_id: source model logic tree branch ID :param fname: relative filename for the current source model portion :returns: the number of sources in the source model portion """withself._get_source_model(fname)assm:src=self.source_id.split('!')[0].split('@')[0]trt_by_src=get_trt_by_src(sm,src)ifself.basepath:path=sm.name[len(self.basepath)+1:]else:path=sm.nameforsrc_id,trtintrt_by_src.items():try:valid.source_id(src_id)exceptValueError:raiseInvalidFile('%s: contain invalid ID %s'%(sm.name,src_id))self.source_data.append((branch_id,trt,path,src_id))self.tectonic_region_types.add(trt)returnlen(trt_by_src)
[docs]defbset_values(self,lt_path):""" :param sm_rlz: an effective realization :returns: a list of B - 1 pairs (branchset, value) """returnself.root_branchset.get_bset_values(lt_path)[1:]
# used in the sslt page of the advanced manual
[docs]defdecompose(self):""" If the logic tree is source specific, returns a dictionary source ID -> SourceLogicTree instance """assertself.is_source_specificbsets=collections.defaultdict(list)bsetdict=collections.defaultdict(dict)forbsetinself.branchsets[1:]:ifbset.filters['applyToSources']:[src_id]=bset.filters['applyToSources']bsets[src_id].append(bset)bsetdict[src_id][bset.id]=self.bsetdict[bset.id]root=self.branchsets[0]iflen(root)>1:out={None:SourceLogicTree(None,[root],self.bsetdict[root.id])}else:out={}# src_id -> SourceLogicTreeforsrc_idinbsets:out[src_id]=SourceLogicTree(src_id,bsets[src_id],bsetdict[src_id])returnout
[docs]defto_node(self):""" :returns: a logicTree Node convertible into NRML format """bsnodes=[]forbsetinself.branchsets:dic=dict(branchSetID='bs%02d'%bset.ordinal,uncertaintyType=bset.uncertainty_type)brnodes=[]forbrinbset.branches:um=Node('uncertaintyModel',{},br.value)uw=Node('uncertaintyWeight',{},br.weight)brnode=Node('logicTreeBranch',{'branchID':br.branch_id},nodes=[um,uw])brnodes.append(brnode)bsnodes.append(Node('logicTreeBranchSet',dic,nodes=brnodes))returnNode('logicTree',{'logicTreeID':'lt'},nodes=bsnodes)
[docs]defget_duplicated_sources(self):""" :returns: {src_id: affected branches} """sd=group_array(self.source_data,'source')u,c=numpy.unique(self.source_data['source'],return_counts=1)# AUS event based was hanging with a slower implementationreturn{src:sd[src]['branch']forsrcinu[c>1]}
# SourceModelLogicTreedef__toh5__(self):tbl=[]forbrid,brinself.branches.items():ifbr.bs_id.startswith('dummy'):continue# don't store dummy branchesdic=self.bsetdict[br.bs_id].copy()utype=dic['uncertaintyType']tbl.append((br.bs_id,brid,utype,repr(br.value),br.weight))attrs=dict(bsetdict=json.dumps(self.bsetdict))attrs['seed']=self.seedattrs['num_samples']=self.num_samplesattrs['sampling_method']=self.sampling_methodattrs['filename']=self.filenameattrs['num_paths']=self.num_pathsattrs['is_source_specific']=self.is_source_specificattrs['source_id']=self.source_idattrs['branchID']=self.branchIDreturnnumpy.array(tbl,branch_dt),attrs# SourceModelLogicTreedef__fromh5__(self,array,attrs):# this is rather tricky; to understand it, run the test# SerializeSmltTestCase which has a logic tree with 3 branchsets# with the form b11[b21[b31, b32], b22[b31, b32]] and 1 x 2 x 2 rlzsvars(self).update(attrs)self.test_mode=Falsebsets=[]self.branches={}self.bsetdict=json.loads(attrs['bsetdict'])self.shortener={}acc=AccumDict(accum=[])# bsid -> rowsforrecinarray:rec=fix_bytes(rec)# NB: it is important to keep the order of the branchsetsacc[rec['branchset']].append(rec)forordinal,(bsid,rows)inenumerate(acc.items()):utype=rows[0]['utype']filters={}ats=self.bsetdict[bsid].get('applyToSources')atb=self.bsetdict[bsid].get('applyToBranches')ifats:filters['applyToSources']=ats.split()ifatb:filters['applyToBranches']=atb.split()bset=BranchSet(utype,ordinal,filters)bset.id=bsidforno,rowinenumerate(rows):try:uvalue=ast.literal_eval(row['uvalue'])except(SyntaxError,ValueError):uvalue=row['uvalue']# not really deserializable :-(br=Branch(bsid,row['branch'],row['weight'],uvalue)self.branches[br.branch_id]=brbase=BASE33489ifutype=='sourceModel'elseBASE183self.shortener[br.branch_id]=keyno(br.branch_id,ordinal,no,base)bset.branches.append(br)bsets.append(bset)CompositeLogicTree(bsets)# perform attach_to_branchesself.branchsets=bsets# bsets [<b11>, <b21 b22>, <b31 b32>]self.root_branchset=bsets[0]def__str__(self):return'<%s%s>'%(self.__class__.__name__,repr(self.root_branchset))
[docs]defcapitalize(words):""" Capitalize words separated by spaces. """return' '.join(w.capitalize()forwindecode(words).split(' '))
[docs]defget_field(data,field,default):""" :param data: a record with a field `field`, possibily missing """try:returndata[field]exceptValueError:# field missing in old enginesreturndefault
[docs]classLtRealization(object):""" Composite realization build on top of a source model realization and a GSIM realization. """# NB: for EUR, with 302_990_625 realizations, the usage of __slots__# save little memory, from 95.3 GB down to 81.0 GB__slots__=['ordinal','sm_lt_path','gsim_rlz','weight']def__init__(self,ordinal,sm_lt_path,gsim_rlz,weight):self.ordinal=ordinalself.sm_lt_path=sm_lt_pathself.gsim_rlz=gsim_rlzself.weight=weightdef__repr__(self):return'<%d,w=%s>'%(self.ordinal,self.weight)@propertydefgsim_lt_path(self):returnself.gsim_rlz.lt_pathdef__lt__(self,other):returnself.ordinal<other.ordinaldef__eq__(self,other):returnrepr(self)==repr(other)def__ne__(self,other):returnrepr(self)!=repr(other)def__hash__(self):returnhash(repr(self))
[docs]classFullLogicTree(object):""" The full logic tree as composition of :param source_model_lt: :class:`SourceModelLogicTree` object :param gsim_lt: :class:`GsimLogicTree` object """oversampling='tolerate'
[docs]@classmethoddeffake(cls,gsimlt=None):""" :returns: a fake `FullLogicTree` instance with the given gsim logic tree object; if None, builds automatically a fake gsim logic tree """gsim_lt=gsimltorGsimLogicTree.from_('[FromFile]')fakeSM=Realization('scenario',weight=1,ordinal=0,lt_path='b1',samples=1)self=object.__new__(cls)self.source_model_lt=SourceModelLogicTree.fake()self.gsim_lt=gsim_ltself.sm_rlzs=[fakeSM]returnself
def__init__(self,source_model_lt,gsim_lt,oversampling='tolerate'):self.source_model_lt=source_model_ltself.gsim_lt=gsim_ltself.oversampling=oversamplingself.init()# set .sm_rlzs and .trtsdef__getstate__(self):# .sd will not be available in the workersreturn{'source_model_lt':self.source_model_lt,'gsim_lt':self.gsim_lt,'oversampling':self.oversampling}
[docs]definit(self):ifself.source_model_lt.num_samples:# NB: the number of effective rlzs can be less than the number# of realizations in case of samplingself.sm_rlzs=get_effective_rlzs(self.source_model_lt)else:# full enumerationsamples=self.gsim_lt.get_num_paths()self.sm_rlzs=[]forsm_rlzinself.source_model_lt:sm_rlz.samples=samplesself.sm_rlzs.append(sm_rlz)self.Re=len(self.sm_rlzs)assertself.Re<=TWO24,len(self.sm_rlzs)self.trti={trt:ifori,trtinenumerate(self.gsim_lt.values)}self.trts=list(self.gsim_lt.values)R=self.get_num_paths()logging.info('Building {:_d} realizations'.format(R))self.weights=numpy.array(# shape (R, 1) or (R, M+1)[rlz.weightforrlzinself.get_realizations()])returnself
[docs]defwget(self,weights,imt):""" Dispatch to the underlying gsim_lt.wget except for sampling """ifself.num_samples:returnweights[:,-1]returnself.gsim_lt.wget(weights,imt)
[docs]defgfull(self,all_trt_smrs):""" :returns: the total Gt = Σ_i G_i """Gt=0fortrt_smrsinall_trt_smrs:trt=self.trts[trt_smrs[0]//TWO24]Gt+=len(self.gsim_lt.values[trt])returnGt
[docs]defget_gids(self,all_trt_smrs):""" :returns: list of of arrays of gids, one for each source group """gids=[]g=0fortrt_smrsinall_trt_smrs:rbg=self.get_rlzs_by_gsim(trt_smrs)gids.append(numpy.arange(g,g+len(rbg)))g+=len(rbg)returngids
[docs]defget_trt_rlzs(self,all_trt_smrs):""" :returns: a list with Gt arrays of dtype uint32 """data=[]fortrt_smrsinall_trt_smrs:forrlzsinself.get_rlzs_by_gsim(trt_smrs).values():data.append(U32(rlzs)+TWO24*(trt_smrs[0]//TWO24))returndata
[docs]defg_weights(self,all_trt_smrs):""" :returns: an array of weights of shape (Gt, 1) or (Gt, M+1) """data=[]fortrt_smrsinall_trt_smrs:forrlzsinself.get_rlzs_by_gsim(trt_smrs).values():data.append(self.weights[rlzs].sum(axis=0))returnnumpy.array(data)
[docs]deftrt_by(self,trt_smr):""" :returns: the TRT associated to trt_smr """iflen(self.trts)==1:returnself.trts[0]returnself.trts[trt_smr//TWO24]
@propertydefseed(self):""" :returns: the source_model_lt seed """returnself.source_model_lt.seed@propertydefnum_samples(self):""" :returns: the source_model_lt ``num_samples`` parameter """returnself.source_model_lt.num_samples@propertydefsampling_method(self):""" :returns: the source_model_lt ``sampling_method`` parameter """returnself.source_model_lt.sampling_method@cached_propertydefsd(self):returngroup_array(self.source_model_lt.source_data,'source')
[docs]defget_trt_smrs(self,src_id=None):""" :returns: a tuple of indices trt_smr for the given source """try:sd=self.source_model_lt.source_dataexceptAttributeError:# fake logic treereturn0,ifsrc_idisNone:returntuple(trti*TWO24+sm_rlz.ordinalforsm_rlzinself.sm_rlzsfortrtiinself.trti)sd=self.sd[src_id]trt=sd['trt'][0]# all same trttrti=0iftrt=='*'elseself.trti[trt]brids=set(sd['branch'])returntuple(trti*TWO24+sm_rlz.ordinalforsm_rlzinself.sm_rlzsifset(sm_rlz.lt_path)&brids)
# NB: called by the source_reader with smr and by# .reduce_groups with source_id
[docs]defset_trt_smr(self,srcs,source_id=None,smr=None):""" :param srcs: source objects :param source_id: base source ID :param srm: source model realization index :returns: list of sources with the same base source ID """ifnotself.trti:# empty gsim_ltreturnsrcssd=self.sdout=[]forsrcinsrcs:srcid=valid.corename(src)ifsource_idandsrcid!=source_id:continue# filterifself.trti=={'*':0}:# passed gsim=XXX in the job.initrti=0else:trti=self.trti[src.tectonic_region_type]ifsmrisNoneand';'insrc.source_id:# assume <base_id>;<smr>smr=_get_smr(src.source_id)ifsmrisNone:# called by .reduce_groupssrcid=srcid.split('@')[0]try:# check if ambiguous source IDsrcid,fname=srcid.rsplit('!')exceptValueError:# non-ambiguous source IDfname=''ok=slice(None)else:ok=[fnameinstringforstringinsd[srcid]['fname']]brids=set(sd[srcid]['branch'][ok])tup=tuple(trti*TWO24+sm_rlz.ordinalforsm_rlzinself.sm_rlzsifset(sm_rlz.lt_path)&brids)else:tup=trti*TWO24+smr# print('Setting %s on %s' % (tup, src))src.trt_smr=tup# realizations impacted by the sourceout.append(src)returnout
[docs]defreduce_groups(self,src_groups):""" Filter the sources and set the tuple .trt_smr """groups=[]source_id=self.source_model_lt.source_idforsginsrc_groups:ok=self.set_trt_smr(sg,source_id)ifok:grp=copy.copy(sg)grp.sources=okgroups.append(grp)returngroups
[docs]defgsim_by_trt(self,rlz):""" :returns: a dictionary trt->gsim for the given realization """returndict(zip(self.gsim_lt.values,rlz.gsim_rlz.value))
[docs]defget_num_paths(self):""" :returns: number of the paths in the full logic tree """ifself.num_samples:returnself.num_samplesreturnlen(self.sm_rlzs)*self.gsim_lt.get_num_paths()
[docs]defget_realizations(self):""" :returns: the complete list of LtRealizations """ifhasattr(self,'_rlzs'):returnself._rlzsnum_samples=self.source_model_lt.num_samplesifnum_samples:# samplingrlzs=numpy.empty(num_samples,object)sm_rlzs=[]forsm_rlzinself.sm_rlzs:sm_rlzs.extend([sm_rlz]*sm_rlz.samples)gsim_rlzs=self.gsim_lt.sample(num_samples,self.seed+1,self.sampling_method)fori,gsim_rlzinenumerate(gsim_rlzs):rlzs[i]=LtRealization(i,sm_rlzs[i].lt_path,gsim_rlz,sm_rlzs[i].weight*gsim_rlz.weight)ifself.sampling_method.startswith('early_'):forrlzinrlzs:rlz.weight[:]=1./num_sampleselse:# full enumerationgsim_rlzs=list(self.gsim_lt)ws=numpy.array([gsim_rlz.weightforgsim_rlzingsim_rlzs])rlzs=numpy.empty(len(ws)*len(self.sm_rlzs),object)i=0forsm_rlzinself.sm_rlzs:smpath=sm_rlz.lt_pathforgsim_rlz,weightinzip(gsim_rlzs,sm_rlz.weight*ws):rlzs[i]=LtRealization(i,smpath,gsim_rlz,weight)i+=1# rescale the weights if not one, see case_52tot_weight=sum(rlz.weightforrlzinrlzs)[-1]iftot_weight!=1.:forrlzinrlzs:rlz.weight=rlz.weight/tot_weightself._rlzs=rlzsreturnself._rlzs
[docs]defget_rlzs_by_gsim_dic(self):""" :returns: a dictionary trt_smr -> gsim -> rlz ordinals """ifhasattr(self,'_rlzs_by'):returnself._rlzs_byrlzs=self.get_realizations()trtis=range(len(self.gsim_lt.values))smrs=numpy.array([sm.ordinalforsminself.sm_rlzs])ifself.source_model_lt.filename=='_fake.xml':# scenariosmr_by_ltp={'~'.join(sm_rlz.lt_path):ifori,sm_rlzinenumerate(self.sm_rlzs)}smidx=numpy.zeros(self.get_num_paths(),int)forrlzinrlzs:smidx[rlz.ordinal]=smr_by_ltp['~'.join(rlz.sm_lt_path)]self._rlzs_by=_ddic(trtis,smrs,lambdasmr:rlzs[smidx==smr])else:# classical and event basedstart=0slices=[]forsminself.sm_rlzs:slices.append(slice(start,start+sm.samples))start+=sm.samplesself._rlzs_by=_ddic(trtis,smrs,lambdasmr:rlzs[slices[smr]])returnself._rlzs_by
[docs]defget_rlzs_by_gsim(self,trt_smr):""" :param trt_smr: index or array of indices :returns: a dictionary gsim -> array of rlz indices """ifisinstance(trt_smr,(numpy.ndarray,list,tuple)):# classicaldic=AccumDict(accum=[])fortintrt_smr:forgsim,rlzsinself._rlzs_by_gsim(t).items():dic[gsim].append(rlzs)return{k:numpy.concatenate(ls,dtype=U32)fork,lsindic.items()}# event basedreturnself._rlzs_by_gsim(trt_smr)
# FullLogicTreedef__toh5__(self):sm_data=[]forsminself.sm_rlzs:sm_data.append((str(sm.value),sm.weight,'~'.join(sm.lt_path),sm.samples))return(dict(source_model_lt=self.source_model_lt,gsim_lt=self.gsim_lt,source_data=self.source_model_lt.source_data,sm_data=numpy.array(sm_data,source_model_dt)),dict(seed=self.seed,num_samples=self.num_samples,trts=hdf5.array_of_vstr(self.gsim_lt.values),oversampling=self.oversampling))# FullLogicTreedef__fromh5__(self,dic,attrs):# TODO: this is called more times than needed, maybe we should cache itsm_data=dic['sm_data']sd=dic.pop('source_data',numpy.zeros(0))# empty for engine <= 3.16vars(self).update(attrs)self.source_model_lt=dic['source_model_lt']self.source_model_lt.source_data=sd[:]self.gsim_lt=dic['gsim_lt']self.sm_rlzs=[]forsm_id,recinenumerate(sm_data):path=tuple(str(decode(rec['path'])).split('~'))sm=Realization(rec['name'],rec['weight'],sm_id,path,rec['samples'])self.sm_rlzs.append(sm)
[docs]defget_num_potential_paths(self):""" :returns: the number of potential realizations """returnself.gsim_lt.get_num_paths()*self.source_model_lt.num_paths
@propertydefrlzs(self):""" :returns: an array of realizations """sh1=self.source_model_lt.shortenersh2=self.gsim_lt.shortenertups=[]forrinself.get_realizations():path='%s~%s'%(shorten(r.sm_lt_path,sh1,'smlt'),shorten(r.gsim_rlz.lt_path,sh2,'gslt'))tups.append((r.ordinal,path,r.weight[-1]))returnnumpy.array(tups,rlz_dt)def__repr__(self):info_by_model={}forsminself.sm_rlzs:info_by_model[sm.lt_path]=('~'.join(map(decode,sm.lt_path)),decode(sm.value),sm.weight)summary=['%s, %s, weight=%s'%ibmforibmininfo_by_model.values()]return'<%s\n%s>'%(self.__class__.__name__,'\n'.join(summary))
[docs]classSourceLogicTree(object):""" Source specific logic tree (full enumeration) """def__init__(self,source_id,branchsets,bsetdict):self.source_id=source_idself.bsetdict=bsetdictbranchsets=[copy.copy(bset)forbsetinbranchsets]self.root_branchset=branchsets[0]self.num_paths=1forchild,parentinzip(branchsets[1:]+[None],branchsets):branches=[copy.copy(br)forbrinparent.branches]forbrinbranches:br.bset=childparent.branches=branchesself.num_paths*=len(branches)self.branchsets=branchsetsself.num_samples=0__iter__=SourceModelLogicTree.__iter__
[docs]defcompose(source_model_lt,gsim_lt):""" :returns: a CompositeLogicTree instance """bsets=[]dic=groupby(gsim_lt.branches,operator.attrgetter('trt'))bsno=len(source_model_lt.branchsets)fortrt,btuplesindic.items():bsid=gsim_lt.bsetdict[trt]bset=BranchSet('gmpeModel',bsno)bset.branches=[Branch(bsid,bt.id,bt.weight['weight'],bt.gsim)forbtinbtuples]# branch ID fixed laterbsets.append(bset)bsno+=1clt=CompositeLogicTree(source_model_lt.branchsets+bsets)returnclt