# -*- coding: utf-8 -*-
"""
Description:
To compute and encapsulate the properties of a set of EDI files
Author: fei.zhang@ga.gov.au
CreateDate: 2017-04-20
"""
import csv
import glob
import os
import sys
from logging import INFO,DEBUG,WARNING,ERROR
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from shapely.geometry import Point # , Polygon, LineString, LinearRing
import mtpy.core.mt as mt
import mtpy.imaging.mtplottools as mtplottools
from mtpy.utils.mtpy_decorator import deprecated
from mtpy.utils.matplotlib_utils import gen_hist_bins
from mtpy.utils.mtpylog import MtPyLog
import mtpy.analysis.pt as MTpt
import mtpy.imaging.penetration
[docs]def is_num_in_seq(anum, aseq, atol=0.0001):
"""
check if anum is in a sequence by a small tolerance
:param anum: a number to be checked
:param aseq: a sequence or a list of values
:param atol: absolute tolerance
:return: True | False
"""
# print(np.isclose(anum, aseq, atol=atol))
# return np.isclose(anum, aseq, atol=atol).any()
for an_number in aseq:
if abs(anum - an_number) < atol:
return True
else:
pass
return False
[docs]class EdiCollection(object):
"""
A super class to encapsulate the properties pertinent to a set of EDI files
:param edilist: a list of edifiles with full path, for read-only
:param outdir: computed result to be stored in outdir
:param ptol: period tolerance considered as equal, default 0.05 means 5 percent
The ptol parameter controls what freqs/periods are grouped together:
10 percent may result more double counting of freq/period data than 5 pct.
(eg: MT_Datasets/WPJ_EDI)
"""
def __init__(self, edilist=None, mt_objs=None, outdir=None, ptol=0.05):
"""
constructor
"""
#self._logger = MtPyLog.get_mtpy_logger(self.__class__.__name__) # will be EdiCollection
self._logger = MtPyLog.get_mtpy_logger(__name__) # __name__ will be path.to.module OR __main__
self._logger.setLevel(INFO)
#self._logger.setLevel(DEBUG)
if edilist is not None:
self.edifiles = edilist
self._logger.info("number of edi files in this collection: %s",
len(self.edifiles))
elif mt_objs is not None:
self.edifiles = [mt_obj.fn for mt_obj in mt_objs]
assert len(self.edifiles) > 0
self.num_of_edifiles = len(self.edifiles) # number of stations
print("number of stations/edifiles = %s" % self.num_of_edifiles)
self.ptol = ptol
if edilist is not None:
# if edilist is provided, always create MT objects from the list
self._logger.debug("constructing MT objects from edi files")
self.mt_obj_list = [mt.MT(edi) for edi in self.edifiles]
elif mt_objs is not None:
# use the supplied mt_objs
self.mt_obj_list = list(mt_objs)
else:
self._logger.error("None Edi file set")
# get all frequencies from all edi files
self.all_frequencies = None
self.mt_periods = None
self.all_unique_periods = self._get_all_periods()
self.geopdf = self.create_mt_station_gdf()
self.bound_box_dict = self.get_bounding_box() # in orginal projection
# ensure that outdir is created if not exists.
if outdir is None:
#raise Exception("Error: OutputDir is not specified!!!")
pass
elif not os.path.exists(outdir):
os.mkdir(outdir)
self.outdir = outdir
return
def _get_all_periods(self):
"""
from the list of edi files get a list of all unique periods from the frequencies.
"""
if self.all_frequencies is not None: # already initialized
return
# get all frequencies from all edi files
all_freqs = []
for mt_obj in self.mt_obj_list:
all_freqs.extend(list(mt_obj.Z.freq))
self.mt_periods = 1.0 / np.array(all_freqs)
# sort all frequencies so that they are in ascending order,
# use set to remove repeats and make an array
self.all_frequencies = sorted(list(set(all_freqs)))
self._logger.debug("Number of MT Frequencies: %s", len(self.all_frequencies))
all_periods = 1.0 / np.array(sorted(self.all_frequencies, reverse=True))
self._logger.debug("Type of all_periods %s", type(all_periods))
self._logger.info("Number of MT Periods: %s", len(all_periods))
self._logger.debug("Periods List: %s", str(all_periods))
return sorted(all_periods)
[docs] def get_period_occurance(self,aper):
"""
For a given aperiod, compute its occurance frequencies among the stations/edi
:param aper: a float value of the period
:return:
"""
station_list = []
afreq = 1.0 / aper
acount = 0
for mt_obj in self.mt_obj_list:
# if afreq in mt_obj.Z.freq:
if is_num_in_seq(afreq, mt_obj.Z.freq):
acount = acount + 1
station_list.append(mt_obj.station)
# print (station_list)
occ_percentage = (100.0*acount)/self.num_of_edifiles
return occ_percentage
[docs] def get_periods_by_stats(self, percentage=10.0):
"""
check the presence of each period in all edi files, keep a list of periods which are at least percentage present
:return: a list of periods which are present in at least percentage edi files
"""
adict = {}
for aper in self.all_unique_periods:
station_list = []
afreq = 1.0 / aper
acount = 0
for mt_obj in self.mt_obj_list:
# if afreq in mt_obj.Z.freq:
if is_num_in_seq(afreq, mt_obj.Z.freq):
acount = acount + 1
station_list.append(mt_obj.station)
if (100.0 * acount) / self.num_of_edifiles >= percentage:
adict.update({aper: acount})
# print (aper, acount)
else:
self._logger.info("Period=%s is excluded. it is from stations: %s ", aper, station_list)
mydict_ordered = sorted(
list(adict.items()), key=lambda value: value[1], reverse=True)
# for apair in mydict_ordered:
# print (apair)
selected_periods = [pc[0] for pc in mydict_ordered]
print("Selected periods %s out of the total %s:" % (len(selected_periods), len(self.all_unique_periods)))
return selected_periods
[docs] def select_periods(self, show=True, period_list=None, percentage=10.0):
"""
Use edi_collection to analyse the whole set of EDI files
:param show: True or false
:param period_list:
:param percentage:
:return: select_period_list
"""
uniq_period_list = self.all_unique_periods # filtered list of periods ?
print("Unique periods", len(uniq_period_list))
if show:
plt.figure()
plt.clf()
bins = gen_hist_bins(uniq_period_list)
plt.hist(self.mt_periods, bins=bins)
# plt.hist(self.mt_periods, bins=1000)
plt.title("Histogram with uniq_periods bins")
plt.xlabel("Periods")
plt.ylabel("Occurance in number of MT stations")
plt.show()
if period_list:
# 1 ASK user to input a Pmin and Pmax
# assume uniq_period_list is sorted
select_period_list = []
index_start = 0
for period in period_list:
for index in range(index_start, len(uniq_period_list)):
if (isinstance(period, float) and np.isclose(uniq_period_list[index], period)) or \
(isinstance(period, tuple) and period[0] <= uniq_period_list[index] <= period[1]):
select_period_list.append(uniq_period_list[index])
elif (isinstance(period, float) and uniq_period_list[index] > period) or \
(isinstance(period, tuple) and period[1] < uniq_period_list[index]):
index_start = index
break
select_period_list = np.array(select_period_list)
else:
# 2 percetage stats
# select commonly occured frequencies from all stations.
# This could miss some slightly varied frequencies in the middle range.
select_period_list = np.array(self.get_periods_by_stats(percentage=percentage))
print("Selected periods ", len(select_period_list))
return select_period_list
[docs] def create_mt_station_gdf(self, outshpfile=None):
"""
create station location geopandas dataframe, and output to shape file
:param outshpfile: output file
:return: gdf
"""
mt_stations = []
for mtobj in self.mt_obj_list:
mt_stations.append(
[mtobj.station, mtobj.lon, mtobj.lat, mtobj.elev, mtobj.utm_zone])
pdf = pd.DataFrame(mt_stations, columns=['StationId', 'Lon', 'Lat', 'Elev', 'UtmZone'])
# print (pdf.head())
mt_points = [Point(xy) for xy in zip(pdf.Lon, pdf.Lat)]
# OR pdf['geometry'] = pdf.apply(lambda z: Point(z.Lon, z.Lat), axis=1)
# if you want to df = df.drop(['Lon', 'Lat'], axis=1)
# crs0 = {'init': 'epsg:4326'} # WGS84
crs0 = {'init': 'epsg:4283'} # GDA94
gdf = gpd.GeoDataFrame(pdf, crs=crs0, geometry=mt_points)
if outshpfile is not None:
gdf.to_file(outshpfile, driver='ESRI Shapefile')
return gdf
[docs] def plot_stations(self, savefile=None, showfig=True):
"""
Visualise the geopandas df of MT stations
:param savefile:
:param showfig:
:return:
"""
gdf = self.geopdf
gdf.plot(figsize=(10, 6), marker='o', color='blue', markersize=5)
if savefile is not None:
fig = plt.gcf()
fig.savefig(savefile, dpi=300)
if showfig is True:
plt.show()
return savefile
[docs] def display_on_basemap(self):
"""
display MT stations which are in stored in geopandas dataframe in a base map.
:return: plot object
"""
world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres'))
self._logger.debug(world.shape)
myax = world.plot(alpha=0.5)
# myax.set_xlim([149,150])
# myax.set_xlim([110, 155])
# myax.set_ylim([-40, -10])
myax.set_xlim(
(self.bound_box_dict['MinLon'], self.bound_box_dict['MaxLon']))
myax.set_ylim(
(self.bound_box_dict['MinLat'], self.bound_box_dict['MaxLat']))
myax2 = self.geopdf.plot(ax=myax, figsize=(
10, 6), marker='o', color='blue', markersize=8)
plt.show()
return myax2
[docs] def display_on_image(self):
"""
display/overlay the MT properties on a background geo-referenced map image
:return: plot object
"""
import mtpy.utils.plot_geotiff_imshow as plotegoimg
myax = plotegoimg.plot_geotiff(
geofile='data/PM_Gravity.tif', show=False)
margin = 0.02 # degree
myax.set_xlim(
(self.bound_box_dict['MinLon'] - margin, self.bound_box_dict['MaxLon'] + margin))
myax.set_ylim(
(self.bound_box_dict['MinLat'] - margin, self.bound_box_dict['MaxLat'] + margin))
myax2 = self.geopdf.plot(ax=myax, figsize=(
10, 6), marker='o', color='r', markersize=10)
plt.show()
return myax2
[docs] def get_phase_tensor_tippers(self, period, interpolate=True):
"""
For a given MT period (s) value, compute the phase tensor and tippers etc.
:param period: MT_period (s)
:param interpolate: Boolean to indicate whether to interpolate on to the given period
:return: dictionary pt_dict_list
pt_dict keys ['station', 'freq', 'lon', 'lat', 'phi_min', 'phi_max', 'azimuth', 'skew', 'n_skew', 'elliptic',
'tip_mag_re', 'tip_mag_im', 'tip_ang_re', 'tip_ang_im']
"""
pt_dict_list = []
plot_per = period
#plot_per = self.all_unique_periods[1] # test first
print("The plot period is ", plot_per)
for mt_obj in self.mt_obj_list:
pt_dict = {}
pt = None
ti = None
if(interpolate == False):
p_index = [ff for ff, f2 in enumerate(1.0/mt_obj.Z.freq)
if (f2 > plot_per * (1 - self.ptol)) and
(f2 < plot_per * (1 + self.ptol))]
pt = mt_obj.pt
ti = mt_obj.Tipper
else:
p_index = [0]
newZ, newTipper = mt_obj.interpolate([1./plot_per], bounds_error=False)
pt = MTpt.PhaseTensor(z_object=newZ)
ti = newTipper
# end if
if len(p_index) >= 1:
p_index = p_index[0]
pt_dict['station']=mt_obj.station
pt_dict['period'] =plot_per
pt_dict['lon'] = mt_obj.lon
pt_dict['lat'] = mt_obj.lat
pt_dict['phi_min'] = pt.phimin[p_index]
pt_dict['phi_max'] = pt.phimax[p_index]
pt_dict['azimuth']= pt.azimuth[p_index]
pt_dict['skew'] = pt.beta[p_index]
pt_dict['n_skew'] = 2 * pt.beta[p_index]
pt_dict['elliptic'] = pt.ellipticity[p_index]
pt_dict['tip_mag_re']= ti.mag_real[p_index]
pt_dict['tip_mag_im']= ti.mag_imag[p_index]
pt_dict['tip_ang_re']= ti.angle_real[p_index]
pt_dict['tip_ang_im']= ti.angle_imag[p_index]
pt_dict_list.append(pt_dict)
else:
self._logger.warn(" the period %s is NOT found for this station %s. Skipping!!!" % (plot_per, mt_obj.station))
return pt_dict_list
[docs] def create_phase_tensor_csv(self, dest_dir, period_list=None,
interpolate=True,
file_name="phase_tensor.csv"):
"""
create phase tensor ellipse and tipper properties.
Implementation based on mtpy.utils.shapefiles_creator.ShapeFilesCreator.create_csv_files
:param dest_dir: output directory
:param period_list: list of periods; default=None, in which data for all available
frequencies are output
:param interpolate: Boolean to indicate whether to interpolate data onto given period_list
:param file_name: output file name
:return: pt_dict
"""
csvfname = os.path.join(dest_dir, file_name)
pt_dict = {}
csv_header = ['station', 'freq', 'lon', 'lat', 'phi_min', 'phi_max', 'azimuth', 'skew',
'n_skew', 'elliptic', 'tip_mag_re', 'tip_mag_im', 'tip_ang_re', 'tip_ang_im']
freq_list = None
if(period_list is None):
freq_list = self.all_frequencies
else:
freq_list = 1./np.array(period_list)
# end if
#with open(csvfname, "wb") as csvf:
with open(csvfname, "w",newline="") as csvf:
writer = csv.writer(csvf)
writer.writerow(csv_header)
for freq in freq_list:
ptlist = []
for mt_obj in self.mt_obj_list:
f_index_list = None
pt = None
ti = None
if(interpolate):
f_index_list = [0]
newZ = None
newTipper = None
newZ, newTipper = mt_obj.interpolate([freq], bounds_error=False)
pt = MTpt.PhaseTensor(z_object=newZ)
ti = newTipper
else:
freq_min = freq * (1 - self.ptol)
freq_max = freq * (1 + self.ptol)
f_index_list = [ff for ff, f2 in enumerate(mt_obj.Z.freq)
if (f2 > freq_min) and (f2 < freq_max)]
pt = mt_obj.pt
ti = mt_obj.Tipper
#end if
if len(f_index_list) > 1:
self._logger.warn("more than one freq found %s", f_index_list)
if len(f_index_list) >= 1:
p_index = f_index_list[0]
# geographic coord lat long and elevation
# long, lat, elev = (mt_obj.lon, mt_obj.lat, 0)
station, lon, lat = (mt_obj.station, mt_obj.lon, mt_obj.lat)
pt_stat = [station, freq, lon, lat,
pt.phimin[p_index],
pt.phimax[p_index],
pt.azimuth[p_index],
pt.beta[p_index],
2 * pt.beta[p_index],
pt.ellipticity[p_index], # FZ: get ellipticity begin here
ti.mag_real[p_index],
ti.mag_imag[p_index],
ti.angle_real[p_index],
ti.angle_imag[p_index]]
ptlist.append(pt_stat)
else:
self._logger.warn("Freq %s NOT found for this station %s", freq, mt_obj.station)
csv_freq_file = os.path.join(dest_dir,
'{name[0]}_{freq}Hz{name[1]}'.format(
freq=str(freq), name=os.path.splitext(file_name)))
#with open(csv_freq_file, "wb") as freq_csvf:
with open(csv_freq_file, "w",newline="") as freq_csvf:
writer_freq = csv.writer(freq_csvf)
writer_freq.writerow(csv_header)
writer_freq.writerows(ptlist)
writer.writerows(ptlist)
pt_dict[freq] = ptlist
return pt_dict
# 2020-10: FZ started this new function on request of WPJ
[docs] def create_penetration_depth_csv(self, dest_dir, period_list=None, interpolate=False, file_name="penetration_depth.csv"):
"""
create penetration depth csv file for each frequency corresponding to the given input 1.0/period_list.
of course subject to a tolerance. Note that frequencies values are usually provided in MT EDI files.
:param dest_dir: output directory
:param period_list: list of periods; default=None all available periods will be output
:param interpolate: Boolean to indicate whether to interpolate data onto given period_list
:param file_name: output files basename
:return: pt_dict
"""
csvfname = os.path.join(dest_dir, file_name)
p_dict = {}
csv_header = ['FREQ','STATION', 'LON', 'LAT', 'pen_depth_det', 'pen_depth_zxy', 'pen_depth_zyx']
# convert the period_list into freq array
freq_list = None
if(period_list is None):
freq_list = self.all_frequencies
else:
freq_list = 1./np.array(period_list)
# end if
with open(csvfname, "w",newline="") as csvf:
writer = csv.writer(csvf)
writer.writerow(csv_header)
for freq in freq_list:
pdlist = []
stations, periods, pen_depth_det, latlons = mtpy.imaging.penetration.get_penetration_depth_by_period( self.mt_obj_list, 1.0/freq) # whichrho='det')
stations, periods, pen_depth_zxy, latlons = mtpy.imaging.penetration.get_penetration_depth_by_period( self.mt_obj_list, 1.0/freq,whichrho='zxy')
stations, periods, pen_depth_zyx, latlons = mtpy.imaging.penetration.get_penetration_depth_by_period( self.mt_obj_list, 1.0/freq,whichrho='zyx')
for iter in range(len(stations)):
pdlist.append([freq, stations[iter], latlons[iter][1], latlons[iter][0], pen_depth_det[iter], pen_depth_zxy[iter], pen_depth_zyx[iter]])
csv_freq_file = os.path.join(dest_dir,
'{name[0]}_{freq}Hz{name[1]}'.format(
freq=str(freq), name=os.path.splitext(file_name)))
with open(csv_freq_file, "w",newline="") as freq_csvf:
writer_freq = csv.writer(freq_csvf)
writer_freq.writerow(csv_header)
writer_freq.writerows(pdlist)
writer.writerows(pdlist)
p_dict[freq] = pdlist
return csvfname
[docs] @deprecated("This function is more expensive compared with the method create_phase_tensor_csv(self,)")
def create_phase_tensor_csv_with_image(self, dest_dir):
"""
Using PlotPhaseTensorMaps class to generate csv file of phase tensor attributes, etc.
Only for comparison. This method is more expensive because it will create plot object first.
:return:
"""
from mtpy.imaging.phase_tensor_maps import PlotPhaseTensorMaps
for freq in self.all_frequencies:
ptm = PlotPhaseTensorMaps(fn_list=self.edifiles, plot_freq=freq, fig_dpi=80, plot_yn='n')
ptm.export_params_to_file(save_path=dest_dir)
return
[docs] def create_measurement_csv(self, dest_dir, period_list=None, interpolate=True):
"""
create csv file from the data of EDI files: IMPEDANCE, APPARENT RESISTIVITIES AND PHASES
see also utils/shapefiles_creator.py
:param dest_dir: output directory
:param period_list: list of periods; default=None, in which data for all available
frequencies are output
:param interpolate: Boolean to indicate whether to interpolate data onto given period_list
:return: csvfname
"""
if dest_dir is None:
dest_dir = self.outdir
else:
self._logger.info("result will be in the dir %s", dest_dir)
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
# summary csv file
csv_basename = "edi_measurement"
csvfname = os.path.join(dest_dir, "%s.csv" % csv_basename)
pt_dict = {}
csv_header = [
'FREQ', 'STATION', 'LON', 'LAT','ZXXre', 'ZXXim',
'ZXYre', 'ZXYim', 'ZYXre', 'ZYXim', 'ZYYre', 'ZYYim', 'TXre', 'TXim', 'TYre', 'TYim',
'RHOxx', 'RHOxy', 'RHOyx', 'RHOyy', 'PHSxx', 'PHSxy', 'PHSyx', 'PHSyy'
]
freq_list = None
if(period_list is None):
freq_list = self.all_frequencies
else:
freq_list = 1./np.array(period_list)
# end if
with open(csvfname, "w",newline="") as csvf:
writer = csv.writer(csvf)
writer.writerow(csv_header)
for freq in freq_list:
mtlist = []
for mt_obj in self.mt_obj_list:
f_index_list = None
pt = None
ti = None
zobj = None
if (interpolate):
f_index_list = [0]
newZ = None
newTipper = None
self._logger.debug("Interpolating frequency ....%s", freq)
newZ, newTipper = mt_obj.interpolate([freq], bounds_error=False)
pt = MTpt.PhaseTensor(z_object=newZ)
ti = newTipper
zobj = newZ
else: # interpolate is False
freq_max = freq * (1 + self.ptol)
freq_min = freq * (1 - self.ptol)
f_index_list = np.where((mt_obj.Z.freq < freq_max) & (mt_obj.Z.freq > freq_min))
f_index_list = f_index_list[0] # reduce from 3d [f,2,2] to 1d [f]
pt = mt_obj.pt
ti = mt_obj.Tipper
zobj = mt_obj.Z
# end if
if len(f_index_list) > 1:
self._logger.warn("more than one freq found %s", f_index_list)
if len(f_index_list) >= 1:
the_index = f_index_list[0] # a single value
self._logger.debug("The freqs index %s", the_index)
# geographic coord lat long and elevation
# long, lat, elev = (mt_obj.lon, mt_obj.lat, 0)
station, lat, lon = (
mt_obj.station, mt_obj.lat, mt_obj.lon)
resist_phase = mtplottools.ResPhase(z_object=zobj)
# resist_phase.compute_res_phase()
mt_stat = [freq, station, lon, lat,
zobj.z[the_index, 0, 0].real,
zobj.z[the_index, 0, 0].imag,
zobj.z[the_index, 0, 1].real,
zobj.z[the_index, 0, 1].imag,
zobj.z[the_index, 1, 0].real,
zobj.z[the_index, 1, 0].imag,
zobj.z[the_index, 1, 1].real,
zobj.z[the_index, 1, 1].imag,
ti.tipper[the_index, 0, 0].real,
ti.tipper[the_index, 0, 0].imag,
ti.tipper[the_index, 0, 1].real,
ti.tipper[the_index, 0, 1].imag,
resist_phase.resxx[the_index], resist_phase.resxy[the_index],
resist_phase.resyx[the_index], resist_phase.resyy[the_index],
resist_phase.phasexx[the_index], resist_phase.phasexy[the_index],
resist_phase.phaseyx[the_index], resist_phase.phaseyy[the_index]
]
mtlist.append(mt_stat)
else:
self._logger.warn(
'Freq %s NOT found for this station %s *** Skipping it in CSV file', freq, mt_obj.station)
with open(csvfname, "a",newline="") as csvf: # summary csv for all freqs
writer = csv.writer(csvf)
writer.writerows(mtlist)
csv_basename2 = "%s_%sHz.csv" % (csv_basename, str(freq))
csvfile2 = os.path.join(dest_dir, csv_basename2)
with open(csvfile2, "w", newline="") as csvf: # individual csvfile for each freq
writer = csv.writer(csvf)
writer.writerow(csv_header)
writer.writerows(mtlist)
pt_dict[freq] = mtlist
return csvfname
[docs] def export_edi_files(self, dest_dir, period_list=None,
interpolate=True,period_buffer=None,longitude_format='LON'):
"""
export edi files.
:param dest_dir: output directory
:param period_list: list of periods; default=None, in which data for all available
frequencies are output
:param interpolate: Boolean to indicate whether to interpolate data onto given period_list; otherwise
a period_list is obtained from get_periods_by_stats()
:param file_name: output file name
:param period_buffer: buffer so that interpolation doesn't stretch too far
over periods. Provide a float or integer factor,
greater than which interpolation will not stretch.
e.g. 1.5 means only interpolate to a maximum of
1.5 times each side of each frequency value
:return:
"""
if period_list is None:
period_list = np.array(self.get_periods_by_stats())
# end if
for mt_obj in self.mt_obj_list:
# interpolate each station onto the period list
# check bounds of period list
interp_periods = period_list[np.where(
(period_list >= 1. / mt_obj.Z.freq.max()) &
(period_list <= 1. / mt_obj.Z.freq.min()))]
interp_periods = np.sort(interp_periods)
# if specified, apply a buffer so that interpolation doesn't
# stretch too far over periods
if type(period_buffer) in [float, int]:
interp_periods_new = []
dperiods = 1. / mt_obj.Z.freq
for iperiod in interp_periods:
# find nearest data period
difference = np.abs(iperiod - dperiods)
nearestdperiod = dperiods[difference == np.amin(difference)][0]
if max(nearestdperiod / iperiod, iperiod / nearestdperiod) < period_buffer:
interp_periods_new.append(iperiod)
interp_periods = np.array(interp_periods_new)
self._logger.debug("station_name and its original period: %s %s %s",
mt_obj.station, len(mt_obj.Z.freq), 1.0 / mt_obj.Z.freq)
self._logger.debug("station_name and interpolation period: %s %s %s",
mt_obj.station, len(interp_periods), interp_periods)
if len(interp_periods) > 0: # not empty
interp_z, interp_t = mt_obj.interpolate(1. / interp_periods)
if dest_dir is not None and os.path.isdir(dest_dir):
mt_obj.write_mt_file(
save_dir=dest_dir,
fn_basename=mt_obj.station,
file_type='edi',
new_Z_obj=interp_z,
new_Tipper_obj=interp_t,
longitude_format=longitude_format)
else:
pass
# end for
# end func
return
[docs] def get_bounding_box(self, epsgcode=None):
""" compute bounding box
:return: bounding box in given proj coord system
"""
if epsgcode is None:
new_gdf = self.geopdf
else: # reproj
new_gdf = self.geopdf.to_crs(epsg=epsgcode)
tup = new_gdf.total_bounds
bdict = {"MinLon": tup[0],
"MinLat": tup[1],
"MaxLon": tup[2],
"MaxLat": tup[3]}
self._logger.debug(bdict)
return bdict
[docs] def get_station_utmzones_stats(self):
"""
A simple method to find what UTM zones these (edi files) MT stations belong to
are they in a single UTM zone, which corresponds to a unique EPSG code?
or do they belong to multiple UTM zones?
:return: a_dict like {UTMZone:Number_of_MT_sites}
"""
def get_utm_zone(latitude, longitude):
zone_num = int(1 + (longitude + 180.0) / 6.0)
if latitude >= 0:
return "%s%s"%(zone_num,'N')
else:
return "%s%s"%(zone_num,'S')
utm_zones={}
for mt_obj in self.mt_obj_list:
utmz = get_utm_zone(mt_obj.lat, mt_obj.lon)
utm_zones[utmz] = utm_zones.get(utmz,0)+1
return utm_zones
[docs] def get_stations_distances_stats(self):
"""
get the min max statistics of the distances between stations.
useful for determining the ellipses tipper sizes etc
:return: dict={}
"""
import math
mt_stations = []
for mtobj in self.mt_obj_list:
mt_stations.append( (mtobj.station, mtobj.lat, mtobj.lon, mtobj.utm_zone) )
pdf = pd.DataFrame(mt_stations, columns=['Station', 'Lat', 'Lon', 'UtmZone'])
mt_distances = []
for i in range(len(pdf)):
xi=pdf.iloc[i]['Lat']
yi=pdf.iloc[i]['Lon']
for j in range(i+1, len(pdf)):
xj = pdf.iloc[j]['Lat']
yj = pdf.iloc[j]['Lon']
dist = math.sqrt((xi-xj)**2 + (yi - yj)**2)
mt_distances.append(dist)
if(dist <0.004): # 0.004 is about 400 meters
self._logger.info("Small distances occurred between stations: %s %s", pdf.iloc[i].Station, pdf.iloc[j].Station)
# print (mt_distances)
anarray = pd.Series(mt_distances)
print(anarray.describe())
q01 = anarray.quantile(q=0.01)
q02 = anarray.quantile(q=0.02)
q03 = anarray.quantile(q=0.03)
q04 = anarray.quantile(q=0.04)
q05 = anarray.quantile(q=0.05)
self._logger.info("1,2,3,4 5 Percentile distances: %s, %s, %s, %s, %s", q01, q02,q03,q04,q05)
# anarray.plot()
# plt.show()
min_d = anarray.min() # cold be very small due to two close stations, skew the result
max_d = anarray.max()
self._logger.debug("Minimum = %s", min_d )
self._logger.debug("Maximum = %s", max_d )
return {"MIN_DIST":min_d, "Q1PERCENT":q01, "Q2PERCENT":q02, "Q3PERCENT":q03, "MAX_DIST":max_d}
[docs] def show_obj(self, dest_dir=None):
"""
test call object's methods and show it's properties
:return:
"""
print(len(self.all_unique_periods), 'unique periods (s)', self.all_unique_periods)
print(len(self.all_frequencies),
'unique frequencies (Hz)', self.all_frequencies)
myper = self.get_periods_by_stats(percentage=20)
print(myper)
print(self.bound_box_dict)
print(self.get_bounding_box(epsgcode=28353))
if dest_dir is None:
self.plot_stations(savefile= os.path.join(self.outdir,'edi_collection_test.jpg'))
else:
self.plot_stations(savefile= os.path.join(dest_dir,'edi_collection_test.jpg'))
# self.display_on_basemap()
# self.display_on_image()
# self.display_folium()
utmzones=self.get_station_utmzones_stats()
number_zones = len(list(utmzones.items()))
self._logger.info("This Edi fileset has %s UTM Zone(s): %s ", number_zones, utmzones)
return
[docs] def get_min_max_distance(self):
"""
get the min and max distance between all possible pairs of stations.
:return: min_dist, max_dist
"""
mt_distances = self.get_stations_distances_stats()
min_dist = mt_distances.get("MIN_DIST")
max_dist = mt_distances.get("MAX_DIST")
return min_dist, max_dist
[docs] def calculate_aver_impedance(self, dest_dir, component="det", rotation_angle=0, interpolate=True):
"""
calculate the average impedance tensor Z (related to apparent resistivity) of all edi (MT-stations) for each period.
algorithm:
- 1 make sure the stations all have the same period range, if not, interpolate onto common periods
- 2 rotate to strike if necessary
- 3 calculate: the determinant of the impedance tensor, or the geometric mean, if necessary
- 4 get the median resistivity for each period
- 5 get the median resistivity overall by taking the median of the above
:param component: =det – default, returns average for determinant of impedance tensor
=geom_mean – returns average geometric mean of the off diagonals sqrt(ZxyXZyx)
=separate returns a 2x2 array containing average for each component of the impedance tensor.
:param rotation_angle: angle to rotate the data by before calculating mean.
:return: A_dictionary=: Period->Median_Resist_On_Stations, OVER_ALL-> Median_Resist
"""
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
self._logger.info("result will be in the dir %s", dest_dir)
# summary csv file
csv_basename = "z_average_impedance"
csvfname = os.path.join(dest_dir, "%s.csv" % csv_basename)
pt_dict = {}
csv_header = [
'FREQ', 'STATION', 'LON', 'LAT','ZXXre', 'ZXXim',
'ZXYre', 'ZXYim', 'ZYXre', 'ZYXim', 'ZYYre', 'ZYYim',
"DETERM"
]
freq_list = self.all_frequencies
with open(csvfname, "w", newline="") as csvf:
writer = csv.writer(csvf)
writer.writerow(csv_header)
for freq in freq_list:
mtlist = []
for mt_obj in self.mt_obj_list:
f_index_list = None
zobj = None
if (interpolate):
f_index_list = [0]
newZ, newTipper = mt_obj.interpolate([freq], bounds_error=False)
zobj = newZ
else:
freq_max = freq * (1 + self.ptol)
freq_min = freq * (1 - self.ptol)
f_index_list = np.where((mt_obj.Z.freq < freq_max) & (mt_obj.Z.freq > freq_min))
f_index_list = f_index_list[0] # slice to get the dimension_freq.
zobj = mt_obj.Z
self._logger.debug("Debug interpolate=False f_index_list: %s,%s ", f_index_list, len(f_index_list))
# end if
self._logger.debug("Debug zobj.det ****** %s, %s,%s,%s",type(zobj.det), len(zobj.det), zobj.det[0], np.abs(zobj.det[0]))
if len(f_index_list) > 1:
self._logger.warn("more than one freq found %s", f_index_list)
if len(f_index_list) >= 1:
the_index = f_index_list[0]
self._logger.debug("The freqs index %s", the_index)
# geographic coord lat long and elevation
# long, lat, elev = (mt_obj.lon, mt_obj.lat, 0)
station, lat, lon = (
mt_obj.station, mt_obj.lat, mt_obj.lon)
mt_stat = [freq, station,lon, lat,
zobj.z[the_index, 0, 0].real,
zobj.z[the_index, 0, 0].imag,
zobj.z[the_index, 0, 1].real,
zobj.z[the_index, 0, 1].imag,
zobj.z[the_index, 1, 0].real,
zobj.z[the_index, 1, 0].imag,
zobj.z[the_index, 1, 1].real,
zobj.z[the_index, 1, 1].imag,
np.abs(zobj.det[the_index])
]
mtlist.append(mt_stat)
else:
self._logger.warn(
'Freq %s NOT found for this station %s ***** Skipping it in csv file', freq, mt_obj.station)
with open(csvfname, "a", newline="") as csvf: # summary csv for all freqs
writer = csv.writer(csvf)
writer.writerows(mtlist)
csv_basename2 = "%s_%sHz.csv" % (csv_basename, str(freq))
csvfile2 = os.path.join(dest_dir, csv_basename2)
with open(csvfile2, "w", newline="") as csvf: # individual csvfile for each freq
writer = csv.writer(csvf)
writer.writerow(csv_header)
writer.writerows(mtlist)
pt_dict[freq] = mtlist
return pt_dict
##################################################################
if __name__ == "__main__":
# python mtpy/core/edi_collection.py examples/data/edi2/ /c/tmpedi2
# python mtpy/core/edi_collection.py examples/data/edi_files/ /c/tmp1128
# python mtpy/core/edi_collection.py examples/data/edi_files_2/ /c/tmp1127
if len(sys.argv) < 3:
print("\n USAGE: %s edi_dir out_Dir" % sys.argv[0])
sys.exit(1)
else:
argv1 = sys.argv[1]
if os.path.isdir(argv1):
edis = glob.glob(argv1 + '/*.edi')
assert len(edis) > 0 # must has edi files
obj = EdiCollection(edis)
elif os.path.isfile(argv1) and argv1.endswith('.edi'):
# assume input is a list of EDI files
obj = EdiCollection(sys.argv[1:-2])
else:
sys.exit(2)
outdir = sys.argv[2]
#obj.show_obj(dest_dir = outdir)
# obj.calculate_aver_impedance(out_dir=outdir)
# obj.create_mt_station_gdf(os.path.join(outdir, 'edi_collection_test.shp'))
mt_distances = obj.get_stations_distances_stats()
min_dist = mt_distances.get("MIN_DIST")
max_dist = mt_distances.get("MAX_DIST")
print( mt_distances )
# todo: review this method to consistent column naming. create_phase_tensor_csv
# obj.create_phase_tensor_csv(outdir)
# When the interploae = True
# obj.create_measurement_csv(outdir, interpolate=True)
# obj.calculate_aver_impedance(outdir,interpolate=True)
# When the interploae = False
obj.create_measurement_csv(outdir, interpolate=False)
obj.calculate_aver_impedance(outdir,interpolate=False)
# obj.create_penetration_depth_csv(dest_dir= outdir, period_list=[0.1067,95.33], interpolate=False)
#obj.create_penetration_depth_csv(outdir, interpolate=False)
# todo: interploate=True combine create_penetration_depth_csv into calculate_aver_impedance