Source code for openquake.commonlib.reportwriter

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

# 2015.06.26 13:21:50 CEST
"""
Utilities to build a report writer generating a .rst report for a calculation
"""
from __future__ import print_function
import os
import sys
import mock
import time
import logging


from openquake.baselib.general import humansize
from openquake.commonlib import readinput, datastore, source, parallel


[docs]def indent(text): return ' ' + '\n '.join(text.splitlines())
[docs]class ReportWriter(object): """ A particularly smart view over the datastore """ title = dict( params='Parameters', inputs='Input files', csm_info='Composite source model', required_params_per_trt='Required parameters per tectonic region type', ruptures_per_trt='Number of ruptures per tectonic region type', ruptures_events='Specific information for event based', rlzs_assoc='Realizations per (TRT, GSIM)', job_info='Informational data', biggest_ebr_gmf='Maximum memory allocated for the GMFs', avglosses_data_transfer='Estimated data transfer for the avglosses', exposure_info='Exposure model', short_source_info='Slowest sources', task_info='Information about the tasks', times_by_source_class='Computation times by source typology', performance='Slowest operations', ) def __init__(self, dstore): self.dstore = dstore self.oq = oq = dstore['oqparam'] self.text = (oq.description.encode('utf8') + '\n' + '=' * len(oq.description)) info = dstore['job_info'] dpath = dstore.hdf5path mtime = os.path.getmtime(dpath) self.text += '\n\n%s:%s updated %s' % ( info.hostname, dpath.encode('utf-8'), time.ctime(mtime)) # NB: in the future, the sitecol could be transferred as # an array by leveraging the HDF5 serialization protocol in # litetask decorator; for the moment however the size of the # data to transfer is given by the usual pickle sitecol_size = humansize(len(parallel.Pickled(dstore['sitecol']))) self.text += '\n\nnum_sites = %d, sitecol = %s' % ( len(dstore['sitecol']), sitecol_size)
[docs] def add(self, name, obj=None): """Add the view named `name` to the report text""" title = self.title[name] line = '-' * len(title) if obj: text = '\n::\n\n' + indent(str(obj)) else: text = datastore.view(name, self.dstore) self.text += '\n'.join(['\n\n' + title, line, text])
[docs] def make_report(self): """Build the report and return a restructed text string""" oq, ds = self.oq, self.dstore for name in ('params', 'inputs'): self.add(name) if 'composite_source_model' in ds: self.add('csm_info') self.add('required_params_per_trt') self.add('rlzs_assoc', ds['csm_info'].get_rlzs_assoc()) if 'composite_source_model' in ds: self.add('ruptures_per_trt') if 'scenario' not in oq.calculation_mode: self.add('job_info') if oq.calculation_mode in ('event_based_rupture', 'event_based', 'event_based_risk'): self.add('ruptures_events') if oq.calculation_mode in ('event_based_risk',): self.add('biggest_ebr_gmf') self.add('avglosses_data_transfer') if 'exposure' in oq.inputs: self.add('exposure_info') if 'source_info' in ds: self.add('short_source_info') self.add('times_by_source_class') if 'performance_data' in ds: self.add('task_info') self.add('performance') return self.text
[docs] def save(self, fname): """Save the report""" with open(fname, 'w') as f: f.write(self.text)
[docs]def build_report(job_ini, output_dir=None): """ Write a `report.csv` file with information about the calculation without running it :param job_ini: full pathname of the job.ini file :param output_dir: the directory where the report is written (default the input directory) """ oq = readinput.get_oqparam(job_ini) output_dir = output_dir or os.path.dirname(job_ini) from openquake.calculators import base # ugly calc = base.calculators(oq) # some taken is care so that the real calculation is not run: # the goal is to extract information about the source management only with mock.patch.object( calc.__class__, 'core_task', source.count_eff_ruptures): calc.pre_execute() with mock.patch.object(logging.root, 'info'): # reduce logging calc.execute() calc.save_params() rw = ReportWriter(calc.datastore) rw.make_report() report = (os.path.join(output_dir, 'report.rst') if output_dir else calc.datastore.export_path('report.rst')) try: rw.save(report) except IOError as exc: # permission error sys.stderr.write(str(exc) + '\n') return report