# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2014-2023 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportsysimportstringimportinspectfrompprintimportpprintimportunittest.mockasmockimportloggingimportoperatorimportcollectionsimportnumpyimportpandasimportfionafromshapely.geometryimportshapefromdecoratorimportFunctionMakerfromopenquake.baselibimportconfig,hdf5fromopenquake.baselib.generalimportgroupby,gen_subclasses,humansizefromopenquake.baselib.performanceimportMonitorfromopenquake.hazardlibimportnrml,imt,logictree,site,geofromopenquake.hazardlib.gsim.baseimportregistryfromopenquake.hazardlib.mfd.baseimportBaseMFDfromopenquake.hazardlib.scalerel.baseimportBaseMSRfromopenquake.hazardlib.source.baseimportBaseSeismicSourcefromopenquake.hazardlib.validimportpmf_map,lon_latfromopenquake.commonlib.oqvalidationimportOqParamfromopenquake.commonlibimportreadinput,logsfromopenquake.risklibimportasset,scientificfromopenquake.calculators.exportimportexportfromopenquake.calculators.extractimportextractfromopenquake.calculatorsimportbase,reportwriterfromopenquake.calculators.viewsimportview,text_tablefromopenquake.calculators.exportimportDISPLAY_NAMEF32=numpy.float32U8=numpy.uint8
[docs]defprint_subclass(what,cls):""" Print the docstring of the given subclass, or print all available subclasses. """split=what.split(':')iflen(split)==1:# no subclass specified, print allforclsingen_subclasses(cls):print(cls.__name__)else:# print the specified subclass, if knownforclsingen_subclasses(cls):ifcls.__name__==split[1]:print(cls.__doc__)breakelse:print('Unknown class %s'%split[1])
[docs]defprint_imt(what):""" Print the docstring of the given IMT, or print all available IMTs. """split=what.split(':')iflen(split)==1:# no IMT specified, print allforiminvars(imt).values():ifinspect.isfunction(im)andis_upper(im):print(im.__name__)else:# print the docstring of the specified IMT, if knownforiminvars(imt).values():ifinspect.isfunction(im)andis_upper(im):ifim.__name__==split[1]:print(im.__doc__)breakelse:print('Unknown IMT %s'%split[1])
[docs]defprint_gsim(what):""" Print the docstring of the given GSIM, or print all available GSIMs. """split=what.split(':')iflen(split)==1:# no GSIM specified, print allforgsinsorted(registry):print(gs)else:# print the docstring of the specified GSIM, if knownforgs,clsinregistry.items():ifcls.__name__==split[1]:print(cls.__doc__)breakelse:print('Unknown GSIM %s'%split[1])
[docs]defsource_model_info(sm_nodes):""" Extract information about source models. Returns a table with TRTs as rows and source classes as columns. """c=collections.Counter()forsminsm_nodes:groups=[sm[0]]ifsm['xmlns'].endswith('nrml/0.4')elsesm[0]forgroupingroups:grp_trt=group.get('tectonicRegion')forsrcingroup:trt=src.get('tectonicRegion',grp_trt)src_class=src.tag.split('}')[1]c[trt,src_class]+=1trts,classes=zip(*c)trts=sorted(set(trts))classes=sorted(set(classes))dtlist=[('TRT',(bytes,30))]+[(name,int)fornameinclasses]out=numpy.zeros(len(trts)+1,dtlist)# +1 for the totalsfori,trtinenumerate(trts):out[i]['TRT']=trtforsrc_classinclasses:out[i][src_class]=c[trt,src_class]out[-1]['TRT']='Total'fornameinout.dtype.names[1:]:out[-1][name]=out[name][:-1].sum()returntext_table(out)
[docs]defdo_build_reports(directory):""" Walk the directory and builds pre-calculation reports for all the job.ini files found. """forcwd,dirs,filesinos.walk(directory):forfinsorted(files):iffin('job.ini','job_h.ini','job_haz.ini','job_hazard.ini'):job_ini=os.path.join(cwd,f)logging.warning(job_ini)try:reportwriter.build_report(job_ini,cwd)exceptExceptionase:logging.error(str(e))
[docs]defis_upper(func):""" True if the name of the function starts with an uppercase character """char=func.__name__[0]returncharinstring.ascii_uppercase
[docs]defmain(what,report=False):""" Give information about the passed keyword or filename """ifos.environ.get('OQ_DISTRIBUTE')notin('no','processpool'):os.environ['OQ_DISTRIBUTE']='processpool'ifwhat=='calculators':forcalcinsorted(base.calculators):print(calc)elifwhat=='executing':fields='id,user_name,calculation_mode,description'rows=logs.dbcmd(f"SELECT {fields} FROM job WHERE status IN ""('executing', 'submitted') AND is_running=1")print(fields.replace(',','\t'))forrowinrows:print('\t'.join(map(str,row)))elifwhat.startswith('gsim'):print_gsim(what)elifwhat.startswith('imt'):print_imt(what)elifwhat=='views':fornameinsorted(view):print(name)elifwhat=='exports':dic=groupby(export,operator.itemgetter(0),lambdagroup:[r[1]forringroup])items=[(DISPLAY_NAME.get(exporter,'?'),exporter,formats)forexporter,formatsindic.items()]n=0fordispname,exporter,formatsinsorted(items):print(dispname,'"%s"'%exporter,formats)n+=len(formats)print('There are %d exporters defined.'%n)elifwhat=='extracts':forkeyinextract:func=extract[key]ifhasattr(func,'__wrapped__'):fm=FunctionMaker(func.__wrapped__)elifhasattr(func,'func'):# for partial objectsfm=FunctionMaker(func.func)else:fm=FunctionMaker(func)print('%s(%s)%s'%(fm.name,fm.signature,fm.doc))elifwhat=='parameters':docs=OqParam.docs()names=set()forvalinvars(OqParam).values():ifhasattr(val,'name'):names.add(val.name)params=sorted(names)forparaminparams:print(param)print(docs[param])elifwhat.startswith('mfd'):print_subclass(what,BaseMFD)elifwhat.startswith('msr'):print_subclass(what,BaseMSR)elifwhat.startswith('geohash'):print_geohash(what)elifwhat=='venv':print(sys.prefix)elifwhat=='cfg':print('Looking at the following paths (the last wins)')forpathinconfig.paths:print(path)elifwhat=='site_params':lst=sorted(site.site_param_dt)maxlen=max(len(x)forxinlst)ncols=4nrows=int(numpy.ceil(len(lst)/ncols))forrinrange(nrows):col=[]forcinrange(ncols):try:col.append(lst[r*ncols+c].rjust(maxlen))exceptIndexError:col.append(' '*maxlen)print(''.join(col))elifwhat=='sources':forclsingen_subclasses(BaseSeismicSource):print(cls.__name__)elifwhat=='disagg':foroutinpmf_map:print(out)elifwhat=='consequences':known=scientific.KNOWN_CONSEQUENCESprint('The following %d consequences are implemented:'%len(known))forconsinknown:print(cons)elifos.path.isdir(what)andreport:withMonitor('info',measuremem=True)asmon:withmock.patch.object(logging.root,'info'):# reduce loggingdo_build_reports(what)print(mon)elifwhat.endswith('.csv'):[rec]=hdf5.sniff([what])df=pandas.read_csv(what,skiprows=rec.skip)iflen(df)>25:dots=pandas.DataFrame({col:['...']forcolindf.columns})df=pandas.concat([df[:10],dots,df[-10:]])print(text_table(df,ext='org'))elifwhat.endswith('.xml'):node=nrml.read(what)tag=node[0].tagiftag.endswith('sourceModel'):print(source_model_info([node]))eliftag.endswith('exposureModel'):_exp,df=asset.read_exp_df(what)print(node.to_str())print(df.set_index('id')[['lon','lat','taxonomy','value-structural']])eliftag.endswith('logicTree'):bset=node[0][0]ifbset.tag.endswith("logicTreeBranchingLevel"):bset=bset[0]ifbset.attrib['uncertaintyType']in('sourceModel','extendModel'):sm_nodes=[]forsmpathinlogictree.collect_info(what).smpaths:sm_nodes.append(nrml.read(smpath))print(source_model_info(sm_nodes))elifbset.attrib['uncertaintyType']=='gmpeModel':print(logictree.GsimLogicTree(what))else:print(node.to_str())elifwhat.endswith('.shp'):withfiona.open(what)asf:print_features(f)elifwhat.endswith(('.ini','.zip')):withMonitor('info',measuremem=True)asmon:ifreport:print('Generated',reportwriter.build_report(what))else:oq=readinput.get_oqparam(what)lt=readinput.get_logic_tree(oq)size=humansize(oq.get_input_size())print('calculation_mode: %s'%oq.calculation_mode)print('description: %s'%oq.description)print('input size: %s'%size)forbsetinlt.branchsets:pprint(bset.to_list())ifmon.duration>1:print(mon)elifwhat:print("No info for '%s'"%what)
main.what='filename or one of %s'%', '.join(choices)main.report='build rst report from job.ini file or zip archive'