Source code for openquake.engine.export.hazard

# Copyright (c) 2010-2014, GEM Foundation.
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

"""
Functionality for exporting and serializing hazard curve calculation results.
"""


import os
import csv
import operator
from collections import namedtuple, defaultdict

from openquake.baselib.general import groupby
from openquake.hazardlib.calc import disagg
from openquake.commonlib import hazard_writers
from openquake.commonlib.writers import (
    floatformat, scientificformat, write_csv)

from openquake.engine.db import models
from openquake.engine.export import core


def _get_result_export_dest(calc_id, target, result, file_ext='xml'):
    """
    Get the full absolute path (including file name) for a given ``result``.

    As a side effect, intermediate directories are created such that the file
    can be created and written to immediately.

    :param int calc_id:
        ID of the associated
        :class:`openquake.engine.db.models.OqJob`.
    :param target:
        Destination directory location for exported files OR a file-like
        object. If file-like, we just simply return it.
    :param result:
        :mod:`openquake.engine.db.models` result object with a foreign key
        reference to :class:`~openquake.engine.db.models.Output`.
    :param file_ext:
        Desired file extension for the output file.
        Defaults to 'xml'.

    :returns:
        Full path (including filename) to the destination export file.
        If the ``target`` is a file-like, we don't do anything special
        and simply return it.
    """
    if not isinstance(target, (basestring, buffer)):
        # It's not a file path. In this case, we expect a file-like object.
        # Just return it.
        return target

    output = result.output
    output_type = output.output_type
    samples = output.oq_job.get_param('number_of_logic_tree_samples', 0)

    # Create the names for each subdirectory
    calc_dir = 'calc_%s' % calc_id
    type_dir = output_type

    imt_dir = ''  # if blank, we don't have an IMT dir
    if output_type in ('hazard_curve', 'hazard_map', 'disagg_matrix'):
        imt_dir = result.imt
        if result.imt == 'SA':
            imt_dir = 'SA-%s' % result.sa_period

    # construct the directory which will contain the result XML file:
    directory = os.path.join(target, calc_dir, type_dir, imt_dir)
    core.makedirs(directory)

    if output_type in ('hazard_curve', 'hazard_curve_multi', 'hazard_map',
                       'uh_spectra'):
        # include the poe in hazard map and uhs file names
        if output_type in ('hazard_map', 'uh_spectra'):
            output_type = '%s-poe_%s' % (output_type, result.poe)

        if result.statistics is not None:
            # we could have stats
            if result.statistics == 'quantile':
                # quantile
                filename = '%s-%s.%s' % (output_type,
                                         'quantile_%s' % result.quantile,
                                         file_ext)
            else:
                # mean
                filename = '%s-%s.%s' % (output_type, result.statistics,
                                         file_ext)
        else:
            # otherwise, we need to include logic tree branch info
            ltr = result.lt_realization
            sm_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.sm_lt_path)
            gsim_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.gsim_lt_path)
            if samples:
                # Monte-Carlo logic tree sampling
                filename = '%s-smltp_%s-gsimltp_%s-ltr_%s.%s' % (
                    output_type, sm_ltp, gsim_ltp, ltr.ordinal, file_ext
                )
            else:
                # End Branch Enumeration
                filename = '%s-smltp_%s-gsimltp_%s.%s' % (
                    output_type, sm_ltp, gsim_ltp, file_ext
                )
    elif output_type == 'gmf':
        # only logic trees, no stats
        ltr = result.lt_realization
        sm_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.sm_lt_path)
        gsim_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.gsim_lt_path)
        if samples:
            # Monte-Carlo logic tree sampling
            filename = '%s-smltp_%s-gsimltp_%s-ltr_%s.%s' % (
                output_type, sm_ltp, gsim_ltp, ltr.ordinal, file_ext
            )
        else:
            # End Branch Enumeration
            filename = '%s-smltp_%s-gsimltp_%s.%s' % (
                output_type, sm_ltp, gsim_ltp, file_ext
            )
    elif output_type == 'ses':
        sm_ltp = core.LT_PATH_JOIN_TOKEN.join(result.sm_lt_path)
        filename = '%s-%s-smltp_%s.%s' % (
            output_type, result.ordinal, sm_ltp, file_ext
        )
    elif output_type == 'disagg_matrix':
        # only logic trees, no stats

        out = '%s(%s)' % (output_type, result.poe)
        location = 'lon_%s-lat_%s' % (result.location.x, result.location.y)

        ltr = result.lt_realization
        sm_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.sm_lt_path)
        gsim_ltp = core.LT_PATH_JOIN_TOKEN.join(ltr.gsim_lt_path)
        if samples:
            # Monte-Carlo logic tree sampling
            filename = '%s-%s-smltp_%s-gsimltp_%s-ltr_%s.%s' % (
                out, location, sm_ltp, gsim_ltp, ltr.ordinal, file_ext
            )
        else:
            # End Branch Enumeration
            filename = '%s-%s-smltp_%s-gsimltp_%s.%s' % (
                out, location, sm_ltp, gsim_ltp, file_ext
            )
    else:
        filename = '%s.%s' % (output_type, file_ext)

    return os.path.abspath(os.path.join(directory, filename))


