Source code for openquake.commonlib.risk_parsers

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2010-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

"""
Module containing parsers for risk input artifacts.
"""
from openquake.commonlib.node import iterparse
from collections import namedtuple

from openquake.commonlib import nrml

NRML = "{%s}" % nrml.NAMESPACE
GML = "{%s}" % nrml.GML_NAMESPACE

AssetData = namedtuple(
    "AssetData",
    "exposure_metadata site asset_ref taxonomy area number costs occupancy")
Site = namedtuple("Site", "longitude latitude")
ExposureMetadata = namedtuple(
    "ExposureMetadata",
    "exposure_id taxonomy_source asset_category description conversions")


[docs]class Conversions(object): def __init__(self, cost_types, area_type, area_unit, deductible_is_absolute, insurance_limit_is_absolute): self.cost_types = cost_types self.area_type = area_type self.area_unit = area_unit self.deductible_is_absolute = deductible_is_absolute self.insurance_limit_is_absolute = insurance_limit_is_absolute def __repr__(self): return str(self.__dict__) def __eq__(self, cs): return ( self.cost_types == cs.cost_types and self.area_type == cs.area_type and self.area_unit == cs.area_unit and self.deductible_is_absolute == cs.deductible_is_absolute and self.insurance_limit_is_absolute == cs.insurance_limit_is_absolute)
CostType = namedtuple( "CostType", "name conversion_type unit retrofitted_type retrofitted_unit") Cost = namedtuple("Cost", "cost_type value retrofitted deductible limit") Occupancy = namedtuple("Occupancy", "occupants period") # TODO: this class will disappear soon, see # https://bugs.launchpad.net/oq-engine/+bug/1380465
[docs]class ExposureModelParser(object): """ Exposure model parser. This class is implemented as a generator. For each `asset` element in the parsed document, it yields an `AssetData` instance :param source: Filename or file-like object containing the XML data. """ def __init__(self, source): self._source = source # contains the data of the node currently parsed. self._meta = None def __iter__(self): """ Parse the document iteratively. """ for event, element in iterparse(self._source, events=('start', 'end')): # exposure metadata if event == 'start' and element.tag == '%sexposureModel' % NRML: desc = element.find('%sdescription' % NRML) if desc is not None: desc = desc.text else: desc = "" self._meta = ExposureMetadata( exposure_id=element.get('id'), description=desc, taxonomy_source=element.get('taxonomySource'), asset_category=str(element.get('category')), conversions=Conversions( cost_types=[], area_type=None, area_unit=None, deductible_is_absolute=True, insurance_limit_is_absolute=True)) # conversions if event == 'start' and element.tag == "%sarea" % NRML: self._meta.conversions.area_type = element.get('type') self._meta.conversions.area_unit = element.get('unit') elif event == 'start' and element.tag == "%sdeductible" % NRML: self._meta.conversions.deductible_is_absolute = not ( element.get('isAbsolute', "false") == "false") elif event == 'start' and element.tag == "%sinsuranceLimit" % NRML: self._meta.conversions.insurance_limit_is_absolute = not ( element.get('isAbsolute', "false") == "false") elif event == 'start' and element.tag == "%scostType" % NRML: self._meta.conversions.cost_types.append( CostType( name=element.get('name'), conversion_type=element.get('type'), unit=element.get('unit'), retrofitted_type=element.get('retrofittedType'), retrofitted_unit=element.get('retrofittedUnit'))) # asset data elif event == 'end' and element.tag == '%sasset' % NRML: if element.get('area') is not None: area = float(element.get('area')) else: area = None if element.get('number') is not None: number = float(element.get('number')) else: number = None point_elem = element.find('%slocation' % NRML) site_data = AssetData( exposure_metadata=self._meta, site=Site(float(point_elem.get("lon")), float(point_elem.get("lat"))), asset_ref=element.get('id'), taxonomy=element.get('taxonomy'), area=area, number=number, costs=_to_costs(element), occupancy=_to_occupancy(element)) del element yield site_data
def _to_occupancy(element): """ Convert the 'occupancy' tags to named tuples. We want to extract the values of <occupancies>. We expect the node to have child elements structured like this: <occupancy occupants="100" period="day"/> <occupancy occupants="50" period="night"/> ... """ occupancy_data = [] for otag in element.findall('.//%soccupancy' % NRML): occupancy_data.append(Occupancy( float(otag.attrib['occupants']), otag.attrib["period"])) return occupancy_data def _to_costs(element): """ Convert the 'cost' elements to named tuples. """ costs = [] for otag in element.findall('.//%scost' % NRML): retrofitted = otag.get('retrofitted') deductible = otag.get('deductible') limit = otag.get('insuranceLimit') if retrofitted is not None: retrofitted = float(retrofitted) if deductible is not None: deductible = float(deductible) if limit is not None: limit = float(limit) costs.append(Cost( otag.attrib['type'], float(otag.attrib['value']), retrofitted, deductible, limit)) return costs