Source code for mtpy.core.mt

# -*- coding: utf-8 -*-
"""
.. module:: MT
   :synopsis: The main container for MT response functions.

.. moduleauthor:: Jared Peacock <jpeacock@usgs.gov>
"""

# ==============================================================================
import numpy as np
import os
import time
import warnings
from dateutil import parser as dt_parser
from pathlib import Path

import mtpy.core.edi as MTedi
import mtpy.core.z as MTz
import mtpy.utils.gis_tools as gis_tools
import mtpy.analysis.pt as MTpt
import mtpy.analysis.distortion as MTdistortion
import mtpy.core.jfile as MTj
import mtpy.core.mt_xml as MTxml
import mtpy.core.zmm as MTzmm

from mtpy.utils.mtpylog import MtPyLog

_logger = MtPyLog.get_mtpy_logger(__name__)
# _logger.setLevel(logging.DEBUG)

try:
    import scipy

    scipy_version = [int(ss) for ss in scipy.__version__.split('.')]
    if scipy_version[0] == 0:
        if scipy_version[1] < 14:
            warnings.warn('Note: need scipy version 0.14.0 or higher or interpolation '
                          'might not work.', ImportWarning)
            _logger.warning('Note: need scipy version 0.14.0 or higher or interpolation '
                            'might not work.')
    import scipy.interpolate as spi

    interp_import = True

except ImportError:  # pragma: no cover
    warnings.warn('Could not find scipy.interpolate, cannot use method interpolate'
                  'check installation you can get scipy from scipy.org.')
    _logger.warning('Could not find scipy.interpolate, cannot use method interpolate'
                    'check installation you can get scipy from scipy.org.')
    interp_import = False