@core.export_output.add(('hazard_curve', 'xml'),
                        ('hazard_curve', 'geojson'))
[docs]def export_hazard_curve(key, output, target): """ Export the specified hazard curve ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `hazard_curve`. :param target: The same ``target`` as :func:`export`. :returns: The same return value as defined by :func:`export`. """ export_type = key[1] hcd = [] for hc in models.HazardCurve.objects.filter(output=output.id): hcd.extend(_curve_data(hc)) metadata = _curve_metadata(hc, target) haz_calc = output.oq_job dest = _get_result_export_dest( haz_calc.id, target, hc, file_ext=export_type) writercls = hazard_writers.HazardCurveGeoJSONWriter \ if export_type == 'geojson' else hazard_writers.HazardCurveXMLWriter writercls(dest, **metadata).serialize(hcd) return dest
@core.export_output.add(('hazard_curve', 'csv'))
[docs]def export_hazard_curve_csv(key, output, target): """ Save a hazard curve (of a given IMT) as a .csv file in the format (lon lat poe1 ... poeN), where the fields are space separated. """ data = [] for hc in models.HazardCurve.objects.filter(output=output.id): x_y_poes = models.HazardCurveData.objects.all_curves_simple( filter_args=dict(hazard_curve=hc.id)) data.extend(x_y_poes) haz_calc_id = output.oq_job.id dest = _get_result_export_dest(haz_calc_id, target, hc, file_ext='csv') with open(dest, 'wb') as f: writer = csv.writer(f, delimiter=' ') with floatformat('%11.7E'): for x, y, poes in sorted(data): writer.writerow(map(scientificformat, [x, y] + poes)) return dest
@core.export_output.add(('hazard_curve_multi', 'xml'))
[docs]def export_hazard_curve_multi_xml(key, output, target): hcs = output.hazard_curve data = [_curve_data(hc) for hc in hcs] metadata_set = [] for hc in hcs: metadata = _curve_metadata(hc, target) metadata_set.append(metadata) haz_calc = output.oq_job dest = _get_result_export_dest(haz_calc.id, target, hcs) writer = hazard_writers.MultiHazardCurveXMLWriter(dest, metadata_set) writer.serialize(data) return dest
def _curve_metadata(hc, target): if hc.lt_realization is not None: # If the curves are for a specified logic tree realization, # get the tree paths lt_rlz = hc.lt_realization smlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsimlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: # These curves must be statistical aggregates smlt_path = None gsimlt_path = None return { 'quantile_value': hc.quantile, 'statistics': hc.statistics, 'smlt_path': smlt_path, 'gsimlt_path': gsimlt_path, 'sa_period': hc.sa_period, 'sa_damping': hc.sa_damping, 'investigation_time': hc.investigation_time, 'imt': hc.imt, 'imls': hc.imls, } def _curve_data(hc): curves = models.HazardCurveData.objects.all_curves_simple( filter_args=dict(hazard_curve=hc.id) ) # Simple object wrapper around the values, to match the interface of the # XML writer: Location = namedtuple('Location', 'x y') HazardCurveData = namedtuple('HazardCurveData', 'location poes') return (HazardCurveData(Location(x, y), poes) for x, y, poes in curves) @core.export_output.add(('gmf_scenario', 'xml'), ('gmf', 'xml'))
[docs]def export_gmf_xml(key, output, target): """ Export the GMF Collection specified by ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `gmf`. :param target: The same ``target`` as :func:`export`. :returns: The same return value as defined by :func:`export`. """ gmf = models.Gmf.objects.get(output=output.id) haz_calc = output.oq_job if output.output_type == 'gmf': lt_rlz = gmf.lt_realization sm_lt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsim_lt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: # gmf_scenario sm_lt_path = '' gsim_lt_path = '*' dest = _get_result_export_dest(haz_calc.id, target, output.gmf) writer = hazard_writers.EventBasedGMFXMLWriter( dest, sm_lt_path, gsim_lt_path) writer.serialize(gmf) return dest
@core.export_output.add(('gmf_scenario', 'csv'), ('gmf', 'csv'))
[docs]def export_gmf_csv(key, output, target): """ Export the GMF Collection specified by ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `gmf`. :param target: The same ``target`` as :func:`export`. :returns: The same return value as defined by :func:`export`. """ haz_calc = output.oq_job dest = _get_result_export_dest( haz_calc.id, target, output.gmf)[:-3] + 'csv' # export the GMFs ordered by tag write_csv(dest, sorted(_gen_gmf_rows(output), key=operator.itemgetter(0))) return dest
[docs]def to_imt_str(gmf): """Extract the IMT from a gmf record""" if gmf.sa_period: return 'SA(%s)' % gmf.sa_period else: return gmf.imt
[docs]def regroup(idx_gmv_imt_triples): """ Regroup the GMF data in a form suitable for export. >>> data = [(1, 0.1, 'PGA'), (2, 0.2, 'PGA'), ... (1, 0.11, 'SA(0.1)'), (2, 0.21, 'SA(0.1)'), ... (1, 0.12, 'SA(0.2)'), (2, 0.22, 'SA(0.2)')] >>> regroup(data) [(1, 2), [0.1, 0.2], [0.11, 0.21], [0.12, 0.22]] """ def reducegroup(group): indices = [] gmvs = [] for sid, gmv, imt in group: indices.append(sid) gmvs.append(gmv) return tuple(indices), gmvs by_imt = operator.itemgetter(2) dic = groupby(idx_gmv_imt_triples, by_imt, reducegroup) by_indices = operator.itemgetter(0) val = groupby(dic.values(), by_indices, lambda group: [gmvs for indices, gmvs in group]) [(indices, gmvs_by_imt)] = val.items() return [indices] + gmvs_by_imt
# this is so complicated because the db structure is so wrong def _gen_gmf_rows(output): # yield a row for each rupture; the format is # [tag, indices, gmf_imt_1, ... , gmf_imt_N] gmf = models.Gmf.objects.get(output=output) haz_calc = output.oq_job sids = sorted(models.HazardSite.objects.filter( hazard_calculation=haz_calc).values_list('id', flat=True)) idx = {s: i for i, s in enumerate(sids)} # sid -> idx # if some sites are filtered out, the exported indices are wrong # and different from the ones that are exported by oq-lite; since # it is too difficult to fix, with the moment we live we that. NB: the # indices in the database are right, it is only the export which is wrong gmf_by_rupture = defaultdict(list) for data in models.GmfData.objects.filter( site_id__in=sids, gmf=gmf): for rupid, gmv in zip(data.rupture_ids, data.gmvs): gmf_by_rupture[rupid].append( (idx[data.site_id], gmv, to_imt_str(data))) tag = dict(models.SESRupture.objects.filter( pk__in=gmf_by_rupture).values_list('id', 'tag')) for rupid in gmf_by_rupture: yield [tag[rupid]] + regroup(gmf_by_rupture[rupid]) @core.export_output.add(('ses', 'xml'))
[docs]def export_ses_xml(key, output, target): """ Export the Stochastic Event Set Collection specified by ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `ses`. :param str target: Destination directory location for exported files. :returns: The exported file path """ ses_coll = models.SESCollection.objects.get(output=output.id) haz_calc = output.oq_job sm_lt_path = core.LT_PATH_JOIN_TOKEN.join(ses_coll.sm_lt_path) dest = _get_result_export_dest(haz_calc.id, target, output.ses) writer = hazard_writers.SESXMLWriter(dest, sm_lt_path) writer.serialize(ses_coll) return dest
@core.export_output.add(('ses', 'csv'))
[docs]def export_ses_csv(key, output, target): """ Export the Stochastic Event Set Collection specified by ``output`` to the ``target`` in csv format. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `ses`. :param str target: Destination directory location for exported files. :returns: The exported file path """ ses_coll = models.SESCollection.objects.get(output=output.id) haz_calc = output.oq_job dest = _get_result_export_dest( haz_calc.id, target, output.ses)[:-3] + 'csv' rows = [] for ses in ses_coll: for sesrup in ses: rows.append([sesrup.tag, sesrup.seed]) write_csv(dest, sorted(rows, key=operator.itemgetter(0))) return dest
@core.export_output.add(('hazard_map', 'xml'), ('hazard_map', 'geojson'))
[docs]def export_hazard_map(key, output, target): """ General hazard map export code. """ file_ext = key[1] data = [] for hazard_map in models.HazardMap.objects.filter(output=output): data.extend(zip(hazard_map.lons, hazard_map.lats, hazard_map.imls)) haz_calc = output.oq_job if hazard_map.lt_realization is not None: # If the maps are for a specified logic tree realization, # get the tree paths lt_rlz = hazard_map.lt_realization smlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsimlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: # These maps must be constructed from mean or quantile curves smlt_path = None gsimlt_path = None dest = _get_result_export_dest(haz_calc.id, target, hazard_map, file_ext=file_ext) metadata = { 'quantile_value': hazard_map.quantile, 'statistics': hazard_map.statistics, 'smlt_path': smlt_path, 'gsimlt_path': gsimlt_path, 'sa_period': hazard_map.sa_period, 'sa_damping': hazard_map.sa_damping, 'investigation_time': hazard_map.investigation_time, 'imt': hazard_map.imt, 'poe': hazard_map.poe, } writer_class = (hazard_writers.HazardMapXMLWriter if file_ext == 'xml' else hazard_writers.HazardMapGeoJSONWriter) writer = writer_class(dest, **metadata) writer.serialize(data) return dest
@core.export_output.add(('hazard_map', 'csv'))
[docs]def export_hazard_map_csv(key, output, target): """ General hazard map export code. """ file_ext = key[1] data = [] for hazard_map in models.HazardMap.objects.filter(output=output): data.extend(zip(hazard_map.lons, hazard_map.lats, hazard_map.imls)) haz_calc = output.oq_job dest = _get_result_export_dest(haz_calc.id, target, hazard_map, file_ext=file_ext) with open(dest, 'w') as f: writer = csv.writer(f, delimiter=' ') with floatformat('%12.8E'): for row in sorted(data): writer.writerow(map(scientificformat, row)) return dest
class _DisaggMatrix(object): """ A simple data model into which disaggregation matrix information can be packed. The :class:`openquake.commonlib.hazard_writers.DisaggXMLWriter` expects a sequence of objects which match this interface. :param matrix: A n-dimensional numpy array representing a probability mass function produced by the disaggregation calculator. The calculator produces a 6d matrix, but the final results which are saved to XML are "subsets" of matrix showing contributions to hazard from different combinations of magnitude, distance, longitude, latitude, epsilon, and tectonic region type. :param dim_labels: Expected values are (as tuples, lists, or similar) one of the following, depending on the result `matrix` type: * ['Mag'] * ['Dist'] * ['TRT'] * ['Mag', 'Dist'] * ['Mag', 'Dist', 'Eps'] * ['Lon', 'Lat'] * ['Mag', 'Lon', 'Lat'] * ['Lon', 'Lat', 'TRT'] :param float poe: Probability of exceedence (specified in the calculation config file). :param float iml: Interpolated intensity value, corresponding to the ``poe``, extracted from the aggregated hazard curve (which was used as input to compute the ``matrix``). """ def __init__(self, matrix, dim_labels, poe, iml): self.matrix = matrix self.dim_labels = dim_labels self.poe = poe self.iml = iml @core.export_output.add(('disagg_matrix', 'xml'))
[docs]def export_disagg_matrix_xml(key, output, target): """ Export disaggregation histograms to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `disagg_matrix`. :param str target: Destination directory location for exported files. :returns: A list of exported file name (including the absolute path to each file). """ # We expect 1 result per `Output` [disagg_result] = models.DisaggResult.objects.filter(output=output) lt_rlz = disagg_result.lt_realization haz_calc = output.oq_job dest = _get_result_export_dest(haz_calc.id, target, output.disagg_matrix) writer_kwargs = dict( investigation_time=disagg_result.investigation_time, imt=disagg_result.imt, lon=disagg_result.location.x, lat=disagg_result.location.y, sa_period=disagg_result.sa_period, sa_damping=disagg_result.sa_damping, mag_bin_edges=disagg_result.mag_bin_edges, dist_bin_edges=disagg_result.dist_bin_edges, lon_bin_edges=disagg_result.lon_bin_edges, lat_bin_edges=disagg_result.lat_bin_edges, eps_bin_edges=disagg_result.eps_bin_edges, tectonic_region_types=disagg_result.trts, smlt_path=core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path), gsimlt_path=core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path), ) writer = hazard_writers.DisaggXMLWriter(dest, **writer_kwargs) data = (_DisaggMatrix(disagg_result.matrix[i], dim_labels, disagg_result.poe, disagg_result.iml) for i, dim_labels in enumerate(disagg.pmf_map)) writer.serialize(data) return dest
@core.export_output.add(('uh_spectra', 'xml'))
[docs]def export_uh_spectra_xml(key, output, target): """ Export the specified UHS ``output`` to the ``target``. :param output: :class:`openquake.engine.db.models.Output` with an `output_type` of `uh_spectra`. :param str target: Destination directory location for exported files. :returns: A list containing the exported file name. """ uhs = models.UHS.objects.get(output=output) haz_calc = output.oq_job dest = _get_result_export_dest(haz_calc.id, target, output.uh_spectra) if uhs.lt_realization is not None: lt_rlz = uhs.lt_realization smlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.sm_lt_path) gsimlt_path = core.LT_PATH_JOIN_TOKEN.join(lt_rlz.gsim_lt_path) else: smlt_path = None gsimlt_path = None metadata = { 'quantile_value': uhs.quantile, 'statistics': uhs.statistics, 'smlt_path': smlt_path, 'gsimlt_path': gsimlt_path, 'poe': uhs.poe, 'periods': uhs.periods, 'investigation_time': uhs.investigation_time, } writer = hazard_writers.UHSXMLWriter(dest, **metadata) writer.serialize(uhs) return dest