Source code for openquake.engine.export.hazard

# Copyright (c) 2010-2014, GEM Foundation.
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

"""
Functionality for exporting and serializing hazard curve calculation results.
"""


import os
import csv

from collections import namedtuple

from openquake.hazardlib.calc import disagg
from openquake.commonlib import hazard_writers
from openquake.commonlib.writers import floatformat

from openquake.engine.db import models
from openquake.engine.export import core


def _get_result_export_dest(calc_id, target, result, file_ext='xml'):
    """
    Get the full absolute path (including file name) for a given ``result``.

    As a side effect, intermediate directories are created such that the file
    can be created and written to immediately.

    :param int calc_id:
        ID of the associated
        :class:`openquake.engine.db.models.OqJob`.
    :param target:
        Destination directory location for exported files OR a file-like
        object. If file-like, we just simply return it.
    :param result:
        :mod:`openquake.engine.db.models` result object with a foreign key
        reference to :class:`~openquake.engine.db.models.Output`.
    :param file_ext:
        Desired file extension for the output file.
        Defaults to 'xml'.

    :returns:
        Full path (including filename) to the destination export file.
        If the ``target`` is a file-like, we don't do anything special
        and simply return it.
    """
    if not isinstance(target, (basestring, buffer)):
        # It's not a file path. In this case, we expect a file-like object.
        # Just return it.
        return target

    output = result.output
    output_type = output.output_type

    # Create the names for each subdirectory
    calc_dir = 'calc_%s' % calc_id
    type_dir = output_type

    imt_dir = ''  # if blank, we don't have an IMT dir
    if output_type in ('hazard_curve', 'hazard_map', 'disagg_matrix'):
        imt_dir = result.imt
        if result.imt == 'SA':
            imt_dir = 'SA-%s' % result.sa_period

    # construct the directory which will contain the result XML file:
    directory = os.path.join(target, calc_dir, type_dir, imt_dir)
    core.makedirs(directory)

    if output_type in ('hazard_curve', 'hazard_curve_multi', 'hazard_map',
                       'uh_spectra'):
        # include the poe in hazard map and uhs file names
        if output_type in ('hazard_map', 'uh_spectra'):
            output_type = '%s-poe_%s' % (output_type, result.poe)

        if result.statistics is not None:
            # we could have stats
            if result.statistics == 'quantile':
                # quantile
                filename = '%s-%s.%s' % (output_type,
                                         'quantile_%s' % result.quantile,
                                         file_ext)
            else:
                # mean
                filename = '%s-%s.%s' % (output_type, result.statistics,
                                         file_ext)
        else:
            # otherwise, we need to include logic tree branch info
            ltr = result.lt_realization
            sm_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.sm_lt_path)
            gsim_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.gsim_lt_path)
            if ltr.weight is None:
                # Monte-Carlo logic tree sampling
                filename = '%s-smltp_%s-gsimltp_%s-ltr_%s.%s' % (
                    output_type, sm_ltp, gsim_ltp, ltr.ordinal, file_ext
                )
            else:
                # End Branch Enumeration
                filename = '%s-smltp_%s-gsimltp_%s.%s' % (
                    output_type, sm_ltp, gsim_ltp, file_ext
                )
    elif output_type == 'gmf':
        # only logic trees, no stats
        ltr = result.lt_realization
        sm_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.sm_lt_path)
        gsim_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.gsim_lt_path)
        if ltr.weight is None:
            # Monte-Carlo logic tree sampling
            filename = '%s-smltp_%s-gsimltp_%s-ltr_%s.%s' % (
                output_type, sm_ltp, gsim_ltp, ltr.ordinal, file_ext
            )
        else:
            # End Branch Enumeration
            filename = '%s-smltp_%s-gsimltp_%s.%s' % (
                output_type, sm_ltp, gsim_ltp, file_ext
            )
    elif output_type == 'ses':
        sm_ltp = core.LT_PATH_JOIN_TOKEN.join(result.sm_lt_path)
        filename = '%s-%s-smltp_%s.%s' % (
            output_type, result.ordinal, sm_ltp, file_ext
        )
    elif output_type == 'disagg_matrix':
        # only logic trees, no stats

        out = '%s(%s)' % (output_type, result.poe)
        location = 'lon_%s-lat_%s' % (result.location.x, result.location.y)

        ltr = result.lt_realization
        sm_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.sm_lt_path)
        gsim_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.gsim_lt_path)
        if ltr.weight is None:
            # Monte-Carlo logic tree sampling
            filename = '%s-%s-smltp_%s-gsimltp_%s-ltr_%s.%s' % (
                out, location, sm_ltp, gsim_ltp, ltr.ordinal, file_ext
            )
        else:
            # End Branch Enumeration
            filename = '%s-%s-smltp_%s-gsimltp_%s.%s' % (
                out, location, sm_ltp, gsim_ltp, file_ext
            )
    else:
        filename = '%s.%s' % (output_type, file_ext)

    return os.path.abspath(os.path.join(directory, filename))


