# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2025 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
import os
import shutil
import operator
from xml.etree.ElementTree import iterparse
from openquake.baselib.general import groupby
from openquake.baselib.node import context, striptag, Node
from openquake.hazardlib.nrml import NRML05
from openquake.hazardlib import InvalidFile, nrml, sourceconverter
from openquake.hazardlib.sourcewriter import obj_to_node
from openquake.risklib import scientific, read_nrml
[docs]def get_vulnerability_functions_04(fname):
    """
    Parse the vulnerability model in NRML 0.4 format.
    :param fname:
        path of the vulnerability file
    :returns:
        a dictionary imt, taxonomy -> vulnerability function + vset
    """
    categories = dict(assetCategory=set(), lossCategory=set(),
                      vulnerabilitySetID=set())
    imts = set()
    taxonomies = set()
    vf_dict = {}  # imt, taxonomy -> vulnerability function
    for vset in nrml.read(fname).vulnerabilityModel:
        categories['assetCategory'].add(vset['assetCategory'])
        categories['lossCategory'].add(vset['lossCategory'])
        categories['vulnerabilitySetID'].add(vset['vulnerabilitySetID'])
        IML = vset.IML
        imt_str = IML['IMT']
        imls = ~IML
        imts.add(imt_str)
        for vfun in vset.getnodes('discreteVulnerability'):
            taxonomy = vfun['vulnerabilityFunctionID']
            if taxonomy in taxonomies:
                raise InvalidFile(
                    'Duplicated vulnerabilityFunctionID: %s: %s, line %d' %
                    (taxonomy, fname, vfun.lineno))
            taxonomies.add(taxonomy)
            with context(fname, vfun):
                loss_ratios = ~vfun.lossRatio
                coefficients = ~vfun.coefficientsVariation
            if len(loss_ratios) != len(imls):
                raise InvalidFile(
                    'There are %d loss ratios, but %d imls: %s, line %d' %
                    (len(loss_ratios), len(imls), fname,
                     vfun.lossRatio.lineno))
            if len(coefficients) != len(imls):
                raise InvalidFile(
                    'There are %d coefficients, but %d imls: %s, line %d' %
                    (len(coefficients), len(imls), fname,
                     vfun.coefficientsVariation.lineno))
            with context(fname, vfun):
                vf_dict[imt_str, taxonomy] = scientific.VulnerabilityFunction(
                    taxonomy, imt_str, imls, loss_ratios, coefficients,
                    vfun['probabilisticDistribution'])
    categories['id'] = '_'.join(sorted(categories['vulnerabilitySetID']))
    del categories['vulnerabilitySetID']
    return vf_dict, categories 
[docs]def upgrade_file(path, multipoint):
    """Upgrade to the latest NRML version"""
    node0 = nrml.read(path)[0]
    shutil.copy(path, path + '.bak')  # make a backup of the original file
    tag = striptag(node0.tag)
    gml = True
    if tag == 'vulnerabilityModel':
        vf_dict, cat_dict = get_vulnerability_functions_04(path)
        # below I am converting into a NRML 0.5 vulnerabilityModel
        node0 = Node(
            'vulnerabilityModel', cat_dict,
            nodes=[obj_to_node(val) for val in vf_dict.values()])
        gml = False
    elif tag == 'fragilityModel':
        node0 = read_nrml.convert_fragility_model_04(
            nrml.read(path)[0], path)
        gml = False
    elif tag == 'sourceModel':
        node0 = nrml.read(path)[0]
        dic = groupby(node0.nodes, operator.itemgetter('tectonicRegion'))
        node0.nodes = [Node('sourceGroup',
                            dict(tectonicRegion=trt, name="group %s" % i),
                            nodes=srcs)
                       for i, (trt, srcs) in enumerate(dic.items(), 1)]
        if multipoint:
            sourceconverter.update_source_model(node0, path + '.bak')
    with open(path, 'wb') as f:
        nrml.write([node0], f, gml=gml) 
# NB: this works only for migrations from NRML version 0.4 to 0.5
# we will implement a more general solution when we will need to pass
# to version 0.6
[docs]def main(directory, dry_run=False, multipoint=False):
    """
    Upgrade all the NRML files contained in the given directory to the latest
    NRML version. Works by walking all subdirectories.
    WARNING: there is no downgrade!
    """
    for cwd, dirs, files in os.walk(directory):
        for f in files:
            path = os.path.join(cwd, f)
            if f.endswith('.xml'):
                ip = iterparse(path, events=('start',))
                next(ip)  # read node zero
                try:
                    fulltag = next(ip)[1].tag  # tag of the first node
                    xmlns, tag = fulltag.split('}')
                except Exception:  # not a NRML file
                    xmlns, tag = '', ''
                if xmlns[1:] == NRML05:  # already upgraded
                    if 'sourceModel' in tag and multipoint:
                        print('upgrading to multiPointSources', path)
                        node0 = nrml.read(path)[0]
                        sourceconverter.update_source_model(node0, path)
                        with open(path, 'wb') as f:
                            nrml.write([node0], f, gml=True)
                elif 'nrml/0.4' in xmlns and (
                        'vulnerability' in tag or 'fragility' in tag or
                        'sourceModel' in tag):
                    if not dry_run:
                        print('Upgrading', path)
                        try:
                            upgrade_file(path, multipoint)
                        except Exception as exc:
                            raise
                            print(exc)
                    else:
                        print('Not upgrading', path) 
main.directory = 'directory to consider'
main.dry_run = 'test the upgrade without replacing the files'
main.multipoint = 'replace PointSources with MultiPointSources'