# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2019, GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importzlibimportos.pathimportpickleimportoperatorimportloggingimportnumpyfromopenquake.baselibimportparallel,general,hdf5,python3compat,configfromopenquake.hazardlibimportnrml,sourceconverter,InvalidFile,calc,sitefromopenquake.hazardlib.source.multi_faultimportsave_and_splitfromopenquake.hazardlib.source.pointimportmsr_namefromopenquake.hazardlib.validimportbasename,fragmentnofromopenquake.hazardlib.ltimportapply_uncertaintiesU16=numpy.uint16TWO16=2**16# 65,536TWO24=2**24# 16,777,216TWO30=2**30# 1,073,741,24TWO32=2**32# 4,294,967,296bybranch=operator.attrgetter('branch')CALC_TIME,NUM_SITES,NUM_RUPTURES,WEIGHT,MUTEX=3,4,5,6,7source_info_dt=numpy.dtype([('source_id',hdf5.vstr),# 0('grp_id',numpy.uint16),# 1('code',(numpy.bytes_,1)),# 2('calc_time',numpy.float32),# 3('num_sites',numpy.uint32),# 4('num_ruptures',numpy.uint32),# 5('weight',numpy.float32),# 6('mutex_weight',numpy.float64),# 7('trti',numpy.uint8),# 8])checksum=operator.attrgetter('checksum')
[docs]defcheck_unique(ids,msg='',strict=True):""" Raise a DuplicatedID exception if there are duplicated IDs """ifisinstance(ids,dict):# ids by keyall_ids=sum(ids.values(),[])unique,counts=numpy.unique(all_ids,return_counts=True)forduplinunique[counts>1]:keys=[kforkinidsifduplinids[k]]ifkeys:errmsg='%r appears in %s%s'%(dupl,keys,msg)ifstrict:raisenrml.DuplicatedID(errmsg)else:logging.info('*'*60+' DuplicatedID:\n'+errmsg)returnunique,counts=numpy.unique(ids,return_counts=True)foru,cinzip(unique,counts):ifc>1:errmsg='%r appears %d times %s'%(u,c,msg)ifstrict:raisenrml.DuplicatedID(errmsg)else:logging.info('*'*60+' DuplicatedID:\n'+errmsg)
[docs]defzpik(obj):""" zip and pickle a python object """gz=zlib.compress(pickle.dumps(obj,pickle.HIGHEST_PROTOCOL))returnnumpy.frombuffer(gz,numpy.uint8)
[docs]defmutex_by_grp(src_groups):""" :returns: a composite array with boolean fields src_mutex, rup_mutex """lst=[]forsginsrc_groups:lst.append((sg.src_interdep=='mutex',sg.rup_interdep=='mutex'))returnnumpy.array(lst,[('src_mutex',bool),('rup_mutex',bool)])
[docs]defbuild_rup_mutex(src_groups):""" :returns: a composite array with fields (grp_id, src_id, rup_id, weight) """lst=[]dtlist=[('grp_id',numpy.uint16),('src_id',numpy.uint32),('rup_id',numpy.int64),('weight',numpy.float64)]forsginsrc_groups:ifsg.rup_interdep=='mutex':forsrcinsg:fori,(rup,_)inenumerate(src.data):lst.append((src.grp_id,src.id,i,rup.weight))returnnumpy.array(lst,dtlist)
[docs]defcreate_source_info(csm,h5):""" Creates source_info, trt_smrs, toms """data={}# src_id -> rowlens=[]forsrcid,srcsingeneral.groupby(csm.get_sources(),basename).items():src=srcs[0]num_ruptures=sum(src.num_rupturesforsrcinsrcs)mutex=getattr(src,'mutex_weight',0)trti=csm.full_lt.trti.get(src.tectonic_region_type,0)lens.append(len(src.trt_smrs))row=[srcid,src.grp_id,src.code,0,0,num_ruptures,src.weight,mutex,trti]data[srcid]=rowlogging.info('There are %d groups and %d sources with len(trt_smrs)=%.2f',len(csm.src_groups),len(data),numpy.mean(lens))csm.source_info=data# src_id -> rownum_srcs=len(csm.source_info)# avoid hdf5 damned bug by creating source_info in advanceh5.create_dataset('source_info',(num_srcs,),source_info_dt)h5['mutex_by_grp']=mutex_by_grp(csm.src_groups)h5['rup_mutex']=build_rup_mutex(csm.src_groups)
[docs]defread_source_model(fname,branch,converter,applied,sample,monitor):""" :param fname: path to a source model XML file :param branch: source model logic tree branch ID :param converter: SourceConverter :param applied: list of source IDs within applyToSources :param sample: a string with the sampling factor (if any) :param monitor: a Monitor instance :returns: a SourceModel instance """[sm]=nrml.read_source_models([fname],converter)sm.branch=branchforsginsm.src_groups:ifsampleandnotsg.atomic:srcs=[]forsrcinsg:ifsrc.source_idinapplied:srcs.append(src)else:srcs.extend(calc.filters.split_source(src))sg.sources=_sample(srcs,float(sample),applied)return{fname:sm}
# NB: in classical this is called after reduce_sources, so ";" is not# added if the same source appears multiple times, len(srcs) == 1# in event based instead identical sources can appear multiple times# but will have different seeds and produce different rupture setsdef_fix_dupl_ids(src_groups):sources=general.AccumDict(accum=[])forsginsrc_groups:forsrcinsg.sources:sources[src.source_id].append(src)forsrc_id,srcsinsources.items():iflen(srcs)>1:# happens in logictree/case_01/rup.inifori,srcinenumerate(srcs):src.source_id='%s;%d'%(src.source_id,i)
[docs]defcheck_branchID(branchID):""" Forbids invalid characters .:; used in fragmentno """if'.'inbranchID:raiseInvalidFile('branchID %r contains an invalid "."'%branchID)elif':'inbranchID:raiseInvalidFile('branchID %r contains an invalid ":"'%branchID)elif';'inbranchID:raiseInvalidFile('branchID %r contains an invalid ";"'%branchID)
[docs]defcheck_duplicates(smdict,strict):# check_duplicates in the same fileforsminsmdict.values():srcids=[]forsginsm.src_groups:srcids.extend(src.source_idforsrcinsg)ifsg.src_interdep=='mutex':# mutex sources in the same group must have all the same# basename, i.e. the colon convention must be usedbasenames=set(map(basename,sg))assertlen(basenames)==1,basenamescheck_unique(srcids,'in '+sm.fname,strict)# check duplicates in different files but in the same branch# the problem was discovered in the DOM modelforbranch,smsingeneral.groupby(smdict.values(),bybranch).items():srcids=general.AccumDict(accum=[])fnames=[]forsminsms:ifisinstance(sm,nrml.GeometryModel):# the section IDs are not checked since they not count# as real sourcescontinueforsginsm.src_groups:srcids[sm.fname].extend(src.source_idforsrcinsg)fnames.append(sm.fname)check_unique(srcids,'in branch %s'%branch,strict=strict)found=find_false_duplicates(smdict)iffound:logging.warning('Found different sources with same ID %s',general.shortlist(found))
[docs]defget_csm(oq,full_lt,dstore=None):""" Build source models from the logic tree and to store them inside the `source_full_lt` dataset. """converter=sourceconverter.SourceConverter(oq.investigation_time,oq.rupture_mesh_spacing,oq.complex_fault_mesh_spacing,oq.width_of_mfd_bin,oq.area_source_discretization,oq.minimum_magnitude,oq.source_id,discard_trts=[s.strip()forsinoq.discard_trts.split(',')],floating_x_step=oq.floating_x_step,floating_y_step=oq.floating_y_step,source_nodes=oq.source_nodes,infer_occur_rates=oq.infer_occur_rates)full_lt.ses_seed=oq.ses_seedlogging.info('Reading the source model(s) in parallel')# NB: the source models file must be in the shared directory# NB: dstore is None in logictree_test.pyallargs=[]sdata=full_lt.source_model_lt.source_dataallpaths=set(full_lt.source_model_lt.info.smpaths)dic=general.group_array(sdata,'fname')smpaths=[]ss=os.environ.get('OQ_SAMPLE_SOURCES')applied=set()forsrcsinfull_lt.source_model_lt.info.applytosources.values():applied.update(srcs)forfname,rowsindic.items():path=os.path.abspath(os.path.join(full_lt.source_model_lt.basepath,fname))smpaths.append(path)allargs.append((path,rows[0]['branch'],converter,applied,ss))forpathinallpaths-set(smpaths):# geometry modelsallargs.append((path,'',converter,applied,ss))smdict=parallel.Starmap(read_source_model,allargs,h5=dstoreifdstoreelseNone).reduce()parallel.Starmap.shutdown()# save memorysmdict={k:smdict[k]forkinsorted(smdict)}check_duplicates(smdict,strict=oq.disagg_by_src)logging.info('Applying uncertainties')groups=_build_groups(full_lt,smdict)# checking the changeschanges=sum(sg.changesforsgingroups)ifchanges:logging.info('Applied {:_d} changes to the composite source model'.format(changes))is_event_based=oq.calculation_mode.startswith(('event_based','ebrisk'))csm=_get_csm(full_lt,groups,is_event_based)out=[]probs=[]forsgincsm.src_groups:ifsg.src_interdep=='mutex'and'src_mutex'notindstore:segments=[]forsrcinsg:segments.append(src.source_id.split(':')[1])t=(src.source_id,src.grp_id,src.count_ruptures(),src.mutex_weight)out.append(t)probs.append((src.grp_id,sg.grp_probability))assertlen(segments)==len(set(segments)),segmentsifout:dtlist=[('src_id',hdf5.vstr),('grp_id',int),('num_ruptures',int),('mutex_weight',float)]dstore.create_dset('src_mutex',numpy.array(out,dtlist),fillvalue=None)lst=[('grp_id',int),('probability',float)]dstore.create_dset('grp_probability',numpy.array(probs,lst),fillvalue=None)# must be called *after* _fix_dupl_idsfix_geometry_sections(smdict,csm,dstore)returncsm
[docs]defadd_checksums(srcs):""" Build and attach a checksum to each source """forsrcinsrcs:dic={k:vfork,vinvars(src).items()ifknotin'source_id trt_smr smweight samples branch'}src.checksum=zlib.adler32(pickle.dumps(dic,protocol=4))
# called before _fix_dupl_ids
[docs]deffind_false_duplicates(smdict):""" Discriminate different sources with same ID (false duplicates) and put a question mark in their source ID """acc=general.AccumDict(accum=[])atomic=set()forsmodelinsmdict.values():forsgroupinsmodel.src_groups:forsrcinsgroup:src.branch=smodel.branchsrcid=(src.source_idifsgroup.atomicelsebasename(src))acc[srcid].append(src)ifsgroup.atomic:atomic.add(src.source_id)found=[]forsrcid,srcsinacc.items():iflen(srcs)>1:# duplicated IDifany(src.source_idinatomicforsrcinsrcs):raiseRuntimeError('Sources in atomic groups cannot be ''duplicated: %s',srcid)ifany(getattr(src,'mutex_weight',0)forsrcinsrcs):raiseRuntimeError('Mutually exclusive sources cannot be ''duplicated: %s',srcid)add_checksums(srcs)gb=general.AccumDict(accum=[])forsrcinsrcs:gb[checksum(src)].append(src)iflen(gb)>1:forsame_checksumingb.values():forsrcinsame_checksum:check_branchID(src.branch)src.source_id+='!%s'%src.branchfound.append(srcid)returnfound
[docs]defreplace(lst,splitdic,key):""" Replace a list of named elements with the split elements in splitdic """new=[]forelinlst:tag=getattr(el,key)iftaginsplitdic:new.extend(splitdic[tag])else:new.append(el)lst[:]=new
[docs]deffix_geometry_sections(smdict,csm,dstore):""" If there are MultiFaultSources, fix the sections according to the GeometryModels (if any). """gmodels=[]gfiles=[]forfname,modinsmdict.items():ifisinstance(mod,nrml.GeometryModel):gmodels.append(mod)gfiles.append(fname)# merge and reorder the sectionssec_ids=[]sections={}forgmodingmodels:sec_ids.extend(gmod.sections)sections.update(gmod.sections)check_unique(sec_ids,'section ID in files '+' '.join(gfiles))ifsections:# save in the temporary fileassertdstore,('You forgot to pass the dstore to ''get_composite_source_model')oq=dstore['oqparam']mfsources=[]forsgincsm.src_groups:forsrcinsg:ifsrc.code==b'F':mfsources.append(src)ifoq.sitesandlen(oq.sites)==1andoq.use_rates:lon,lat,_dep=oq.sites[0]site1=site.SiteCollection.from_points([lon],[lat])else:site1=Nonesplit_dic=save_and_split(mfsources,sections,dstore.tempname,site1)forsgincsm.src_groups:replace(sg.sources,split_dic,'source_id')
def_groups_ids(smlt_dir,smdict,fnames):# extract the source groups and ids from a sequence of source filesgroups=[]forfnameinfnames:fullname=os.path.abspath(os.path.join(smlt_dir,fname))groups.extend(smdict[fullname].src_groups)returngroups,set(src.source_idforgrpingroupsforsrcingrp)def_build_groups(full_lt,smdict):# build all the possible source groups from the full logic treesmlt_file=full_lt.source_model_lt.filenamesmlt_dir=os.path.dirname(smlt_file)groups=[]frac=1./len(full_lt.sm_rlzs)forrlzinfull_lt.sm_rlzs:src_groups,source_ids=_groups_ids(smlt_dir,smdict,rlz.value[0].split())bset_values=full_lt.source_model_lt.bset_values(rlz.lt_path)while(bset_valuesandbset_values[0][0].uncertainty_type=='extendModel'):(_bset,value),*bset_values=bset_valuesextra,extra_ids=_groups_ids(smlt_dir,smdict,value.split())common=source_ids&extra_idsifcommon:raiseInvalidFile('%s contains source(s) %s already present in %s'%(value,common,rlz.value))src_groups.extend(extra)forsrc_groupinsrc_groups:sg=apply_uncertainties(bset_values,src_group)full_lt.set_trt_smr(sg,smr=rlz.ordinal)forsrcinsg:# the smweight is used in event based sampling:# see oq-risk-tests etnasrc.smweight=rlz.weightiffull_lt.num_sampleselsefracifrlz.samples>1:src.samples=rlz.samplesgroups.append(sg)# check applyToSourcessm_branch=rlz.lt_path[0]src_id=full_lt.source_model_lt.info.applytosources[sm_branch]forsrcidinsrc_id:ifsrcidnotinsource_ids:raiseValueError("The source %s is not in the source model,"" please fix applyToSources in %s or the ""source model(s) %s"%(srcid,smlt_file,rlz.value[0].split()))returngroups
[docs]defreduce_sources(sources_with_same_id,full_lt):""" :param sources_with_same_id: a list of sources with the same source_id :returns: a list of truly unique sources, ordered by trt_smr """out=[]add_checksums(sources_with_same_id)forsrcsingeneral.groupby(sources_with_same_id,checksum).values():# duplicate sources: same id, same checksumsrc=srcs[0]iflen(srcs)>1:# happens in logictree/case_07src.trt_smr=tuple(s.trt_smrforsinsrcs)else:src.trt_smr=src.trt_smr,out.append(src)out.sort(key=operator.attrgetter('trt_smr'))returnout
[docs]defsplit_by_tom(sources):""" Groups together sources with the same TOM """defkey(src):returngetattr(src,'temporal_occurrence_model',None).__class__.__name__returngeneral.groupby(sources,key).values()
def_get_csm(full_lt,groups,event_based):# 1. extract a single source from multiple sources with the same ID# 2. regroup the sources in non-atomic groups by TRT# 3. reorder the sources by source_idatomic=[]acc=general.AccumDict(accum=[])forgrpingroups:ifgrpandgrp.atomic:atomic.append(grp)elifgrp:acc[grp.trt].extend(grp)key=operator.attrgetter('source_id','code')src_groups=[]fortrtinacc:lst=[]forsrcsingeneral.groupby(acc[trt],key).values():# NB: not reducing the sources in event basediflen(srcs)>1andnotevent_based:srcs=reduce_sources(srcs,full_lt)lst.extend(srcs)forsourcesingeneral.groupby(lst,trt_smrs).values():forgrpinsplit_by_tom(sources):src_groups.append(sourceconverter.SourceGroup(trt,grp))src_groups.extend(atomic)_fix_dupl_ids(src_groups)# sort by source_idforsginsrc_groups:sg.sources.sort(key=operator.attrgetter('source_id'))returnCompositeSourceModel(full_lt,src_groups)
[docs]classCompositeSourceModel:""" :param full_lt: a :class:`FullLogicTree` instance :param src_groups: a list of SourceGroups :param event_based: a flag True for event based calculations, flag otherwise """def__init__(self,full_lt,src_groups):self.src_groups=src_groupsself.init(full_lt)
[docs]definit(self,full_lt):self.full_lt=full_ltself.gsim_lt=full_lt.gsim_ltself.source_model_lt=full_lt.source_model_ltself.sm_rlzs=full_lt.sm_rlzs# initialize the code dictionaryself.code={}# srcid -> codeforgrp_id,sginenumerate(self.src_groups):assertlen(sg)# sanity checkforsrcinsg:src.grp_id=grp_idifsrc.code!=b'P':source_id=basename(src)self.code[source_id]=src.code
[docs]defget_trt_smrs(self):""" :returns: an array of trt_smrs (to be stored as an hdf5.vuint32 array) """keys=[sg.sources[0].trt_smrsforsginself.src_groups]assertlen(keys)<TWO16,len(keys)return[numpy.array(trt_smrs,numpy.uint32)fortrt_smrsinkeys]
[docs]defget_sources(self,atomic=None):""" There are 3 options: atomic == None => return all the sources (default) atomic == True => return all the sources in atomic groups atomic == True => return all the sources not in atomic groups """srcs=[]forsrc_groupinself.src_groups:ifatomicisNone:# get all sourcessrcs.extend(src_group)elifatomic==src_group.atomic:srcs.extend(src_group)returnsrcs
[docs]defget_basenames(self):""" :returns: a sorted list of source names stripped of the suffixes """sources=set()forsrcinself.get_sources():sources.add(basename(src,';:.').split('!')[0])returnsorted(sources)
[docs]defget_mags_by_trt(self,maximum_distance):""" :param maximum_distance: dictionary trt -> magdist interpolator :returns: a dictionary trt -> magnitudes in the sources as strings """mags=general.AccumDict(accum=set())# trt -> magsforsginself.src_groups:forsrcinsg:mags[sg.trt].update(src.get_magstrs())out={}fortrtinmags:minmag=maximum_distance(trt).x[0]out[trt]=sorted(mforminmags[trt]iffloat(m)>=minmag)returnout
[docs]defupdate_source_info(self,source_data):""" Update (eff_ruptures, num_sites, calc_time) inside the source_info """assertlen(source_data)<TWO24,len(source_data)forsrc_id,nsites,weight,ctimesinpython3compat.zip(source_data['src_id'],source_data['nsites'],source_data['weight'],source_data['ctimes']):baseid=basename(src_id)row=self.source_info[baseid]row[CALC_TIME]+=ctimesrow[WEIGHT]+=weightrow[NUM_SITES]+=nsites
[docs]defcount_ruptures(self):""" Call src.count_ruptures() on each source. Slow. """n=0forsrcinself.get_sources():n+=src.count_ruptures()returnn
[docs]deffix_src_offset(self):""" Set the src.offset field for each source """src_id=0forsrcsingeneral.groupby(self.get_sources(),basename).values():offset=0iflen(srcs)>1:# order by split numbersrcs.sort(key=fragmentno)forsrcinsrcs:src.id=src_idsrc.offset=offsetifnotsrc.num_ruptures:src.num_ruptures=src.count_ruptures()offset+=src.num_rupturesifsrc.num_ruptures>=TWO30:raiseValueError('%s contains more than 2**30 ruptures'%src)# print(src, src.offset, offset)src_id+=1
[docs]defget_msr_by_grp(self):""" :returns: a dictionary grp_id -> MSR string """acc=general.AccumDict(accum=set())forgrp_id,sginenumerate(self.src_groups):forsrcinsg:acc[grp_id].add(msr_name(src))return{grp_id:' '.join(sorted(acc[grp_id]))forgrp_idinacc}
[docs]defget_max_weight(self,oq):# used in preclassical""" :param oq: an OqParam instance :returns: total weight and max weight of the sources """srcs=self.get_sources()tot_weight=0nr=0forsrcinsrcs:nr+=src.num_rupturestot_weight+=src.weightifsrc.code==b'C'andsrc.num_ruptures>20_000:msg=('{} is suspiciously large, containing {:_d} ''ruptures with complex_fault_mesh_spacing={} km')spc=oq.complex_fault_mesh_spacinglogging.info(msg.format(src,src.num_ruptures,spc))asserttot_weightmax_weight=tot_weight/(oq.concurrent_tasksor1)max_weight*=1.05# increased to produce fewer taskslogging.info('tot_weight={:_d}, max_weight={:_d}, num_sources={:_d}'.format(int(tot_weight),int(max_weight),len(srcs)))returnmax_weight
[docs]defsplit(self,cmakers,sitecol,max_weight,num_chunks=1,tiling=False):""" :yields: (cmaker, tilegetters, blocks, splits) for each source group """N=len(sitecol)oq=cmakers[0].oqmax_mb=float(config.memory.pmap_max_mb)mb_per_gsim=oq.imtls.size*N*4/1024**2self.splits=[]# send heavy groups firstgrp_ids=numpy.argsort([sg.weightforsginself.src_groups])[::-1]forcmakerincmakers[grp_ids]:G=len(cmaker.gsims)grp_id=cmaker.grp_idsg=self.src_groups[grp_id]splits=G*mb_per_gsim/max_mbhint=sg.weight/max_weightifsg.atomicortiling:blocks=[None]tiles=max(hint,splits)else:# if hint > max_blocks generate max_blocks and more tilesblocks=list(general.split_in_blocks(sg,min(hint,oq.max_blocks),lambdas:s.weight))tiles=max(hint/oq.max_blocks*splits,splits)tilegetters=list(sitecol.split(tiles,oq.max_sites_disagg))self.splits.append(splits)cmaker.tiling=tilingcmaker.gsims=list(cmaker.gsims)# save data transfercmaker.codes=sg.codescmaker.rup_indep=getattr(sg,'rup_interdep',None)!='mutex'cmaker.num_chunks=num_chunkscmaker.blocks=len(blocks)cmaker.weight=sg.weightcmaker.atomic=sg.atomicyieldcmaker,tilegetters,blocks,numpy.ceil(splits)
def__toh5__(self):G=len(self.src_groups)arr=numpy.zeros(G+1,hdf5.vuint8)forgrp_id,grpinenumerate(self.src_groups):arr[grp_id]=zpik(grp)arr[G]=zpik(self.source_info)size=sum(len(val)forvalinarr)logging.info(f'Storing {general.humansize(size)} ''of CompositeSourceModel')returnarr,{}# tested in case_36def__fromh5__(self,arr,attrs):objs=[pickle.loads(zlib.decompress(a.tobytes()))forainarr]self.src_groups=objs[:-1]self.source_info=objs[-1]def__repr__(self):""" Return a string representation of the composite model """contents=[]forsginself.src_groups:arr=numpy.array(_strip_colons(sg))line=f'grp_id={sg.sources[0].grp_id}{arr}'contents.append(line)return'<%s\n%s>'%(self.__class__.__name__,'\n'.join(contents))