# -*- coding: utf-8 -*-# vim: tabstop=4 shiftwidth=4 softtabstop=4## Copyright (C) 2012-2025 GEM Foundation## OpenQuake is free software: you can redistribute it and/or modify it# under the terms of the GNU Affero General Public License as published# by the Free Software Foundation, either version 3 of the License, or# (at your option) any later version.## OpenQuake is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the# GNU Affero General Public License for more details.## You should have received a copy of the GNU Affero General Public License# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.""" Module :mod:`openquake.hazardlib.geo.line` defines :class:`Line`."""importnumpyasnpfromopenquake.baselib.generalimportcached_propertyfromopenquake.baselib.performanceimportcompilefromopenquake.hazardlib.geoimportgeodetic,utils,PointTOLERANCE=0.1SMALL=1e-2def_update(rtra,rtra_prj,proj,pnt):xg,yg=proj(np.array([pnt[0]]),np.array([pnt[1]]),reverse=True)rtra.append(np.array([xg[0],yg[0],pnt[2]]))rtra_prj.append(pnt)def_resample(line,sect_len,orig_extremes):# Returns array of resampled trace coordinates## :param coo:# A :class:`numpy.ndarray` instance with three columns and n-lines# containing the coordinates of the polyline to be resampled.# :param sect_len:# The resampling distance [km]# :param orig_extremes:# A boolean. When true the last point in coo is also added.# Project the coordinates of the trace and save them in `txy`txy=line.proj(line.coo[:,0],line.coo[:,1],line.coo[:,2]).T# Compute the total length of the original trace# tot_len = sum(utils.get_dist(txy[i], txy[i - 1]) for i in range(1, N))inc_len=0.# Initialize the lists with the coordinates of the resampled tracertra_prj=[txy[0]]rtra=[line.coo[0]]# Resamplingidx_vtx=-1whileTrue:# Computing distances from the reference pointdis=utils.get_dist(txy,rtra_prj[-1])ifidx_vtx>0:# Fixing distances for points before the indexdis[0:idx_vtx]=100000# Index of the point on the trace with a distance just below the# sampling distanceidx=np.where(dis<=sect_len,dis,-np.inf).argmax()# If the pick a point that is not the last one on the trace we# compute the new sample by interpolationifidx<len(dis)-1:pnt=find_t(txy[idx+1,:],txy[idx,:],rtra_prj[-1],sect_len)ifnp.isnan(pnt).any():raiseValueError('Did not find the intersection')_update(rtra,rtra_prj,line.proj,pnt)inc_len+=sect_len# Adding more points still on the same segmentdelta=txy[idx+1]-rtra_prj[-1]chk_dst=utils.get_dist(txy[idx+1],rtra_prj[-1])rat=delta/chk_dstwhilechk_dst>sect_len*0.9999:_update(rtra,rtra_prj,line.proj,rtra_prj[-1]+sect_len*rat)inc_len+=sect_len# This is the distance between the last resampled point# and the second vertex of the segmentchk_dst=utils.get_dist(txy[idx+1],rtra_prj[-1])else:same_dir=Trueiflen(rtra)>1:same_dir=_get_same_dir(rtra,line.coo)# This is the distance between the last sampled point and the last# point on the original edgedist_from_last=utils.get_dist(rtra_prj[-1],txy[-1])# We are processing the last point# if tot_len - inc_len > 0.5 * sect_len and not orig_extremes:if((dist_from_last>sect_len/2andnotorig_extremesandsame_dir)or(dist_from_last<sect_len/2andnotorig_extremesandnotsame_dir)):# Adding more points still on the same segmentdelta=txy[-1]-txy[-2]chk_dst=utils.get_dist(txy[-1],txy[-2])_update(rtra,rtra_prj,line.proj,rtra_prj[-1]+sect_len*delta/chk_dst)inc_len+=sect_leneliforig_extremes:# Adding last pointrtra.append(line.coo[-1])break# Updating indexidx_vtx=idx+1returnnp.array(utils.clean_points(rtra))def_get_same_dir(rtra,coo):# If the line is verticalif(np.abs(rtra[-2][0]-rtra[-1][0])<SMALLandnp.abs(rtra[-2][1]-rtra[-1][1])<SMALL):same_dir=Trueifcoo[-1,2]<rtra[-1][2]:same_dir=Falsereturnsame_dir# Azimuth of the resampled edgeazim_rsmp_edge=geodetic.azimuth(rtra[-2][0],rtra[-2][1],rtra[-1][0],rtra[-1][1])# Azimuth from the last resampled edge and the last point on# the original edgeazim_orig_edge=geodetic.azimuth(rtra[-1][0],rtra[-1][1],coo[-1,0],coo[-1,1])# Checksame_dir=np.abs(azim_rsmp_edge-azim_orig_edge)<30returnsame_dir
[docs]@compile('(f8[:,:],f8[:,:],f8[:],f8[:,:])')defline_get_tu(ui,ti,sl,weights):""" Compute the T and U quantitities. :param ui: A :class:`numpy.ndarray` instance of cardinality (num segments x num sites) :param ti: A :class:`numpy.ndarray` instance of cardinality (num segments x num sites) :param sl: A :class:`numpy.ndarray` instance with the segments' length :param weights: A :class:`numpy.ndarray` instance of cardinality (num segments x num sites) """# Sum of weights - This has shape equal to the number of sitesweight_sum=weights.sum(axis=0)# Compute Tt_upp=(ti*weights/weight_sum.T).sum(axis=0)# Compute Uu_upp=ui[0]*weights[0]foriinrange(1,len(sl)):delta=np.sum(sl[:i])u_upp+=(ui[i]+delta)*weights[i]u_upp=(u_upp/weight_sum.T).Treturnt_upp,u_upp
[docs]@compile('(f8,f8,f8[:,:],f8[:],f8[:],f8[:,:],f8[:,:])')defget_ui_ti(lam0,phi0,coo,lons,lats,uhat,that):""" Compute the t and u coordinates. ti and ui have shape (num_segments x num_sites). """N=len(lons)L=len(coo)# Sites projected coordinatessx,sy=utils.project_direct(lam0,phi0,lons,lats)# Polyline projected coordinatestx,ty=utils.project_direct(lam0,phi0,coo[:,0],coo[:,1])# Initializing ti and ui coordinatesui=np.zeros((L-1,N))ti=np.zeros((L-1,N))# For each segmentforiinrange(L-1):dx=sx-tx[i]dy=sy-ty[i]ui[i]=dx*uhat[i,0]+dy*uhat[i,1]ti[i]=dx*that[i,0]+dy*that[i,1]returnui,ti
# has effect on case_65 with multifaultsources and rx0 distance# affects the performance of ~/oq-risk-tests/test/disaggregation/NZ
[docs]@compile('(f8[:,:],f8[:,:],f8[:])')defget_ti_weights(ui,ti,segments_len):""" :returns: (weights, idx_on_trace) """S1,S2=ui.shapeweights=np.zeros_like(ui)terma=np.zeros_like(ui)term1=np.zeros_like(ui)term2=np.zeros_like(ui)idx_on_trace=np.zeros(S2,dtype=np.bool_)foriinrange(S1):ti_=ti[i]ui_=ui[i]terma_=terma[i]term1_=term1[i]term2_=term2[i]ws=weights[i]seglen=segments_len[i]# More general casecond0=np.abs(ti_)>=TOLERANCEifcond0.any():terma_[cond0]=seglen-ui_[cond0]term1_[cond0]=np.arctan(terma_[cond0]/ti_[cond0])term2_[cond0]=np.arctan(-ui_[cond0]/ti_[cond0])ws[cond0]=(term1_[cond0]-term2_[cond0])/ti_[cond0]# Case for sites on the extension of one segmentcond1=np.abs(ti_)<TOLERANCEcond2=np.logical_or(ui_<0.-TOLERANCE,ui_>seglen+TOLERANCE)iii=np.logical_and(cond1,cond2)iflen(iii):ws[iii]=1./(ui_[iii]-seglen)-1./ui_[iii]# Case for sites on one segmentcond3=np.logical_and(ui_>=-TOLERANCE,ui_<=seglen+TOLERANCE)jjj=np.logical_and(cond1,cond3)ws[jjj]=1/(-0.01-seglen)+1/0.01idx_on_trace[jjj]=1.0returnweights,idx_on_trace
[docs]@compile('(f8,f8,f8[:,:],f8[:],f8[:,:],f8[:,:],f8[:],f8[:])')defget_tuw(lam0,phi0,coo,slen,uhat,that,lons,lats):""" :returns: array of float32 of shape (N, 3) """N=len(lons)out=np.empty((N,3),np.float32)ui,ti=get_ui_ti(lam0,phi0,coo,lons,lats,uhat,that)weights,iot=get_ti_weights(ui,ti,slen)t,u=line_get_tu(ui,ti,slen,weights)t[iot]=0.0out[:,0]=tout[:,1]=uout[:,2]=weights.sum(axis=0)returnout
[docs]classLine(object):""" This class represents a geographical line, which is basically a sequence of geographical points. A line is defined by at least two points. :param points: The sequence of points defining this line. :type points: list of :class:`~openquake.hazardlib.geo.point.Point` instances """
[docs]@classmethoddeffrom_coo(cls,coo):""" Build a Line object for an array of coordinates, assuming they have e been cleaned already, i.e. there are no adjacent duplicate points """self=cls.__new__(cls)self.init(coo)returnself
def__init__(self,points):points=utils.clean_points(points)# can remove points!self.init(np.array([[p.x,p.y,p.z]forpinpoints]))
[docs]defflip(self):""" Returns a new line with the points flipped. Here is an example, taking advantage of the string representation of Lines in terms of geohashes of 5 letters (~2 km of precision): >>> line = Line([Point(1, 2), Point(1, 3)]) >>> print(line) s02eq_s089n >>> print(line.flip()) s089n_s02eq >>> line.get_azimuths() [0.0] >>> line.flip().get_azimuths() [180.0] >>> line = Line([Point(1, 0), Point(2, 0)]) >>> line.get_azimuths() [90.0] >>> line.flip().get_azimuths() [270.0] """returnself.from_coo(np.flip(self.coo,axis=0))
[docs]@classmethoddeffrom_vectors(cls,lons,lats,deps=None):""" Creates a line from three numpy.ndarray instances containing longitude, latitude and depths values """arrs=lons,latsifdepsisnotNone:arrs=lons,lats,depsreturncls([Point(*coo)forcooinzip(*arrs)])
[docs]defon_surface(self):""" Check if this line is defined on the surface (i.e. all points are on the surfance, depth=0.0). :returns bool: True if this line is on the surface, false otherwise. """returnall(point.on_surface()forpointinself)
[docs]defhorizontal(self):""" Check if this line is horizontal (i.e. all depths of points are equal). :returns bool: True if this line is horizontal, false otherwise. """returnall(p.depth==self.coo[0,2]forpinself)
[docs]defget_azimuths(self):""" Return the azimuths of all the segments composing the polyline """iflen(self.coo)==2:return[self[0].azimuth(self[1])]lons=self.coo[:,0]lats=self.coo[:,1]returngeodetic.azimuth(lons[:-1],lats[:-1],lons[1:],lats[1:])
[docs]defaverage_azimuth(self):""" Calculate and return weighted average azimuth of all line's segments in decimal degrees. Uses formula from http://en.wikipedia.org/wiki/Mean_of_circular_quantities >>> from openquake.hazardlib.geo.point import Point as P >>> '%.1f' % Line([P(0, 0), P(1e-5, 1e-5)]).average_azimuth() '45.0' >>> '%.1f' % Line([P(0, 0), P(0, 1e-5), P(1e-5, 1e-5)] ... ).average_azimuth() '45.0' >>> line = Line([P(0, 0), P(-2e-5, 0), P(-2e-5, 1.154e-5)]) >>> '%.1f' % line.average_azimuth() '300.0' """returnself.azimuth
[docs]defresample(self,sect_len:float,orig_extremes=False):""" Resample this line into sections. The first point in the resampled line corresponds to the first point in the original line. Starting from the first point in the original line, a line segment is defined as the line connecting the last point in the resampled line and the next point in the original line. :param float sect_len: The length of the section, in km. :param bool original_extremes: A boolean controlling the way in which the last point is added. When true the first and last point match the original extremes. When false the last point is at a `sect_len` distance from the previous one, before or after the last point. :returns: A new line resampled into sections based on the given length. """returnLine.from_coo(_resample(self,sect_len,orig_extremes))
[docs]defget_lengths(self)->np.ndarray:""" Calculate a numpy.ndarray instance with the length of the segments composing the polyline. :returns: Segments length in km. """lengths=[]fori,pointinenumerate(self):ifi!=0:lengths.append(point.distance(self[i-1]))returnnp.array(lengths)
[docs]defget_length(self)->float:""" Calculate the length of the line as a sum of lengths of all its segments. :returns: Total length in km. """returnself.length
[docs]defkeep_corners(self,delta):""" Removes the points where the change in direction is lower than a tolerance value and returns a new line. :param delta: An angle in decimal degrees """coo=self.coo# Compute the azimuth of all the segmentsazim=geodetic.azimuth(coo[:-1,0],coo[:-1,1],coo[1:,0],coo[1:,1])pidx={0,coo.shape[0]-1}idx,=np.nonzero(np.abs(np.diff(azim))>delta)pidx=sorted(pidx|set(idx+1))returnself.from_coo(coo[pidx])
[docs]defresample_to_num_points(self,num_points):""" Resample the line to a specified number of points. :param num_points: Integer number of points the resulting line should have. :returns: A new line with that many points as requested. """assertlen(self)>1,"can not resample the line of one point"section_length=self.get_length()/(num_points-1)resampled_points=[self[0]]segment=0acc_length=0last_segment_length=0points=self.pointsforiinrange(num_points-1):tot_length=(i+1)*section_lengthwhiletot_length>acc_lengthandsegment<len(points)-1:last_segment_length=points[segment].distance(points[segment+1])acc_length+=last_segment_lengthsegment+=1p1,p2=points[segment-1:segment+1]offset=tot_length-(acc_length-last_segment_length)ifoffset<1e-5:# forward geodetic transformations for very small distances# are very inefficient (and also unneeded). if target point# is just 1 cm away from original (non-resampled) line vertex,# don't even bother doing geodetic calculations.resampled=p1else:resampled=p1.equally_spaced_points(p2,offset)[1]resampled_points.append(resampled)returnLine(resampled_points)
[docs]defget_tuw(self,mesh):""" Computes the U and T coordinates of the GC2 method for a mesh of points. :param mesh: An instance of :class:`openquake.hazardlib.geo.mesh.Mesh` """slen,uhat,that=self.sut_hattuw=get_tuw(self.proj.lam0,self.proj.phi0,self.coo,slen,uhat,that,mesh.lons,mesh.lats)returntuw[:,0],tuw[:,1],tuw[:,2]
[docs]defget_ui_ti(self,mesh,uhat,that):""" Compute the t and u coordinates. ti and ui have shape (num_segments x num_sites). """returnget_ui_ti(self.proj.lam0,self.proj.phi0,self.coo,mesh.lons,mesh.lats,uhat,that)
@cached_propertydefsut_hat(self):""" Return the unit vectors defining the local origin for each segment of the trace. :param sx: The vector with the x coordinates of the trace :param sy: The vector with the y coordinates of the trace :returns: Two arrays of size n x 3 (when n is the number of segments composing the trace """# Projected coordinatessx,sy=self.proj(self.coo[:,0],self.coo[:,1])slen=((sx[1:]-sx[:-1])**2+(sy[1:]-sy[:-1])**2)**0.5sg=np.zeros((len(sx)-1,3))sg[:,0]=sx[1:]-sx[:-1]sg[:,1]=sy[1:]-sy[:-1]uhat=get_versor(sg)that=get_versor(np.cross(sg,np.array([0,0,1])))returnslen,uhat,thatdef__str__(self):returnutils.geohash5(self.coo)
[docs]defget_versor(arr):""" Returns the versor (i.e. a unit vector) of a vector """norm=np.linalg.norm(arr,axis=1)assert(norm>0).all(),normreturn(arr.T/norm).T
[docs]@compile("(f8[:],f8[:],f8[:],f8)")deffind_t(pnt0,pnt1,ref_pnt,distance):""" Find the point on the segment within `pnt0` and `pnt1` at `distance` from `ref_pnt`. See https://tinyurl.com/meyt4ft3 :param pnt0: A 1D :class:`numpy.ndarray` instance of length 3 :param pnt1: A 1D :class:`numpy.ndarray` instance of length 3 :param ref_pnt: A 1D :class:`numpy.ndarray` instance of length 3 :param distance: A float with the distance in km from `ref_pnt` to the point on the segment. :returns: A 1D :class:`numpy.ndarray` instance of length 3 """x1=pnt0[0]y1=pnt0[1]z1=pnt0[2]x2=pnt1[0]y2=pnt1[1]z2=pnt1[2]x3=ref_pnt[0]y3=ref_pnt[1]z3=ref_pnt[2]r=distancepa=(x2-x1)**2+(y2-y1)**2+(z2-z1)**2pb=2*((x2-x1)*(x1-x3)+(y2-y1)*(y1-y3)+(z2-z1)*(z1-z3))pc=(x3**2+y3**2+z3**2+x1**2+y1**2+z1**2-2*(x3*x1+y3*y1+z3*z1)-r**2)chk=pb*pb-4*pa*pc# In this case the line is not intersecting the sphereifchk<0:returnnp.array([np.nan,np.nan,np.nan])# Computing the points of intersectionpu=(-pb+(pb**2-4*pa*pc)**0.5)/(2*pa)x=x1+pu*(x2-x1)y=y1+pu*(y2-y1)z=z1+pu*(z2-z1)if(x>=min(x1,x2)andx<=max(x1,x2)andy>=min(y1,y2)andy<=max(y1,y2)andz>=min(z1,z2)andz<=max(z1,z2)):out=[x,y,z]else:pu=(-pb-(pb**2-4*pa*pc)**0.5)/(2*pa)x=x1+pu*(x2-x1)y=y1+pu*(y2-y1)z=z1+pu*(z2-z1)out=[x,y,z]returnnp.array(out)