# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2019-2020, GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.importosimportgetpassimportloggingimportitertoolsimportnumpyimportpandasfromopenquake.baselibimportgeneral,parallel,python3compatfromopenquake.hazardlib.statsimportweighted_quantilesfromopenquake.risklibimportasset,scientific,reinsurancefromopenquake.commonlibimportdatastore,logsfromopenquake.calculatorsimportbase,viewsfromopenquake.calculators.baseimportexpose_outputsU8=numpy.uint8F32=numpy.float32F64=numpy.float64U16=numpy.uint16U32=numpy.uint32
[docs]deffix_investigation_time(oq,dstore):""" If starting from GMFs, fix oq.investigation_time. :returns: the number of hazard realizations """R=len(dstore['weights'])if'gmfs'inoq.inputsandnotoq.investigation_time:attrs=dstore['gmf_data'].attrsinv_time=attrs['investigation_time']eff_time=attrs['effective_time']ifinv_time:# is zero in scenariosoq.investigation_time=inv_timeoq.ses_per_logic_tree_path=eff_time/(oq.investigation_time*R)returnR
[docs]defsave_curve_stats(dstore):""" Save agg_curves-stats """oq=dstore['oqparam']units=dstore['exposure'].cost_calculator.get_units(oq.loss_types)try:K=len(dstore['agg_keys'])exceptKeyError:K=0stats=oq.hazard_stats()S=len(stats)weights=dstore['weights'][:]aggcurves_df=dstore.read_df('aggcurves')periods=aggcurves_df.return_period.unique()P=len(periods)ep_fields=[]if'loss'inaggcurves_df:ep_fields=['loss']if'loss_aep'inaggcurves_df:ep_fields.append('loss_aep')if'loss_oep'inaggcurves_df:ep_fields.append('loss_oep')EP=len(ep_fields)forltinoq.ext_loss_types:loss_id=scientific.LOSSID[lt]out=numpy.zeros((K+1,S,P,EP))aggdf=aggcurves_df[aggcurves_df.loss_id==loss_id]foragg_id,dfinaggdf.groupby("agg_id"):fors,statinenumerate(stats.values()):forpinrange(P):fore,ep_fieldinenumerate(ep_fields):dfp=df[df.return_period==periods[p]]ws=weights[dfp.rlz_id.to_numpy()]ws/=ws.sum()out[agg_id,s,p,e]=stat(dfp[ep_field].to_numpy(),ws)stat='agg_curves-stats/'+ltdstore.create_dset(stat,F64,(K+1,S,P,EP))dstore.set_shape_descr(stat,agg_id=K+1,stat=list(stats),return_period=periods,ep_fields=ep_fields)dstore.set_attrs(stat,units=units)dstore[stat][:]=out
[docs]defreagg_idxs(num_tags,tagnames):""" :param num_tags: dictionary tagname -> number of tags with that tagname :param tagnames: subset of tagnames of interest :returns: T = T1 x ... X TN indices with repetitions Reaggregate indices. Consider for instance a case with 3 tagnames, taxonomy (4 tags), region (3 tags) and country (2 tags): >>> num_tags = dict(taxonomy=4, region=3, country=2) There are T = T1 x T2 x T3 = 4 x 3 x 2 = 24 combinations. The function will return 24 reaggregated indices with repetions depending on the selected subset of tagnames. For instance reaggregating by taxonomy and region would give: >>> list(reagg_idxs(num_tags, ['taxonomy', 'region'])) # 4x3 [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10, 11, 11] Reaggregating by taxonomy and country would give: >>> list(reagg_idxs(num_tags, ['taxonomy', 'country'])) # 4x2 [0, 1, 0, 1, 0, 1, 2, 3, 2, 3, 2, 3, 4, 5, 4, 5, 4, 5, 6, 7, 6, 7, 6, 7] Reaggregating by region and country would give: >>> list(reagg_idxs(num_tags, ['region', 'country'])) # 3x2 [0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5] Here is an example of single tag aggregation: >>> list(reagg_idxs(num_tags, ['taxonomy'])) # 4 [0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3] """shape=list(num_tags.values())T=numpy.prod(shape)arr=numpy.arange(T).reshape(shape)ranges=[numpy.arange(n)iftintagnameselse[slice(None)]fort,ninnum_tags.items()]fori,idxinenumerate(itertools.product(*ranges)):arr[idx]=ireturnarr.flatten()
[docs]defget_loss_builder(dstore,oq,return_periods=None,loss_dt=None,num_events=None):""" :param dstore: datastore for an event based risk calculation :returns: a LossCurvesMapsBuilder instance or a Mock object for scenarios """assertoq.investigation_timeweights=dstore['weights'][()]haz_time=oq.investigation_time*oq.ses_per_logic_tree_path*(len(weights)ifoq.collect_rlzselse1)ifoq.collect_rlzs:try:etime=dstore['gmf_data'].attrs['effective_time']exceptKeyError:etime=Nonehaz_time=(oq.investigation_time*oq.ses_per_logic_tree_path*len(weights))ifetimeandetime!=haz_time:raiseValueError('The effective time stored in gmf_data is %d, ''which is inconsistent with %d'%(etime,haz_time))num_events=numpy.array([len(dstore['events'])])weights=numpy.ones(1)else:haz_time=oq.investigation_time*oq.ses_per_logic_tree_pathifnum_eventsisNone:num_events=numpy.bincount(dstore['events']['rlz_id'],minlength=len(weights))max_events=num_events.max()periods=return_periodsoroq.return_periodsorscientific.return_periods(haz_time,max_events)# in case_master [1, 2, 5, 10]if'post_loss_amplification'inoq.inputs:pla_factor=scientific.pla_factor(dstore.read_df('post_loss_amplification'))else:pla_factor=Nonereturnscientific.LossCurvesMapsBuilder(oq.conditional_loss_poes,numpy.array(periods),loss_dtoroq.loss_dt(),weights,haz_time,oq.risk_investigation_timeoroq.investigation_time,pla_factor=pla_factor)
[docs]defget_src_loss_table(dstore,loss_id):""" :returns: (source_ids, array of losses of shape Ns) """K=dstore['risk_by_event'].attrs.get('K',0)alt=dstore.read_df('risk_by_event','agg_id',dict(agg_id=K,loss_id=loss_id))iflen(alt)==0:# no losses for this loss typereturn[],()ws=dstore['weights'][:]events=dstore['events'][:]ruptures=dstore['ruptures'][:]source_id=dstore['source_info']['source_id']eids=alt.event_id.to_numpy()evs=events[eids]rlz_ids=evs['rlz_id']srcidx=dict(ruptures[['id','source_id']])srcids=[srcidx[rup_id]forrup_idinevs['rup_id']]srcs=python3compat.decode(source_id[srcids])acc=general.AccumDict(accum=0)forsrc,rlz_id,lossinzip(srcs,rlz_ids,alt.loss.to_numpy()):acc[src]+=loss*ws[rlz_id]returnzip(*sorted(acc.items()))
[docs]deffix_dtypes(dic):""" Fix the dtypes of the given columns inside a dictionary (to be called before conversion to a DataFrame) """fix_dtype(dic,U32,['agg_id'])fix_dtype(dic,U8,['loss_id'])if'event_id'indic:fix_dtype(dic,U32,['event_id'])if'rlz_id'indic:fix_dtype(dic,U16,['rlz_id'])if'return_period'indic:fix_dtype(dic,U32,['return_period'])floatcolumns=[colforcolindicifcolnotin{'agg_id','loss_id','event_id','rlz_id','return_period'}]fix_dtype(dic,F32,floatcolumns)
[docs]defbuild_aggcurves(items,builder,num_events,aggregate_loss_curves_types,monitor):""" :param items: a list of pairs ((agg_id, rlz_id, loss_id), losses) :param builder: a :class:`LossCurvesMapsBuilder` instance """dic=general.AccumDict(accum=[])for(agg_id,rlz_id,loss_id),datainitems:year=data.pop('year',())curve={col:builder.build_curve(# col is 'losses' in the case of consequencesyear,'loss'ifcol=='losses'elsecol,data[col],aggregate_loss_curves_types,scientific.LOSSTYPE[loss_id],num_events[rlz_id])forcolindata}forp,periodinenumerate(builder.return_periods):dic['agg_id'].append(agg_id)dic['rlz_id'].append(rlz_id)dic['loss_id'].append(loss_id)dic['return_period'].append(period)forcolindata:# NB: 'fatalities' in EventBasedDamageTestCase.test_case_15fork,cincurve[col].items():dic[k].append(c[p])returndic
# launch Starmap building the aggcurves and store them
[docs]defstore_aggcurves(oq,agg_ids,rbe_df,builder,loss_cols,events,num_events,dstore):aggtypes=oq.aggregate_loss_curves_typeslogging.info('Building aggcurves')units=dstore['exposure'].cost_calculator.get_units(oq.loss_types)try:year=events['year']iflen(numpy.unique(year))==1:# there is a single yearyear=()exceptValueError:# missing in case of GMFs from CSVyear=()items=[]foragg_idinagg_ids:gb=rbe_df[rbe_df.agg_id==agg_id].groupby(['rlz_id','loss_id'])for(rlz_id,loss_id),dfingb:data={col:df[col].to_numpy()forcolinloss_cols}iflen(year):data['year']=year[df.event_id.to_numpy()]items.append([(agg_id,rlz_id,loss_id),data])dstore.swmr_on()dic=parallel.Starmap.apply(build_aggcurves,(items,builder,num_events,aggtypes),concurrent_tasks=oq.concurrent_tasks,h5=dstore.hdf5).reduce()fix_dtypes(dic)suffix={'ep':'','aep':'_aep','oep':'_oep'}ep_fields=['loss'+suffix[a]forainaggtypes.split(', ')]dstore.create_df('aggcurves',pandas.DataFrame(dic),limit_states=' '.join(oq.limit_states),units=units,ep_fields=ep_fields)
[docs]defcompute_aggrisk(dstore,oq,rbe_df,num_events,agg_ids):""" Compute the aggrisk DataFrame with columns agg_id, rlz_id, loss_id, loss """L=len(oq.loss_types)weights=dstore['weights'][:]ifoq.investigation_time:# event basedtr=oq.time_ratio# (risk_invtime / haz_invtime) * num_sesifoq.collect_rlzs:# reduce the time ratio by the number of rlzstr/=len(weights)columns=[colforcolinrbe_df.columnsifcolnotin{'event_id','agg_id','rlz_id','loss_id','variance'}]ifoq.investigation_timeisNoneorall(col.startswith('dmg_')forcolincolumns):builder=FakeBuilder()else:builder=get_loss_builder(dstore,oq,num_events=num_events)dmgs=[colforcolincolumnsifcol.startswith('dmg_')]ifdmgs:aggnumber=dstore['agg_values']['number']acc=general.AccumDict(accum=[])quantiles=general.AccumDict(accum=([],[]))foragg_idinagg_ids:gb=rbe_df[rbe_df.agg_id==agg_id].groupby(['rlz_id','loss_id'])for(rlz_id,loss_id),dfingb:ne=num_events[rlz_id]acc['agg_id'].append(agg_id)acc['rlz_id'].append(rlz_id)acc['loss_id'].append(loss_id)ifdmgs:# infer the number of buildings in nodamage statendamaged=sum(df[col].sum()forcolindmgs)dmg0=aggnumber[agg_id]-ndamaged/(ne*L)assertdmg0>=0,dmg0acc['dmg_0'].append(dmg0)forcolincolumns:losses=df[col].sort_values().to_numpy()sorted_losses,_,eperiods=scientific.fix_losses(losses,ne,builder.eff_time)ifoq.quantilesandnotcol.startswith('dmg_'):ls,ws=quantiles[agg_id,loss_id,col]ls.extend(sorted_losses)ws.extend([weights[rlz_id]]*len(sorted_losses))agg=sorted_losses.sum()acc[col].append(agg*trifoq.investigation_timeelseagg/ne)ifbuilder.pla_factor:agg=sorted_losses@builder.pla_factor(eperiods)acc['pla_'+col].append(agg*trifoq.investigation_timeelseagg/ne)fix_dtypes(acc)aggrisk=pandas.DataFrame(acc)out=general.AccumDict(accum=[])ifquantiles:for(agg_id,loss_id,col),(losses,ws)inquantiles.items():qs=weighted_quantiles(oq.quantiles,losses,ws)out['agg_id'].append(agg_id)out['loss_id'].append(loss_id)forq,qvalueinzip(oq.quantiles,qs):qstring=('%.2f'%q)[2:]# ie. '05' or '95'out[f'{col}q{qstring}'].append(qvalue)aggrisk_quantiles=pandas.DataFrame(out)returnaggrisk,aggrisk_quantiles,columns,builder
# aggcurves are built in parallel, aggrisk sequentially
[docs]defbuild_store_agg(dstore,oq,rbe_df,num_events):""" Build the aggrisk and aggcurves tables from the risk_by_event table """size=dstore.getsize('risk_by_event')logging.info('Building aggrisk from %s of risk_by_event',general.humansize(size))rups=len(dstore['ruptures'])events=dstore['events'][:]rlz_id=events['rlz_id']rup_id=events['rup_id']iflen(num_events)>1:rbe_df['rlz_id']=rlz_id[rbe_df.event_id.to_numpy()]else:rbe_df['rlz_id']=0agg_ids=rbe_df.agg_id.unique()K=agg_ids.max()T=scientific.LOSSID[oq.total_lossesor'structural']logging.info("Performing %d aggregations",len(agg_ids))aggrisk,aggrisk_quantiles,columns,builder=compute_aggrisk(dstore,oq,rbe_df,num_events,agg_ids)dstore.create_df('aggrisk',aggrisk,limit_states=' '.join(oq.limit_states))iflen(aggrisk_quantiles):dstore.create_df('aggrisk_quantiles',aggrisk_quantiles)loss_cols=[colforcolincolumnsifnotcol.startswith('dmg_')]foragg_idinagg_ids:# build loss_by_event and loss_by_ruptureifagg_id==Kand('loss'incolumnsor'losses'incolumns)andrups:df=rbe_df[(rbe_df.agg_id==K)&(rbe_df.loss_id==T)].copy()iflen(df):df['rup_id']=rup_id[df.event_id.to_numpy()]if'losses'incolumns:# for consequencesdf['loss']=df['losses']lbe_df=df[['event_id','loss']].sort_values('loss',ascending=False)gb=df[['rup_id','loss']].groupby('rup_id')rbr_df=gb.sum().sort_values('loss',ascending=False)dstore.create_df('loss_by_rupture',rbr_df.reset_index())dstore.create_df('loss_by_event',lbe_df)ifoq.investigation_timeandloss_cols:store_aggcurves(oq,agg_ids,rbe_df,builder,loss_cols,events,num_events,dstore)returnaggrisk
[docs]defbuild_reinsurance(dstore,oq,num_events):""" Build and store the tables `reinsurance-avg_policy` and `reinsurance-avg_portfolio`; for event_based, also build the `reinsurance-aggcurves` table. """size=dstore.getsize('reinsurance-risk_by_event')logging.info('Building reinsurance-aggcurves from %s of ''reinsurance-risk_by_event',general.humansize(size))ifoq.investigation_time:tr=oq.time_ratio# risk_invtime / (haz_invtime * num_ses)ifoq.collect_rlzs:# reduce the time ratio by the number of rlzstr/=len(dstore['weights'])events=dstore['events'][:]rlz_id=events['rlz_id']try:year=events['year']iflen(numpy.unique(year))==1:# there is a single yearyear=()exceptValueError:# missing in case of GMFs from CSVyear=()rbe_df=dstore.read_df('reinsurance-risk_by_event','event_id')columns=rbe_df.columnsiflen(num_events)>1:rbe_df['rlz_id']=rlz_id[rbe_df.index.to_numpy()]else:rbe_df['rlz_id']=0builder=(get_loss_builder(dstore,oq,num_events=num_events)ifoq.investigation_timeelseFakeBuilder())avg=general.AccumDict(accum=[])dic=general.AccumDict(accum=[])forrlzid,dfinrbe_df.groupby('rlz_id'):ne=num_events[rlzid]avg['rlz_id'].append(rlzid)forcolincolumns:agg=df[col].sum()avg[col].append(agg*trifoq.investigation_timeelseagg/ne)ifoq.investigation_time:iflen(year):years=year[df.index.to_numpy()]else:years=()curve={col:builder.build_curve(years,col,df[col].to_numpy(),oq.aggregate_loss_curves_types,'reinsurance',ne)forcolincolumns}forp,periodinenumerate(builder.return_periods):dic['rlz_id'].append(rlzid)dic['return_period'].append(period)forcolincurve:fork,cincurve[col].items():dic[k].append(c[p])cc=dstore['exposure'].cost_calculatordstore.create_df('reinsurance-avg_portfolio',pandas.DataFrame(avg),units=cc.get_units(oq.loss_types))# aggrisk by policyavg=general.AccumDict(accum=[])rbp_df=dstore.read_df('reinsurance_by_policy')iflen(num_events)>1:rbp_df['rlz_id']=rlz_id[rbp_df.event_id.to_numpy()]else:rbp_df['rlz_id']=0columns=[colforcolinrbp_df.columnsifcolnotin{'event_id','policy_id','rlz_id'}]for(rlz_id,policy_id),dfinrbp_df.groupby(['rlz_id','policy_id']):ne=num_events[rlz_id]avg['rlz_id'].append(rlz_id)avg['policy_id'].append(policy_id)forcolincolumns:agg=df[col].sum()avg[col].append(agg*trifoq.investigation_timeelseagg/ne)dstore.create_df('reinsurance-avg_policy',pandas.DataFrame(avg),units=cc.get_units(oq.loss_types))ifoq.investigation_timeisNone:returndic['return_period']=F32(dic['return_period'])dic['rlz_id']=U16(dic['rlz_id'])dstore.create_df('reinsurance-aggcurves',pandas.DataFrame(dic),units=cc.get_units(oq.loss_types))
[docs]@base.calculators.add('post_risk')classPostRiskCalculator(base.RiskCalculator):""" Compute losses and loss curves starting from an event loss table. """
[docs]defpre_execute(self):oq=self.oqparamds=self.datastoreself.reaggreate=Falseifoq.hazard_calculation_idandnotds.parent:ds.parent=datastore.read(oq.hazard_calculation_id)ifnothasattr(self,'assetcol'):self.assetcol=ds.parent['assetcol']base.save_agg_values(ds,self.assetcol,oq.loss_types,oq.aggregate_by,oq.max_aggregations)aggby=ds.parent['oqparam'].aggregate_byself.reaggreate=(aggbyandoq.aggregate_byandset(oq.aggregate_by[0])<set(aggby[0]))ifself.reaggreate:[names]=aggbyself.num_tags=dict(zip(names,self.assetcol.tagcol.agg_shape(names)))self.L=len(oq.loss_types)ifself.R>1:self.num_events=numpy.bincount(ds['events']['rlz_id'],minlength=self.R)# events by rlzelse:self.num_events=numpy.array([len(ds['events'])])
[docs]defexecute(self):oq=self.oqparamR=fix_investigation_time(oq,self.datastore)ifoq.investigation_time:eff_time=oq.investigation_time*oq.ses_per_logic_tree_path*Rif'reinsurance'inoq.inputs:logging.warning('Reinsurance calculations are still experimental')self.policy_df=self.datastore.read_df('policy')self.treaty_df=self.datastore.read_df('treaty_df')# there must be a single loss type (possibly a total type)ideduc=self.datastore['assetcol/array']['ideductible'].any()if(oq.total_lossesorlen(oq.loss_types)==1)andideduc:# claim already computed and present in risk_by_eventlt='claim'else:# claim to be computed from the policies[lt]=oq.inputs['reinsurance']loss_id=scientific.LOSSID[lt]parent=self.datastore.parentifparentand'risk_by_event'inparent:dstore=parentelse:dstore=self.datastorect=oq.concurrent_tasksor1# now aggregate risk_by_event by policyallargs=[(dstore,pdf,self.treaty_df,loss_id)forpdfinnumpy.array_split(self.policy_df,ct)]self.datastore.swmr_on()smap=parallel.Starmap(reinsurance.reins_by_policy,allargs,h5=self.datastore.hdf5)rbp=pandas.concat(list(smap))iflen(rbp)==0:raiseValueError('No data in risk_by_event for %r'%lt)rbe=reinsurance.by_event(rbp,self.treaty_df,self._monitor)self.datastore.create_df('reinsurance_by_policy',rbp)self.datastore.create_df('reinsurance-risk_by_event',rbe)ifoq.investigation_timeandoq.return_periods!=[0]:# setting return_periods = 0 disable loss curvesifeff_time<2:logging.warning('eff_time=%s is too small to compute loss curves',eff_time)returnlogging.info('Aggregating by %s',oq.aggregate_by)if'source_info'inself.datastoreand'risk'inoq.calculation_mode:logging.info('Building the src_loss_table')withself.monitor('src_loss_table',measuremem=True):forloss_typeinoq.loss_types:source_ids,losses=get_src_loss_table(self.datastore,scientific.LOSSID[loss_type])self.datastore['src_loss_table/'+loss_type]=lossesself.datastore.set_shape_descr('src_loss_table/'+loss_type,source=source_ids)K=len(self.datastore['agg_keys'])ifoq.aggregate_byelse0rbe_df=self.datastore.read_df('risk_by_event')iflen(rbe_df)==0:logging.warning('The risk_by_event table is empty, perhaps the ''hazard is too small?')return0ifself.reaggreate:idxs=numpy.concatenate([reagg_idxs(self.num_tags,oq.aggregate_by[0]),numpy.array([K],int)])rbe_df['agg_id']=idxs[rbe_df['agg_id'].to_numpy()]rbe_df=rbe_df.groupby(['event_id','loss_id','agg_id']).sum().reset_index()self.aggrisk=build_store_agg(self.datastore,oq,rbe_df,self.num_events)if'reinsurance-risk_by_event'inself.datastore:build_reinsurance(self.datastore,oq,self.num_events)return1
[docs]defpost_execute(self,ok):""" Sanity checks and save agg_curves-stats """ifos.environ.get('OQ_APPLICATION_MODE')=='ARISTOTLE':try:self._plot_assets()exceptException:logging.error('',exc_info=True)ifnotok:# the hazard is to smallreturnoq=self.oqparamif'risk'inoq.calculation_mode:self.datastore['oqparam']=oqforlninself.oqparam.loss_types:li=scientific.LOSSID[ln]dloss=views.view('delta_loss:%d'%li,self.datastore)ifdloss['delta'].mean()>.1:# more than 10% variationlogging.warning('A big variation in the %s losses is expected: try''\n$ oq show delta_loss:%d%d',ln,li,self.datastore.calc_id)logging.info('Sanity check on avg_losses and aggrisk')if'avg_losses-rlzs'inset(self.datastore):url=('https://docs.openquake.org/oq-engine/advanced/''addition-is-non-associative.html')K=len(self.datastore['agg_keys'])ifoq.aggregate_byelse0aggrisk=self.aggrisk[self.aggrisk.agg_id==K]avg_losses={lt:self.datastore['avg_losses-rlzs/'+lt][:].sum(axis=0)forltinoq.loss_types}# shape (R, L)for_,rowinaggrisk.iterrows():ri,li=int(row.rlz_id),int(row.loss_id)lt=scientific.LOSSTYPE[li]ifltnotinavg_losses:continue# check on the sum of the average lossesavg=avg_losses[lt][ri]agg=row.lossifnotnumpy.allclose(avg,agg,rtol=.1):# a serious discrepancy is an errorraiseValueError("agg != sum(avg) [%s]: %s%s"%(lt,agg,avg))ifnotnumpy.allclose(avg,agg,rtol=.001):# a small discrepancy is expectedlogging.warning('Due to rounding errors inherent in floating-point ''arithmetic, agg_losses != sum(avg_losses) [%s]: ''%s != %s\nsee %s',lt,agg,avg,url)# save agg_curves-statsifself.R>1and'aggcurves'inself.datastore:save_curve_stats(self.datastore)
[docs]defpost_aggregate(calc_id:int,aggregate_by):""" Re-run the postprocessing after an event based risk calculation """parent=datastore.read(calc_id)oqp=parent['oqparam']aggby=aggregate_by.split(',')parent_tags=asset.tagset(oqp.aggregate_by)ifaggbyandnotparent_tags:raiseValueError('Cannot reaggregate from a parent calculation ''without aggregate_by')fortaginaggby:iftagnotinparent_tags:raiseValueError('%r not in %s'%(tag,oqp.aggregate_by[0]))dic=dict(calculation_mode='reaggregate',description=oqp.description+'[aggregate_by=%s]'%aggregate_by,user_name=getpass.getuser(),is_running=1,status='executing',pid=os.getpid(),hazard_calculation_id=calc_id)log=logs.init('job',dic,logging.INFO)ifos.environ.get('OQ_DISTRIBUTE')notin('no','processpool'):os.environ['OQ_DISTRIBUTE']='processpool'withlog:oqp.hazard_calculation_id=parent.calc_idparallel.Starmap.init()prc=PostRiskCalculator(oqp,log.calc_id)prc.run(aggregate_by=[aggby])expose_outputs(prc.datastore)