# =============================================================================
[docs]class MT(object): """ Basic MT container to hold all information necessary for a MT station including the following parameters. * Site --> information on site details (lat, lon, name, etc) * FieldNotes --> information on instruments, setup, etc. * Copyright --> information on how the data can be used and citations * Provenance --> where the data come from and how they are stored * Processing --> how the data were processed. The most used attributes are made available from MT, namely the following. ===================== ===================================================== Attribute Description ===================== ===================================================== station station name lat station latitude in decimal degrees lon station longitude in decimal degrees elev station elevation in meters Z mtpy.core.z.Z object for impedance tensor Tipper mtpy.core.z.Tipper object for tipper pt mtpy.analysis.pt.PhaseTensor for phase tensor east station location in UTM coordinates assuming WGS-84 north station location in UTM coordinates assuming WGS-84 utm_zone zone of UTM coordinates assuming WGS-84 rotation_angle rotation angle of the data fn absolute path to the data file ===================== ===================================================== Other information is contained with in the different class attributes. For instance survey name is in MT.Site.survey .. note:: * The best way to see what all the information is and where it is contained would be to write out a configuration file :: >>> import mtpy.core.mt as mt >>> mt_obj = mt.MT() >>> mt_obj.write_cfg_file(r"/home/mt/generic.cfg") * Currently EDI, XML, and j file are supported to read in information, and can write out EDI and XML formats. Will be extending to j and Egberts Z format. ===================== ===================================================== Methods Description ===================== ===================================================== read_mt_file read in a MT file [ EDI | XML | j ] write_mt_file write a MT file [ EDI | XML ] read_cfg_file read a configuration file write_cfg_file write a configuration file remove_distortion remove distortion following Bibby et al. [2005] remove_static_shift Shifts apparent resistivity curves up or down interpolate interpolates Z and T onto specified frequency array. ===================== ===================================================== Examples ------------------- :Read from an .edi File: :: >>> import mtpy.core.mt as mt >>> mt_obj = mt.MT(r"/home/edi_files/s01.edi") :Remove Distortion: :: >>> import mtpy.core.mt as mt >>> mt1 = mt.MT(fn=r"/home/mt/edi_files/mt01.edi") >>> D, new_z = mt1.remove_distortion() >>> mt1.write_mt_file(new_fn=r"/home/mt/edi_files/mt01_dr.edi",\ >>> new_Z=new_z) :Remove Static Shift: :: >>> new_z_obj = mt_obj.remove_static_shift(ss_x=.78, ss_y=1.1) >>> # write a new edi file >>> mt_obj.write_mt_file(new_fn=r"/home/edi_files/s01_ss.edi", >>> new_Z=new_z) >>> wrote file to: /home/edi_files/s01_ss.edi :Interpolate: :: >>> new_freq = np.logspace(-3, 3, num=24) >>> new_z_obj, new_tipper_obj = mt_obj.interpolate(new_freq) >>> mt_obj.write_mt_file(new_Z=new_z_obj, new_Tipper=new_tipper_obj) >>> wrote file to: /home/edi_files/s01_RW.edi """ def __init__(self, fn=None, **kwargs): self._logging = MtPyLog.get_mtpy_logger(self.__class__.__name__) # important information held in objects self.Site = Site() self.FieldNotes = FieldNotes() self.Provenance = Provenance() self.Notes = MTedi.Information() self.Processing = Processing() self.Copyright = Copyright() self._Z = MTz.Z() self._Tipper = MTz.Tipper() self._rotation_angle = 0 self.save_dir = os.getcwd() self.original_file_type = None if fn is not None: self.read_mt_file(fn) self._fn = os.path.normpath(os.path.abspath(fn)) # store file reference # provide key words to fill values if an edi file does not exist for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) # ========================================================================== # get functions # ========================================================================== @property def fn(self): """ reference to original data file""" return self._fn @property def lat(self): """Latitude""" return self.Site.Location.latitude @property def lon(self): """Longitude""" return self.Site.Location.longitude @property def elev(self): """Elevation""" return self.Site.Location.elevation @property def east(self): """easting (m)""" return self.Site.Location.easting @property def north(self): """northing (m)""" return self.Site.Location.northing @property def utm_zone(self): """utm zone""" return self.Site.Location.utm_zone @property def rotation_angle(self): """rotation angle in degrees from north""" return self._rotation_angle @property def Z(self): """mtpy.core.z.Z object to hole impedance tensor""" return self._Z @property def Tipper(self): """mtpy.core.z.Tipper object to hold tipper information""" return self._Tipper @property def station(self): """station name""" return self.Site.id @property def pt(self): """mtpy.analysis.pt.PhaseTensor object to hold phase tensor""" return MTpt.PhaseTensor(z_object=self.Z) # ========================================================================== # set functions # ========================================================================== @lat.setter def lat(self, latitude): """ set latitude making sure the input is in decimal degrees upon setting utm coordinates are recalculated """ self.Site.Location.latitude = latitude self.Site.Location.project_location2utm() @lon.setter def lon(self, longitude): """ set longitude making sure the input is in decimal degrees upon setting utm coordinates are recalculated """ self.Site.Location.longitude = longitude self.Site.Location.project_location2utm() @elev.setter def elev(self, elevation): """ set elevation, should be input as meters """ self.Site.Location.elevation = elevation @east.setter def east(self, easting): """ set easting in meters upon setting lat and lon are recalculated """ self.Site.Location.easting = easting self.Site.Location.project_location2ll() @north.setter def north(self, northing): """ set northing in meters upon setting lat and lon are recalculated """ self.Site.Location.northing = northing self.Site.Location.project_location2ll() @utm_zone.setter def utm_zone(self, utm_zone): """ set UTM zone upon setting lat and lon are recalculated """ self.Site.Location.utm_zone = utm_zone self.Site.Location.project_location2ll() @rotation_angle.setter def rotation_angle(self, theta_r): """ set rotation angle in degrees assuming North is 0 measuring clockwise positive to East as 90. upon setting rotates Z and Tipper """ self._rotation_angle = theta_r self._Z.rotate(theta_r) self._Tipper.rotate(theta_r) self.pt.rotate(theta_r) print(("Rotated Z, Tipper, Phase Tensor and Zinvariants by" "{0:.3f} degrees".format(self._rotation_angle))) @Z.setter def Z(self, z_object): """ set z_object recalculate phase tensor and invariants, which shouldn't change except for strike angle """ self._Z = z_object self._Z.compute_resistivity_phase() @Tipper.setter def Tipper(self, t_object): """ set tipper object recalculate tipper angle and magnitude """ self._Tipper = t_object if self._Tipper is not None: self._Tipper.compute_amp_phase() self._Tipper.compute_mag_direction() @station.setter def station(self, station_name): """ set station name """ self.Site.id = station_name # ========================================================================== # read in files # ==========================================================================
[docs] def read_mt_file(self, fn, file_type=None): """ Read an MT response file. .. note:: Currently only .edi, .xml, and .j files are supported :param fn: full path to input file :type fn: string :param file_type: ['edi' | 'j' | 'xml' | ... ] if None, automatically detects file type by the extension. :type file_type: string :Example: :: >>> import mtpy.core.mt as mt >>> mt_obj = mt.MT() >>> mt_obj.read_mt_file(r"/home/mt/mt01.xml") """ if file_type is None: file_type = os.path.splitext(fn)[1][1:].lower() if file_type.lower() == 'edi': self._read_edi_file(fn) elif file_type.lower() == 'j': self._read_j_file(fn) elif file_type.lower() == 'xml': self._read_xml_file(fn) elif file_type.lower() == 'zmm': self._read_zmm_file(fn) else: raise MTError('File type not supported yet')
[docs] def write_mt_file(self, save_dir=None, fn_basename=None, file_type='edi', new_Z_obj=None, new_Tipper_obj=None, longitude_format='LON', latlon_format='dms' ): """ Write an mt file, the supported file types are EDI and XML. .. todo:: jtype and Gary Egberts z format :param save_dir: full path save directory :type save_dir: string :param fn_basename: name of file with or without extension :type fn_basename: string :param file_type: [ 'edi' | 'xml' ] :type file_type: string :param new_Z_obj: new Z object :type new_Z_obj: mtpy.core.z.Z :param new_Tipper_obj: new Tipper object :type new_Tipper_obj: mtpy.core.z.Tipper :param longitude_format: whether to write longitude as LON or LONG. options are 'LON' or 'LONG', default 'LON' :type longitude_format: string :param latlon_format: format of latitude and longitude in output edi, degrees minutes seconds ('dms') or decimal degrees ('dd') :type latlon_format: string :returns: full path to file :rtype: string :Example: :: >>> mt_obj.write_mt_file(file_type='xml') """ if save_dir is not None: self.save_dir = save_dir if fn_basename is not None: ext = os.path.splitext(fn_basename)[1][1:].lower() fn_basename = os.path.splitext(fn_basename)[0] if ext == '': fn_basename = '{0}.{1}'.format(fn_basename, file_type.lower()) elif ext in ['xml', 'edi']: fn_basename = '{0}.{1}'.format(fn_basename, ext) file_type = ext else: raise MTError('File type {0} not supported yet.'.format(ext)) else: fn_basename = '{0}.{1}'.format(self.station, file_type) fn = os.path.join(self.save_dir, fn_basename) if file_type == 'edi': fn = self._write_edi_file(fn, new_Z=new_Z_obj, new_Tipper=new_Tipper_obj, longitude_format=longitude_format, latlon_format=latlon_format) elif file_type == 'xml': fn = self._write_xml_file(fn, new_Z=new_Z_obj, new_Tipper=new_Tipper_obj) return fn
# --> read in edi file def _read_edi_file(self, edi_fn): """ read in edi file and set attributes accordingly """ if not os.path.isfile(edi_fn): raise MTError('Could not find {0}, check path.'.format(edi_fn)) self.save_dir = os.path.dirname(edi_fn) edi_obj = MTedi.Edi(edi_fn=edi_fn) self._edi_get_site(edi_obj) # get info self.Notes = edi_obj.Info self._parse_notes() # get field notes self._edi_get_field_notes(edi_obj) self.Z = edi_obj.Z self.Tipper = edi_obj.Tipper self.station = edi_obj.station # --> make sure things are ordered from high frequency to low self._check_freq_order() def _edi_get_site(self, edi_obj): """ Get Site information """ self.lat = edi_obj.lat self.lon = edi_obj.lon self.elev = edi_obj.elev self.Site.acquired_by = edi_obj.Header.acqby self.Site.survey = edi_obj.Header.loc self.Site.start_date = edi_obj.Header.acqdate self.Site.project = edi_obj.Header.project self.Site.Location.datum = edi_obj.Header.datum self.Site.Location.elev_units = edi_obj.Define_measurement.units self.Site.Location.coordinate_system = edi_obj.Header.coordinate_system if hasattr(edi_obj.Header,'enddate'): self.Site.end_date = edi_obj.Header.enddate self.Site.Location.declination = edi_obj.Header.declination def _edi_get_field_notes(self, edi_obj): """ get FieldNotes attributes from edi """ # get information about different sensors try: for key in list(edi_obj.Define_measurement.meas_hx.__dict__.keys()): setattr(self.FieldNotes.Magnetometer_hx, key, edi_obj.Define_measurement.meas_hx.__dict__[key]) except AttributeError: pass try: for key in list(edi_obj.Define_measurement.meas_hy.__dict__.keys()): setattr(self.FieldNotes.Magnetometer_hy, key, edi_obj.Define_measurement.meas_hy.__dict__[key]) except AttributeError: pass try: for key in list(edi_obj.Define_measurement.meas_hz.__dict__.keys()): setattr(self.FieldNotes.Magnetometer_hz, key, edi_obj.Define_measurement.meas_hz.__dict__[key]) except AttributeError: pass try: for key in list(edi_obj.Define_measurement.meas_ex.__dict__.keys()): setattr(self.FieldNotes.Electrode_ex, key, edi_obj.Define_measurement.meas_ex.__dict__[key]) except AttributeError: pass try: for key in list(edi_obj.Define_measurement.meas_ey.__dict__.keys()): setattr(self.FieldNotes.Electrode_ey, key, edi_obj.Define_measurement.meas_ey.__dict__[key]) except AttributeError: pass try: self.FieldNotes.Magnetometer_hx.id = self.Notes.hx except AttributeError: pass try: self.FieldNotes.Magnetometer_hy.id = self.Notes.hy except AttributeError: pass try: self.FieldNotes.Magnetometer_hz.id = self.Notes.hz except AttributeError: pass try: self.FieldNotes.Magnetometer_hx.type = self.Notes.b_instrument_type self.FieldNotes.Magnetometer_hy.type = self.Notes.b_instrument_type self.FieldNotes.Magnetometer_hz.type = self.Notes.b_instrument_type except AttributeError: pass try: self.FieldNotes.Electrode_ex.type = self.Notes.e_instrument_type self.FieldNotes.Electrode_ey.type = self.Notes.e_instrument_type except AttributeError: pass try: self.FieldNotes.DataLogger = self.Notes.b_logger_type except AttributeError: pass # keep the edi object around, should be able to deprecate this later self._edi_obj = edi_obj def _parse_notes(self): """ parse the notes section if there is any information that is useful """ for a_key in list(self.Notes.info_dict.keys()): a_value = self.Notes.info_dict[a_key] try: a_value = float(a_value) except (ValueError, TypeError): pass a_list = a_key.strip().lower().split('.') if a_key.find('mtft') == 0 or a_key.find('birrp') == 0: a_list = ['processing'] + a_list a1 = a_list[0] obj_attr = a_list[-1] if a1 in ['processing', 'fieldnotes', 'copyright', 'provenance']: cl = a_list[0].capitalize() if a1 == 'fieldnotes': cl = 'FieldNotes' obj = getattr(self, cl) count = 1 while count < len(a_list) - 1: cl_attr = a_list[count] if cl_attr == 'dataquality': cl_attr = 'DataQuality' elif cl_attr == 'datalogger': cl_attr = 'DataLogger' try: obj = getattr(obj, cl_attr) except AttributeError: try: obj = getattr(obj, cl_attr.capitalize()) except AttributeError: # print 'Could not get {0}'.format(cl_attr) pass count += 1 setattr(obj, obj_attr, a_value) self.Notes.info_dict.pop(a_key) # --> write edi file def _write_edi_file(self, new_edi_fn, new_Z=None, new_Tipper=None, longitude_format='LON', latlon_format='dms'): """ write a new edi file if things have changed. Note if new_Z or new_Tipper are not None, they are not changed in MT object, you need to change them manually if you want them to be changed. Similarly, the new function name does not change the MT objecte fn attribute but does change MT.edi_object.fn attribute. :param new_edi_fn: full path to new edi file :type new_edi_fn: string :param new_Z: new Z object :type new_Z: mtpy.core.z.Z :param new_Tipper: new Tipper object :type new_Tipper: mtpy.core.z.Tipper :returns edi_fn: full path to edi file written :rtype edi_fn: string """ # get header information, mostly from site edi_obj = MTedi.Edi() edi_obj.Header = self._edi_set_header() # get information edi_obj.Info = MTedi.Information() edi_obj.Info.info_list = self._edi_set_info_list() # get define measurement edi_obj.Define_measurement = self._edi_set_define_measurement() # get mtsec edi_obj.Data_sect = self._edi_set_data_sect() # set Z and T data if new_Z is not None: edi_obj.Z = new_Z else: edi_obj.Z = self._Z if new_Tipper is not None: edi_obj.Tipper = new_Tipper else: edi_obj.Tipper = self._Tipper # set rotation angle # edi_obj.zrot = self.rotation_angle # --> write edi file edi_fn = edi_obj.write_edi_file(new_edi_fn=new_edi_fn, longitude_format=longitude_format, latlon_format=latlon_format) return edi_fn def _edi_set_header(self): """ make an edi header class """ header = MTedi.Header() header.acqby = self.Site.acquired_by header.acqdate = self.Site.start_date header.coordinate_system = self.Site.Location.coordinate_system header.dataid = self.Site.id header.datum = self.Site.Location.datum header.elev = self.elev header.fileby = self.Site.acquired_by header.lat = self.lat header.loc = self.Site.project header.lon = self.lon header.project = self.Site.project if type(self.Site.survey) is list: header.survey = ','.join(self.Site.survey) else: header.survey = self.Site.survey header.units = '[mV/km]/[nT]' header.declination = self.Site.Location.declination header.progvers = 'MTpy' header.progdate = time.strftime('%Y-%m-%d', time.gmtime()) return header # --> get information list for edi def _edi_set_info_list(self): """ get the information for an edi file """ info_list = [] # write previous information first for key in sorted(self.Notes.info_dict.keys()): l_key = key.lower() l_value = self.Notes.info_dict[key] info_list.append( '{0} = {1}'.format( l_key.strip(), str(l_value).strip())) # get field notes information includes data quality for f_key in sorted(self.FieldNotes.__dict__.keys()): obj = getattr(self.FieldNotes, f_key) for t_key in sorted(obj.__dict__.keys()): if t_key in ['_kw_list', '_fmt_list', '_logger']: continue l_key = 'fieldnotes.{0}.{1}'.format(f_key.lower(), t_key.lower()) l_value = getattr(obj, t_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) # get processing information for p_key in sorted(self.Processing.__dict__.keys()): if p_key.lower() == 'software': for s_key in sorted(self.Processing.Software.__dict__.keys()): if s_key.lower() == 'author': for a_key in sorted( self.Processing.Software.Author.__dict__.keys()): l_key = 'processing.software.author.{0}'.format( a_key) l_value = getattr(self.Processing.Software.Author, a_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) else: l_key = 'processing.software.{0}'.format(s_key) l_value = getattr(self.Processing.Software, s_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) elif p_key.lower() == 'remotesite': if self.Processing.RemoteSite.id is None: continue else: for s_key in sorted( self.Processing.RemoteSite.__dict__.keys()): if s_key == 'Location': for a_key in sorted( self.Processing.RemoteSite.Location.__dict__.keys()): l_key = 'processing.remote_site.location.{0}'.format( a_key) l_value = getattr(self.Processing.RemoteSite.Location, a_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) else: l_key = 'processing.remote_site.{0}'.format(s_key) l_value = getattr(self.Processing.RemoteSite, s_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) elif p_key.lower() in ['datum', 'coordinate_system', 'sign_convention', 'remote_reference', 'processed_by']: l_key = 'processing.{0}'.format(p_key) l_value = getattr(self.Processing, p_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) # get copyright information for c_key in sorted(self.Copyright.__dict__.keys()): if c_key.lower() == 'citation': if self.Copyright.Citation.author is not None: for p_key in sorted(self.Copyright.Citation.__dict__.keys()): l_key = 'copyright.citation.{0}'.format(p_key.lower()) l_value = getattr(self.Copyright.Citation, p_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) else: continue else: l_key = 'copyright.{0}'.format(c_key.lower()) l_value = getattr(self.Copyright, c_key) if type(l_value) is list: l_value = ''.join(l_value) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) # get provenance for p_key in sorted(self.Provenance.__dict__.keys()): if p_key.lower() == 'creator': for s_key in list(self.Provenance.Creator.__dict__.keys()): l_key = 'provenance.creator.{0}'.format(s_key) l_value = getattr(self.Provenance.Creator, s_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) elif p_key.lower() == 'submitter': for s_key in list(self.Provenance.Submitter.__dict__.keys()): l_key = 'provenance.submitter.{0}'.format(s_key) l_value = getattr(self.Provenance.Submitter, s_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) else: l_key = 'provenance.{0}'.format(p_key) l_value = getattr(self.Provenance, p_key) if l_value in [None, 'None', 'none']: continue info_list.append('{0} = {1}'.format(l_key, l_value)) return info_list # get edi define measurement def _edi_set_define_measurement(self): """ get define measurement block for an edi file """ define_meas = MTedi.DefineMeasurement() define_meas.maxchan = 7 define_meas.refelev = self.elev define_meas.reflat = self.lat define_meas.reflon = self.lon define_meas.reftype = self.Site.Location.coordinate_system define_meas.units = self.Site.Location.elev_units define_meas.meas_ex = MTedi.EMeasurement() for key in define_meas.meas_ex._kw_list: setattr(define_meas.meas_ex, key, getattr(self.FieldNotes.Electrode_ex, key)) define_meas.meas_ey = MTedi.EMeasurement() for key in define_meas.meas_ey._kw_list: setattr(define_meas.meas_ey, key, getattr(self.FieldNotes.Electrode_ey, key)) define_meas.meas_hx = MTedi.HMeasurement() for key in define_meas.meas_hx._kw_list: setattr(define_meas.meas_hx, key, getattr(self.FieldNotes.Magnetometer_hx, key)) define_meas.meas_hy = MTedi.HMeasurement() for key in define_meas.meas_hy._kw_list: setattr(define_meas.meas_hy, key, getattr(self.FieldNotes.Magnetometer_hy, key)) if self.Tipper is not None: if np.all(self.Tipper.tipper == 0) == False: define_meas.meas_hz = MTedi.HMeasurement() for key in define_meas.meas_hz._kw_list: setattr(define_meas.meas_hz, key, getattr(self.FieldNotes.Magnetometer_hz, key)) return define_meas def _edi_set_data_sect(self): """ get mt data section for edi file """ sect = MTedi.DataSection() sect.data_type = 'MT' sect.nfreq = self.Z.z.shape[0] sect.sectid = self.station nchan = 5 if np.all(self.Tipper.tipper == 0) == True: nchan = 4 sect.nchan = nchan sect.maxblks = 999 sect.ex = self.FieldNotes.Electrode_ex.acqchan sect.ey = self.FieldNotes.Electrode_ey.acqchan sect.hx = self.FieldNotes.Magnetometer_hx.acqchan sect.hy = self.FieldNotes.Magnetometer_hy.acqchan if np.all(self.Tipper.tipper == 0) == False: sect.hz = self.FieldNotes.Magnetometer_hz.acqchan return sect # --> check the order of frequencies def _check_freq_order(self): """ check to make sure the Z and Tipper arrays are ordered such that the first index corresponds to the highest frequency and the last index corresponds to the lowest freqeuncy. """ if self.Z.freq[0] < self.Z.freq[1]: print('Flipping arrays to be ordered from short period to long') self.Z.z = self.Z.z.copy()[::-1] self.Z.z_err = self.Z.z_err.copy()[::-1] self.Z.freq = self.Z.freq.copy()[::-1] if self.Tipper.tipper is not None: if self.Tipper.freq[0] < self.Tipper.freq[1]: self.Tipper.tipper = self.Tipper.tipper.copy()[::-1] self.Tipper.tipper_err = self.Tipper.tipper_err.copy()[::-1] self.Tipper.freq = self.Tipper.freq.copy()[::-1] def _read_j_file(self, j_fn): """ read j file """ j_obj = MTj.JFile(j_fn) self.save_dir = os.path.dirname(j_fn) self.station = os.path.splitext(os.path.basename(j_fn))[0] self.Z = j_obj.Z self.Tipper = j_obj.Tipper self._check_freq_order() self.Site.Location.latitude = j_obj.metadata_dict['latitude'] self.Site.Location.longitude = j_obj.metadata_dict['longitude'] self.Site.Location.elevation = j_obj.metadata_dict['elevation'] self.Notes.info_dict = j_obj.header_dict def _read_zmm_file(self, zmm_fn): """ read zmm file """ if not isinstance(zmm_fn, Path): zmm_fn = Path(zmm_fn) zmm_obj = MTzmm.ZMM(zmm_fn) zmm_obj.read_zmm_file() self.save_dir = zmm_fn.parent self.Z = zmm_obj.Z self.Tipper = zmm_obj.Tipper # set location self.Site.Location.latitude = zmm_obj.lat self.Site.Location.longitude = zmm_obj.lon self.Site.Location.declination = zmm_obj.declination # set station name self.Site.id = zmm_obj.station def _read_xml_file(self, xml_fn): """ read xml file """ if not os.path.isfile(xml_fn): raise MTError('Could not find {0}, check path.'.format(xml_fn)) self.save_dir = os.path.dirname(xml_fn) xml_obj = MTxml.MT_XML() xml_obj.read_xml_file(xml_fn) self.Z = xml_obj.Z self.Tipper = xml_obj.Tipper # check order self._check_freq_order() # get information and fill attributes self._xml_get_site(xml_obj) self._xml_get_notes(xml_obj) self._xml_get_field_notes(xml_obj) self._xml_get_copyright(xml_obj) self._xml_get_provenance(xml_obj) self._xml_get_processing(xml_obj) def _xml_get_site(self, xml_obj): """ get Site information from xml Site """ # get information for s_attr in list(xml_obj.Site.__dict__.keys()): if s_attr in ['_name', '_attr', '_value']: continue x_obj = getattr(xml_obj.Site, s_attr) name = x_obj.name.lower() if name == 'acquiredby': name = 'acquired_by' elif name == 'end': name = 'end_date' elif name == 'start': name = 'start_date' elif name == 'runlist': name = 'run_list' elif name == 'yearcollected': name = 'year_collected' elif name == 'datecollected': name = 'date_collected' value = x_obj.value if name == 'location': for l_attr in list(xml_obj.Site.Location.__dict__.keys()): if l_attr in ['_name', '_attr', '_value']: continue l_obj = getattr(xml_obj.Site.Location, l_attr) name = l_obj.name.lower() value = l_obj.value if name == 'elevation': units = l_obj.attr['units'] self.Site.Location.elev_units = units elif name == 'declination': units = l_obj.attr['epoch'] self.Site.Location.declination_epoch = units setattr(self.Site.Location, name, value) else: setattr(self.Site, name, value) def _xml_get_notes(self, xml_obj): """ get notes and set in a dictionary that is similar to edi notes """ self.Notes = MTedi.Information() self.Notes.info_dict = {} self.Notes.info_list = [] self.Notes.info_dict['notes'] = str(xml_obj.Notes.value) self.Notes.info_list = ['notes = {0}'.format(str(xml_obj.Notes.value))] def _xml_get_field_notes(self, xml_obj): """ get field notes information """ for f_attr in list(xml_obj.FieldNotes.__dict__.keys()): if f_attr.lower() == 'instrument': for i_attr in list(xml_obj.FieldNotes.Instrument.__dict__.keys()): if i_attr in ['_name', '_attr', '_value']: continue i_obj = getattr(xml_obj.FieldNotes.Instrument, i_attr) name = i_obj.name.lower() value = i_obj.value setattr(self.FieldNotes.DataLogger, name, value) elif 'dipole' in f_attr.lower(): xml_d_obj = getattr(xml_obj.FieldNotes, f_attr) azm = 0.0 length = 0.0 try: comp = xml_d_obj.attr['name'].lower() except KeyError: comp = 'ex' try: t = xml_d_obj.attr['type'].lower() if comp == 'ex': setattr(self.FieldNotes.Electrode_ex, 'type', t) elif comp == 'ey': setattr(self.FieldNotes.Electrode_ey, 'type', t) except KeyError: pass for e_attr in list(xml_d_obj.__dict__.keys()): if e_attr in ['_name', '_attr', '_value']: continue e_obj = getattr(xml_d_obj, e_attr) name = e_obj.name.lower() value = e_obj.value if name == 'azimuth': azm = float(value) if name == 'length': length = float(value) if comp == 'ex': setattr(self.FieldNotes.Electrode_ex, name, value) elif comp == 'ey': setattr(self.FieldNotes.Electrode_ey, name, value) # need to set x, y, x2, y2 x = 0 y = 0 x2 = length * np.cos(np.deg2rad(azm)) y2 = length * np.sin(np.deg2rad(azm)) for name, value in zip(['x', 'y', 'x2', 'y2'], [x, y, x2, y2]): if comp == 'ex': setattr(self.FieldNotes.Electrode_ex, name, value) elif comp == 'ey': setattr(self.FieldNotes.Electrode_ey, name, value) elif 'magnetometer' in f_attr.lower(): xml_d_obj = getattr(xml_obj.FieldNotes, f_attr) try: comp = xml_d_obj.attr['name'].lower() except KeyError: try: comp = xml_d_obj.attr['type'].lower() except KeyError: comp = 'hx' try: t = xml_d_obj.attr['type'].lower() if comp == 'hx': setattr(self.FieldNotes.Magnetometer_hx, 'type', t) elif comp == 'hy': setattr(self.FieldNotes.Magnetometer_hy, 'type', t) elif comp == 'hz': setattr(self.FieldNotes.Magnetometer_hz, 'type', t) elif comp == 'fluxgate': setattr(self.FieldNotes.Magnetometer_hx, 'type', t) setattr(self.FieldNotes.Magnetometer_hy, 'type', t) setattr(self.FieldNotes.Magnetometer_hz, 'type', t) else: pass except KeyError: pass for m_attr in list(xml_d_obj.__dict__.keys()): if m_attr in ['_name', '_attr', '_value']: continue m_obj = getattr(xml_obj.FieldNotes.Magnetometer, m_attr) name = m_obj.name.lower() value = m_obj.value if comp == 'hx': setattr(self.FieldNotes.Magnetometer_hx, name, value) elif comp == 'hy': setattr(self.FieldNotes.Magnetometer_hy, name, value) elif comp == 'hz': setattr(self.FieldNotes.Magnetometer_hz, name, value) elif comp == 'fluxgate': setattr(self.FieldNotes.Magnetometer_hx, name, value) setattr(self.FieldNotes.Magnetometer_hy, name, value) setattr(self.FieldNotes.Magnetometer_hz, name, value) elif 'dataquality' in f_attr.lower(): obj = getattr(xml_obj.FieldNotes, f_attr) for d_attr in list(obj.__dict__.keys()): if d_attr in ['_name', '_attr', '_value']: continue d_obj = getattr(obj, d_attr) name = d_obj.name.lower() if name == 'goodfromperiod': name = 'good_from_period' elif name == 'goodtoperiod': name = 'good_to_period' elif name == 'comments': setattr(self.FieldNotes.DataQuality, 'author', d_obj.attr['author']) if name == 'comments' and \ f_attr.lower() == 'dataqualitywarnings': name = 'warnings_' + name value = d_obj.value setattr(self.FieldNotes.DataQuality, name, value) def _xml_get_copyright(self, xml_obj): """ get copyright information """ for f_attr in list(xml_obj.Copyright.__dict__.keys()): if f_attr in ['_name', '_attr', '_value']: continue if f_attr.lower() == 'citation': for i_attr in list(xml_obj.Copyright.Citation.__dict__.keys()): if i_attr in ['_name', '_attr', '_value']: continue i_obj = getattr(xml_obj.Copyright.Citation, i_attr) name = i_obj.name.lower() value = i_obj.value setattr(self.Copyright.Citation, name, value) else: obj = getattr(xml_obj.Copyright, f_attr) name = obj.name.lower() if name == 'releasestatus': name = 'release_status' elif name == 'conditionsofuse': name = 'conditions_of_use' elif name == 'additionalinfo': name = 'additional_info' value = obj.value.replace('\n', '') setattr(self.Copyright, name, value) def _xml_get_provenance(self, xml_obj): """ get provenance infor """ for f_attr in list(xml_obj.Provenance.__dict__.keys()): if f_attr in ['_name', '_attr', '_value']: continue if f_attr.lower() in ['creator', 'submitter']: obj = getattr(xml_obj.Provenance, f_attr) s_obj = getattr(self.Provenance, f_attr) for i_attr in list(obj.__dict__.keys()): if i_attr in ['_name', '_attr', '_value']: continue i_obj = getattr(obj, i_attr) name = i_obj.name.lower() value = i_obj.value setattr(s_obj, name, value) else: obj = getattr(xml_obj.Provenance, f_attr) name = obj.name.lower() if name == 'creationtime': name = 'creation_time' elif name == 'creatingapplication': name = 'creating_application' value = obj.value setattr(self.Provenance, name, value) def _xml_get_processing(self, xml_obj): """ get processing info """ for f_attr in list(xml_obj.ProcessingInfo.__dict__.keys()): if f_attr in ['_name', '_attr', '_value']: continue if 'software' in f_attr.lower(): obj = getattr(xml_obj.ProcessingInfo, f_attr) for i_attr in list(obj.__dict__.keys()): if i_attr in ['_name', '_attr', '_value']: continue i_obj = getattr(obj, i_attr) name = i_obj.name.lower() if name == 'lastmod': name = 'last_modified' value = i_obj.value if name == 'author': value = Person() value.name = i_obj.value setattr(self.Processing.Software, name, value) elif 'remoteinfo' in f_attr.lower(): obj = getattr(xml_obj.ProcessingInfo, f_attr) for i_attr in list(obj.__dict__.keys()): if i_attr in ['_name', '_attr', '_value']: continue if i_attr.lower() == 'location': loc_obj = getattr(obj, i_attr) for l_attr in list(loc_obj.__dict__.keys()): if l_attr in ['_name', '_attr', '_value']: continue l_obj = getattr(loc_obj, l_attr) name = l_obj.name.lower() value = l_obj.value setattr(self.Processing.RemoteSite.Location, name, value) else: i_obj = getattr(obj, i_attr) name = i_obj.name.lower() if name == 'yearcollected': name = 'year_collected' value = i_obj.value setattr(self.Processing.RemoteSite, name, value) else: obj = getattr(xml_obj.ProcessingInfo, f_attr) name = obj.name.lower() if name == 'signconvention': name = 'sign_convention' elif name == 'remoteref': name = 'remote_ref' elif name == 'processedby': name = 'processed_by' elif name == 'processingtag': name = 'processing_tag' value = obj.value setattr(self.Processing, name, value) def _write_xml_file(self, xml_fn, new_Z=None, new_Tipper=None): """ Write a xml file. """ if new_Z is not None: self.Z = new_Z if new_Tipper is not None: self.Tipper = new_Tipper xml_obj = MTxml.MT_XML() xml_obj.Attachment.Filename.value = os.path.basename(self.fn) xml_obj.PrimaryData.Filename.value = os.path.basename(self.fn)[:-4]+'.png' xml_obj.ProductId.value = '{0}.{1}'.format(self.station.upper(), self.Site.year_collected) xml_obj.Z = self.Z xml_obj.Tipper = self.Tipper xml_obj = self._xml_set_provenance(xml_obj) xml_obj = self._xml_set_copyright(xml_obj) xml_obj = self._xml_set_site(xml_obj) xml_obj = self._xml_set_field_notes(xml_obj) xml_obj = self._xml_set_processing(xml_obj) xml_obj = self._xml_set_site_layout(xml_obj) xml_obj.write_xml_file(xml_fn) def _xml_set_site(self, xml_obj): """ set the Site attributes in the xml object """ xml_obj.Site.Project.value = self.Site.project if type(self.Site.survey) is list: xml_obj.Site.Survey.value = ','.join(self.Site.survey) else: xml_obj.Site.Survey.value = self.Site.survey xml_obj.Site.Id.value = self.Site.id xml_obj.Site.AcquiredBy.value = self.Site.acquired_by xml_obj.Site.Start.value = self.Site.start_date xml_obj.Site.End.value = self.Site.end_date xml_obj.Site.RunList.value = self.Site.run_list xml_obj.Site.Orientation.value = 'geomagnetic' try: xml_obj.Site.Orientation.attr = {'angle_to_geographic_north':\ '{0:.2f}'.format(self.Site.Location.declination)} except ValueError: xml_obj.Site.Orientation.attr = {'angle_to_geographic_north': '0.00'} xml_obj.Site.Location.Latitude.value = self.lat xml_obj.Site.Location.Longitude.value = self.lon xml_obj.Site.Location.Elevation.value = self.elev xml_obj.Site.Location.Elevation.attr = { 'units': self.Site.Location.elev_units} xml_obj.Site.Location.Declination.value = self.Site.Location.declination xml_obj.Site.Location.Declination.attr = { 'epoch': self.Site.Location.declination_epoch} return xml_obj def _xml_set_field_notes(self, xml_obj): """ Set the FieldNotes attributes of the xml object """ xml_obj.FieldNotes.Instrument.Type.value = self.FieldNotes.DataLogger.type xml_obj.FieldNotes.Instrument.Id.value = self.FieldNotes.DataLogger.id xml_obj.FieldNotes.Instrument.Manufacturer.value = self.FieldNotes.DataLogger.manufacturer # EX xml_obj.FieldNotes.Dipole.Type.value = self.FieldNotes.Electrode_ex.type xml_obj.FieldNotes.Dipole.Id.value = self.FieldNotes.Electrode_ex.id xml_obj.FieldNotes.Dipole.Manufacturer.value = self.FieldNotes.Electrode_ex.manufacturer xml_obj.FieldNotes.Dipole.attr = {'name': 'EX'} length = np.sqrt((self.FieldNotes.Electrode_ex.x2 - self.FieldNotes.Electrode_ex.x) ** 2 + (self.FieldNotes.Electrode_ex.y2 - self.FieldNotes.Electrode_ex.y) ** 2) xml_obj.FieldNotes.Dipole.Length.value = length try: azm = np.arctan((self.FieldNotes.Electrode_ex.y2 - self.FieldNotes.Electrode_ex.y) / (self.FieldNotes.Electrode_ex.x2 - self.FieldNotes.Electrode_ex.x)) except ZeroDivisionError: azm = 0.0 xml_obj.FieldNotes.Dipole.Azimuth.value = np.degrees(azm) xml_obj.FieldNotes.Dipole.Channel.value = self.FieldNotes.Electrode_ex.acqchan # EY xml_obj.FieldNotes.Dipole_00.Type.value = self.FieldNotes.Electrode_ey.type xml_obj.FieldNotes.Dipole_00.Id.value = self.FieldNotes.Electrode_ey.id xml_obj.FieldNotes.Dipole_00.Manufacturer.value = self.FieldNotes.Electrode_ey.manufacturer xml_obj.FieldNotes.Dipole_00.attr = {'name': 'EY'} length = np.sqrt((self.FieldNotes.Electrode_ey.x2 - self.FieldNotes.Electrode_ey.x) ** 2 + (self.FieldNotes.Electrode_ey.y2 - self.FieldNotes.Electrode_ey.y) ** 2) xml_obj.FieldNotes.Dipole_00.Length.value = length try: azm = np.arctan((self.FieldNotes.Electrode_ey.y2 - self.FieldNotes.Electrode_ey.y) / (self.FieldNotes.Electrode_ey.x2 - self.FieldNotes.Electrode_ey.x)) except ZeroDivisionError: azm = 90.0 xml_obj.FieldNotes.Dipole_00.Azimuth.value = np.degrees(min([np.pi/2, azm])) xml_obj.FieldNotes.Dipole_00.Channel.value = self.FieldNotes.Electrode_ey.acqchan # HX xml_obj.FieldNotes.Magnetometer.Type.value = self.FieldNotes.Magnetometer_hx.type xml_obj.FieldNotes.Magnetometer.Id.value = self.FieldNotes.Magnetometer_hx.id xml_obj.FieldNotes.Magnetometer.Manufacturer.value = self.FieldNotes.Magnetometer_hx.manufacturer xml_obj.FieldNotes.Magnetometer.attr = {'name': 'HX'} xml_obj.FieldNotes.Magnetometer.Azimuth.value = self.FieldNotes.Magnetometer_hx.azm xml_obj.FieldNotes.Magnetometer.Channel.value = self.FieldNotes.Magnetometer_hx.acqchan # HY xml_obj.FieldNotes.Magnetometer_00.Type.value = self.FieldNotes.Magnetometer_hy.type xml_obj.FieldNotes.Magnetometer_00.Id.value = self.FieldNotes.Magnetometer_hy.id xml_obj.FieldNotes.Magnetometer_00.Manufacturer.value = self.FieldNotes.Magnetometer_hy.manufacturer xml_obj.FieldNotes.Magnetometer_00.attr = {'name': 'HY'} xml_obj.FieldNotes.Magnetometer_00.Azimuth.value = self.FieldNotes.Magnetometer_hy.azm xml_obj.FieldNotes.Magnetometer_00.Channel.value = self.FieldNotes.Magnetometer_hy.acqchan # HZ xml_obj.FieldNotes.Magnetometer_01.Type.value = self.FieldNotes.Magnetometer_hz.type xml_obj.FieldNotes.Magnetometer_01.Id.value = self.FieldNotes.Magnetometer_hz.id xml_obj.FieldNotes.Magnetometer_01.Manufacturer.value = self.FieldNotes.Magnetometer_hz.manufacturer xml_obj.FieldNotes.Magnetometer_01.attr = {'name': 'HZ'} xml_obj.FieldNotes.Magnetometer_01.Azimuth.value = self.FieldNotes.Magnetometer_hz.azm xml_obj.FieldNotes.Magnetometer_01.Channel.value = self.FieldNotes.Magnetometer_hz.acqchan # Data Quality Notes xml_obj.FieldNotes.DataQualityNotes.Rating.value = self.FieldNotes.DataQuality.rating xml_obj.FieldNotes.DataQualityNotes.GoodFromPeriod.value = self.FieldNotes.DataQuality.good_from_period xml_obj.FieldNotes.DataQualityNotes.GoodToPeriod.value = self.FieldNotes.DataQuality.good_to_period xml_obj.FieldNotes.DataQualityNotes.Comments.value = self.FieldNotes.DataQuality.comments xml_obj.FieldNotes.DataQualityNotes.Comments.attr = { 'author': self.FieldNotes.DataQuality.author} # Data Quality Warnings xml_obj.FieldNotes.DataQualityWarnings.Flag.value = self.FieldNotes.DataQuality.warnings_flag xml_obj.FieldNotes.DataQualityWarnings.Comments.value = self.FieldNotes.DataQuality.warnings_comments xml_obj.FieldNotes.DataQualityWarnings.Comments.attr = { 'author': self.FieldNotes.DataQuality.author} return xml_obj def _xml_set_processing(self, xml_obj): """ Set the Processing attributes of the xml object """ xml_obj.ProcessingInfo.ProcessedBy.value = self.Processing.processed_by xml_obj.ProcessingInfo.ProcessingSoftware.Name.value = self.Processing.Software.name xml_obj.ProcessingInfo.ProcessingSoftware.Author.value = self.Processing.Software.Author.name xml_obj.ProcessingInfo.ProcessingSoftware.Version.value = self.Processing.Software.version # TODO: Need to find a way to put in processing parameters. xml_obj.ProcessingInfo.SignConvention.value = self.Processing.sign_convention xml_obj.ProcessingInfo.RemoteRef.value = self.Processing.remote_reference xml_obj.ProcessingInfo.RemoteInfo.Project.value = self.Processing.RemoteSite.project xml_obj.ProcessingInfo.RemoteInfo.Survey.value = self.Processing.RemoteSite.survey xml_obj.ProcessingInfo.RemoteInfo.ID.value = self.Processing.RemoteSite.id xml_obj.ProcessingInfo.RemoteInfo.YearCollected.value = self.Processing.RemoteSite.year_collected xml_obj.ProcessingInfo.RemoteInfo.AcquiredBy.value = self.Processing.RemoteSite.acquired_by xml_obj.ProcessingInfo.RemoteInfo.Location.Latitude.value = self.Processing.RemoteSite.Location.latitude xml_obj.ProcessingInfo.RemoteInfo.Location.Longitude.value = self.Processing.RemoteSite.Location.longitude xml_obj.ProcessingInfo.RemoteInfo.Location.Elevation.value = self.Processing.RemoteSite.Location.elevation xml_obj.ProcessingInfo.RemoteInfo.Location.Elevation.attr = { 'units': self.Processing.RemoteSite.Location.elev_units} xml_obj.ProcessingInfo.RemoteInfo.Location.attr = { 'datum': self.Processing.RemoteSite.Location.datum} return xml_obj def _xml_set_provenance(self, xml_obj): """ Set the Provenance attributes of the xml object """ xml_obj.Provenance.CreatingApplication.value = 'MTpy 0.1.0' xml_obj.Provenance.Submitter.Name.value = self.Provenance.Submitter.name xml_obj.Provenance.Submitter.Email.value = self.Provenance.Submitter.email xml_obj.Provenance.Submitter.Org.value = self.Provenance.Submitter.organization xml_obj.Provenance.Submitter.OrgURL.value = self.Provenance.Submitter.organization_url xml_obj.Provenance.Creator.Name.value = self.Provenance.Creator.name xml_obj.Provenance.Creator.Email.value = self.Provenance.Creator.email xml_obj.Provenance.Creator.Org.value = self.Provenance.Creator.organization xml_obj.Provenance.Creator.OrgURL.value = self.Provenance.Creator.organization_url return xml_obj def _xml_set_copyright(self, xml_obj): """ Set the Copyright attributes of the xml object """ xml_obj.Copyright.Citation.Authors.value = self.Copyright.Citation.author xml_obj.Copyright.Citation.Title.value = self.Copyright.Citation.title xml_obj.Copyright.Citation.Year.value = self.Copyright.Citation.year xml_obj.Copyright.Citation.Journal.value = self.Copyright.Citation.journal xml_obj.Copyright.Citation.Volume.value = self.Copyright.Citation.volume xml_obj.Copyright.Citation.DOI.value = self.Copyright.Citation.doi if type(self.Copyright.conditions_of_use) is list: xml_obj.Copyright.ConditionsOfUse.value = ''.join(self.Copyright.conditions_of_use) else: xml_obj.Copyright.ConditionsOfUse.value = self.Copyright.conditions_of_use xml_obj.Copyright.ReleaseStatus.value = self.Copyright.release_status xml_obj.Copyright.AdditionalInfo.value = self.Copyright.additional_info return xml_obj def _xml_set_site_layout(self, xml_obj): """ set the site layout from define measurement """ xml_obj.SiteLayout.InputChannels.Magnetic_hx.attr = {'name':"Hx", 'orientation':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hx.azm), 'x':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hx.x), 'y':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hx.y), 'z':'0.00'} xml_obj.SiteLayout.InputChannels.Magnetic_hy.attr = {'name':"Hy", 'orientation':'{0:.2f}'.format(max([90, self.FieldNotes.Magnetometer_hy.azm])), 'x':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hy.x), 'y':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hy.y), 'z':'0.00'} xml_obj.SiteLayout.OutputChannels.Magnetic_hz.attr = {'name':"Hz", 'orientation':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hz.azm), 'x':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hz.x), 'y':'{0:.2f}'.format(self.FieldNotes.Magnetometer_hz.y), 'z':'0.00'} try: azm = np.arctan((self.FieldNotes.Electrode_ex.y2 - self.FieldNotes.Electrode_ex.y) / (self.FieldNotes.Electrode_ex.x2 - self.FieldNotes.Electrode_ex.x)) except ZeroDivisionError: azm = 90.0 xml_obj.SiteLayout.OutputChannels.Electric_ex.attr = {'name':"Ex", 'orientation':'{0:.2f}'.format(azm), 'x':'{0:.2f}'.format(self.FieldNotes.Electrode_ex.x), 'y':'{0:.2f}'.format(self.FieldNotes.Electrode_ex.y), 'z':'0.00', 'x2':'{0:.2f}'.format(self.FieldNotes.Electrode_ex.x2), 'y2':'{0:.2f}'.format(self.FieldNotes.Electrode_ex.y2), 'z2':'0.00'} try: azm = np.arctan((self.FieldNotes.Electrode_ey.y2 - self.FieldNotes.Electrode_ey.y) / (self.FieldNotes.Electrode_ey.x2 - self.FieldNotes.Electrode_ey.x)) except ZeroDivisionError: azm = 90.0 xml_obj.SiteLayout.OutputChannels.Electric_ey.attr = {'name':"Ey", 'orientation':'{0:.2f}'.format(azm), 'x':'{0:.2f}'.format(self.FieldNotes.Electrode_ey.x), 'y':'{0:.2f}'.format(self.FieldNotes.Electrode_ey.y), 'z':'0.00', 'x2':'{0:.2f}'.format(self.FieldNotes.Electrode_ey.x2), 'y2':'{0:.2f}'.format(self.FieldNotes.Electrode_ey.y2), 'z2':'0.00'} return xml_obj
[docs] def read_cfg_file(self, cfg_fn): """ Read in a configuration file and populate attributes accordingly. The configuration file should be in the form: | Site.Location.latitude = 46.5 | Site.Location.longitude = 122.7 | Site.Location.datum = 'WGS84' | | Processing.Software.name = BIRRP | Processing.Software.version = 5.2.1 | | Provenance.Creator.name = L. Cagniard | Provenance.Submitter.name = I. Larionov :param cfg_fn: full path to configuration file :type cfg_fn: string .. note:: The best way to make a configuration file would be to save a configuration file first from MT, then filling in the fields. :Make configuration file: :: >>> import mtpy.core.mt as mt >>> mt_obj = mt.MT() >>> mt_obj.write_cfg_file(r"/mt/generic_config.cfg") :Read in configuration file: :: >>> import mtpy.core.mt as mt >>> mt_obj = mt.MT() >>> mt_obj.read_cfg_file(r"/home/mt/survey_config.cfg") """ with open(cfg_fn, 'r') as fid: cfg_lines = fid.readlines() for line in cfg_lines: if line[0] == '#': continue line_list = line.strip().split('=') if len(line) < 2: continue else: name_list = line_list[0].strip().split('.') cl_name = name_list[0] if cl_name.lower() == 'fieldnotes': cl_name = 'FieldNotes' else: cl_name = cl_name.capitalize() cl_obj = getattr(self, cl_name) cl_attr = name_list[-1] cl_value = line_list[1].strip() if cl_value in ['None', 'none']: cl_value = None try: cl_value = float(cl_value) except ValueError: if cl_value.find(',') >= 0: cl_value = cl_value.replace('[', '').replace(']', '') cl_value = cl_value.strip().split(',') try: cl_value = [float(vv) for vv in cl_value] except ValueError: pass except TypeError: cl_value = None count = 1 while count < len(name_list) - 1: cl_name = name_list[count].lower() if cl_name == 'dataquality': cl_name = 'DataQuality' elif cl_name == 'datalogger': cl_name = 'DataLogger' elif cl_name == 'remotesite': cl_name = 'RemoteSite' try: cl_obj = getattr(cl_obj, cl_name) except AttributeError: try: cl_obj = getattr(cl_obj, cl_name.capitalize()) cl_name = cl_name.capitalize() except AttributeError: print('Could not get {0}'.format(cl_name)) count += 1 setattr(cl_obj, cl_attr, cl_value)
[docs] def write_cfg_file(self, cfg_fn): """ Write a configuration file for the MT sections :param cfg_fn: full path to configuration file to write to :type cfg_fn: string :return cfg_fn: full path to configuration file :rtype cfg_fn: string :Write configuration file: :: >>> import mtpy.core.mt as mt >>> mt_obj = mt.MT() >>> mt_obj.read_mt_file(r"/home/mt/edi_files/mt01.edi") >>> mt_obj.write_cfg_file(r"/home/mt/survey_config.cfg") """ cfg_lines = [] for obj_name in sorted(['Site', 'FieldNotes', 'Provenance', 'Processing', 'Copyright']): obj = getattr(self, obj_name) l_key = obj_name for obj_key in sorted(obj.__dict__.keys()): obj_attr = getattr(obj, obj_key) l_key = '{0}.{1}'.format(obj_name, obj_key) if not isinstance(obj_attr, (str, float, int, list)) and \ obj_attr is not None: for a_key in sorted(obj_attr.__dict__.keys()): if a_key in ['_kw_list', '_fmt_list', '_logger']: continue obj_attr_01 = getattr(obj_attr, a_key) l_key = '{0}.{1}.{2}'.format(obj_name, obj_key, a_key) if not isinstance(obj_attr_01, (str, float, int, list, np.float64)) and \ obj_attr_01 is not None: for b_key in sorted(obj_attr_01.__dict__.keys()): obj_attr_02 = getattr(obj_attr_01, b_key) l_key = '{0}.{1}.{2}.{3}'.format(obj_name, obj_key, a_key, b_key) cfg_lines.append('{0} = {1}'.format(l_key, obj_attr_02)) else: cfg_lines.append('{0} = {1}'.format(l_key, obj_attr_01)) else: cfg_lines.append('{0} = {1}'.format(l_key, obj_attr)) cfg_lines.append('') with open(cfg_fn, 'w') as fid: fid.write('\n'.join(cfg_lines)) print('--> Wrote MT configuration file to {0}'.format(cfg_fn)) return cfg_fn
[docs] def remove_distortion(self, num_freq=None): """ remove distortion following Bibby et al. [2005]. :param num_freq: number of frequencies to look for distortion from the highest frequency :type num_freq: int :returns: Distortion matrix :rtype: np.ndarray(2, 2, dtype=real) :returns: Z with distortion removed :rtype: mtpy.core.z.Z :Remove distortion and write new .edi file: :: >>> import mtpy.core.mt as mt >>> mt1 = mt.MT(fn=r"/home/mt/edi_files/mt01.edi") >>> D, new_z = mt1.remove_distortion() >>> mt1.write_mt_file(new_fn=r"/home/mt/edi_files/mt01_dr.edi",\ >>> new_Z=new_z) """ dummy_z_obj = MTz.copy.deepcopy(self.Z) D, new_z_object = MTdistortion.remove_distortion(z_object=dummy_z_obj, num_freq=num_freq) return D, new_z_object
[docs] def remove_static_shift(self, ss_x=1.0, ss_y=1.0): """ Remove static shift from the apparent resistivity Assume the original observed tensor Z is built by a static shift S and an unperturbated "correct" Z0 : * Z = S * Z0 therefore the correct Z will be : * Z0 = S^(-1) * Z :param ss_x: correction factor for x component :type ss_x: float :param ss_y: correction factor for y component :type ss_y: float :returns: new Z object with static shift removed :rtype: mtpy.core.z.Z .. note:: The factors are in resistivity scale, so the entries of the matrix "S" need to be given by their square-roots! :Remove Static Shift: :: >>> import mtpy.core.mt as mt >>> mt_obj = mt.MT(r"/home/mt/mt01.edi") >>> new_z_obj = mt.remove_static_shift(ss_x=.5, ss_y=1.2) >>> mt_obj.write_mt_file(new_fn=r"/home/mt/mt01_ss.edi", >>> ... new_Z_obj=new_z_obj) """ s_array, new_z = self.Z.remove_ss(reduce_res_factor_x=ss_x, reduce_res_factor_y=ss_y) new_z_obj = MTz.Z(z_array=new_z, z_err_array=self.Z.z_err.copy(), freq=self.Z.freq.copy()) return new_z_obj
[docs] def interpolate(self, new_freq_array, interp_type='slinear', bounds_error=True, period_buffer=None): """ Interpolate the impedance tensor onto different frequencies :param new_freq_array: a 1-d array of frequencies to interpolate on to. Must be with in the bounds of the existing frequency range, anything outside and an error will occur. :type new_freq_array: np.ndarray :param period_buffer: maximum ratio of a data period and the closest interpolation period. Any points outside this ratio will be excluded from the interpolated impedance array. :returns: a new impedance object with the corresponding frequencies and components. :rtype: mtpy.core.z.Z :returns: a new tipper object with the corresponding frequencies and components. :rtype: mtpy.core.z.Tipper :Interpolate: :: >>> import mtpy.core.mt as mt >>> edi_fn = r"/home/edi_files/mt_01.edi" >>> mt_obj = mt.MT(edi_fn) >>> # create a new frequency range to interpolate onto >>> new_freq = np.logspace(-3, 3, 24) >>> new_z_object, new_tipper_obj = mt_obj.interpolate(new_freq) >>> mt_obj.write_mt_file(new_fn=r"/home/edi_files/mt_01_interp.edi", >>> ... new_Z_obj=new_z_object, >>> ... new_Tipper_obj=new_tipper_object) """ # if the interpolation module has not been loaded return if interp_import is False: raise ImportError('could not interpolate, need to install scipy') # make sure the input is a numpy array if not isinstance(new_freq_array, np.ndarray): new_freq_array = np.array(new_freq_array) if period_buffer is not None: if 0. < period_buffer < 1.: period_buffer += 1. print("Warning: period buffer must be > 1. Updating to",period_buffer) # check the bounds of the new frequency array if bounds_error: # YG: the commented block below seems no longer necessary. # floater = 1.e-8 # FZ: a small offset to avoid out-of-bound error in spi interpolation module. # self._logger.info("massage the new_freq_array's min and max to avoid out-of-bound interp") # minindex = np.argmin(new_freq_array) # maxindex = np.argmax(new_freq_array) # new_freq_array[minindex] += floater # new_freq_array[maxindex] -= floater # logger.debug("new freq array %s", new_freq_array) if self.Z.freq.min() > new_freq_array.min(): raise ValueError('New frequency minimum of {0:.5g}'.format(new_freq_array.min()) + \ ' is smaller than old frequency minimum of {0:.5g}'.format(self.Z.freq.min()) + \ '. The new frequency range needs to be within the ' + 'bounds of the old one.') if self.Z.freq.max() < new_freq_array.max(): raise ValueError('New frequency maximum of {0:.5g}'.format(new_freq_array.max()) + \ 'is smaller than old frequency maximum of {0:.5g}'.format(self.Z.freq.max()) + \ '. The new frequency range needs to be within the ' + 'bounds of the old one.') # make a new Z object new_Z = MTz.Z(z_array=np.zeros((new_freq_array.shape[0], 2, 2), dtype='complex'), z_err_array=np.zeros((new_freq_array.shape[0], 2, 2)), freq=new_freq_array) new_Tipper = MTz.Tipper(tipper_array=np.zeros((new_freq_array.shape[0], 1, 2), dtype='complex'), tipper_err_array=np.zeros( (new_freq_array.shape[0], 1, 2)), freq=new_freq_array) # interpolate the impedance tensor for ii in range(2): for jj in range(2): # need to look out for zeros in the impedance # get the indicies of non-zero components nz_index = np.nonzero(self.Z.z[:, ii, jj]) if len(nz_index[0]) == 0: continue # get the non-zero components z_real = self.Z.z[nz_index, ii, jj].real z_imag = self.Z.z[nz_index, ii, jj].imag z_err = self.Z.z_err[nz_index, ii, jj] # get the frequencies of non-zero components f = self.Z.freq[nz_index] # get frequencies to interpolate on to, making sure the # bounds are with in non-zero components new_nz_index = np.where((new_freq_array >= f.min()) & (new_freq_array <= f.max()))[0] new_f = new_freq_array[new_nz_index] # apply period buffer if type(period_buffer) in [float, int]: new_f_update = [] new_nz_index_update = [] for ifidx,ifreq in enumerate(new_f): # find nearest data period difference = np.abs(np.log10(ifreq) - np.log10(f)) fidx = np.where(difference == np.amin(difference))[0][0] if max(f[fidx] / ifreq, ifreq / f[fidx]) < period_buffer: new_f_update.append(ifreq) new_nz_index_update.append(new_nz_index[ifidx]) new_f = np.array(new_f_update) new_nz_index = np.array(new_nz_index_update) # create a function that does 1d interpolation z_func_real = spi.interp1d(f, z_real, kind=interp_type) z_func_imag = spi.interp1d(f, z_imag, kind=interp_type) z_func_err = spi.interp1d(f, z_err, kind=interp_type) # interpolate onto new frequency range new_Z.z[new_nz_index, ii, jj] = z_func_real( new_f) + 1j * z_func_imag(new_f) new_Z.z_err[new_nz_index, ii, jj] = z_func_err(new_f) # compute resistivity and phase for new Z object new_Z.compute_resistivity_phase() # if there is not tipper than skip if self.Tipper.tipper is None: return new_Z, new_Tipper # interpolate the Tipper for jj in range(2): # get indicies of non-zero components nz_index = np.nonzero(self.Tipper.tipper[:, 0, jj]) if len(nz_index[0]) == 0: continue # get non-zero components t_real = self.Tipper.tipper[nz_index, 0, jj].real t_imag = self.Tipper.tipper[nz_index, 0, jj].imag t_err = self.Tipper.tipper_err[nz_index, 0, jj] # get frequencies for non-zero components f = self.Tipper.freq[nz_index] # create interpolation functions t_func_real = spi.interp1d(f, t_real, kind=interp_type) t_func_imag = spi.interp1d(f, t_imag, kind=interp_type) t_func_err = spi.interp1d(f, t_err, kind=interp_type) # get new frequency to interpolate over, making sure bounds are # for non-zero components new_nz_index = np.where((new_freq_array >= f.min()) & (new_freq_array <= f.max())) new_f = new_freq_array[new_nz_index] # interpolate onto new frequency range new_Tipper.tipper[new_nz_index, 0, jj] = t_func_real(new_f) + \ 1j * t_func_imag(new_f) new_Tipper.tipper_err[new_nz_index, 0, jj] = t_func_err(new_f) new_Tipper.compute_mag_direction() return new_Z, new_Tipper
[docs] def plot_mt_response(self, **kwargs): """ Returns a mtpy.imaging.plotresponse.PlotResponse object :Plot Response: :: >>> mt_obj = mt.MT(edi_file) >>> pr = mt.plot_mt_response() >>> # if you need more info on plot_mt_response >>> help(pr) """ from mtpy.imaging import plot_mt_response # todo change this to the format of the new imaging API plot_obj = plot_mt_response.PlotMTResponse(z_object=self.Z, t_object=self.Tipper, pt_obj=self.pt, station=self.station, **kwargs) return plot_obj
# raise NotImplementedError # ============================================================================== # Site details # ==============================================================================
[docs]class Site(object): """ Information on the site, including location, id, etc. Holds the following information: ================= =========== ============================================= Attributes Type Explanation ================= =========== ============================================= aqcuired_by string name of company or person whom aqcuired the data. id string station name Location object Holds location information, lat, lon, elev Location datum, easting, northing see Location class start_date string YYYY-MM-DD start date of measurement end_date string YYYY-MM-DD end date of measurement year_collected string year data collected survey string survey name project string project name run_list string list of measurment runs ex. [mt01a, mt01b] ================= =========== ============================================= More attributes can be added by inputing a key word dictionary >>> Site(**{'state':'Nevada', 'Operator':'MTExperts'}) """ def __init__(self, **kwargs): self.acquired_by = None self._end_dt = None self.id = None self.Location = Location() self.project = None self.run_list = None self._start_dt = None self.survey = None self._date_fmt = '%Y-%m-%d' for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) @property def year_collected(self): try: return self._start_dt.year except TypeError: return None @year_collected.setter def year_collected(self, value): pass @property def start_date(self): try: return self._start_dt.strftime(self._date_fmt) except AttributeError: return None @start_date.setter def start_date(self, start_str): self._start_dt = self._read_date(start_str) @property def end_date(self): try: return self._end_dt.strftime(self._date_fmt) except AttributeError: return None @end_date.setter def end_date(self, end_str): self._end_dt = self._read_date(end_str) def _read_date(self, date_str): """ read a date string """ if date_str in [None, 'None', 'none', 'NONE']: return None try: return dt_parser.parse(date_str) except dt_parser.ParserError: try: return dt_parser.parse(date_str, dayfirst=True) except dt_parser.ParserError as error: raise ValueError(error)
# ============================================================================== # Location class, be sure to put locations in decimal degrees, and note datum # ==============================================================================
[docs]class Location(object): """ location details """ def __init__(self, **kwargs): self.datum = 'WGS84' self.declination = None self.declination_epoch = None self._elevation = None self._latitude = None self._longitude = None self._northing = None self._easting = None self.utm_zone = None self.elev_units = 'm' self.coordinate_system = 'Geographic North' for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) @property def latitude(self): return self._latitude @latitude.setter def latitude(self, lat): self._latitude = gis_tools.assert_lat_value(lat) @property def longitude(self): return self._longitude @longitude.setter def longitude(self, lon): self._longitude = gis_tools.assert_lon_value(lon) @property def elevation(self): return self._elevation @elevation.setter def elevation(self, elev): self._elevation = gis_tools.assert_elevation_value(elev) @property def easting(self): return self._easting @easting.setter def easting(self, easting): try: self._easting = float(easting) except TypeError: self._easting = None @property def northing(self): return self._northing @northing.setter def northing(self, northing): try: self._northing = float(northing) except TypeError: self._northing = None
[docs] def project_location2utm(self): """ project location coordinates into meters given the reference ellipsoid, for now that is constrained to WGS84 Returns East, North, Zone """ utm_point = gis_tools.project_point_ll2utm(self._latitude, self._longitude, datum=self.datum) self.easting = utm_point[0] self.northing = utm_point[1] self.utm_zone = utm_point[2]
[docs] def project_location2ll(self): """ project location coordinates into meters given the reference ellipsoid, for now that is constrained to WGS84 Returns East, North, Zone """ ll_point = gis_tools.project_point_utm2ll(self.easting, self.northing, self.utm_zone, datum=self.datum) self.latitude = ll_point[0] self.longitude = ll_point[1]
# ============================================================================== # Field Notes # ==============================================================================
[docs]class FieldNotes(object): """ Field note information. Holds the following information: ================= =========== ============================================= Attributes Type Explanation ================= =========== ============================================= data_quality DataQuality notes on data quality electrode Instrument type of electrode used data_logger Instrument type of data logger magnetometer Instrument type of magnetotmeter ================= =========== ============================================= More attributes can be added by inputing a key word dictionary >>> FieldNotes(**{'electrode_ex':'Ag-AgCl 213', 'magnetometer_hx':'102'}) """ def __init__(self, **kwargs): null_emeas = MTedi.EMeasurement() null_hmeas = MTedi.HMeasurement() self.DataQuality = DataQuality() self.DataLogger = Instrument() self.Electrode_ex = Instrument(**null_emeas.__dict__) self.Electrode_ey = Instrument(**null_emeas.__dict__) self.Magnetometer_hx = Instrument(**null_hmeas.__dict__) self.Magnetometer_hy = Instrument(**null_hmeas.__dict__) self.Magnetometer_hz = Instrument(**null_hmeas.__dict__) self.Electrode_ex.chtype = 'ex' self.Electrode_ey.chtype = 'ey' self.Magnetometer_hx.chtype = 'hx' self.Magnetometer_hy.chtype = 'hy' self.Magnetometer_hz.chtype = 'hz' for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
# ============================================================================== # Instrument # ==============================================================================
[docs]class Instrument(object): """ Information on an instrument that was used. Holds the following information: ================= =========== ============================================= Attributes Type Explanation ================= =========== ============================================= id string serial number or id number of data logger manufacturer string company whom makes the instrument type string Broadband, long period, something else ================= =========== ============================================= More attributes can be added by inputing a key word dictionary >>> Instrument(**{'ports':'5', 'gps':'time_stamped'}) """ def __init__(self, **kwargs): self.id = None self.manufacturer = None self.type = None for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
# ============================================================================== # Data Quality # ==============================================================================
[docs]class DataQuality(object): """ Information on data quality. Holds the following information: ================= =========== ============================================= Attributes Type Explanation ================= =========== ============================================= comments string comments on data quality good_from_period float minimum period data are good good_to_period float maximum period data are good rating int [1-5]; 1 = poor, 5 = excellent warrning_comments string any comments on warnings in the data warnings_flag int [0-#of warnings] ================= =========== ============================================= More attributes can be added by inputing a key word dictionary >>>DataQuality(**{'time_series_comments':'Periodic Noise'}) """ def __init__(self, **kwargs): self.comments = None self.good_from_period = None self.good_to_period = None self.rating = None self.warnings_comments = None self.warnings_flag = 0 self.author = None for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
# ============================================================================== # Citation # ==============================================================================
[docs]class Citation(object): """ Information for a citation. Holds the following information: ================= =========== ============================================= Attributes Type Explanation ================= =========== ============================================= author string Author names title string Title of article, or publication journal string Name of journal doi string DOI number (doi:10.110/sf454) year int year published ================= =========== ============================================= More attributes can be added by inputing a key word dictionary >>> Citation(**{'volume':56, 'pages':'234--214'}) """ def __init__(self, **kwargs): self.author = None self.title = None self.journal = None self.volume = None self.doi = None self.year = None for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
# ============================================================================== # Copyright # ============================================================================== # ============================================================================== # Provenance # ==============================================================================
[docs]class Provenance(object): """ Information of the file history, how it was made Holds the following information: ====================== =========== ======================================== Attributes Type Explanation ====================== =========== ======================================== creation_time string creation time of file YYYY-MM-DD,hh:mm:ss creating_application string name of program creating the file creator Person person whom created the file submitter Person person whom is submitting file for archiving ====================== =========== ======================================== More attributes can be added by inputing a key word dictionary >>> Provenance(**{'archive':'IRIS', 'reprocessed_by':'grad_student'}) """ def __init__(self, **kwargs): self.creation_time = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime()) self.creating_application = 'MTpy' self.Creator = Person() self.Submitter = Person() for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
# ============================================================================== # Person # ==============================================================================
[docs]class Person(object): """ Information for a person Holds the following information: ================= =========== ============================================= Attributes Type Explanation ================= =========== ============================================= email string email of person name string name of person organization string name of person's organization organization_url string organizations web address ================= =========== ============================================= More attributes can be added by inputing a key word dictionary >>> Person(**{'phone':'650-888-6666'}) """ def __init__(self, **kwargs): self.email = None self.name = None self.organization = None self.organization_url = None for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
# ============================================================================== # Processing # ==============================================================================
[docs]class Processing(object): """ Information for a processing Holds the following information: ================= =========== ============================================= Attributes Type Explanation ================= =========== ============================================= email string email of person name string name of person organization string name of person's organization organization_url string organizations web address ================= =========== ============================================= More attributes can be added by inputing a key word dictionary >>> Person(**{'phone':'888-867-5309'}) """ def __init__(self, **kwargs): self.Software = Software() self.notes = None self.processed_by = None self.sign_convention = 'exp(+i \omega t)' self.remote_reference = None self.RemoteSite = Site() for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
[docs]class Software(object): """ software """ def __init__(self, **kwargs): self.name = None self.version = None self.Author = Person() for key in kwargs: setattr(self, key, kwargs[key])
# ============================================================================== # Error # ==============================================================================
[docs]class MTError(Exception): pass