# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2020, GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importcopyimportitertoolsimportcollectionsimportnumpyfromopenquake.baselib.generalimportCallableDict,BASE183,BASE33489fromopenquake.hazardlibimportgeofromopenquake.hazardlib.sourceconverterimport(split_coords_2d,split_coords_3d)fromopenquake.hazardlibimportvalid
[docs]classLogicTreeError(Exception):""" Logic tree file contains a logic error. :param node: XML node object that causes fail. Used to determine the affected line number. All other constructor parameters are passed to :class:`superclass' <LogicTreeError>` constructor. """def__init__(self,node,filename,message):self.filename=filenameself.message=messageself.lineno=nodeifisinstance(node,int)elsegetattr(node,'lineno','?')def__str__(self):return"filename '%s', line %s: %s"%(self.filename,self.lineno,self.message)
# parse_uncertainty #
[docs]defunknown(utype,node,filename):try:returnfloat(node.text)except(TypeError,ValueError):raiseLogicTreeError(node,filename,'expected single float value')
[docs]@parse_uncertainty.add('abGRAbsolute')defabGR(utype,node,filename):try:[a,b]=node.text.split()returnfloat(a),float(b)exceptValueError:raiseLogicTreeError(node,filename,'expected a pair of floats separated by space')
[docs]@parse_uncertainty.add('characteristicFaultGeometryAbsolute')defcharGeom(utype,node,filename):pairs=[]# (tag, extra)forgeom_nodeinnode.surface:if"simpleFaultGeometry"ingeom_node.tag:_validate_simple_fault_geometry(utype,geom_node,filename)extra=parse_uncertainty('simpleFaultGeometryAbsolute',geom_node,filename)elif"complexFaultGeometry"ingeom_node.tag:_validate_complex_fault_geometry(utype,geom_node,filename)extra=parse_uncertainty('complexFaultGeometryAbsolute',geom_node,filename)elif"planarSurface"ingeom_node.tag:_validate_planar_fault_geometry(utype,geom_node,filename)extra=[]forkeyin["topLeft","topRight","bottomRight","bottomLeft"]:nd=getattr(geom_node,key)extra.append((nd["lon"],nd["lat"],nd["depth"]))else:raiseLogicTreeError(geom_node,filename,"Surface geometry type not recognised")pairs.append((geom_node.tag.split('}')[1],extra))returnpairs
# validationsdef_validate_simple_fault_geometry(utype,node,filename):try:coords=split_coords_2d(~node.LineString.posList)trace=geo.Line([geo.Point(*p)forpincoords])exceptValueError:# If the geometry cannot be created then use the LogicTreeError# to point the user to the incorrect node. Hence, if trace is# compiled successfully then len(trace) is True, otherwise it is# Falsetrace=[]iflen(trace):returnraiseLogicTreeError(node,filename,"'simpleFaultGeometry' node is not valid")def_validate_complex_fault_geometry(utype,node,filename):# NB: if the geometry does not conform to the Aki & Richards convention# this will not be verified here, but will raise an error when the surface# is createdvalid_edges=[]foredge_nodeinnode.nodes:try:coords=split_coords_3d(edge_node.LineString.posList.text)edge=geo.Line([geo.Point(*p)forpincoords])exceptValueError:# See use of validation error in simple geometry case# The node is valid if all of the edges compile correctlyedge=[]iflen(edge):valid_edges.append(True)else:valid_edges.append(False)ifnode["spacing"]andall(valid_edges):returnraiseLogicTreeError(node,filename,"'complexFaultGeometry' node is not valid")def_validate_planar_fault_geometry(utype,node,filename):valid_spacing=node["spacing"]forkeyin["topLeft","topRight","bottomLeft","bottomRight"]:lon=getattr(node,key)["lon"]lat=getattr(node,key)["lat"]depth=getattr(node,key)["depth"]valid_lon=(lon>=-180.0)and(lon<=180.0)valid_lat=(lat>=-90.0)and(lat<=90.0)valid_depth=(depth>=0.0)is_valid=valid_lonandvalid_latandvalid_depthifnotis_validornotvalid_spacing:raiseLogicTreeError(node,filename,"'planarFaultGeometry' node is not valid")# apply_uncertainty #apply_uncertainty=CallableDict()@apply_uncertainty.add('simpleFaultDipRelative')def_simple_fault_dip_relative(utype,source,value):source.modify('adjust_dip',dict(increment=value))@apply_uncertainty.add('simpleFaultDipAbsolute')def_simple_fault_dip_absolute(bset,source,value):source.modify('set_dip',dict(dip=value))@apply_uncertainty.add('simpleFaultGeometryAbsolute')def_simple_fault_geom_absolute(utype,source,value):coords,usd,lsd,dip,spacing=valuetrace=geo.Line([geo.Point(*p)forpincoords])source.modify('set_geometry',dict(fault_trace=trace,upper_seismogenic_depth=usd,lower_seismogenic_depth=lsd,dip=dip,spacing=spacing))@apply_uncertainty.add('complexFaultGeometryAbsolute')def_complex_fault_geom_absolute(utype,source,value):all_coords,spacing=valueedges=[geo.Line([geo.Point(*p)forpincoords])forcoordsinall_coords]source.modify('set_geometry',dict(edges=edges,spacing=spacing))@apply_uncertainty.add('characteristicFaultGeometryAbsolute')def_char_fault_geom_absolute(utype,source,value):source.modify('set_geometry',dict(surface=to_surface(value)))@apply_uncertainty.add('abGRAbsolute')def_abGR_absolute(utype,source,value):a,b=valuesource.mfd.modify('set_ab',dict(a_val=a,b_val=b))@apply_uncertainty.add('bGRAbsolute')def_bGR_absolute(utype,source,value):b_val=float(value)source.mfd.modify('set_bGR',dict(b_val=b_val))@apply_uncertainty.add('bGRRelative')def_abGR_relative(utype,source,value):source.mfd.modify('increment_b',dict(value=value))@apply_uncertainty.add('maxMagGRRelative')def_maxmagGR_relative(utype,source,value):source.mfd.modify('increment_max_mag',dict(value=value))@apply_uncertainty.add('maxMagGRRelativeNoMoBalance')def_maxmagGRnoMoBalance_relative(utype,source,value):source.mfd.modify('increment_max_mag_no_mo_balance',dict(value=value))@apply_uncertainty.add('maxMagGRAbsolute')def_maxmagGR_absolute(utype,source,value):source.mfd.modify('set_max_mag',dict(value=value))@apply_uncertainty.add('incrementalMFDAbsolute')def_incMFD_absolute(utype,source,value):min_mag,bin_width,occur_rates=valuesource.mfd.modify('set_mfd',dict(min_mag=min_mag,bin_width=bin_width,occurrence_rates=occur_rates))@apply_uncertainty.add('truncatedGRFromSlipAbsolute')def_trucMFDFromSlip_absolute(utype,source,value):slip_rate,rigidity,const_term=valuesource.modify('adjust_mfd_from_slip',dict(slip_rate=slip_rate,rigidity=rigidity,constant_term=const_term))@apply_uncertainty.add('setMSRAbsolute')def_setMSR(utype,source,value):msr=valuesource.modify('set_msr',dict(new_msr=msr))@apply_uncertainty.add('recomputeMmax')def_recompute_mmax_absolute(utype,source,value):epsilon=valuesource.modify('recompute_mmax',dict(epsilon=epsilon))@apply_uncertainty.add('setLowerSeismDepthAbsolute')def_setLSD(utype,source,value):source.modify('set_lower_seismogenic_depth',dict(lsd=float(value)))@apply_uncertainty.add('dummy')# do nothingdef_dummy(utype,source,value):pass# ######################### apply_uncertainties ########################### #
[docs]defapply_uncertainties(bset_values,src_group):""" :param bset_value: a list of pairs (branchset, value) List of branch IDs :param src_group: SourceGroup instance :returns: A copy of the original group with possibly modified sources """sg=copy.copy(src_group)sg.sources=[]sg.changes=0forsourceinsrc_group:oks=[bset.filter_source(source)forbset,valueinbset_values]ifsum(oks):# source not filtered outsrc=copy.deepcopy(source)srcs=[]for(bset,value),okinzip(bset_values,oks):ifokandbset.collapsed:ifsrc.code==b'N':raiseNotImplementedError('Collapsing of the logic tree is not implemented ''for %s'%src)forbrinbset.branches:newsrc=copy.deepcopy(src)newsrc.scaling_rate=br.weight# used in lt_test.pyapply_uncertainty(bset.uncertainty_type,newsrc,br.value)srcs.append(newsrc)sg.changes+=len(srcs)elifok:ifnotsrcs:# only the first timesrcs.append(src)apply_uncertainty(bset.uncertainty_type,src,value)sg.changes+=1else:srcs=[copy.copy(source)]# this is ultra-fastsg.sources.extend(srcs)returnsg
[docs]defrandom(size,seed,sampling_method='early_weights'):""" :param size: size of the returned array (integer or pair of integers) :param seed: random seed :param sampling_method: 'early_weights', 'early_latin', ... :returns: an array of floats in the range 0..1 You can compare montecarlo sampling with latin square sampling with the following code: .. code-block: import matplotlib.pyplot as plt samples, seed = 10, 42 x, y = random((samples, 2), seed, 'early_latin').T plt.xlim([0, 1]) plt.ylim([0, 1]) plt.scatter(x, y, color='green') # points on a latin square x, y = random((samples, 2), seed, 'early_weights').T plt.scatter(x, y, color='red') # points NOT on a latin square for x in numpy.arange(0, 1, 1/samples): for y in numpy.arange(0, 1, 1/samples): plt.axvline(x) plt.axhline(y) plt.show() """numpy.random.seed(seed)xs=numpy.random.uniform(size=size)ifsampling_method.endswith('latin'):# https://zmurchok.github.io/2019/03/15/Latin-Hypercube-Sampling.htmltry:s,d=sizeexceptTypeError:# cannot unpack non-iterable int objectreturn(numpy.argsort(xs)+xs)/sizeforiinrange(d):xs[:,i]=(numpy.argsort(xs[:,i])+xs[:,i])/sreturnxs
[docs]defsample(weighted_objects,probabilities,sampling_method='early_weights'):""" Take random samples of a sequence of weighted objects :param weighted_objects: A finite sequence of N objects with a ``.weight`` attribute. The weights must sum up to 1. :param probabilities: An array of S random numbers in the range 0..1 :param sampling_method: Default early_weights, i.e. use the CDF of the weights :return: A list of S objects extracted randomly """ifsampling_method.startswith('early'):# consider the weights differentidxs=numpy.searchsorted(_cdf(weighted_objects),probabilities)elifsampling_method.startswith('late'):n=len(weighted_objects)# consider all weights equalidxs=numpy.searchsorted(numpy.arange(1/n,1,1/n),probabilities)# NB: returning an array would break thingsreturn[weighted_objects[idx]foridxinidxs]
Weighted=collections.namedtuple('Weighted','object weight')# used in notebooks for teaching, not in the engine
# ######################### branches and branchsets ######################## #
[docs]classBranch(object):""" Branch object, represents a ``<logicTreeBranch />`` element. :param bs_id: BranchSetID of the branchset to which the branch belongs :param branch_id: String identifier of the branch :param weight: float value of weight assigned to the branch. A text node contents of ``<uncertaintyWeight />`` child node. :param value: The actual uncertainty parameter value. A text node contents of ``<uncertaintyModel />`` child node. Type depends on the branchset's uncertainty type. """def__init__(self,bs_id,branch_id,weight,value):self.bs_id=bs_idself.branch_id=branch_idself.weight=weightself.value=valueself.bset=None@propertydefid(self):returnself.branch_idiflen(self.branch_id)==1elseself.short_id
[docs]defis_leaf(self):""" :returns: True if the branch has no branchset or has a dummy branchset """returnself.bsetisNoneorself.bset.uncertainty_type=='dummy'
[docs]classBranchSet(object):""" Branchset object, represents a ``<logicTreeBranchSet />`` element. :param uncertainty_type: String value. According to the spec one of: gmpeModel Branches contain references to different GMPEs. Values are parsed as strings and are supposed to be one of supported GMPEs. See list at :class:`GMPELogicTree`. sourceModel Branches contain references to different PSHA source models. Values are treated as file names, relatively to base path. maxMagGRRelative Different values to add to Gutenberg-Richter ("GR") maximum magnitude. Value should be interpretable as float. bGRRelative Values to add to GR "b" value. Parsed as float. maxMagGRAbsolute Values to replace GR maximum magnitude. Values expected to be lists of floats separated by space, one float for each GR MFD in a target source in order of appearance. abGRAbsolute Values to replace "a" and "b" values of GR MFD. Lists of pairs of floats, one pair for one GR MFD in a target source. incrementalMFDAbsolute Replaces an evenly discretized MFD with the values provided simpleFaultDipRelative Increases or decreases the angle of fault dip from that given in the original source model simpleFaultDipAbsolute Replaces the fault dip in the specified source(s) simpleFaultGeometryAbsolute Replaces the simple fault geometry (trace, upper seismogenic depth lower seismogenic depth and dip) of a given source with the values provided complexFaultGeometryAbsolute Replaces the complex fault geometry edges of a given source with the values provided characteristicFaultGeometryAbsolute Replaces the complex fault geometry surface of a given source with the values provided truncatedGRFromSlipAbsolute Updates a TruncatedGR using a slip rate and a rigidity :param filters: Dictionary, a set of filters to specify which sources should the uncertainty be applied to. Represented as branchset element's attributes in xml: applyToSources The uncertainty should be applied only to specific sources. This filter is required for absolute uncertainties (also only one source can be used for those). Value should be the list of source ids. Can be used only in source model logic tree. applyToTectonicRegionType Can be used in both the source model and GMPE logic trees. Allows to specify to which tectonic region type (Active Shallow Crust, Stable Shallow Crust, etc.) the uncertainty applies to. This filter is required for all branchsets in GMPE logic tree. """applied=None# to be replaced by a string in hazardlib.logictreedef__init__(self,uncertainty_type,ordinal=0,filters=None,collapsed=False):self.uncertainty_type=uncertainty_typeself.ordinal=ordinalself.filters=filtersor{}self.collapsed=collapsedself.branches=[]
[docs]defsample(self,probabilities,sampling_method):""" :param num_samples: the number of samples :param probabilities: (Ns, Nb) random numbers in the range 0..1 :param sampling_method: the sampling method used :returns: a list of num_samples lists of branches """out=[]forprobsinprobabilities:# probs has a value for each branchsetbranchset=selfbranches=[]whilebranchsetisnotNone:ifbranchset.collapsed:branch=branchset.branches[0]else:x=probs[branchset.ordinal][branch]=sample(branchset.branches,[x],sampling_method)branches.append(branch)branchset=branch.bsetout.append(branches)returnout
[docs]defenumerate_paths(self):""" Generate all possible paths starting from this branch set. :returns: Generator of two-item tuples. Each tuple contains weight of the path (calculated as a product of the weights of all path's branches) and list of path's :class:`Branch` objects. Total sum of all paths' weights is 1.0 """forpath_branchinself._enumerate_paths([]):flat_path=[]weight=1.0whilepath_branch:path_branch,branch=path_branchweight*=branch.weightflat_path.append(branch)yieldweight,flat_path[::-1]
def_enumerate_paths(self,prefix_path):""" Recursive (private) part of :func:`enumerate_paths`. Returns generator of recursive lists of two items, where second item is the branch object and first one is itself list of two items. """ifself.collapsed:b0=copy.copy(self.branches[0])# b0.branch_id = '.'b0.weight=1.0branches=[b0]else:branches=self.branchesforbranchinbranches:path_branch=[prefix_path,branch]ifbranch.bsetisnotNone:# dummies can be branchpointsyield frombranch.bset._enumerate_paths(path_branch)else:yieldpath_branchdef__getitem__(self,branch_id):""" Return :class:`Branch` object belonging to this branchset with id equal to ``branch_id``. """forbranchinself.branches:ifbranch.branch_id==branch_id:returnbranchraiseKeyError(branch_id)
[docs]deffilter_source(self,source):""" Apply filters to ``source`` and return ``True`` if uncertainty should be applied to it. """forkey,valueinself.filters.items():ifkey=='applyToTectonicRegionType':ifvalue!=source.tectonic_region_type:returnFalseelifkey=='applyToSources':ifsourceandsource.source_idnotinvalue:returnFalseelifkey=='applyToBranches':passelse:raiseAssertionError("unknown filter '%s'"%key)# All filters pass, return True.returnTrue
[docs]defget_bset_values(self,ltpath):""" :param ltpath: List of branch IDs :returns: A list of pairs [(bset, value), ...] """pairs=[]bset=selfwhileltpath:brid,ltpath=ltpath[0],ltpath[1:]br=bset[brid]pairs.append((bset,br.value))ifbr.is_leaf():breakelse:bset=br.bsetreturnpairs
[docs]defcollapse(self):""" Collapse to the first branch (with side effects) """self.collapsed=Trueb0=self.branches[0]b0.branch_id='.'b0.weight=1.self.branches=[b0]
[docs]defto_list(self):""" :returns: a literal list describing the branchset """atb=self.filters.get("applyToBranches",[])lst=[self.uncertainty_type,atb]forbrinself.branches:lst.append([br.branch_id,'...',br.weight])returnlst
# NB: this function cannot be used with monster logic trees like the one for# South Africa (ZAF), since it is too slow; the engine uses a trick instead
[docs]defcount_paths(branches):""" :param branches: a list of branches (endpoints or nodes) :returns: the number of paths in the branchset (slow) """returnsum(1ifbr.bsetisNoneelsecount_paths(br.bset.branches)forbrinbranches)
dummy_counter=itertools.count(1)
[docs]defdummy_branchset():""" :returns: a dummy BranchSet with a single branch """bset=BranchSet('dummy')bset.branches=[Branch('dummy%d'%next(dummy_counter),'.',1,None)]bset.branches[0].short_id='.'returnbset
[docs]classCompositeLogicTree(object):""" Build a logic tree from a set of branches by automatically setting the branch IDs. """def__init__(self,branchsets):self.branchsets=branchsetsself.basepaths=self._attach_to_branches()def_attach_to_branches(self):# attach branchsets to branches depending on the applyToBranches# attribute; also attaches dummy branchsets to dummy branches.paths=[]nb=len(self.branchsets)brno=add_path(self.branchsets[0],0,0,0,nb,paths)previous_branches=self.branchsets[0].branchesbranchdic={br.branch_id:brforbrinprevious_branches}fori,bsetinenumerate(self.branchsets[1:]):forbrinbset.branches:ifbr.branch_id!='.'andbr.branch_idinbranchdic:raiseNameError('The branch ID %s is duplicated'%br.branch_id)branchdic[br.branch_id]=brdummies=[]prev_ids=[pb.branch_idforpbinprevious_branches]app2brs=list(bset.filters.get('applyToBranches',''))orprev_idsifapp2brs!=prev_ids:forbranch_idinapp2brs:# NB: if branch_id has already a branchset it is overriddenbranchdic[branch_id].bset=bsetforbridinprev_ids:br=branchdic[brid]ifbridnotinapp2brs:br.bset=dummy=dummy_branchset()[dummybranch]=dummy.branchesbranchdic[dummybranch.branch_id]=dummybranchdummies.append(dummybranch)else:# apply to all previous branchesforbranchinprevious_branches:branch.bset=bsetbrno=add_path(bset,i+1,brno,len(previous_branches),nb,paths)previous_branches=bset.branches+dummiesreturnpathsdef__iter__(self):nb=len(self.branchsets)ordinal=0forweight,branchesinself.branchsets[0].enumerate_paths():value=[br.valueforbrinbranches]lt_path=''.join(branch.idforbranchinbranches)yieldRealization(value,weight,ordinal,lt_path.ljust(nb,'.'))ordinal+=1