# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2015-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importcsvimportshutilimportjsonimportstringimportpickleimportloggingimportosimporttempfileimportsubprocessimporttracebackimportsignalimportzlibimporturllib.parseasurlparseimportreimportpsutilfromdatetimeimportdatetime,timezonefromurllib.parseimportunquote_plusfromxml.parsers.expatimportExpatErrorfromdjango.httpimport(HttpResponse,HttpResponseNotFound,HttpResponseBadRequest,HttpResponseForbidden)fromdjango.core.mailimportEmailMessagefromdjango.views.decorators.csrfimportcsrf_exemptfromdjango.views.decorators.httpimportrequire_http_methodsfromdjango.shortcutsimportrenderimportnumpyfromopenquake.baselibimporthdf5,config,parallelfromopenquake.baselib.generalimportgroupby,gettemp,zipfiles,mpfromopenquake.hazardlibimportnrml,gsim,validfromopenquake.hazardlib.shakemap.validateimport(impact_validate,ARISTOTLE_FORM_LABELS,ARISTOTLE_FORM_PLACEHOLDERS)fromopenquake.commonlibimportreadinput,oqvalidation,logs,datastore,dbapifromopenquake.calculatorsimportbase,viewsfromopenquake.calculators.gettersimportNotFoundfromopenquake.calculators.exportimportexport,FIELD_DESCRIPTIONfromopenquake.calculators.extractimportextractas_extractfromopenquake.calculators.postproc.plotsimportplot_shakemap,plot_rupturefromopenquake.engineimport__version__asoqversionfromopenquake.engine.exportimportcorefromopenquake.engineimportengine,aelo,impactfromopenquake.engine.aeloimport(get_params_from,PRELIMINARY_MODELS,PRELIMINARY_MODEL_WARNING)fromopenquake.engine.export.coreimportDataStoreExportErrorfromopenquake.serverimportutilsfromdjango.confimportsettingsfromdjango.httpimportFileResponsefromdjango.urlsimportreversefromwsgiref.utilimportFileWrapperifsettings.LOCKDOWN:fromdjango.contrib.authimportauthenticate,login,logoutUTC=timezone.utcCWD=os.path.dirname(__file__)METHOD_NOT_ALLOWED=405NOT_IMPLEMENTED=501XML='application/xml'JSON='application/json'HDF5='application/x-hdf'#: For exporting calculation outputs, the client can request a specific format#: (xml, geojson, csv, etc.). If the client does not specify give them (NRML)#: XML by default.DEFAULT_EXPORT_TYPE='xml'EXPORT_CONTENT_TYPE_MAP=dict(xml=XML,geojson=JSON)DEFAULT_CONTENT_TYPE='text/plain'LOGGER=logging.getLogger('openquake.server')ACCESS_HEADERS={'Access-Control-Allow-Origin':'*','Access-Control-Allow-Methods':'GET, POST, OPTIONS','Access-Control-Max-Age':1000,'Access-Control-Allow-Headers':'*'}KUBECTL="kubectl apply -f -".split()ENGINE="python -m openquake.engine.engine".split()AELO_FORM_LABELS={'lon':'Longitude','lat':'Latitude','vs30':'Vs30','siteid':'Site name','asce_version':'ASCE version',}AELO_FORM_PLACEHOLDERS={'lon':'max. 5 decimals','lat':'max. 5 decimals','vs30':'fixed at 760 m/s','siteid':f'max. {settings.MAX_AELO_SITE_NAME_LEN} characters','asce_version':'ASCE version',}HIDDEN_OUTPUTS=['assetcol','job']# disable check on the export_dir, since the WebUI exports in a tmpdiroqvalidation.OqParam.is_valid_export_dir=lambdaself:True# Credit for this decorator to https://gist.github.com/aschem/1308865.
[docs]defcross_domain_ajax(func):defwrap(request,*args,**kwargs):# Firefox sends 'OPTIONS' request for cross-domain javascript call.ifnotrequest.method=="OPTIONS":response=func(request,*args,**kwargs)else:response=HttpResponse()fork,vinlist(ACCESS_HEADERS.items()):response[k]=vreturnresponsereturnwrap
def_get_base_url(request):""" Construct a base URL, given a request object. This comprises the protocol prefix (http:// or https://) and the host, which can include the port number. For example: http://www.openquake.org or https://www.openquake.org:8000. """ifrequest.is_secure():base_url='https://%s'else:base_url='http://%s'base_url%=request.META['HTTP_HOST']returnbase_url
[docs]defstore(request_files,ini,calc_id):""" Store the uploaded files in calc_dir and select the job file by looking at the .ini extension. :returns: full path of the ini file """calc_dir=parallel.scratch_dir(calc_id)arch=request_files.get('archive')ifarchisNone:# move each file to calc_dir using the upload file namesinifiles=[]# NB: request_files.values() Django objects are not sortableforeach_fileinrequest_files.values():new_path=os.path.join(calc_dir,each_file.name)shutil.move(each_file.temporary_file_path(),new_path)ifeach_file.name.endswith(ini):inifiles.append(new_path)else:# extract the files from the archive into calc_dirinifiles=readinput.extract_from_zip(arch,ini,calc_dir)ifnotinifiles:raiseNotFound('There are no %s files in the archive'%ini)returninifiles[0]
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defajax_login(request):""" Accept a POST request to login. :param request: `django.http.HttpRequest` object, containing mandatory parameters username and password required. """username=request.POST['username']password=request.POST['password']user=authenticate(username=username,password=password)ifuserisnotNone:ifuser.is_active:login(request,user)returnHttpResponse(content='Successful login',content_type='text/plain',status=200)else:returnHttpResponse(content='Disabled account',content_type='text/plain',status=403)else:returnHttpResponse(content='Invalid login',content_type='text/plain',status=403)
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defajax_logout(request):""" Accept a POST request to logout. """logout(request)returnHttpResponse(content='Successful logout',content_type='text/plain',status=200)
[docs]@cross_domain_ajax@require_http_methods(['GET'])defget_engine_version(request):""" Return a string with the openquake.engine version """returnHttpResponse(oqversion)
[docs]@cross_domain_ajax@require_http_methods(['GET'])defget_engine_latest_version(request):""" Return a string with if new versions have been released. Return 'None' if the version is not available """returnHttpResponse(engine.check_obsolete_version())
[docs]@cross_domain_ajax@require_http_methods(['GET'])defget_available_gsims(request):""" Return a list of strings with the available GSIMs """gsims=list(gsim.get_available_gsims())returnHttpResponse(content=json.dumps(gsims),content_type=JSON)
[docs]@cross_domain_ajax@require_http_methods(['GET'])defget_ini_defaults(request):""" Return a list of ini attributes with a default value """ini_defs={}all_names=dir(oqvalidation.OqParam)+list(oqvalidation.OqParam.ALIASES)fornameinall_names:ifnameinoqvalidation.OqParam.ALIASES:# old namenewname=oqvalidation.OqParam.ALIASES[name]else:newname=nameobj=getattr(oqvalidation.OqParam,newname)if(isinstance(obj,valid.Param)andobj.defaultisnotvalid.Param.NODEFAULT):ifisinstance(obj.default,float)andnumpy.isnan(obj.default):passelse:ini_defs[name]=obj.defaultreturnHttpResponse(content=json.dumps(ini_defs),content_type=JSON)
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defvalidate_nrml(request):""" Leverage oq-risklib to check if a given XML text is a valid NRML :param request: a `django.http.HttpRequest` object containing the mandatory parameter 'xml_text': the text of the XML to be validated as NRML :returns: a JSON object, containing: * 'valid': a boolean indicating if the provided text is a valid NRML * 'error_msg': the error message, if any error was found (None otherwise) * 'error_line': line of the given XML where the error was found (None if no error was found or if it was not a validation error) """xml_text=request.POST.get('xml_text')ifnotxml_text:returnHttpResponseBadRequest('Please provide the "xml_text" parameter')xml_file=gettemp(xml_text,suffix='.xml')try:nrml.to_python(xml_file)exceptExpatErrorasexc:return_make_response(error_msg=str(exc),error_line=exc.lineno,valid=False)exceptExceptionasexc:# get the exception messageexc_msg=exc.args[0]ifisinstance(exc_msg,bytes):exc_msg=exc_msg.decode('utf-8')# make it a unicode objectelifisinstance(exc_msg,str):passelse:# if it is another kind of object, it is not obvious a priori how# to extract the error line from itreturn_make_response(error_msg=str(exc_msg),error_line=None,valid=False)# if the line is not mentioned, the whole message is takenerror_msg=exc_msg.split(', line')[0]# check if the exc_msg contains a line number indicationsearch_match=re.search(r'line \d+',exc_msg)ifsearch_match:error_line=int(search_match.group(0).split()[1])else:error_line=Nonereturn_make_response(error_msg=error_msg,error_line=error_line,valid=False)else:return_make_response(error_msg=None,error_line=None,valid=True)
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defvalidate_zip(request):""" Leverage the engine libraries to check if a given zip archive is a valid calculation input :param request: a `django.http.HttpRequest` object containing a zip archive :returns: a JSON object, containing: * 'valid': a boolean indicating if the provided archive is valid * 'error_msg': the error message, if any error was found (None otherwise) """archive=request.FILES.get('archive')ifnotarchive:returnHttpResponseBadRequest('Missing archive file')job_zip=archive.temporary_file_path()try:oq=readinput.get_oqparam(job_zip)base.calculators(oq,calc_id=None).read_inputs()exceptExceptionasexc:return_make_response(str(exc),None,valid=False)else:return_make_response(None,None,valid=True)
[docs]@require_http_methods(['GET'])@cross_domain_ajaxdefdownload_png(request,calc_id,what):""" Get a PNG image with the relevant name, if available """job=logs.dbcmd('get_job',int(calc_id))ifjobisNone:returnHttpResponseNotFound()ifnotutils.user_has_permission(request,job.user_name,job.status):returnHttpResponseForbidden()try:fromPILimportImageresponse=HttpResponse(content_type="image/png")withdatastore.read(job.ds_calc_dir+'.hdf5')asds:arr=ds['png/%s'%what][:]Image.fromarray(arr).save(response,format='png')returnresponseexceptExceptionasexc:tb=''.join(traceback.format_tb(exc.__traceback__))returnHttpResponse(content='%s: %s\n%s'%(exc.__class__.__name__,exc,tb),content_type='text/plain',status=500)
[docs]@require_http_methods(['GET'])@cross_domain_ajaxdefcalc(request,calc_id):""" Get a JSON blob containing all of parameters for the given calculation (specified by ``calc_id``). Also includes the current job status ( executing, complete, etc.). """try:info=logs.dbcmd('calc_info',calc_id)ifnotutils.user_has_permission(request,info['user_name'],info['status']):returnHttpResponseForbidden()exceptdbapi.NotFound:returnHttpResponseNotFound()returnHttpResponse(content=json.dumps(info),content_type=JSON)
[docs]@require_http_methods(['GET'])@cross_domain_ajaxdefcalc_list(request,id=None):# view associated to the endpoints /v1/calc/list and /v1/calc/:id/status""" Get a list of calculations and report their id, status, calculation_mode, is_running, description, and a url where more detailed information can be accessed. This is called several times by the Javascript. Responses are in JSON. """base_url=_get_base_url(request)# always filter calculation list unless user is a superusercalc_data=logs.dbcmd('get_calcs',request.GET,utils.get_valid_users(request),notutils.is_superuser(request),id)response_data=[]username=psutil.Process(os.getpid()).username()for(hc_id,owner,status,calculation_mode,is_running,desc,pid,parent_id,size_mb,host,start_time)incalc_data:ifhost:owner+='@'+host.split('.')[0]url=urlparse.urljoin(base_url,'v1/calc/%d'%hc_id)abortable=Falseifis_running:try:ifpsutil.Process(pid).username()==username:abortable=Trueexceptpsutil.NoSuchProcess:passstart_time_str=(start_time.strftime("%Y-%m-%d, %H:%M:%S")+" "+settings.TIME_ZONE)response_data.append(dict(id=hc_id,owner=owner,calculation_mode=calculation_mode,status=status,is_running=bool(is_running),description=desc,url=url,parent_id=parent_id,abortable=abortable,size_mb=size_mb,start_time=start_time_str))# if id is specified the related dictionary is returned instead the listifidisnotNone:response_data=[jobforjobinresponse_dataifstr(job['id'])==id]ifnotresponse_data:returnHttpResponseNotFound()[response_data]=response_datareturnHttpResponse(content=json.dumps(response_data),content_type=JSON)
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defcalc_abort(request,calc_id):""" Abort the given calculation, it is it running """job=logs.dbcmd('get_job',calc_id)ifjobisNone:message={'error':'Unknown job %s'%calc_id}returnHttpResponse(content=json.dumps(message),content_type=JSON)ifjob.statusnotin('submitted','executing'):message={'error':'Job %s is not running'%job.id}returnHttpResponse(content=json.dumps(message),content_type=JSON)# only the owner or superusers can abort a calculationif(job.user_namenotinutils.get_valid_users(request)andnotutils.is_superuser(request)):message={'error':('User %s has no permission to abort job %s'%(request.user,job.id))}returnHttpResponse(content=json.dumps(message),content_type=JSON,status=403)ifjob.pid:# is a spawned jobtry:os.kill(job.pid,signal.SIGINT)exceptExceptionasexc:logging.error(exc)else:logging.warning('Aborting job %d, pid=%d',job.id,job.pid)logs.dbcmd('set_status',job.id,'aborted')message={'success':'Killing job %d'%job.id}returnHttpResponse(content=json.dumps(message),content_type=JSON)message={'error':'PID for job %s not found'%job.id}returnHttpResponse(content=json.dumps(message),content_type=JSON)
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defcalc_remove(request,calc_id):""" Remove the calculation id """# Only the owner can remove a jobuser=utils.get_user(request)try:message=logs.dbcmd('del_calc',calc_id,user)exceptdbapi.NotFound:returnHttpResponseNotFound()if'success'inmessage:returnHttpResponse(content=json.dumps(message),content_type=JSON,status=200)elif'error'inmessage:logging.error(message['error'])returnHttpResponse(content=json.dumps(message),content_type=JSON,status=403)else:# This is an untrapped server errorlogging.error(message)returnHttpResponse(content=message,content_type='text/plain',status=500)
[docs]defshare_job(user_level,calc_id,share):ifuser_level<2:returnHttpResponseForbidden()try:message=logs.dbcmd('share_job',calc_id,share)exceptdbapi.NotFound:returnHttpResponseNotFound()if'success'inmessage:returnHttpResponse(content=json.dumps(message),content_type=JSON,status=200)elif'error'inmessage:logging.error(message['error'])returnHttpResponse(content=json.dumps(message),content_type=JSON,status=403)else:raiseAssertionError(f"share_job must return 'success' or 'error'!? Returned: {message}")
[docs]defget_user_level(request):ifsettings.LOCKDOWN:try:returnrequest.user.levelexceptAttributeError:# e.g. AnonymousUser (not authenticated)return0else:# NOTE: when authentication is not required, the user interface# can assume the user to have the maximum levelreturn2
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defcalc_unshare(request,calc_id):""" Unshare the calculation of the given id """user_level=get_user_level(request)returnshare_job(user_level,calc_id,share=False)
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defcalc_share(request,calc_id):""" Share the calculation of the given id """user_level=get_user_level(request)returnshare_job(user_level,calc_id,share=True)
[docs]deflog_to_json(log):"""Convert a log record into a list of strings"""return[log.timestamp.isoformat()[:22],log.level,log.process,log.message]
[docs]@require_http_methods(['GET'])@cross_domain_ajaxdefcalc_log(request,calc_id,start,stop):""" Get a slice of the calculation log as a JSON list of rows """start=startor0stop=stopor0try:response_data=logs.dbcmd('get_log_slice',calc_id,start,stop)exceptdbapi.NotFound:returnHttpResponseNotFound()returnHttpResponse(content=json.dumps(response_data),content_type=JSON)
[docs]@require_http_methods(['GET'])@cross_domain_ajaxdefcalc_log_size(request,calc_id):""" Get the current number of lines in the log """try:response_data=logs.dbcmd('get_log_size',calc_id)exceptdbapi.NotFound:returnHttpResponseNotFound()returnHttpResponse(content=json.dumps(response_data),content_type=JSON)
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defcalc_run(request):""" Run a calculation. :param request: a `django.http.HttpRequest` object. If the request has the attribute `hazard_job_id`, the results of the specified hazard calculations will be re-used as input by the risk calculation. The request also needs to contain the files needed to perform the calculation. They can be uploaded as separate files, or zipped together. """job_ini=request.POST.get('job_ini')hazard_job_id=request.POST.get('hazard_job_id')ifhazard_job_id:# "continue" button, tested in the QGIS pluginini=job_iniifjob_inielse"risk.ini"else:ini=job_iniifjob_inielse".ini"user=utils.get_user(request)try:job_id=submit_job(request.FILES,ini,user,hazard_job_id)exceptExceptionasexc:# job failed, for instance missing .xml file# get the exception messageexc_msg=traceback.format_exc()+str(exc)logging.error(exc_msg)response_data=dict(traceback=exc_msg.splitlines(),job_id=exc.job_id)status=500else:response_data=dict(status='created',job_id=job_id)status=200returnHttpResponse(content=json.dumps(response_data),content_type=JSON,status=status)
[docs]defaelo_callback(job_id,job_owner_email,outputs_uri,inputs,exc=None,warnings=None):ifnotjob_owner_email:returnfrom_email=settings.EMAIL_HOST_USERto=[job_owner_email]reply_to=settings.EMAIL_SUPPORTlon,lat=inputs['sites'].split()body=(f"Input values: lon = {lon}, lat = {lat},"f" vs30 = {inputs['vs30']}, siteid = {inputs['siteid']},"f" asce_version = {inputs['asce_version']}\n\n")ifwarningsisnotNone:forwarninginwarnings:body+=warning+'\n'ifexc:subject=f'Job {job_id} failed'body+=f'There was an error running job {job_id}:\n{exc}'else:subject=f'Job {job_id} finished correctly'body+=(f'Please find the results here:\n{outputs_uri}')EmailMessage(subject,body,from_email,to,reply_to=[reply_to]).send()
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defimpact_get_rupture_data(request):""" Retrieve rupture parameters corresponding to a given usgs id :param request: a `django.http.HttpRequest` object containing usgs_id """rupture_path=get_uploaded_file_path(request,'rupture_file')station_data_file=get_uploaded_file_path(request,'station_data_file')user=request.useruser.testdir=Nonerup,rupdic,_oqparams,err=impact_validate(request.POST,user,rupture_path,station_data_file)iferr:returnHttpResponse(content=json.dumps(err),content_type=JSON,status=400if'invalid_inputs'inerrelse500)ifrupdic.get('shakemap_array',None)isnotNone:shakemap_array=rupdic['shakemap_array']figsize=(6.3,6.3)# fitting in a single row in the template without resizingrupdic['pga_map_png']=plot_shakemap(shakemap_array,'PGA',backend='Agg',figsize=figsize,with_cities=False,return_base64=True,rupture=rup)rupdic['mmi_map_png']=plot_shakemap(shakemap_array,'MMI',backend='Agg',figsize=figsize,with_cities=False,return_base64=True,rupture=rup)delrupdic['shakemap_array']elifrupisnotNone:img_base64=plot_rupture(rup,figsize=(8,8),return_base64=True)rupdic['rupture_png']=img_base64returnHttpResponse(content=json.dumps(rupdic),content_type=JSON,status=200)
[docs]defget_uploaded_file_path(request,filename):file=request.FILES.get(filename)iffile:# NOTE: we could not find a reliable way to avoid the deletion of the# uploaded file right after the request is consumed, therefore we need# to store a copy of itreturngettemp(open(file.temporary_file_path()).read(),suffix='.xml')
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defimpact_run(request):""" Run an ARISTOTLE calculation. :param request: a `django.http.HttpRequest` object containing usgs_id, rupture_file, lon, lat, dep, mag, rake, dip, strike, local_timestamp, time_event, maximum_distance, trt, truncation_level, number_of_ground_motion_fields, asset_hazard_distance, ses_seed, maximum_distance_stations, station_data_file """# NOTE: this is called via AJAX so the context processor isn't automatically# applied, since AJAX calls often do not render templatesifrequest.user.level==0:returnHttpResponseForbidden()rupture_path=get_uploaded_file_path(request,'rupture_file')station_data_file=get_uploaded_file_path(request,'station_data_file')user=request.useruser.testdir=None_rup,rupdic,params,err=impact_validate(request.POST,user,rupture_path,station_data_file)iferr:returnHttpResponse(content=json.dumps(err),content_type=JSON,status=400if'invalid_inputs'inerrelse500)forkeyin['dip','strike']:ifkeyinrupdicandrupdic[key]isNone:delrupdic[key][jobctx]=engine.create_jobs([params],config.distribution.log_level,user_name=utils.get_user(request))job_owner_email=request.user.emailresponse_data=dict()job_id=jobctx.calc_idoutputs_uri_web=request.build_absolute_uri(reverse('outputs_impact',args=[job_id]))outputs_uri_api=request.build_absolute_uri(reverse('results',args=[job_id]))log_uri=request.build_absolute_uri(reverse('log',args=[job_id,'0','']))traceback_uri=request.build_absolute_uri(reverse('traceback',args=[job_id]))response_data[job_id]=dict(status='created',job_id=job_id,outputs_uri=outputs_uri_api,log_uri=log_uri,traceback_uri=traceback_uri)ifnotjob_owner_email:response_data[job_id]['WARNING']=('No email address is speficied for your user account,'' therefore email notifications will be disabled. As soon as'' the job completes, you can access its outputs at the'' following link: %s. If the job fails, the error traceback'' will be accessible at the following link: %s'%(outputs_uri_api,traceback_uri))# spawn the Aristotle main processproc=mp.Process(target=impact.main_web,args=([params],[jobctx],job_owner_email,outputs_uri_web,impact_callback))proc.start()returnHttpResponse(content=json.dumps(response_data),content_type=JSON,status=200)
[docs]defaelo_validate(request):validation_errs={}invalid_inputs=[]try:lon=valid.longitude(request.POST.get('lon'))exceptExceptionasexc:validation_errs[AELO_FORM_LABELS['lon']]=str(exc)invalid_inputs.append('lon')try:lat=valid.latitude(request.POST.get('lat'))exceptExceptionasexc:validation_errs[AELO_FORM_LABELS['lat']]=str(exc)invalid_inputs.append('lat')try:vs30=valid.positivefloat(request.POST.get('vs30'))exceptExceptionasexc:validation_errs[AELO_FORM_LABELS['vs30']]=str(exc)invalid_inputs.append('vs30')try:siteid=request.POST.get('siteid')iflen(siteid)>settings.MAX_AELO_SITE_NAME_LEN:raiseValueError("site name can not be longer than %s characters"%settings.MAX_AELO_SITE_NAME_LEN)exceptExceptionasexc:validation_errs[AELO_FORM_LABELS['siteid']]=str(exc)invalid_inputs.append('siteid')try:asce_version=request.POST.get('asce_version',oqvalidation.OqParam.asce_version.default)oqvalidation.OqParam.asce_version.validator(asce_version)exceptExceptionasexc:validation_errs[AELO_FORM_LABELS['asce_version']]=str(exc)invalid_inputs.append('asce_version')ifvalidation_errs:err_msg='Invalid input value'err_msg+='s\n'iflen(validation_errs)>1else'\n'err_msg+='\n'.join([f'{field.split(" (")[0]}: "{validation_errs[field]}"'forfieldinvalidation_errs])logging.error(err_msg)response_data={"status":"failed","error_msg":err_msg,"invalid_inputs":invalid_inputs}returnHttpResponse(content=json.dumps(response_data),content_type=JSON,status=400)returnlon,lat,vs30,siteid,asce_version
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defaelo_run(request):""" Run an AELO calculation. :param request: a `django.http.HttpRequest` object containing lon, lat, vs30, siteid, asce_version """res=aelo_validate(request)ifisinstance(res,HttpResponse):# errorreturnreslon,lat,vs30,siteid,asce_version=res# build a LogContext object associated to a database jobtry:params=get_params_from(dict(sites='%s%s'%(lon,lat),vs30=vs30,siteid=siteid,asce_version=asce_version),config.directory.mosaic_dir,exclude=['USA'])logging.root.handlers=[]# avoid breaking the logsexceptExceptionasexc:response_data={'status':'failed','error_cls':type(exc).__name__,'error_msg':str(exc)}logging.error('',exc_info=True)returnHttpResponse(content=json.dumps(response_data),content_type=JSON,status=400)[jobctx]=engine.create_jobs([params],config.distribution.log_level,None,utils.get_user(request),None)job_id=jobctx.calc_idoutputs_uri_web=request.build_absolute_uri(reverse('outputs_aelo',args=[job_id]))outputs_uri_api=request.build_absolute_uri(reverse('results',args=[job_id]))log_uri=request.build_absolute_uri(reverse('log',args=[job_id,'0','']))traceback_uri=request.build_absolute_uri(reverse('traceback',args=[job_id]))response_data=dict(status='created',job_id=job_id,outputs_uri=outputs_uri_api,log_uri=log_uri,traceback_uri=traceback_uri)job_owner_email=request.user.emailifnotjob_owner_email:response_data['WARNING']=('No email address is speficied for your user account,'' therefore email notifications will be disabled. As soon as'' the job completes, you can access its outputs at the following'' link: %s. If the job fails, the error traceback will be'' accessible at the following link: %s'%(outputs_uri_api,traceback_uri))# spawn the AELO main processmp.Process(target=aelo.main,args=(lon,lat,vs30,siteid,asce_version,job_owner_email,outputs_uri_web,jobctx,aelo_callback)).start()returnHttpResponse(content=json.dumps(response_data),content_type=JSON,status=200)
[docs]defsubmit_job(request_files,ini,username,hc_id):""" Create a job object from the given files and run it in a new process. :returns: a job ID """# build a LogContext object associated to a database job[job]=engine.create_jobs([dict(calculation_mode='custom',description='Calculation waiting to start')],config.distribution.log_level,None,username,hc_id)# store the request files and perform some validationtry:job_ini=store(request_files,ini,job.calc_id)job.oqparam=oq=readinput.get_oqparam(job_ini,kw={'hazard_calculation_id':hc_id})dic=dict(calculation_mode=oq.calculation_mode,description=oq.description,hazard_calculation_id=hc_id)logs.dbcmd('update_job',job.calc_id,dic)jobs=[job]exceptExceptionasexc:tb=traceback.format_exc()logs.dbcmd('log',job.calc_id,datetime.now(UTC),'CRITICAL','before starting',tb)logs.dbcmd('finish',job.calc_id,'failed')exc.job_id=job.calc_idraiseexccustom_tmp=os.path.dirname(job_ini)submit_cmd=config.distribution.submit_cmd.split()big_job=oq.get_input_size()>int(config.distribution.min_input_size)ifsubmit_cmd==ENGINE:# used for debuggingforjobinjobs:subprocess.Popen(submit_cmd+[save_pik(job,custom_tmp)])elifsubmit_cmd==KUBECTLandbig_job:forjobinjobs:withopen(os.path.join(CWD,'job.yaml'))asf:yaml=string.Template(f.read()).substitute(DATABASE='%(host)s:%(port)d'%config.dbserver,CALC_PIK=save_pik(job,custom_tmp),CALC_NAME='calc%d'%job.calc_id)subprocess.run(submit_cmd,input=yaml.encode('ascii'))else:proc=mp.Process(target=engine.run_jobs,args=([job],))proc.start()ifconfig.webapi.calc_timeout:mp.Process(target=engine.watchdog,args=(job.calc_id,proc.pid,int(config.webapi.calc_timeout))).start()returnjob.calc_id
[docs]defsave_pik(job,dirname):""" Save a LogContext object in pickled format; returns the path to it """pathpik=os.path.join(dirname,'calc%d.pik'%job.calc_id)withopen(pathpik,'wb')asf:pickle.dump([job],f)returnpathpik
[docs]@require_http_methods(['GET'])@cross_domain_ajaxdefcalc_results(request,calc_id):""" Get a summarized list of calculation results for a given ``calc_id``. Result is a JSON array of objects containing the following attributes: * id * name * type (hazard_curve, hazard_map, etc.) * url (the exact url where the full result can be accessed) """# If the specified calculation doesn't exist OR is not yet complete,# throw back a 404.try:info=logs.dbcmd('calc_info',calc_id)ifnotutils.user_has_permission(request,info['user_name'],info['status']):returnHttpResponseForbidden()exceptdbapi.NotFound:returnHttpResponseNotFound()base_url=_get_base_url(request)# NB: export_output has as keys the list (output_type, extension)# so this returns an ordered map output_type -> extensions such as# {'agg_loss_curve': ['xml', 'csv'], ...}output_types=groupby(export,lambdaoe:oe[0],get_public_outputs)results=logs.dbcmd('get_outputs',calc_id)ifnotresults:returnHttpResponseNotFound()response_data=[]forresultinresults:try:# output from the datastorertype=result.ds_key# Catalina asked to remove the .txt outputs (used for the GMFs)outtypes=[otforotinoutput_types[rtype]ifot!='txt']exceptKeyError:continue# non-exportable outputs should not be shownurl=urlparse.urljoin(base_url,'v1/calc/result/%d'%result.id)datum=dict(id=result.id,name=result.display_name,type=rtype,outtypes=outtypes,url=url,size_mb=result.size_mb)response_data.append(datum)returnHttpResponse(content=json.dumps(response_data))
[docs]@require_http_methods(['GET'])@cross_domain_ajaxdefcalc_traceback(request,calc_id):""" Get the traceback as a list of lines for a given ``calc_id``. """# If the specified calculation doesn't exist throw back a 404.try:response_data=logs.dbcmd('get_traceback',calc_id)exceptdbapi.NotFound:returnHttpResponseNotFound()returnHttpResponse(content=json.dumps(response_data),content_type=JSON)
[docs]@cross_domain_ajax@require_http_methods(['GET','HEAD'])defcalc_result(request,result_id):""" Download a specific result, by ``result_id``. The common abstracted functionality for getting hazard or risk results. :param request: `django.http.HttpRequest` object. Can contain a `export_type` GET param (the default is 'xml' if no param is specified). :param result_id: The id of the requested artifact. :returns: If the requested ``result_id`` is not available in the format designated by the `export_type`. Otherwise, return a `django.http.HttpResponse` containing the content of the requested artifact. Parameters for the GET request can include an `export_type`, such as 'xml', 'geojson', 'csv', etc. """# If the result for the requested ID doesn't exist, OR# the job which it is related too is not complete,# throw back a 404.try:job_id,job_status,job_user,datadir,ds_key=logs.dbcmd('get_result',result_id)ifds_keyinHIDDEN_OUTPUTS:returnHttpResponseForbidden()ifnotutils.user_has_permission(request,job_user,job_status):returnHttpResponseForbidden()exceptdbapi.NotFound:returnHttpResponseNotFound()etype=request.GET.get('export_type')export_type=etypeorDEFAULT_EXPORT_TYPE# NOTE: for some reason, in some cases the environment variable TMPDIR is# ignored, so we need to use config.directory.custom_tmp if definedtemp_dir=config.directory.custom_tmportempfile.gettempdir()tmpdir=tempfile.mkdtemp(dir=temp_dir)try:exported=core.export_from_db((ds_key,export_type),job_id,datadir,tmpdir)exceptDataStoreExportErrorasexc:# TODO: there should be a better error pagereturnHttpResponse(content='%s: %s'%(exc.__class__.__name__,exc),content_type='text/plain',status=500)ifnotexported:# Throw back a 404 if the exact export parameters are not supportedreturnHttpResponseNotFound('Nothing to export for export_type=%s, %s'%(export_type,ds_key))eliflen(exported)>1:# Building an archive so that there can be a single file downloadarchname=ds_key+'-'+export_type+'.zip'zipfiles(exported,os.path.join(tmpdir,archname))exported=os.path.join(tmpdir,archname)else:# single fileexported=exported[0]content_type=EXPORT_CONTENT_TYPE_MAP.get(export_type,DEFAULT_CONTENT_TYPE)fname='output-%s-%s'%(result_id,os.path.basename(exported))stream=FileWrapper(open(exported,'rb'))# 'b' is needed on Windowsstream.close=lambda:(FileWrapper.close(stream),shutil.rmtree(tmpdir))response=FileResponse(stream,content_type=content_type)response['Content-Disposition']=('attachment; filename=%s'%os.path.basename(fname))response['Content-Length']=str(os.path.getsize(exported))returnresponse
[docs]@cross_domain_ajax@require_http_methods(['GET','HEAD'])defaggrisk_tags(request,calc_id):""" Return aggrisk_tags, by ``calc_id``, as JSON. :param request: `django.http.HttpRequest` object. :param calc_id: The id of the requested calculation. :returns: a JSON object as documented in rest-api.rst """job=logs.dbcmd('get_job',int(calc_id))ifjobisNone:returnHttpResponseNotFound()ifnotutils.user_has_permission(request,job.user_name,job.status):returnHttpResponseForbidden()try:withdatastore.read(job.ds_calc_dir+'.hdf5')asds:df=_extract(ds,'aggrisk_tags')exceptExceptionasexc:tb=''.join(traceback.format_tb(exc.__traceback__))returnHttpResponse(content='%s: %s in %s\n%s'%(exc.__class__.__name__,exc,'aggrisk_tags',tb),content_type='text/plain',status=400)returnHttpResponse(content=df.to_json(),content_type=JSON,status=200)
[docs]@cross_domain_ajax@require_http_methods(['GET','HEAD'])defextract(request,calc_id,what):""" Wrapper over the `oq extract` command. If `setting.LOCKDOWN` is true only calculations owned by the current user can be retrieved. """job=logs.dbcmd('get_job',int(calc_id))ifjobisNone:returnHttpResponseNotFound()ifnotutils.user_has_permission(request,job.user_name,job.status):returnHttpResponseForbidden()path=request.get_full_path()n=len(request.path_info)query_string=unquote_plus(path[n:])try:# read the data and save them on a temporary .npz filewithdatastore.read(job.ds_calc_dir+'.hdf5')asds:# NOTE: for some reason, in some cases the environment# variable TMPDIR is ignored, so we need to use# config.directory.custom_tmp if definedtemp_dir=config.directory.custom_tmportempfile.gettempdir()fd,fname=tempfile.mkstemp(prefix=what.replace('/','-'),suffix='.npz',dir=temp_dir)os.close(fd)obj=_extract(ds,what+query_string)hdf5.save_npz(obj,fname)exceptExceptionasexc:tb=''.join(traceback.format_tb(exc.__traceback__))returnHttpResponse(content='%s: %s in %s\n%s'%(exc.__class__.__name__,exc,path,tb),content_type='text/plain',status=500)# stream the data backstream=FileWrapper(open(fname,'rb'))stream.close=lambda:(FileWrapper.close(stream),os.remove(fname))response=FileResponse(stream,content_type='application/octet-stream')response['Content-Disposition']=('attachment; filename=%s'%os.path.basename(fname))response['Content-Length']=str(os.path.getsize(fname))returnresponse
[docs]@cross_domain_ajax@require_http_methods(['GET'])defcalc_datastore(request,job_id):""" Download a full datastore file. :param request: `django.http.HttpRequest` object. :param job_id: The id of the requested datastore :returns: A `django.http.HttpResponse` containing the content of the requested artifact, if present, else throws a 404 """job=logs.dbcmd('get_job',int(job_id))ifjobisNoneornotos.path.exists(job.ds_calc_dir+'.hdf5'):returnHttpResponseNotFound()ifnotutils.user_has_permission(request,job.user_name,job.status):returnHttpResponseForbidden()fname=job.ds_calc_dir+'.hdf5'response=FileResponse(FileWrapper(open(fname,'rb')),content_type=HDF5)response['Content-Disposition']=('attachment; filename=%s'%os.path.basename(fname))response['Content-Length']=str(os.path.getsize(fname))returnresponse
[docs]defweb_engine(request,**kwargs):application_mode=settings.APPLICATION_MODE# NOTE: application_mode is already added by the context processorparams={}ifapplication_mode=='AELO':params['aelo_form_labels']=AELO_FORM_LABELSparams['aelo_form_placeholders']=AELO_FORM_PLACEHOLDERSparams['asce_versions']=(oqvalidation.OqParam.asce_version.validator.choices)params['default_asce_version']=(oqvalidation.OqParam.asce_version.default)elifapplication_mode=='ARISTOTLE':params['impact_form_labels']=ARISTOTLE_FORM_LABELSparams['impact_form_placeholders']=ARISTOTLE_FORM_PLACEHOLDERSparams['impact_default_usgs_id']= \
settings.ARISTOTLE_DEFAULT_USGS_IDreturnrender(request,"engine/index.html",params)
[docs]@cross_domain_ajax@require_http_methods(['GET'])defweb_engine_get_outputs(request,calc_id,**kwargs):application_mode=settings.APPLICATION_MODEjob=logs.dbcmd('get_job',calc_id)ifjobisNone:returnHttpResponseNotFound()withdatastore.read(job.ds_calc_dir+'.hdf5')asds:if'png'inds:# NOTE: only one hmap can be visualized currentlyhmaps=any([k.startswith('hmap')forkinds['png']])avg_gmf=[kforkinds['png']ifk.startswith('avg_gmf-')]assets='assets.png'inds['png']hcurves='hcurves.png'inds['png']# NOTE: remove "and 'All' in k" to show the individual plotsdisagg_by_src=[kforkinds['png']ifk.startswith('disagg_by_src-')and'All'ink]governing_mce='governing_mce.png'inds['png']else:hmaps=assets=hcurves=governing_mce=Falseavg_gmf=[]disagg_by_src=[]size_mb='?'ifjob.size_mbisNoneelse'%.2f'%job.size_mblon=lat=vs30=site_name=Noneifapplication_mode=='AELO':lon,lat=ds['oqparam'].sites[0][:2]# e.g. [[-61.071, 14.686, 0.0]]vs30=ds['oqparam'].override_vs30# e.g. 760.0site_name=ds['oqparam'].description[9:]# e.g. 'AELO for CCA'->'CCA'returnrender(request,"engine/get_outputs.html",dict(calc_id=calc_id,size_mb=size_mb,hmaps=hmaps,avg_gmf=avg_gmf,assets=assets,hcurves=hcurves,disagg_by_src=disagg_by_src,governing_mce=governing_mce,lon=lon,lat=lat,vs30=vs30,site_name=site_name,))
[docs]defis_model_preliminary(ds):# TODO: it would be better having the model written explicitly into the# datastoremodel=ds['oqparam'].base_path.split(os.path.sep)[-2]ifmodelinPRELIMINARY_MODELS:returnTrueelse:returnFalse
[docs]defget_disp_val(val):# gets the value displayed in the webui according to the rounding rulesifval>=1.0:return'{:.2f}'.format(numpy.round(val,2))elifval<0.0001:returnf'{val:.1f}'elifval<0.01:return'{:.4f}'.format(numpy.round(val,4))elifval<0.1:return'{:.3f}'.format(numpy.round(val,3))else:return'{:.2f}'.format(numpy.round(val,2))
# this is extracting only the first site and it is okay
[docs]@cross_domain_ajax@require_http_methods(['GET'])defweb_engine_get_outputs_aelo(request,calc_id,**kwargs):job=logs.dbcmd('get_job',calc_id)size_mb='?'ifjob.size_mbisNoneelse'%.2f'%job.size_mbasce07=asce41=Noneasce07_with_units={}asce41_with_units={}warnings=Nonewithdatastore.read(job.ds_calc_dir+'.hdf5')asds:ifis_model_preliminary(ds):warnings=PRELIMINARY_MODEL_WARNINGif'asce07'inds:try:asce07_js=ds['asce07'][0].decode('utf8')exceptValueError:# NOTE: for backwards compatibility, read scalarasce07_js=ds['asce07'][()].decode('utf8')asce07=json.loads(asce07_js)forkey,valueinasce07.items():ifkeynotin('PGA','Ss','S1'):continueifnotisinstance(value,float):asce07_with_units[key]=valueelifkeyin('CRs','CR1'):# NOTE: (-) stands for adimensionalasce07_with_units[key+' (-)']=get_disp_val(value)else:asce07_with_units[key+' (g)']=get_disp_val(value)if'asce41'inds:try:asce41_js=ds['asce41'][0].decode('utf8')exceptValueError:# NOTE: for backwards compatibility, read scalarasce41_js=ds['asce41'][()].decode('utf8')asce41=json.loads(asce41_js)forkey,valueinasce41.items():ifnotkey.startswith('BSE'):continueifnotisinstance(value,float):asce41_with_units[key]=valueelse:asce41_with_units[key+' (g)']=get_disp_val(value)lon,lat=ds['oqparam'].sites[0][:2]# e.g. [[-61.071, 14.686, 0.0]]vs30=ds['oqparam'].override_vs30# e.g. 760.0site_name=ds['oqparam'].description[9:]# e.g. 'AELO for CCA'->'CCA'try:asce_version=ds['oqparam'].asce_versionexceptAttributeError:# for backwards compatibility on old calculationsasce_version=oqvalidation.OqParam.asce_version.defaulttry:calc_aelo_version=ds.get_attr('/','aelo_version')exceptKeyError:calc_aelo_version='1.0.0'if'warnings'inds:ds_warnings='\n'.join(s.decode('utf8')forsinds['warnings'])ifwarningsisNone:warnings=ds_warningselse:warnings+='\n'+ds_warningsreturnrender(request,"engine/get_outputs_aelo.html",dict(calc_id=calc_id,size_mb=size_mb,asce07=asce07_with_units,asce41=asce41_with_units,lon=lon,lat=lat,vs30=vs30,site_name=site_name,calc_aelo_version=calc_aelo_version,asce_version=asce_version,warnings=warnings))
[docs]defformat_time_delta(td):days=td.daysseconds=td.secondshours=seconds//3600minutes=(seconds%3600)//60seconds=seconds%60# Format without microsecondsformatted_time=f'{days} days, {hours:02}:{minutes:02}:{seconds:02}'returnformatted_time
[docs]defdetermine_precision(weights):""" Determine the minimum decimal places needed to represent the weights accurately """max_decimal_places=0forweightinweights:str_weight=f"{weight:.10f}".rstrip("0")# Remove trailing zerosdecimal_places=str_weight[::-1].find('.')# Count decimal placesmax_decimal_places=max(max_decimal_places,decimal_places)returnmax_decimal_places
[docs]@cross_domain_ajax@require_http_methods(['GET'])defweb_engine_get_outputs_impact(request,calc_id):job=logs.dbcmd('get_job',calc_id)ifjobisNone:returnHttpResponseNotFound()description=job.descriptionjob_start_time=job.start_timejob_start_time_str=job.start_time.strftime('%Y-%m-%d %H:%M:%S')+' UTC'local_timestamp_str=Nonetime_job_after_event=Nonetime_job_after_event_str=Nonewarnings=Nonewithdatastore.read(job.ds_calc_dir+'.hdf5')asds:try:losses=views.view('aggrisk',ds)exceptKeyError:max_avg_gmf=ds['avg_gmf'][0].max()losses=(f'The risk can not be computed since the hazard is too low:'f' the maximum value of the average GMF is {max_avg_gmf:.5f}')losses_header=Noneelse:losses_header=[f'{field}<br><i>{FIELD_DESCRIPTION[field]}</i>'iffieldinFIELD_DESCRIPTIONelsefield.capitalize()forfieldinlosses.dtype.names]weights_precision=determine_precision(losses['weight'])if'png'inds:avg_gmf=[kforkinds['png']ifk.startswith('avg_gmf-')]assets='assets.png'inds['png']else:assets=Falseavg_gmf=[]oqparam=ds['oqparam']ifhasattr(oqparam,'local_timestamp'):local_timestamp_str=(oqparam.local_timestampifoqparam.local_timestamp!='None'elseNone)size_mb='?'ifjob.size_mbisNoneelse'%.2f'%job.size_mbif'warnings'inds:ds_warnings='\n'.join(s.decode('utf8')forsinds['warnings'])ifwarningsisNone:warnings=ds_warningselse:warnings+='\n'+ds_warningsiflocal_timestamp_strisnotNone:local_timestamp=datetime.strptime(local_timestamp_str,'%Y-%m-%d %H:%M:%S%z')time_job_after_event=(job_start_time.replace(tzinfo=timezone.utc)-local_timestamp)time_job_after_event_str=format_time_delta(time_job_after_event)returnrender(request,"engine/get_outputs_impact.html",dict(calc_id=calc_id,description=description,local_timestamp=local_timestamp_str,job_start_time=job_start_time_str,time_job_after_event=time_job_after_event_str,size_mb=size_mb,losses=losses,losses_header=losses_header,weights_precision=weights_precision,avg_gmf=avg_gmf,assets=assets,warnings=warnings))
[docs]@cross_domain_ajax@require_http_methods(['GET'])defdownload_aggrisk(request,calc_id):job=logs.dbcmd('get_job',int(calc_id))ifjobisNone:returnHttpResponseNotFound()ifnotutils.user_has_permission(request,job.user_name,job.status):returnHttpResponseForbidden()withdatastore.read(job.ds_calc_dir+'.hdf5')asds:losses=views.view('aggrisk',ds)# Create the HttpResponse object with the appropriate CSV header.response=HttpResponse(content_type="text/csv",headers={"Content-Disposition":'attachment; filename="aggrisk_%s.csv"'%calc_id},)writer=csv.writer(response)writer.writerow(losses.dtype.names)forrowinlosses:writer.writerow(row)returnresponse
[docs]@cross_domain_ajax@require_http_methods(['GET'])defextract_html_table(request,calc_id,name):job=logs.dbcmd('get_job',int(calc_id))ifjobisNone:returnHttpResponseNotFound()ifnotutils.user_has_permission(request,job.user_name,job.status):returnHttpResponseForbidden()try:withdatastore.read(job.ds_calc_dir+'.hdf5')asds:table=_extract(ds,name)exceptExceptionasexc:tb=''.join(traceback.format_tb(exc.__traceback__))returnHttpResponse(content='%s: %s in %s\n%s'%(exc.__class__.__name__,exc,name,tb),content_type='text/plain',status=400)table_html=table.to_html(classes="table table-striped",index=False)returnrender(request,'engine/show_table.html',{'table_name':name,'table_html':table_html})
[docs]@csrf_exempt@cross_domain_ajax@require_http_methods(['POST'])defon_same_fs(request):""" Accept a POST request to check access to a FS available by a client. :param request: `django.http.HttpRequest` object, containing mandatory parameters filename and checksum. """filename=request.POST['filename']checksum_in=request.POST['checksum']checksum=0try:data=open(filename,'rb').read(32)checksum=zlib.adler32(data,checksum)&0xffffffffifchecksum==int(checksum_in):returnHttpResponse(content=json.dumps({'success':True}),content_type=JSON,status=200)except(IOError,ValueError):passreturnHttpResponse(content=json.dumps({'success':False}),content_type=JSON,status=200)