Source code for mtpy.core.mt_xml

# -*- coding: utf-8 -*-
"""
.. module:: mt_xml
   :synopsis: Deal with XML MT files 

.. moduleauthor:: Jared Peacock <jpeacock@usgs.gov>

.. note:: This module is written to align with the tools written by
          Anna Kelbert <akelbert@usgs.gov>
"""

#==============================================================================
# Imports
#==============================================================================
import os
import datetime
import copy

import numpy as np
import xml.etree.cElementTree as ET
from xml.dom import minidom

import mtpy.core.z as mtz

dt_fmt = '%Y-%m-%d %H:%M:%S'

#==============================================================================
# Generic object to hold information
#==============================================================================
[docs]class XML_element(object): """ Basically an ET element. The key components are * 'name' --> name of the element * 'attr' --> attribute information of the element * 'value' --> value of the element Used the property function here to be sure that these 3 cannot be set through the common k.value = 10, just in case there are similar names in the xml file. This seemed to be the safest to avoid those cases. """ def __init__(self, name, attr, value, **kwargs): self._name = name self._attr = attr self._value = value for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) @property def value(self): return self._value @value.setter def value(self, value): try: self._value = str(value) except ValueError: print('Cannot set {0} to string, set to None'.format(value)) self._value = None @property def attr(self): return self._attr @attr.setter def attr(self, attr): if type(attr) is not dict: raise ValueError('attr needs to be a dictionary, not {0}'.format(type(attr))) else: self._attr = attr @property def name(self): return self._name
#============================================================================== # General information #============================================================================== conditions_of_use = ''.join(['All data and metadata for this survey are ', 'available free of charge and may be copied ', 'freely, duplicated and further distributed ', 'provided this data set is cited as the ', 'reference. While the author(s) strive to ', 'provide data and metadata of best possible ', 'quality, neither the author(s) of this data ', 'set, not IRIS make any claims, promises, or ', 'guarantees about the accuracy, completeness, ', 'or adequacy of this information, and expressly ', 'disclaim liability for errors and omissions in ', 'the contents of this file. Guidelines about ', 'the quality or limitations of the data and ', 'metadata, as obtained from the author(s), are ', 'included for informational purposes only.']) estimates = [XML_element('Estimate', {'type':'real', 'name':'VAR'}, None, **{'Description':XML_element('Description', None, 'Variance'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Error Estimate'), 'Tag':XML_element('Tag', None, 'Variance')}), XML_element('Estimate', {'type':'complex', 'name':'COV'}, None, **{'Description':XML_element('Description', None, 'Full covariance between each two TF components'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Error Estimate'), 'Tag':XML_element('Tag', None, 'Covariance')}), XML_element('Estimate', {'type':'complex', 'name':'INVSIGCOV'}, None, **{'Description':XML_element('Description', None, 'Inverse Coherent Signal Power Matrix S'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Signal Power Estimate'), 'Tag':XML_element('Tag', None, 'inverse_signal_covariance')}), XML_element('Estimate',{'type':'complex', 'name':'RESIDCOV'}, None, **{'Description':XML_element('Description',None, 'Residual Covariance N'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Error Estimate'), 'Tag':XML_element('Tag', None, 'Coherence')}), XML_element('Estimate', {'type':'complex', 'name':'COH'}, None, **{'Description':XML_element('Description', None, 'Coherence'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Signal Coherence'), 'Tag':XML_element('Tag', None, 'Coherence')}), XML_element('Estimate', {'type':'complex', 'name':'PREDCOH'}, None, **{'Description':XML_element('Description', None, 'Multiple Coherence'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Signal Coherence'), 'Tag':XML_element('Tag', None, 'Multiple_Coherence')}), XML_element('Estimate', {'type':'complex', 'name':'SIGAMP'}, None, **{'Description':XML_element('Description', None, 'Signal Amplitude'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Signal Power Estimates'), 'Tag':XML_element('Tag', None, 'Signal_Amplitude')}), XML_element('Estimate', {'type':'complex', 'name':'SIGNOISE'}, None, **{'Description':XML_element('Description', None, 'Signal Noise'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'Error Estimates'), 'Tag':XML_element('Tag', None, 'Signal_Noise')})] data_types = [XML_element('DataType', {'units':'[mV/km]/[nT]', 'name':'Z', 'input':'H', 'output':'E'}, None, **{'Description':XML_element('Description', None, 'MT impedance'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'primary data type'), 'Tag':XML_element('Tag', None, 'impedance')}), XML_element('DataType', {'units':'[]', 'name':'T', 'input':'H', 'output':'H'}, None, **{'Description':XML_element('Description', None, 'Tipper-Vertical Field Transfer Function'), 'ExternalUrl':XML_element('ExternalUrl', None, None), 'Intention':XML_element('Intention', None, 'primary data type'), 'Tag':XML_element('Tag', None, 'tipper')})] #============================================================================== # Useful Functions #==============================================================================
[docs]class XML_Config(object): """ Class to deal with configuration files for xml. Includes all the important information for the station and how data was processed. Key Information includes: ======================== ================================================== Name Purpose ======================== ================================================== ProductID Station name ExternalUrl External URL to link to data Notes Any important information on station, data collection. TimeSeriesArchived Information on Archiving time series including URL. Image A location to an image of the station or the MT response. ======================== ================================================== * ProductID --> station name * ExternalUrl --> external url to link to data * Notes --> any """ def __init__(self, **kwargs): self.cfg_fn = None # Initialize the default attributes and values self.Description = XML_element('Description', None, 'Magnetotelluric Transfer Functions') self.ProductId = XML_element('ProductID', None, None) self.Project = XML_element('Project', None, None) self.Survey = XML_element('Survey', None, None) self.Country = XML_element('Country', None, None) self.SubType = XML_element('SubType', None, 'MT_FT') self.Notes = XML_element('Notes', None, None) self.Tags = XML_element('Tags', None, 'impedance, tipper') self.Image = XML_element('Image', None, None, **{'PrimaryData':XML_element('PrimaryData', None, None), 'Filename':XML_element('Filename', None, None)}) self.Original = XML_element('Original', None, None, **{'Attachment':XML_element('Attachment', None, None), 'Filename':XML_element('Filename', None, None)}) self.TimeSeriesArchived = XML_element('TimeSeriesArchived', None, None, **{'Value':XML_element('Value', None, 0), 'URL':XML_element('URL', None, None)}) self.ExternalUrl = XML_element('ExternalUrl', None, None, **{'Description':XML_element('Description', None, None), 'Url':XML_element('Url', None, None)}) self.PrimaryData = XML_element('PrimaryData', None, None, **{'Filename':XML_element('Filename', None, None)}) self.Attachment = XML_element('Attachment', None, None, **{'Filename':XML_element('Filename', None, None), 'Description':XML_element('Description', None, 'Original file use to produce XML')}) self.Provenance = XML_element('Provenance', None, None, **{'CreationTime':XML_element('CreationTime', None, datetime.datetime.strftime( datetime.datetime.utcnow(), dt_fmt)), 'CreatingApplication':XML_element('CreatingApplication', None, 'MTpy.core.mtxml'), 'Submitter':XML_element('Submitter', None, None, **{'Name':XML_element('Name', None, None), 'Email':XML_element('Email', None, None), 'Org':XML_element('Org', None, None), 'OrgURL':XML_element('OrgURL', None, None)}), 'Creator':XML_element('Creator', None, None, **{'Name':XML_element('Name', None, None), 'Email':XML_element('Email', None, None), 'Org':XML_element('Org', None, None), 'OrgURL':XML_element('OrgURL', None, None)})}) self.Copyright = XML_element('Copyright', None, None, **{'Citation':XML_element('Citation', None, None, **{'Title':XML_element('Title', None, None), 'Authors':XML_element('Authors', None, None), 'Year':XML_element('Year', None, None), 'Journal':XML_element('Journal', None, None), 'Volume':XML_element('Volume', None, None), 'DOI':XML_element('DOI', None, None)}), 'ReleaseStatus':XML_element('ReleaseStatus', None, 'Closed'), 'ConditionsOfUse':XML_element('ConditionsOfUse', None, conditions_of_use), 'AdditionalInfo':XML_element('AdditionalInfo', None, None)}) self.Site = XML_element('Site', None, None, **{'Project':XML_element('Project', None, None), 'Survey':XML_element('Survey', None, None), 'YearCollected':XML_element('YearCollected', None, None), 'Id':XML_element('Id', None, None), 'Location':XML_element('Location', None, None, **{'Latitude':XML_element('Latitude', None, None), 'Longitude':XML_element('Longitude', None, None), 'Elevation':XML_element('Elevation', {'units':'meters'}, None), 'Declination':XML_element('Declination', {'epoch':'1995'}, None)}), 'Orientation':XML_element('Orientation', {'angle_to_geographic_north':'0.0'}, None), 'AcquiredBy':XML_element('AcquiredBy', None, None), 'Start':XML_element('Start', None, None), 'End':XML_element('End', None, None), 'RunList':XML_element('RunList', None, None)}) self.FieldNotes = XML_element('FieldNotes', None, None, **{'Instrument':XML_element('Instrument', None, None, **{'Type':XML_element('Type', None, None), 'Manufacturer':XML_element('Manufacturer', None, None), 'Id':XML_element('Id', None, None), 'Settings':XML_element('Settings', None, None)}), 'Dipole':XML_element('Dipole', {'name':'EX'}, None, **{'Type':XML_element('Type', None, None), 'Manufacturer':XML_element('Manufacturer', None, None), 'Id':XML_element('Id', None, None), 'Length':XML_element('Length', {'units':'meters'}, None), 'Azimuth':XML_element('Azimuth', {'units':'degrees'}, None), 'Channel':XML_element('Channel', None, None)}), 'Dipole_00':XML_element('Dipole', {'name':'EY'}, None, **{'Type':XML_element('Type', None, None), 'Manufacturer':XML_element('Manufacturer', None, None), 'Id':XML_element('Id', None, None), 'Length':XML_element('Length', {'units':'meters'}, None), 'Azimuth':XML_element('Azimuth', {'units':'degrees'}, None), 'Channel':XML_element('Channel', None, None)}), 'Magnetometer':XML_element('Magnetometer', {'name':'HX'}, None, **{'Type':XML_element('Type', None, None), 'Manufacturer':XML_element('Manufacturer', None, None), 'Id':XML_element('Id', None, None), 'Azimuth':XML_element('Azimuth', {'units':'degrees'}, None), 'Channel':XML_element('Channel', None, None)}), 'Magnetometer_00':XML_element('Magnetometer', {'name':'HY'}, None, **{'Type':XML_element('Type', None, None), 'Manufacturer':XML_element('Manufacturer', None, None), 'Id':XML_element('Id', None, None), 'Azimuth':XML_element('Azimuth', {'units':'degrees'}, None), 'Channel':XML_element('Channel', None, None)}), 'Magnetometer_01':XML_element('Magnetometer', {'name':'HZ'}, None, **{'Type':XML_element('Type', None, None), 'Manufacturer':XML_element('Manufacturer', None, None), 'Id':XML_element('Id', None, None), 'Azimuth':XML_element('Azimuth', {'units':'degrees'}, None), 'Channel':XML_element('Channel', None, None)}), 'DataQualityNotes':XML_element('DataQualityNotes', None, None, **{'Rating':XML_element('Rating', None, None), 'GoodFromPeriod':XML_element('GoodFromPeriod', None, None), 'GoodToPeriod':XML_element('GoodToPeriod', None, None), 'Comments':XML_element('Comments', None, None)}), 'DataQualityWarnings':XML_element('DataQualityWarnings', None, None, **{'Flag':XML_element('Flag', None, 0), 'Comments':XML_element('Comments', None, None)})}) self.ProcessingInfo = XML_element('ProcessingInfo', None, None, **{'ProcessedBy':XML_element('ProcessedBy', None, None), 'ProcessingSoftware':XML_element('ProcessingSoftware', None, None, **{'Name':XML_element('Name', None, None), 'LastMod':XML_element('LastMod', None, None), 'Version':XML_element('Version', None, None), 'Author':XML_element('Author', None, None)}), 'SignConvention':XML_element('SignConvention', None, r'exp(+i\omega t)'), 'RemoteRef':XML_element('RemoteRef', {'type':'Robust Remote Processing'}, None), 'RemoteInfo':XML_element('RemoteInfo', None, None, **{'Project':XML_element('Project', None, None), 'Survey':XML_element('Survey', None, None), 'ID':XML_element('ID', None, None), 'AcquiredBy':XML_element('AcquiredBy', None, None), 'Name':XML_element('Name', None, None), 'YearCollected':XML_element('YearCollected', None, None), 'Location':XML_element('Location', {'datum':'WGS84'}, None, **{'Latitude':XML_element('Latitude', None, None), 'Longitude':XML_element('Longitude', None, None), 'Elevation':XML_element('Elevation', {'units':'meters'}, None)}) })}) self.SiteLayout = XML_element('SiteLayout', None, None, **{'InputChannels': XML_element('InputChannels', {'ref':'site', 'units':'m'}, None, **{'Magnetic_hx':XML_element('Magnetic', {'name':"Hx", 'orientation': '0.0', 'x':'0.0', 'y':'0.0', 'z':'0.0'}, None), 'Magnetic_hy':XML_element('Magnetic', {'name':"Hy", 'orientation': '0.0', 'x':'0.0', 'y':'0.0', 'z':'0.0'}, None)}), 'OutputChannels': XML_element('OutputChannels', {'ref':'site', 'units':'m'}, None, **{'Magnetic_hz':XML_element('Magnetic', {'name':"Hz", 'orientation': '0.0', 'x':'0.0', 'y':'0.0', 'z':'0.0'}, None), 'Electric_ex':XML_element('Electric', {'name':"Ex", 'orientation': '0.0', 'x':'0.0', 'y':'0.0', 'z':'0.0', 'x2':'0.0', 'y2':'0.0', 'z2':'0.0'}, None), 'Electric_ey':XML_element('Electric', {'name':"Ey", 'orientation': '0.0', 'x':'0.0', 'y':'0.0', 'z':'0.0', 'x2':'0.0', 'y2':'0.0', 'z2':'0.0'}, None)})}) # self.InputChannels = XML_element('InputChannels', {'ref':'site', 'units':'m'}, None) # self.OutputChannels = XML_element('OutputChannels', {'ref':'site', 'units':'m'}, None) self.Data = XML_element('Data', {'count':0}, None) self.PeriodRange = XML_element('PeriodRange', None, None) self.Datum = XML_element('Datum', None, 'WGS84') self.Declination = XML_element('Declination', None, None) self.StatisticalEstimates = XML_element('StatisticalEstimates', None, None) for ii, estimate in enumerate(estimates): setattr(self.StatisticalEstimates, 'Estimate_{0:02}'.format(ii), estimate) self.DataTypes = XML_element('DataTypes', None, None) for ii, d_type in enumerate(data_types): setattr(self.DataTypes, 'DataType_{0:02}'.format(ii), d_type) for key in list(kwargs.keys()): setattr(self, key, kwargs[key])
[docs] def read_cfg_file(self, cfg_fn=None): """ Read in a cfg file making all key = value pairs attribures of XML_Config. Being sure all new attributes are XML_element objects. The assumed structure of the xml.cfg file is similar to: ``# XML Configuration File MTpy Attachement.Description = Original file use to produce XML Attachment.Filename = None Copyright.Citation.Authors = None Copyright.Citation.DOI = None Copyright.Citation.Journal = None Copyright.Citation.Title = None Copyright.Citation.Volume = None Copyright.Citation.Year = None PeriodRange(max=0)(min=0) = None`` where the heirarchy of information is separated by a . and if the information has attribures they are in the name with (key=value) syntax. Arguments ------------- **cfg_fn** : string full path to cfg file to read in Example --------- :Read in xml.cfg file: :: >>> import mtpy.core.mtxml as mtxml >>> cfg_obj = mtxml.XML_Config() >>> cfg_obj.read_cfg_file(r"/home/MT/xml.cfg") """ if cfg_fn is not None: self.cfg_fn = cfg_fn if not os.path.isfile(self.cfg_fn): raise NameError('Could not find {0}'.format(self.cfg_fn)) with open(self.cfg_fn, 'r') as fid: lines = fid.readlines() for line in lines: # skip comments if line[0] == '#': pass # skip blank lines elif len(line.strip()) < 3: pass # else assume the line is metadata separated by = else: self._read_cfg_line(line)
def _read_cfg_line(self, line): """ read a configuration file line to make the appropriate attribute have the correct values and attributes. porbably should think of a better name for XML_element objects that are attributes of self. """ # split the line by the last = line_list = self._split_cfg_line(line) # split the keys by . to get the different attributes key_list = line_list[0].strip().split('.') value = line_list[1].strip() if value in ['none', 'None']: value = None # loop over the keys to set them appropriately for ii, key in enumerate(key_list): # get the name of the key and any attributes it might have name, attr = self._read_cfg_key(key) # if its the first key, see if its been made an attribute yet if ii == 0: if not hasattr(self, name): setattr(self, name, XML_element(name, None, None)) # for looping purposes we need to get the current XML_element object cfg_attr = getattr(self, name) # be sure to set any attributes, need to do this here because # the test for hasattr will only make a new one if there # isn't one already, but since most things in the cfg file # are already attributes of self, they already exist. cfg_attr._attr = attr else: if not hasattr(cfg_attr, name): setattr(cfg_attr, name, XML_element(name, None, None)) cfg_attr = getattr(cfg_attr, name) cfg_attr._attr = attr # set the value of the current XML_element object cfg_attr._value = value def _split_cfg_line(self, line): """ split a cfg line by the last =, otherwise you get strings that are split in the wrong place. For instance k.l(a=b) = None will be split at ['k.l(a', 'b)', None] but we want [k.l(a=b), None] """ equal_find = -(line[::-1].find('=')) line_list = [line[0:equal_find-1], line[equal_find+1:]] return line_list def _read_cfg_key(self, key): """ read a key from a cfg file and check to see if has any attributes in the form of: parent.name(attribute=0)(attribute=2) = value """ attr = {} if '(' and ')' in key: key_list = key.split('(') key = key_list[0] for key_attr in key_list[1:]: k_list = key_attr.replace(')', '').split('=') attr[k_list[0].strip()] = k_list[1].strip() return key, attr
[docs] def write_cfg_file(self, cfg_fn=None): """ Write out configuration file in the style of: parent.attribute = value Arguments -------------- **cfg_fn** : string full path to write the configuration file """ if cfg_fn is not None: self.cfg_fn = cfg_fn # a list of lines that will be joined to write the file line_list = ['# XML Configuration File MTpy'] # loop over attribute names for attr_00_name in sorted(self.__dict__.keys()): # skip the data attribute cause we don't need that now if attr_00_name in ['Data', 'DataTypes', 'StatisticalEstimates']: continue # get the given attribute attr_00 = getattr(self, attr_00_name) # make sure it is of XML_element instance if isinstance(attr_00, XML_element): # be sure to add a new line for each parent attribute line_list.append(' ') # get the attributes associated with the parent attr_00_keys = self._get_attr_keys(attr_00) # if there are no attributes write the line if len(attr_00_keys) == 0: line_list.append(self._write_cfg_line(attr_00)) # otherwise loop through each attribute checking if there # are more attributes or no else: for attr_01_name in attr_00_keys: attr_01 = getattr(attr_00, attr_01_name) attr_01_keys = self._get_attr_keys(attr_01) if len(attr_01_keys) == 0: line_list.append(self._write_cfg_line(attr_01, attr_00)) else: for attr_02_name in attr_01_keys: attr_02 = getattr(attr_01, attr_02_name) attr_02_keys = self._get_attr_keys(attr_02) if len(attr_02_keys) == 0: line_list.append(self._write_cfg_line(attr_02, [attr_00, attr_01])) else: for attr_03_name in attr_02_keys: attr_03 = getattr(attr_02, attr_03_name) attr_03_keys = self._get_attr_keys(attr_03) if len(attr_03_keys) == 0: line_list.append(self._write_cfg_line(attr_03, [attr_00, attr_01, attr_02])) else: for attr_04_name in attr_03_keys: attr_04 = getattr(attr_03, attr_04_name) line_list.append(self._write_cfg_line(attr_04, [attr_00, attr_01, attr_02, attr_03])) else: print('Not including: {0}'.format(attr_00_name)) # write the file with open(self.cfg_fn, 'w') as fid: fid.write('\n'.join(line_list)) # show the user something happened print('-'*50) print(' Wrote xml configuration file to {0}'.format(self.cfg_fn)) print('-'*50)
def _write_cfg_line(self, XML_element_obj, parent=None): """ write a configuration file line in the format of: parent.attribute = value """ if parent is None: parent_str = '' elif type(parent) is list: parent_str = '.'.join([p._name for p in parent]+['']) elif isinstance(parent, XML_element): parent_str = '{0}.'.format(parent._name) if XML_element_obj._attr is not None: attr_str = ''.join(['({0}={1})'.format(a_key, XML_element_obj._attr[a_key]) for a_key in list(XML_element_obj._attr.keys())]) else: attr_str = '' return '{0}{1}{2} = {3}'.format(parent_str, XML_element_obj._name, attr_str, XML_element_obj._value) def _get_attr_keys(self, attribute): return [a_key for a_key in sorted(attribute.__dict__.keys()) if a_key not in ['_name', '_value', '_attr']]
#============================================================================== # EDI to XML #==============================================================================
[docs]class MT_XML(XML_Config): """ Class to read and write MT information from XML format. This tries to follow the format put forward by Anna Kelbert for archiving MT response data. A configuration file can be read in that might make it easier to write multiple files for the same survey. .. seealso:: mtpy.core.mt_xml.XML_Config =============== =========================================================== Attributes Description =============== =========================================================== Z object of type mtpy.core.z.Z Tipper object of type mtpy.core.z.Tipper =============== =========================================================== .. note:: All other attributes are of the same name and of type XML_element, where attributes are name, value and attr. Attr contains any tag information. This is left this way so that mtpy.core.mt.MT can read in the information. **Use mtpy.core.mt.MT for conversion between data formats.** =============== =========================================================== Methods Description =============== =========================================================== read_cfg_file Read a configuration file in the format of XML_Config read_xml_file Read an xml file write_xml_file Write an xml file =============== =========================================================== :Example: :: >>> import mtpy.core.mt_xml as mtxml >>> x = mtxml.read_xml_file(r"/home/mt_data/mt01.xml") >>> x.read_cfg_file(r"/home/mt_data/survey_xml.cfg") >>> x.write_xml_file(r"/home/mt_data/xml/mt01.xml") """ def __init__(self, **kwargs): XML_Config.__init__(self, **kwargs ) self.edi_fn = None self.xml_fn = None self.cfg_fn = None self.parent_element = None self._Z = mtz.Z() self._Tipper = mtz.Tipper() self._order_list = ['Description', 'ProductId', 'SubType', 'Notes', 'Tags', 'ExternalUrl', 'PrimaryData', #'TimeSeriesArchived', #'Image', #'Original', 'Attachment', 'Provenance', 'Copyright', 'Site', 'FieldNotes', 'ProcessingInfo', 'StatisticalEstimates', 'DataTypes', 'SiteLayout', 'Data', 'PeriodRange'] for key in list(kwargs.keys()): setattr(self, key, kwargs[key]) def _get_name(self, name): if name.find('(') > 0: l = name.split('(') name = l[0] meta_dict = {} for ll in l[1:]: ll = ll.split('=') meta_dict[ll[0]] = ll[1].replace(')', '') else: meta_dict = {} return name, meta_dict def _format_data(self): """ format the Z and tipper data apporpriately """ # --> useful variables comp_dict_z = {(0, 0):('Zxx', 'Hx', 'Ex'), (0, 1):('Zxy', 'Hy', 'Ex'), (1, 0):('Zyx', 'Hx', 'Ey'), (1, 1):('Zyy', 'Hy', 'Ey')} header_dict = {} header_dict['Z'] = XML_element('Z',{'units':'[mV/km]/[nT]', 'type':'complex', 'size':'2 2'}, None) header_dict['Z.VAR'] = XML_element('Z.VAR', {'type':'real', 'size':'2 2'}, None) header_dict['Z.INVSIGCOV'] = XML_element('Z.INVSIGCOV',{'type':'complex', 'size':'2 2'}, None) header_dict['Z.RESIDCOV'] = XML_element('Z.RESIDCOV',{'type':'complex', 'size':'2 2'}, None) header_dict['T'] = XML_element('T', {'units':'[]', 'type':'complex', 'size':'1 2'}, None) header_dict['T.VAR'] = XML_element('T.VAR', {'type':'real', 'size':'1 2'}, None) header_dict['T.INVSIGCOV'] = XML_element('T.INVSIGCOV',{'type':'complex', 'size':'2 2'}, None) header_dict['T.RESIDCOV'] = XML_element('T.RESIDCOV',{'type':'complex', 'size':'1 1'}, None) nf = self.Z.freq.size # determine whether to write tipper data or not if self.Tipper.tipper is not None: nz_tipper = np.any(self.Tipper.tipper) == 0 if nz_tipper == True: write_tipper = False else: write_tipper = True else: write_tipper = False # set the estimates to write out if write_tipper == True: estimates = ['Z', 'Z.VAR', 'Z.INVSIGCOV', 'Z.RESIDCOV', 'T', 'T.VAR', 'T.INVSIGCOV', 'T.RESIDCOV'] else: estimates = ['Z', 'Z.VAR', 'Z.INVSIGCOV', 'Z.RESIDCOV'] # make the data element self.Data = XML_element('Data', {'count':str(nf)}, None) # loop through each period and add appropriate information for f_index, freq in enumerate(self.Z.freq): f_name = 'Period_{0:02}'.format(f_index) # set attribute period name with the index value # we are setting _name to have the necessary information so # we can name the attribute whatever we want. setattr(self.Data, f_name, XML_element('Period', {'value':'{0:.6g}'.format(1./freq), 'units':'seconds'}, None)) d_attr = getattr(self.Data, f_name) # Get information from data for estimate in estimates: attr_name = estimate.replace('.', '_').replace('VAR', 'err').lower() estimate_name = estimate.replace('.', '') # need to make sure the attribute value is a copy otherwise it # will continue to rewrite itself. setattr(d_attr, estimate_name, copy.deepcopy(header_dict[estimate])) c_attr = getattr(d_attr, estimate_name) if 'z' in attr_name: count = 0 try: z_arr = getattr(self.Z, attr_name) except AttributeError: #print 'No {0} information'.format(attr_name) continue for e_index in range(2): for h_index in range(2): c = comp_dict_z[(e_index, h_index)] c_dict = {'name':c[0], 'input':c[1], 'output':c[2]} z_value = z_arr[f_index, e_index, h_index] if attr_name == 'z_err': c_value = '{0:<+.6e}'.format(z_value) else: c_value = '{0:<+.6e} {1:<+.6e}'.format(z_value.real, z_value.imag) setattr(c_attr, 'value_{0:02}'.format(count), XML_element('value', c_dict, c_value)) count += 1 if 't' in attr_name and write_tipper == True: attr_name = attr_name.replace('t', 'tipper') count = 0 if attr_name.lower() in ['tipper', 'tipper_err']: tx = 1 ty = 2 comp_dict_t = {(0, 0):('Tx', 'Hx', 'Hz'), (0, 1):('Ty', 'Hy', 'Hz')} elif attr_name.lower() == 'tipper_invsigcov': tx = 2 ty = 2 comp_dict_t = {(0, 0):('', 'Hx', 'Hx'), (0, 1):('', 'Hx', 'Hy'), (1, 0):('', 'Hy', 'Hx'), (1, 1):('', 'Hy', 'Hy')} elif attr_name.lower() == 'tipper_residcov': tx = 1 ty = 1 comp_dict_t = {(0, 0):('', 'Hz', 'Hz')} for e_index in range(tx): for h_index in range(ty): c = comp_dict_t[(e_index, h_index)] c_dict = {'name':c[0], 'input':c[1], 'output':c[2]} try: t_arr = getattr(self.Tipper, attr_name) except AttributeError: continue t_value = t_arr[f_index, e_index, h_index] if attr_name == 'tipper_err': c_value = '{0:<+.6e}'.format(t_value) else: c_value = '{0:<+.6e} {1:<+.6e}'.format(t_value.real, t_value.imag) setattr(c_attr, 'value_{0:02}'.format(count), XML_element('value', c_dict, c_value)) count += 1 self.PeriodRange._attr = {'min':'{0:.6e}'.format(1./self.Z.freq.min()), 'max':'{0:.6e}'.format(1./self.Z.freq.max())} def _write_element(self, parent_et, XML_element_obj): """ make a new element """ if XML_element_obj._attr is None: XML_element_obj._attr = {} else: for key in list(XML_element_obj._attr.keys()): XML_element_obj._attr[key] = str(XML_element_obj._attr[key]) # if XML_element_obj._name is None: # XML_element_obj._name = 'None' # if XML_element_obj._value is None: # XML_element_obj.value = 'None' new_element = ET.SubElement(parent_et, XML_element_obj._name, XML_element_obj._attr) new_element.text = XML_element_obj._value #new_element.tail = '\n' return new_element
[docs] def write_xml_file(self, xml_fn, cfg_fn=None): """ write xml from edi """ if cfg_fn is not None: self.cfg_fn = cfg_fn self.read_cfg_file() self.xml_fn = xml_fn # get data inot xml format self._format_data() # make the top of the tree element emtf = ET.Element('EM_TF') # loop over the important information sections for element in self._order_list: # get the information for the given element d_00_obj = getattr(self, element) element_00 = self._write_element(emtf, d_00_obj) key_00_list = self._get_attr_keys(d_00_obj) if len(key_00_list) !=0: for key_00 in key_00_list: d_01_obj = getattr(d_00_obj, key_00) element_01 = self._write_element(element_00, d_01_obj) key_01_list = self._get_attr_keys(d_01_obj) if len(key_01_list) !=0: for key_01 in key_01_list: d_02_obj = getattr(d_01_obj, key_01) element_02 = self._write_element(element_01, d_02_obj) key_02_list = self._get_attr_keys(d_02_obj) if len(key_02_list) !=0: for key_02 in key_02_list: d_03_obj = getattr(d_02_obj, key_02) element_03 = self._write_element(element_02, d_03_obj) key_03_list = self._get_attr_keys(d_03_obj) if len(key_03_list) !=0: for key_03 in key_03_list: d_04_obj = getattr(d_03_obj, key_03) element_04 = self._write_element(element_03, d_04_obj) key_04_list = self._get_attr_keys(d_04_obj) if len(key_04_list) !=0: for key_04 in key_04_list: d_05_obj = getattr(d_04_obj, key_04) element_05 = self._write_element(element_04, d_05_obj) #--> write xml file xmlstr = minidom.parseString(ET.tostring(emtf)).toprettyxml(indent=" ") with open(self.xml_fn, 'w') as fid: fid.write(xmlstr) print('-'*72) print(' Wrote xml file to: {0}'.format(self.xml_fn)) print('-'*72) return self.xml_fn
[docs] def read_xml_file(self, xml_fn): """ read in an xml file and set attributes appropriately. Arguments -------------- **xml_fn** : string full path of xml file to read in """ self.xml_fn = xml_fn et_xml = ET.parse(xml_fn) root = et_xml.getroot() for element_00 in root.getchildren(): setattr(self, element_00.tag, self._read_element(element_00)) try: setattr(self.FieldNotes, 'DataQualityNotes', self.Site.DataQualityNotes) delattr(self.Site, 'DataQualityNotes') except AttributeError: pass try: setattr(self.FieldNotes, 'DataQualityWarnings', self.Site.DataQualityWarnings) delattr(self.Site, 'DataQualityWarnings') except AttributeError: pass # set Z and Tipper nf = int(self.Data.attr['count']) period = np.zeros(nf, dtype=np.float) z = np.zeros((nf, 2, 2), dtype=np.complex) z_err = np.zeros((nf, 2, 2), dtype=np.float) self._Z = mtz.Z(z_array=z, z_err_array=z_err, freq=period) self._Z.z_invsigcov = np.zeros((nf, 2, 2), dtype=np.complex) self._Z.z_residcov = np.zeros((nf, 2, 2), dtype=np.complex) input_dict = {'hx': 0, 'hy': 1, 'ex': 0, 'ey': 1} output_dict = input_dict p_count = 0 for per_attr in dir(self.Data): if 'period' in per_attr.lower(): p_obj = getattr(self.Data, per_attr) p1 = 1./float(p_obj.attr['value']) self._Z.freq[p_count] = 1./p1 if hasattr(p_obj, 'Z'): z_block = getattr(p_obj, 'Z') for z_attr in dir(z_block): if 'value_' in z_attr: z_comp = getattr(z_block, z_attr) ii = output_dict[z_comp.attr['output'].lower()] jj = input_dict[z_comp.attr['input'].lower()] z_value = [float(zz) for zz in z_comp.value.strip().split()] z_value = complex(z_value[0], z_value[1]) self._Z.z[p_count, ii, jj] = z_value if hasattr(p_obj, 'ZVAR'): z_block = getattr(p_obj, 'ZVAR') for z_attr in dir(z_block): if 'value_' in z_attr: z_comp = getattr(z_block, z_attr) ii = output_dict[z_comp.attr['output'].lower()] jj = input_dict[z_comp.attr['input'].lower()] z_value = float(z_comp.value) self._Z.z_err[p_count, ii, jj] = z_value if hasattr(p_obj, 'ZINVSIGCOV'): z_block = getattr(p_obj, 'ZINVSIGCOV') for z_attr in dir(z_block): if 'value_' in z_attr: z_comp = getattr(z_block, z_attr) ii = output_dict[z_comp.attr['output'].lower()] jj = input_dict[z_comp.attr['input'].lower()] z_value = [float(zz) for zz in z_comp.value.strip().split()] z_value = complex(z_value[0], z_value[1]) self._Z.z_invsigcov[p_count, ii, jj] = z_value if hasattr(p_obj, 'ZRESIDCOV'): z_block = getattr(p_obj, 'ZRESIDCOV') for z_attr in dir(z_block): if 'value_' in z_attr: z_comp = getattr(z_block, z_attr) ii = output_dict[z_comp.attr['output'].lower()] jj = input_dict[z_comp.attr['input'].lower()] z_value = [float(zz) for zz in z_comp.value.strip().split()] z_value = complex(z_value[0], z_value[1]) self._Z.z_residcov[p_count, ii, jj] = z_value p_count += 1 # Fill Tipper t = np.zeros((nf, 1, 2), dtype=np.complex) t_err = np.zeros((nf, 1, 2), dtype=np.float) self._Tipper = mtz.Tipper(tipper_array=t, tipper_err_array=t_err, freq=period) self._Tipper.tipper_invsigcov = np.zeros((nf, 2, 2), dtype=np.complex) self._Tipper.tipper_residcov = np.zeros((nf, 1, 1), dtype=np.complex) input_dict = {'hz':0, 'hx': 0, 'hy': 1} output_dict = {'hz':0, 'hx':0, 'hy':1} p_count = 0 for per_attr in dir(self.Data): if 'period' in per_attr.lower(): p_obj = getattr(self.Data, per_attr) p1 = float(p_obj.attr['value']) self._Tipper.freq[p_count] = 1./p1 if hasattr(p_obj, 'T'): t_block = getattr(p_obj, 'T') for t_attr in dir(t_block): if 'value_' in t_attr: t_comp = getattr(t_block, t_attr) ii = output_dict[t_comp.attr['output'].lower()] jj = input_dict[t_comp.attr['input'].lower()] t_value = [float(tt) for tt in t_comp.value.strip().split()] t_value = complex(t_value[0], t_value[1]) self._Tipper.tipper[p_count, ii, jj] = t_value if hasattr(p_obj, 'TVAR'): t_block = getattr(p_obj, 'TVAR') for t_attr in dir(t_block): if 'value_' in t_attr: t_comp = getattr(t_block, t_attr) ii = output_dict[t_comp.attr['output'].lower()] jj = input_dict[t_comp.attr['input'].lower()] t_value = float(t_comp.value) self._Tipper.tipper_err[p_count, ii, jj] = t_value if hasattr(p_obj, 'TINVSIGCOV'): t_block = getattr(p_obj, 'TINVSIGCOV') for t_attr in dir(t_block): if 'value_' in t_attr: t_comp = getattr(t_block, t_attr) ii = output_dict[t_comp.attr['output'].lower()] jj = input_dict[t_comp.attr['input'].lower()] t_value = [float(tt) for tt in t_comp.value.strip().split()] t_value = complex(t_value[0], t_value[1]) self._Tipper.tipper_invsigcov[p_count, ii, jj] = t_value if hasattr(p_obj, 'TRESIDCOV'): t_block = getattr(p_obj, 'TRESIDCOV') for t_attr in dir(t_block): if 'value_' in t_attr: t_comp = getattr(t_block, t_attr) ii = output_dict[t_comp.attr['output'].lower()] jj = input_dict[t_comp.attr['input'].lower()] t_value = [float(tt) for tt in t_comp.value.strip().split()] t_value = complex(t_value[0], t_value[1]) self._Tipper.tipper_residcov[p_count, ii, jj] = t_value p_count += 1
def _get_info_from_element(self, element): """ Get information from an element, including name, attr, value Arguments ------------ **element** : ET.Element Returns --------- **XML_element** XML_element Object """ return_obj = XML_element(None, None, None) return_obj._name = element.tag try: return_obj._value = element.text.strip() except AttributeError: return_obj._value = None return_obj._attr = element.attrib return return_obj def _get_attr_name(self, parent, attr_name): """ make attribute name, if one already exists, then add a number to it ex. attribute_01 """ attr_name = attr_name.replace('.', '') if attr_name == 'Period': if not hasattr(parent, 'Period_00'): return 'Period_00' else: for ii in range(0, 100): new_attr_name = '{0}_{1:02}'.format(attr_name, ii) if not hasattr(parent, new_attr_name): break return new_attr_name if hasattr(parent, attr_name): for ii in range(0, 100): new_attr_name = '{0}_{1:02}'.format(attr_name, ii) if not hasattr(parent, new_attr_name): break else: new_attr_name = attr_name return new_attr_name def _read_element(self, element): """ read a given element and return something useful """ child = self._get_info_from_element(element) children = element.getchildren() if len(children) > 0: for child_00 in children: attr_name = self._get_attr_name(child, child_00.tag) setattr(child, attr_name, self._get_info_from_element(child_00)) children_01 = child_00.getchildren() if len(children_01) > 0: parent_01 = getattr(child, attr_name) for child_01 in children_01: attr_01_name = self._get_attr_name(parent_01, child_01.tag) setattr(parent_01, attr_01_name, self._get_info_from_element(child_01)) children_02 = child_01.getchildren() if len(children_02) > 0: parent_02 = getattr(parent_01, attr_01_name) for child_02 in children_02: attr_02_name = self._get_attr_name(parent_02, child_02.tag) setattr(parent_02, attr_02_name, self._get_info_from_element(child_02)) children_03 = child_02.getchildren() if len(children_03) > 0: parent_03 = getattr(parent_02, attr_02_name) for child_03 in children_03: attr_03_name = self._get_attr_name(parent_03, child_03.tag) setattr(parent_03, attr_03_name, self._get_info_from_element(child_03)) return child @property def Z(self): """ get z information """ return self._Z @Z.setter def Z(self, z_object): """ set z object """ if type(z_object) is not mtz.Z: raise MT_XML_Error('To set Z, input needs to be an mtpy.core.z.Z object') self._Z = z_object @property def Tipper(self): """ get Tipper information """ return self._Tipper @Tipper.setter def Tipper(self, t_object): """ set z object """ if type(t_object) is not mtz.Tipper: raise MT_XML_Error('To set Z, input needs to be an mtpy.core.z.Z object') self._Tipper = t_object
#============================================================================== # exceptions #==============================================================================
[docs]class MT_XML_Error(Exception): pass