Source code for openquake.commonlib.sourceconverter

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.

from __future__ import division
# Copyright (C) 2015-2016 GEM Foundation
#
# OpenQuake is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenQuake is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with OpenQuake.  If not, see <http://www.gnu.org/licenses/>.

import math
import copy
import operator

from openquake.baselib.general import block_splitter
from openquake.baselib.performance import Monitor
from openquake.hazardlib import geo, mfd, pmf, source
from openquake.hazardlib.tom import PoissonTOM
from openquake.risklib import valid
from openquake.commonlib.node import context, striptag
from openquake.commonlib import parallel

# the following is arbitrary, it is used to decide when to parallelize
# the filtering (MS)
LOTS_OF_SOURCES_SITES = 1E5  # arbitrary, set by Michele Simionato
MAGNITUDE_FOR_RUPTURE_SPLITTING = 6.5  # given by Marco Pagani
# NB: the parameter MAGNITUDE_FOR_RUPTURE_SPLITTING cannot go in a
# configuration file, otherwise the tests will break by changing it;
# reason: the numbers in the event based calculators depend on the
# splitting: different sources => different seeds => different numbers
POINT_SOURCE_WEIGHT = 1 / 40.


[docs]def get_set_num_ruptures(src): """ Extract the number of ruptures and set it """ if not src.num_ruptures: src.num_ruptures = src.count_ruptures() return src.num_ruptures
[docs]def area_to_point_sources(area_src): """ Split an area source into a generator of point sources. MFDs will be rescaled appropriately for the number of points in the area mesh. :param area_src: :class:`openquake.hazardlib.source.AreaSource` """ mesh = area_src.polygon.discretize(area_src.area_discretization) num_points = len(mesh) area_mfd = area_src.mfd if isinstance(area_mfd, mfd.TruncatedGRMFD): new_a_val = math.log10(10 ** area_mfd.a_val / float(num_points)) new_mfd = mfd.TruncatedGRMFD( a_val=new_a_val, b_val=area_mfd.b_val, bin_width=area_mfd.bin_width, min_mag=area_mfd.min_mag, max_mag=area_mfd.max_mag) elif isinstance(area_mfd, mfd.EvenlyDiscretizedMFD): new_occur_rates = [float(x) / num_points for x in area_mfd.occurrence_rates] new_mfd = mfd.EvenlyDiscretizedMFD( min_mag=area_mfd.min_mag, bin_width=area_mfd.bin_width, occurrence_rates=new_occur_rates) elif isinstance(area_mfd, mfd.ArbitraryMFD): new_occur_rates = [float(x) / num_points for x in area_mfd.occurrence_rates] new_mfd = mfd.ArbitraryMFD( magnitudes=area_mfd.magnitudes, occurrence_rates=new_occur_rates) for i, (lon, lat) in enumerate(zip(mesh.lons, mesh.lats)): pt = source.PointSource( # Generate a new ID and name source_id='%s-%s' % (area_src.source_id, i), name='%s-%s' % (area_src.name, i), tectonic_region_type=area_src.tectonic_region_type, mfd=new_mfd, rupture_mesh_spacing=area_src.rupture_mesh_spacing, magnitude_scaling_relationship= area_src.magnitude_scaling_relationship, rupture_aspect_ratio=area_src.rupture_aspect_ratio, upper_seismogenic_depth=area_src.upper_seismogenic_depth, lower_seismogenic_depth=area_src.lower_seismogenic_depth, location=geo.Point(lon, lat), nodal_plane_distribution=area_src.nodal_plane_distribution, hypocenter_distribution=area_src.hypocenter_distribution, temporal_occurrence_model=area_src.temporal_occurrence_model) pt.trt_model_id = area_src.trt_model_id pt.num_ruptures = pt.count_ruptures() yield pt
[docs]def split_fault_source_by_magnitude(src): """ Utility splitting a fault source into fault sources with a single magnitude bin. :param src: an instance of :class:`openquake.hazardlib.source.base.SeismicSource` """ splitlist = [] i = 0 for mag, rate in src.mfd.get_annual_occurrence_rates(): if not rate: # ignore zero occurency rate continue new_src = copy.copy(src) new_src.source_id = '%s-%s' % (src.source_id, i) new_src.mfd = mfd.EvenlyDiscretizedMFD( min_mag=mag, bin_width=src.mfd.bin_width, occurrence_rates=[rate]) i += 1 splitlist.append(new_src) return splitlist
[docs]def split_fault_source(src, block_size): """ Generator splitting a fault source into several fault sources. :param src: an instance of :class:`openquake.hazardlib.source.base.SeismicSource` """ # NB: the splitting is tricky; if you don't split, you will not # take advantage of the multiple cores; if you split too much, # the data transfer will kill you, i.e. multiprocessing/celery # will fail to transmit to the workers the generated sources. for s in split_fault_source_by_magnitude(src): if s.mfd.min_mag < MAGNITUDE_FOR_RUPTURE_SPLITTING: s.num_ruptures = s.count_ruptures() yield s # don't split, there would too many ruptures else: # split in MultiRuptureSources for ss in MultiRuptureSource.split(s, block_size): yield ss
[docs]class MultiRuptureSource(object): """ Fake source class used to encapsule a set of ruptures. :param rupture: an instance of :class:`openquake.hazardlib.source.rupture. ParametricProbabilisticRupture` :param source_id: an ID for the MultiRuptureSource :param tectonic_region_type: the tectonic region type :param trt_model_id: ID of the tectonic region model the source belongs to """ @classmethod
[docs] def split(cls, src, block_size): """ Split the given fault source into MultiRuptureSources depending on the given block size. """ for i, ruptures in enumerate( block_splitter(src.iter_ruptures(), block_size)): yield cls(ruptures, '%s-%s' % (src.source_id, i), src.tectonic_region_type, src.trt_model_id)
def __init__(self, ruptures, source_id, tectonic_region_type, trt_model_id): self.ruptures = ruptures self.source_id = source_id self.tectonic_region_type = tectonic_region_type self.trt_model_id = trt_model_id self.weight = self.num_ruptures = len(ruptures)
[docs] def iter_ruptures(self): """Yield the ruptures""" for rupture in self.ruptures: yield rupture
[docs] def count_ruptures(self): """Return the block size""" return len(self.ruptures)
[docs] def filter_sites_by_distance_to_source(self, maxdist, sitecol): """The source has been already filtered, return the sitecol""" return sitecol
[docs]def split_source(src, block_size=1): """ Split an area source into point sources and a fault sources into smaller fault sources. :param src: an instance of :class:`openquake.hazardlib.source.base.SeismicSource` """ if isinstance(src, source.AreaSource): for s in area_to_point_sources(src): s.id = src.id yield s elif isinstance( src, (source.SimpleFaultSource, source.ComplexFaultSource)): for s in split_fault_source(src, block_size): s.id = src.id yield s else: # characteristic and nonparametric sources are not split # since they are small anyway yield src
[docs]def split_coords_2d(seq): """ :param seq: a flat list with lons and lats :returns: a validated list of pairs (lon, lat) >>> split_coords_2d([1.1, 2.1, 2.2, 2.3]) [(1.1, 2.1), (2.2, 2.3)] """ lons, lats = [], [] for i, el in enumerate(seq): if i % 2 == 0: lons.append(valid.longitude(el)) elif i % 2 == 1: lats.append(valid.latitude(el)) return list(zip(lons, lats))
[docs]def split_coords_3d(seq): """ :param seq: a flat list with lons, lats and depths :returns: a validated list of (lon, lat, depths) triplets >>> split_coords_3d([1.1, 2.1, 0.1, 2.3, 2.4, 0.1]) [(1.1, 2.1, 0.1), (2.3, 2.4, 0.1)] """ lons, lats, depths = [], [], [] for i, el in enumerate(seq): if i % 3 == 0: lons.append(valid.longitude(el)) elif i % 3 == 1: lats.append(valid.latitude(el)) elif i % 3 == 2: depths.append(valid.depth(el)) return list(zip(lons, lats, depths))
[docs]class RuptureConverter(object): """ Convert ruptures from nodes into Hazardlib ruptures. """ fname = None # should be set externally def __init__(self, rupture_mesh_spacing, complex_fault_mesh_spacing=None): self.rupture_mesh_spacing = rupture_mesh_spacing self.complex_fault_mesh_spacing = ( complex_fault_mesh_spacing or rupture_mesh_spacing)
[docs] def convert_node(self, node): """ Convert the given rupture node into a hazardlib rupture, depending on the node tag. :param node: a node representing a rupture """ with context(self.fname, node): convert_rupture = getattr(self, 'convert_' + striptag(node.tag)) mag = ~node.magnitude rake = ~node.rake h = node.hypocenter hypocenter = geo.Point(h['lon'], h['lat'], h['depth']) return convert_rupture(node, mag, rake, hypocenter)
[docs] def geo_line(self, edge): """ Utility function to convert a node of kind edge into a :class:`openquake.hazardlib.geo.Line` instance. :param edge: a node describing an edge """ with context(self.fname, edge.LineString.posList) as plist: coords = split_coords_2d(~plist) return geo.Line([geo.Point(*p) for p in coords])
[docs] def geo_lines(self, edges): """ Utility function to convert a list of edges into a list of :class:`openquake.hazardlib.geo.Line` instances. :param edge: a node describing an edge """ lines = [] for edge in edges: with context(self.fname, edge): coords = split_coords_3d(~edge.LineString.posList) lines.append(geo.Line([geo.Point(*p) for p in coords])) return lines
[docs] def geo_planar(self, surface): """ Utility to convert a PlanarSurface node with subnodes topLeft, topRight, bottomLeft, bottomRight into a :class:`openquake.hazardlib.geo.PlanarSurface` instance. :param surface: PlanarSurface node """ with context(self.fname, surface): tl = surface.topLeft top_left = geo.Point(tl['lon'], tl['lat'], tl['depth']) tr = surface.topRight top_right = geo.Point(tr['lon'], tr['lat'], tr['depth']) bl = surface.bottomLeft bottom_left = geo.Point(bl['lon'], bl['lat'], bl['depth']) br = surface.bottomRight bottom_right = geo.Point(br['lon'], br['lat'], br['depth']) return geo.PlanarSurface.from_corner_points( self.rupture_mesh_spacing, top_left, top_right, bottom_right, bottom_left)
[docs] def convert_surfaces(self, surface_nodes): """ Utility to convert a list of surface nodes into a single hazardlib surface. There are three possibilities: 1. there is a single simpleFaultGeometry node; returns a :class:`openquake.hazardlib.geo.simpleFaultSurface` instance 2. there is a single complexFaultGeometry node; returns a :class:`openquake.hazardlib.geo.complexFaultSurface` instance 3. there is a list of PlanarSurface nodes; returns a :class:`openquake.hazardlib.geo.MultiSurface` instance :param surface_nodes: surface nodes as just described """ surface_node = surface_nodes[0] if surface_node.tag.endswith('simpleFaultGeometry'): surface = geo.SimpleFaultSurface.from_fault_data( self.geo_line(surface_node), ~surface_node.upperSeismoDepth, ~surface_node.lowerSeismoDepth, ~surface_node.dip, self.rupture_mesh_spacing) elif surface_node.tag.endswith('complexFaultGeometry'): surface = geo.ComplexFaultSurface.from_fault_data( self.geo_lines(surface_node), self.complex_fault_mesh_spacing) else: # a collection of planar surfaces planar_surfaces = list(map(self.geo_planar, surface_nodes)) surface = geo.MultiSurface(planar_surfaces) return surface
[docs] def convert_simpleFaultRupture(self, node, mag, rake, hypocenter): """ Convert a simpleFaultRupture node. :param node: the rupture node :param mag: the rupture magnitude :param rake: the rupture rake angle :param hypocenter: the rupture hypocenter """ with context(self.fname, node): surfaces = [node.simpleFaultGeometry] rupt = source.rupture.Rupture( mag=mag, rake=rake, tectonic_region_type=None, hypocenter=hypocenter, surface=self.convert_surfaces(surfaces), source_typology=source.SimpleFaultSource, surface_nodes=surfaces) return rupt
[docs] def convert_complexFaultRupture(self, node, mag, rake, hypocenter): """ Convert a complexFaultRupture node. :param node: the rupture node :param mag: the rupture magnitude :param rake: the rupture rake angle :param hypocenter: the rupture hypocenter """ with context(self.fname, node): surfaces = [node.complexFaultGeometry] rupt = source.rupture.Rupture( mag=mag, rake=rake, tectonic_region_type=None, hypocenter=hypocenter, surface=self.convert_surfaces(surfaces), source_typology=source.ComplexFaultSource, surface_nodes=surfaces) return rupt
[docs] def convert_singlePlaneRupture(self, node, mag, rake, hypocenter): """ Convert a singlePlaneRupture node. :param node: the rupture node :param mag: the rupture magnitude :param rake: the rupture rake angle :param hypocenter: the rupture hypocenter """ with context(self.fname, node): surfaces = [node.planarSurface] rupt = source.rupture.Rupture( mag=mag, rake=rake, tectonic_region_type=None, hypocenter=hypocenter, surface=self.convert_surfaces(surfaces), source_typology=source.NonParametricSeismicSource, surface_nodes=surfaces) return rupt
[docs] def convert_multiPlanesRupture(self, node, mag, rake, hypocenter): """ Convert a multiPlanesRupture node. :param node: the rupture node :param mag: the rupture magnitude :param rake: the rupture rake angle :param hypocenter: the rupture hypocenter """ with context(self.fname, node): surfaces = list(node.getnodes('planarSurface')) rupt = source.rupture.Rupture( mag=mag, rake=rake, tectonic_region_type=None, hypocenter=hypocenter, surface=self.convert_surfaces(surfaces), source_typology=source.NonParametricSeismicSource, surface_nodes=surfaces) return rupt
[docs]class SourceConverter(RuptureConverter): """ Convert sources from valid nodes into Hazardlib objects. """ def __init__(self, investigation_time, rupture_mesh_spacing, complex_fault_mesh_spacing=None, width_of_mfd_bin=1.0, area_source_discretization=None): self.area_source_discretization = area_source_discretization self.rupture_mesh_spacing = rupture_mesh_spacing self.complex_fault_mesh_spacing = ( complex_fault_mesh_spacing or rupture_mesh_spacing) self.width_of_mfd_bin = width_of_mfd_bin self.tom = PoissonTOM(investigation_time)
[docs] def convert_node(self, node): """ Convert the given node into a hazardlib source, depending on the node tag. :param node: a node representing a source """ with context(self.fname, node): convert_source = getattr(self, 'convert_' + striptag(node.tag)) return convert_source(node)
[docs] def convert_mfdist(self, node): """ Convert the given node into a Magnitude-Frequency Distribution object. :param node: a node of kind incrementalMFD or truncGutenbergRichterMFD :returns: a :class:`openquake.hazardlib.mdf.EvenlyDiscretizedMFD.` or :class:`openquake.hazardlib.mdf.TruncatedGRMFD` instance """ with context(self.fname, node): [mfd_node] = [subnode for subnode in node if subnode.tag.endswith( ('incrementalMFD', 'truncGutenbergRichterMFD', 'arbitraryMFD', 'YoungsCoppersmithMFD'))] if mfd_node.tag.endswith('incrementalMFD'): return mfd.EvenlyDiscretizedMFD( min_mag=mfd_node['minMag'], bin_width=mfd_node['binWidth'], occurrence_rates=~mfd_node.occurRates) elif mfd_node.tag.endswith('truncGutenbergRichterMFD'): return mfd.TruncatedGRMFD( a_val=mfd_node['aValue'], b_val=mfd_node['bValue'], min_mag=mfd_node['minMag'], max_mag=mfd_node['maxMag'], bin_width=self.width_of_mfd_bin) elif mfd_node.tag.endswith('arbitraryMFD'): return mfd.ArbitraryMFD( magnitudes=~mfd_node.magnitudes, occurrence_rates=~mfd_node.occurRates) elif mfd_node.tag.endswith('YoungsCoppersmithMFD'): if "totalMomentRate" in mfd_node.attrib.keys(): # Return Youngs & Coppersmith from the total moment rate return mfd.YoungsCoppersmith1985MFD.from_total_moment_rate( min_mag=mfd_node["minMag"], b_val=mfd_node["bValue"], char_mag=mfd_node["characteristicMag"], total_moment_rate=mfd_node["totalMomentRate"], bin_width=mfd_node["binWidth"]) elif "characteristicRate" in mfd_node.attrib.keys(): # Return Youngs & Coppersmith from the total moment rate return mfd.YoungsCoppersmith1985MFD.\ from_characteristic_rate( min_mag=mfd_node["minMag"], b_val=mfd_node["bValue"], char_mag=mfd_node["characteristicMag"], char_rate=mfd_node["characteristicRate"], bin_width=mfd_node["binWidth"])
[docs] def convert_npdist(self, node): """ Convert the given node into a Nodal Plane Distribution. :param node: a nodalPlaneDist node :returns: a :class:`openquake.hazardlib.geo.NodalPlane` instance """ with context(self.fname, node): npdist = [] for np in node.nodalPlaneDist: prob, strike, dip, rake = ( np['probability'], np['strike'], np['dip'], np['rake']) npdist.append((prob, geo.NodalPlane(strike, dip, rake))) return pmf.PMF(npdist)
[docs] def convert_hpdist(self, node): """ Convert the given node into a probability mass function for the hypo depth distribution. :param node: a hypoDepthDist node :returns: a :class:`openquake.hazardlib.pmf.PMF` instance """ with context(self.fname, node): return pmf.PMF([~hd for hd in node.hypoDepthDist])
[docs] def convert_areaSource(self, node): """ Convert the given node into an area source object. :param node: a node with tag areaGeometry :returns: a :class:`openquake.hazardlib.source.AreaSource` instance """ geom = node.areaGeometry coords = split_coords_2d(~geom.Polygon.exterior.LinearRing.posList) polygon = geo.Polygon([geo.Point(*xy) for xy in coords]) msr = valid.SCALEREL[~node.magScaleRel]() area_discretization = geom.attrib.get( 'discretization', self.area_source_discretization) if area_discretization is None: raise ValueError( 'The source %r has no `discretization` parameter and the job.' 'ini file has no `area_source_discretization` parameter either' % node['id']) return source.AreaSource( source_id=node['id'], name=node['name'], tectonic_region_type=node['tectonicRegion'], mfd=self.convert_mfdist(node), rupture_mesh_spacing=self.rupture_mesh_spacing, magnitude_scaling_relationship=msr, rupture_aspect_ratio=~node.ruptAspectRatio, upper_seismogenic_depth=~geom.upperSeismoDepth, lower_seismogenic_depth=~geom.lowerSeismoDepth, nodal_plane_distribution=self.convert_npdist(node), hypocenter_distribution=self.convert_hpdist(node), polygon=polygon, area_discretization=area_discretization, temporal_occurrence_model=self.tom)
[docs] def convert_pointSource(self, node): """ Convert the given node into a point source object. :param node: a node with tag pointGeometry :returns: a :class:`openquake.hazardlib.source.PointSource` instance """ geom = node.pointGeometry lon_lat = ~geom.Point.pos msr = valid.SCALEREL[~node.magScaleRel]() return source.PointSource( source_id=node['id'], name=node['name'], tectonic_region_type=node['tectonicRegion'], mfd=self.convert_mfdist(node), rupture_mesh_spacing=self.rupture_mesh_spacing, magnitude_scaling_relationship=msr, rupture_aspect_ratio=~node.ruptAspectRatio, upper_seismogenic_depth=~geom.upperSeismoDepth, lower_seismogenic_depth=~geom.lowerSeismoDepth, location=geo.Point(*lon_lat), nodal_plane_distribution=self.convert_npdist(node), hypocenter_distribution=self.convert_hpdist(node), temporal_occurrence_model=self.tom)
[docs] def convert_simpleFaultSource(self, node): """ Convert the given node into a simple fault object. :param node: a node with tag areaGeometry :returns: a :class:`openquake.hazardlib.source.SimpleFaultSource` instance """ geom = node.simpleFaultGeometry msr = valid.SCALEREL[~node.magScaleRel]() fault_trace = self.geo_line(geom) mfd = self.convert_mfdist(node) with context(self.fname, node): try: hypo_list = valid.hypo_list(node.hypoList) except NameError: hypo_list = () try: slip_list = valid.slip_list(node.slipList) except NameError: slip_list = () simple = source.SimpleFaultSource( source_id=node['id'], name=node['name'], tectonic_region_type=node['tectonicRegion'], mfd=mfd, rupture_mesh_spacing=self.rupture_mesh_spacing, magnitude_scaling_relationship=msr, rupture_aspect_ratio=~node.ruptAspectRatio, upper_seismogenic_depth=~geom.upperSeismoDepth, lower_seismogenic_depth=~geom.lowerSeismoDepth, fault_trace=fault_trace, dip=~geom.dip, rake=~node.rake, temporal_occurrence_model=self.tom, hypo_list=hypo_list, slip_list=slip_list) return simple
[docs] def convert_complexFaultSource(self, node): """ Convert the given node into a complex fault object. :param node: a node with tag areaGeometry :returns: a :class:`openquake.hazardlib.source.ComplexFaultSource` instance """ geom = node.complexFaultGeometry edges = self.geo_lines(geom) mfd = self.convert_mfdist(node) msr = valid.SCALEREL[~node.magScaleRel]() with context(self.fname, node): cmplx = source.ComplexFaultSource( source_id=node['id'], name=node['name'], tectonic_region_type=node['tectonicRegion'], mfd=mfd, rupture_mesh_spacing=self.complex_fault_mesh_spacing, magnitude_scaling_relationship=msr, rupture_aspect_ratio=~node.ruptAspectRatio, edges=edges, rake=~node.rake, temporal_occurrence_model=self.tom) return cmplx
[docs] def convert_characteristicFaultSource(self, node): """ Convert the given node into a characteristic fault object. :param node: a node with tag areaGeometry :returns: a :class:`openquake.hazardlib.source.CharacteristicFaultSource` instance """ char = source.CharacteristicFaultSource( source_id=node['id'], name=node['name'], tectonic_region_type=node['tectonicRegion'], mfd=self.convert_mfdist(node), surface=self.convert_surfaces(node.surface), rake=~node.rake, temporal_occurrence_model=self.tom, surface_node=node.surface) return char
[docs] def convert_nonParametricSeismicSource(self, node): """ Convert the given node into a non parametric source object. :param node: a node with tag areaGeometry :returns: a :class:`openquake.hazardlib.source.NonParametricSeismicSource` instance """ trt = node['tectonicRegion'] rup_pmf_data = [] for rupnode in node: probs = pmf.PMF(rupnode['probs_occur']) rup = RuptureConverter.convert_node(self, rupnode) rup.tectonic_region_type = trt rup_pmf_data.append((rup, probs)) nps = source.NonParametricSeismicSource( node['id'], node['name'], trt, rup_pmf_data) return nps
[docs]def parse_ses_ruptures(fname): """ Convert a stochasticEventSetCollection file into a set of SES, each one containing ruptures with an etag and a seed. """ raise NotImplementedError('parse_ses_ruptures')
@parallel.litetask def _filter_sources(sources, sitecol, maxdist, monitor): # called by filter_sources srcs = [] for src in sources: sites = src.filter_sites_by_distance_to_source(maxdist, sitecol) if sites is not None: srcs.append(src) return srcs
[docs]def filter_sources(sources, sitecol, maxdist): """ Filter a list of hazardlib sources according to the maximum distance. :param sources: the original sources :param sitecol: a SiteCollection instance :param maxdist: maximum distance :returns: the filtered sources ordered by source_id """ mon = Monitor('filter sources') if len(sources) * len(sitecol) > LOTS_OF_SOURCES_SITES: # filter in parallel on all available cores sources = parallel.TaskManager.apply_reduce( _filter_sources, (sources, sitecol, maxdist, mon), operator.add, []) else: # few sources and sites, filter sequentially on a single core sources = _filter_sources.task_func(sources, sitecol, maxdist, mon) return sorted(sources, key=operator.attrgetter('source_id'))