@core.export_output.add(('hazard_curve', 'xml'),
                        ('hazard_curve', 'geojson'))
[docs]def export_hazard_curve(key, output, target): """ Export the specified hazard curve ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `hazard_curve`. :param target: The same ``target`` as :func:`export`. :returns: The same return value as defined by :func:`export`. """ export_type = key[1] hc = models.HazardCurve.objects.get(output=output.id) hcd = _curve_data(hc) metadata = _curve_metadata(output, target) haz_calc = output.oq_job dest = _get_result_export_dest( haz_calc.id, target, hc, file_ext=export_type) writercls = hazard_writers.HazardCurveGeoJSONWriter \ if export_type == 'geojson' else hazard_writers.HazardCurveXMLWriter writercls(dest, **metadata).serialize(hcd) return dest
@core.export_output.add(('hazard_curve', 'csv'))
[docs]def export_hazard_curve_csv(key, output, target): """ Save a hazard curve (of a given IMT) as a .csv file in the format (lon lat poe1 ... poeN), where the fields are space separated. """ hc = models.HazardCurve.objects.get(output=output.id) haz_calc_id = output.oq_job.id dest = _get_result_export_dest(haz_calc_id, target, hc, file_ext='csv') x_y_poes = models.HazardCurveData.objects.all_curves_simple( filter_args=dict(hazard_curve=hc.id)) with open(dest, 'wb') as f: writer = csv.writer(f, delimiter=' ') for x, y, poes in sorted(x_y_poes): writer.writerow([x, y] + poes) return dest
@core.export_output.add(('hazard_curve_multi', 'xml'))
[docs]def export_hazard_curve_multi_xml(key, output, target): hcs = output.hazard_curve data = [_curve_data(hc) for hc in hcs] metadata_set = [] for hc in hcs: metadata = _curve_metadata(hc.output, target) metadata_set.append(metadata) haz_calc = output.oq_job dest = _get_result_export_dest(haz_calc.id, target, hcs) writer = hazard_writers.MultiHazardCurveXMLWriter(dest, metadata_set) writer.serialize(data) return dest
def _curve_metadata(output, target): hc = models.HazardCurve.objects.get(output=output.id) if hc.lt_realization is not None: # If the curves are for a specified logic tree realization, # get the tree paths lt_rlz = hc.lt_realization smlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsimlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: # These curves must be statistical aggregates smlt_path = None gsimlt_path = None return { 'quantile_value': hc.quantile, 'statistics': hc.statistics, 'smlt_path': smlt_path, 'gsimlt_path': gsimlt_path, 'sa_period': hc.sa_period, 'sa_damping': hc.sa_damping, 'investigation_time': hc.investigation_time, 'imt': hc.imt, 'imls': hc.imls, } def _curve_data(hc): curves = models.HazardCurveData.objects.all_curves_simple( filter_args=dict(hazard_curve=hc.id) ) # Simple object wrapper around the values, to match the interface of the # XML writer: Location = namedtuple('Location', 'x y') HazardCurveData = namedtuple('HazardCurveData', 'location poes') return (HazardCurveData(Location(x, y), poes) for x, y, poes in curves) @core.export_output.add(('gmf_scenario', 'xml'), ('gmf', 'xml'))
[docs]def export_gmf_xml(key, output, target): """ Export the GMF Collection specified by ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `gmf`. :param target: The same ``target`` as :func:`export`. :returns: The same return value as defined by :func:`export`. """ gmf = models.Gmf.objects.get(output=output.id) haz_calc = output.oq_job if output.output_type == 'gmf': lt_rlz = gmf.lt_realization sm_lt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsim_lt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: # gmf_scenario sm_lt_path = '' gsim_lt_path = '' dest = _get_result_export_dest(haz_calc.id, target, output.gmf) writer = hazard_writers.EventBasedGMFXMLWriter( dest, sm_lt_path, gsim_lt_path) with floatformat('%12.8E'): writer.serialize(gmf) return dest
@core.export_output.add(('ses', 'xml'))
[docs]def export_ses_xml(key, output, target): """ Export the Stochastic Event Set Collection specified by ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `ses`. :param str target: Destination directory location for exported files. :returns: A list of exported file names (including the absolute path to each file). """ ses_coll = models.SESCollection.objects.get(output=output.id) haz_calc = output.oq_job sm_lt_path = core.LT_PATH_JOIN_TOKEN.join(ses_coll.sm_lt_path) dest = _get_result_export_dest(haz_calc.id, target, output.ses) writer = hazard_writers.SESXMLWriter(dest, sm_lt_path) writer.serialize(ses_coll) return dest
@core.export_output.add(('hazard_map', 'xml'), ('hazard_map', 'geojson'))
[docs]def export_hazard_map(key, output, target): """ General hazard map export code. """ file_ext = key[1] hazard_map = models.HazardMap.objects.get(output=output) haz_calc = output.oq_job if hazard_map.lt_realization is not None: # If the maps are for a specified logic tree realization, # get the tree paths lt_rlz = hazard_map.lt_realization smlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsimlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: # These maps must be constructed from mean or quantile curves smlt_path = None gsimlt_path = None dest = _get_result_export_dest(haz_calc.id, target, output.hazard_map, file_ext=file_ext) metadata = { 'quantile_value': hazard_map.quantile, 'statistics': hazard_map.statistics, 'smlt_path': smlt_path, 'gsimlt_path': gsimlt_path, 'sa_period': hazard_map.sa_period, 'sa_damping': hazard_map.sa_damping, 'investigation_time': hazard_map.investigation_time, 'imt': hazard_map.imt, 'poe': hazard_map.poe, } writer_class = (hazard_writers.HazardMapXMLWriter if file_ext == 'xml' else hazard_writers.HazardMapGeoJSONWriter) writer = writer_class(dest, **metadata) writer.serialize(zip(hazard_map.lons, hazard_map.lats, hazard_map.imls)) return dest
class _DisaggMatrix(object): """ A simple data model into which disaggregation matrix information can be packed. The :class:`openquake.commonlib.hazard_writers.DisaggXMLWriter` expects a sequence of objects which match this interface. :param matrix: A n-dimensional numpy array representing a probability mass function produced by the disaggregation calculator. The calculator produces a 6d matrix, but the final results which are saved to XML are "subsets" of matrix showing contributions to hazard from different combinations of magnitude, distance, longitude, latitude, epsilon, and tectonic region type. :param dim_labels: Expected values are (as tuples, lists, or similar) one of the following, depending on the result `matrix` type: * ['Mag'] * ['Dist'] * ['TRT'] * ['Mag', 'Dist'] * ['Mag', 'Dist', 'Eps'] * ['Lon', 'Lat'] * ['Mag', 'Lon', 'Lat'] * ['Lon', 'Lat', 'TRT'] :param float poe: Probability of exceedence (specified in the calculation config file). :param float iml: Interpolated intensity value, corresponding to the ``poe``, extracted from the aggregated hazard curve (which was used as input to compute the ``matrix``). """ def __init__(self, matrix, dim_labels, poe, iml): self.matrix = matrix self.dim_labels = dim_labels self.poe = poe self.iml = iml @core.export_output.add(('disagg_matrix', 'xml'))
[docs]def export_disagg_matrix_xml(key, output, target): """ Export disaggregation histograms to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `disagg_matrix`. :param str target: Destination directory location for exported files. :returns: A list of exported file name (including the absolute path to each file). """ # We expect 1 result per `Output` [disagg_result] = models.DisaggResult.objects.filter(output=output) lt_rlz = disagg_result.lt_realization haz_calc = output.oq_job dest = _get_result_export_dest(haz_calc.id, target, output.disagg_matrix) writer_kwargs = dict( investigation_time=disagg_result.investigation_time, imt=disagg_result.imt, lon=disagg_result.location.x, lat=disagg_result.location.y, sa_period=disagg_result.sa_period, sa_damping=disagg_result.sa_damping, mag_bin_edges=disagg_result.mag_bin_edges, dist_bin_edges=disagg_result.dist_bin_edges, lon_bin_edges=disagg_result.lon_bin_edges, lat_bin_edges=disagg_result.lat_bin_edges, eps_bin_edges=disagg_result.eps_bin_edges, tectonic_region_types=disagg_result.trts, smlt_path=core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path), gsimlt_path=core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path), ) writer = hazard_writers.DisaggXMLWriter(dest, **writer_kwargs) data = (_DisaggMatrix(disagg_result.matrix[i], dim_labels, disagg_result.poe, disagg_result.iml) for i, dim_labels in enumerate(disagg.pmf_map)) writer.serialize(data) return dest
@core.export_output.add(('uh_spectra', 'xml'))
[docs]def export_uh_spectra_xml(key, output, target): """ Export the specified UHS ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `uh_spectra`. :param str target: Destination directory location for exported files. :returns: A list containing the exported file name. """ uhs = models.UHS.objects.get(output=output) haz_calc = output.oq_job dest = _get_result_export_dest(haz_calc.id, target, output.uh_spectra) if uhs.lt_realization is not None: lt_rlz = uhs.lt_realization smlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsimlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: smlt_path = None gsimlt_path = None metadata = { 'quantile_value': uhs.quantile, 'statistics': uhs.statistics, 'smlt_path': smlt_path, 'gsimlt_path': gsimlt_path, 'poe': uhs.poe, 'periods': uhs.periods, 'investigation_time': uhs.investigation_time, } writer = hazard_writers.UHSXMLWriter(dest, **metadata) writer.serialize(uhs) return dest