Module: openquake.hmtk.parsers.strain.strain_csv_parser contains the :classes:
ReadStrainCsv and WriteStrainCsv to read and write strain data from and
to csv format.
import csv
import numpy as np
from openquake.hmtk.strain.geodetic_strain import GeodeticStrain
STRAIN_VARIABLES = ['exx', 'eyy', 'exy', 'var_exx', 'var_eyy', 'var_exy',
'cc_xx_xx', 'cc_xx_yy', 'cc_xx_xy']
[docs]class ReadStrainCsv(object):
:class:`openquake.hmtk.parsers.strain_csv_parser.ReadStrainCsv` reads a
strain model (defined by :class:
`openquake.hmtk.strain.geodetic_strain.GeodeticStrain`) from
a headed csv file
:param str filename:
Name of strain file in csv format
:param strain:
Container for the strain data as instance of :class:
def __init__(self, strain_file):
self.filename = strain_file
self.strain = GeodeticStrain()
[docs] def read_data(self, scaling_factor=1E-9, strain_headers=None):
Reads the data from the csv file
:param float scaling_factor:
Scaling factor used for all strain values (default 1E-9 for
:param list strain_headers:
List of the variables in the file that correspond to strain
strain - Strain model as an instance of the :class:
if strain_headers:
self.strain.data_variables = strain_headers
self.strain.data_variables = STRAIN_VARIABLES
datafile = open(self.filename, 'r')
reader = csv.DictReader(datafile)
self.strain.data = dict([(name, []) for name in reader.fieldnames])
for row in reader:
for name in row.keys():
if 'region' in name.lower():
elif name in self.strain.data_variables:
scaling_factor * float(row[name]))
for key in self.strain.data.keys():
if 'region' in key:
self.strain.data[key] = np.array(self.strain.data[key],
self.strain.data[key] = np.array(self.strain.data[key])
if 'region' not in self.strain.data:
print('No tectonic regionalisation found in input file!')
self.strain.data_variables = self.strain.data.keys()
# Update data with secondary data (i.e. 2nd invariant, e1h, e2h etc.
return self.strain
def _check_invalid_longitudes(self):
Checks to ensure that all longitudes are in the range -180. to 180
idlon = self.strain.data['longitude'] > 180.
if np.any(idlon):
self.strain.data['longitude'][idlon] = \
self.strain.data['longitude'][idlon] - 360.
[docs]class WriteStrainCsv(object):
:class:`openquake.hmtk.parsers.strain_csv_parser.WriteStrainCsv` writes a
strain model (defined by :class:
to a headed csv file
:param str filename:
Name of output file for writing
def __init__(self, filename):
self.filename = filename
[docs] def write_file(self, strain, scaling_factor=1E-9):
Main writer function for the csv file
:param strain:
Instance of :class: openquake.hmtk.strain.geodetic_strain.GeodeticStrain
:param float scaling_factor:
Scaling factor used for all strain values (default 1E-9 for
if not isinstance(strain, GeodeticStrain):
raise ValueError('Strain data must be instance of GeodeticStrain')
for key in strain.data.keys():
if key in strain.data_variables:
# Return strain value back to original scaling
if key in ['longitude', 'latitude']:
strain.data[key] = strain.data[key] / scaling_factor
# Slice seismicity rates into separate dictionary vectors
strain, output_variables = self.slice_rates_to_data(strain)
outfile = open(self.filename, 'wt', newline='')
print('Writing strain data to file %s' % self.filename)
writer = csv.DictWriter(outfile,
for iloc in range(0, strain.get_number_observations()):
row_dict = {}
for key in output_variables:
if len(strain.data[key]) > 0:
# Ignores empty dictionary attributes
row_dict[key] = strain.data[key][iloc]
[docs] def slice_rates_to_data(self, strain):
For the strain data, checks to see if seismicity rates have been
calculated. If so, each column in the array is sliced and stored as a
single vector in the strain.data dictionary with the corresponding
magnitude as a key.
:param strain:
Instance of :class: openquake.hmtk.strain.geodetic_strain.GeodeticStrain
strain - Instance of strain class with updated data dictionary
output_variables - Updated list of headers
output_variables = list(strain.data)
cond = (isinstance(strain.target_magnitudes, np.ndarray) or
isinstance(strain.target_magnitudes, list))
if cond:
magnitude_list = ['%.3f' % mag for mag in strain.target_magnitudes]
return strain, output_variables
# Ensure that the number of rows in the rate array corresponds to the
# number of observations
assert np.shape(strain.seismicity_rate)[0] == \
for iloc, magnitude in enumerate(magnitude_list):
strain.data[magnitude] = strain.seismicity_rate[:, iloc]
return strain, output_variables