datagenerator.wavelets
Generate wavelets from filters derived from real data
1"""Generate wavelets from filters derived from real data""" 2import numpy as np 3from scipy.fftpack import fft, fftfreq, ifft 4from scipy.interpolate import UnivariateSpline 5from datagenerator.util import import_matplotlib 6 7 8def fs_arr(fs): 9 """Split input data into array of frequencies and array of amplitudes""" 10 freq_values = fs[:, 0] 11 db_levels = fs[:, 1] 12 freq_list_arr = np.array(list(freq_values)) 13 db_list_arr = np.array(list(db_levels)) 14 for i in range(db_list_arr.shape[0]): 15 db_list_arr[i, :] = 10.0 * np.log10(db_list_arr[i, :]) 16 db_list_arr[i, :] -= db_list_arr[i, :].max() 17 return freq_list_arr, db_list_arr 18 19 20def percentiles(freq_list_arr, db_list_arr, percentile_list): 21 """Calculate percentiles of frequencies and amplitudes""" 22 plot_freqs = [] 23 plot_dbs = [] 24 for ipctile in percentile_list: 25 plot_freqs.append(np.mean(freq_list_arr, axis=0)) 26 dbs = np.percentile(db_list_arr, ipctile, axis=0) 27 dbs -= dbs.max() 28 plot_dbs.append(dbs) 29 return plot_freqs, plot_dbs 30 31 32def passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose): 33 """Get passband frequencies""" 34 upper_passband_index = np.argmax(np.std(plot_dbs_arr, axis=0)) 35 mid_freq_index = np.argmin( 36 np.std(plot_dbs_arr[:, : upper_passband_index + 1], axis=0) 37 ) 38 low_bandlimit_index = np.argmax( 39 np.std(plot_dbs_arr[:, : mid_freq_index + 1], axis=0) 40 ) 41 hi_std = np.max(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0)) 42 lo_std = np.min(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0)) 43 clipped_std = np.std(plot_dbs_arr[:, upper_passband_index:], axis=0).clip( 44 lo_std + 0.1 * (hi_std - lo_std), hi_std 45 ) 46 hi_bandlimit_index = np.argmin(clipped_std) + upper_passband_index 47 passband_low_freq = plot_freqs_arr[0, low_bandlimit_index] 48 passband_mid_freq = plot_freqs_arr[0, mid_freq_index] 49 passband_hi_freq = plot_freqs_arr[0, hi_bandlimit_index] 50 51 if verbose: 52 print( 53 f"(low, mid, hi) frequencies = {passband_low_freq:.4f}, {passband_mid_freq:.4f}, {passband_hi_freq:.4f}" 54 ) 55 return passband_low_freq, passband_mid_freq, passband_hi_freq 56 57 58def hanflat(inarray, pctflat): 59 """ 60 # Function applies a Hanning taper to ends of "inarray". 61 # Center "pctflat" of samples remain unchanged. 62 # 63 # Parameters: 64 # array : array of values to have ends tapered 65 # pctflat : this percent of samples to remain unchanged (e.g. 0.80) 66 """ 67 numsamples = len(inarray) 68 lowflatindex = int(round(numsamples * (1.0 - pctflat) / 2.0)) 69 hiflatindex = numsamples - lowflatindex 70 71 # print 'length of array, start, end ',len(inarray),lowflatindex,hiflatindex 72 73 # get hanning for numsamples*(1.0-pctflat) 74 hanwgt = np.hanning(len(inarray) - (hiflatindex - lowflatindex)) 75 76 # piece together hanning weights at ends and weights=1.0 in flat portion 77 outarray = np.ones(len(inarray), dtype=float) 78 outarray[:lowflatindex] = hanwgt[:lowflatindex] 79 outarray[hiflatindex:] = hanwgt[numsamples - hiflatindex :] 80 81 return outarray 82 83 84def ricker(f, dt, convolutions=2): 85 """ 86 input frequency and sampling interval (sec) 87 """ 88 89 lenhalf = 1250.0 / f 90 halfsmp = int(lenhalf / dt) + 1 91 lenhalf = halfsmp * dt 92 t = np.arange(-lenhalf, lenhalf + dt, dt) / 1000.0 93 s = (1 - 2 * np.pi ** 2 * f ** 2 * t ** 2) * np.exp(-(np.pi ** 2) * f ** 2 * t ** 2) 94 t *= 1000.0 95 for _ in range(convolutions - 1): 96 s = np.convolve(s, s, mode="full") 97 # i_shorten1,i_shorten2 = shortenwavlet(s,.9999,method="strip") 98 # print "sinc length = ", s[i_shorten1:i_shorten2].shape 99 # print "sinc peak at ", np.argmax(s[i_shorten1:i_shorten2]) 100 # return t,s[i_shorten1:i_shorten2+1] 101 hanningwindow = hanflat(s, 0.50) 102 return t, s * hanningwindow 103 104 105def default_fft(digi=4, convolutions=4): 106 """Setup a default fft""" 107 Nyquist = 0.5 * 1000.0 / digi 108 _, s = ricker(0.12 * Nyquist, digi, convolutions=convolutions) 109 return s, fftfreq(len(s), d=digi / 1000.0) 110 111 112def splineinfill(x, y, newx, damp=0.0): 113 """ 114 # Function to return data spaced at locations specified by input variable 'newx' after 115 # fitting cubic spline function. Input data specified by 'x' and 'y' so data 116 # can be irregularly sampled. 117 """ 118 s = UnivariateSpline(x, y, s=damp) 119 return s(newx) 120 121 122def calculate_wavelet_in_time_domain(ampls): 123 """Generate wavelet in time domain from amplitudes and frequencies""" 124 ampli_spectrum_linear = (10.0 ** (ampls / 10.0)).astype("complex") 125 ampli_spectrum_linear = np.roll( 126 ampli_spectrum_linear, ampli_spectrum_linear.shape[0] // 2 127 ) 128 new_wavelet = np.real(ifft(ampli_spectrum_linear)) 129 new_wavelet = np.roll(new_wavelet, new_wavelet.size // 2) 130 # apply hanning taper at beginning and end of time-domain wavelet 131 new_wavelet = new_wavelet * hanflat(new_wavelet, 0.60) 132 return ampli_spectrum_linear, new_wavelet 133 134 135def generate_wavelet(fs, pcts, digi=4.0, convolutions=4, verbose=False): 136 """Generate a wavelet using interpolated percentiles of the spectrum""" 137 freq_list_arr, db_list_arr = fs_arr(fs) 138 percentile_list = [5, 10, 25, 50, 75, 90, 95] 139 plot_freqs, plot_dbs = percentiles(freq_list_arr, db_list_arr, percentile_list) 140 plot_freqs_arr = np.array(plot_freqs) 141 plot_dbs_arr = np.array(plot_dbs) 142 143 passbands = passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose=verbose) 144 145 # set up default fft 146 s, freq = default_fft(digi=digi, convolutions=convolutions) 147 s /= np.sum(np.sqrt(s ** 2)) # normalize 148 ampli_spectrum = abs(fft(s)) 149 ampli_spectrum = fft(s) 150 freq = np.roll(freq, freq.shape[0] // 2) 151 ampli_spectrum = np.roll(ampli_spectrum, ampli_spectrum.shape[0] // 2) 152 153 __freqs = plot_freqs[0] * 1.0 154 ___freqs = np.hstack((0.0, plot_freqs[0], (freq[-2] + freq[-1]) / 2.0, freq[-1])) 155 ___freqs = np.hstack((-___freqs[1:][::-1], ___freqs)) 156 157 # low_freq_pctile = np.random.uniform(0, 100) 158 # mid_freq_pctile = np.random.uniform(0, 100) 159 # high_freq_pctile = np.random.uniform(0, 100) 160 freq_pctiles = np.interp(__freqs, passbands, pcts) 161 ampls = [] 162 for ifreq in range(__freqs.size): 163 ampls.append(np.percentile(db_list_arr[:, ifreq], freq_pctiles[ifreq], axis=0)) 164 165 ampls = np.array(ampls) 166 ampls -= ampls.max() 167 ampls_with_edges = np.hstack( 168 (-40.0, -40.0, ampls[::-1], -40.0, ampls, -40.0, -40.0) 169 ) 170 171 # interpolate onto freqs in fft 172 try: 173 ____dbs = splineinfill(___freqs, ampls_with_edges, freq, damp=0.0) 174 except: 175 ____dbs = np.interp(freq, ___freqs, ampls_with_edges) 176 177 # generate wavelet in time domain 178 ampli_spectrum, new_wavelet = calculate_wavelet_in_time_domain(____dbs) 179 freqs = [freq, __freqs, ___freqs] 180 amps = [ampli_spectrum, ampls, ampls_with_edges] 181 return freqs, amps, new_wavelet 182 183 184def plot_wavelets(freqs, ampls, wavs, labels, savepng=None): 185 """Plot wavelet and spectra""" 186 plt = import_matplotlib() 187 fig, axs = plt.subplots(1, 2, figsize=(20, 8)) 188 for f, a, w, l in zip(freqs, ampls, wavs, labels): 189 axs[0].plot(w, label=l) 190 axs[1].plot( 191 f[0][f[0].size // 2 :], 192 10.0 * np.log10(np.real(a[0])[f[0].size // 2 :])[::-1], 193 label=l, 194 ) 195 for a in axs.ravel(): 196 a.grid() 197 a.legend(loc="upper right") 198 if savepng is not None: 199 fig.savefig(savepng)
def
fs_arr(fs):
9def fs_arr(fs): 10 """Split input data into array of frequencies and array of amplitudes""" 11 freq_values = fs[:, 0] 12 db_levels = fs[:, 1] 13 freq_list_arr = np.array(list(freq_values)) 14 db_list_arr = np.array(list(db_levels)) 15 for i in range(db_list_arr.shape[0]): 16 db_list_arr[i, :] = 10.0 * np.log10(db_list_arr[i, :]) 17 db_list_arr[i, :] -= db_list_arr[i, :].max() 18 return freq_list_arr, db_list_arr
Split input data into array of frequencies and array of amplitudes
def
percentiles(freq_list_arr, db_list_arr, percentile_list):
21def percentiles(freq_list_arr, db_list_arr, percentile_list): 22 """Calculate percentiles of frequencies and amplitudes""" 23 plot_freqs = [] 24 plot_dbs = [] 25 for ipctile in percentile_list: 26 plot_freqs.append(np.mean(freq_list_arr, axis=0)) 27 dbs = np.percentile(db_list_arr, ipctile, axis=0) 28 dbs -= dbs.max() 29 plot_dbs.append(dbs) 30 return plot_freqs, plot_dbs
Calculate percentiles of frequencies and amplitudes
def
passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose):
33def passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose): 34 """Get passband frequencies""" 35 upper_passband_index = np.argmax(np.std(plot_dbs_arr, axis=0)) 36 mid_freq_index = np.argmin( 37 np.std(plot_dbs_arr[:, : upper_passband_index + 1], axis=0) 38 ) 39 low_bandlimit_index = np.argmax( 40 np.std(plot_dbs_arr[:, : mid_freq_index + 1], axis=0) 41 ) 42 hi_std = np.max(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0)) 43 lo_std = np.min(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0)) 44 clipped_std = np.std(plot_dbs_arr[:, upper_passband_index:], axis=0).clip( 45 lo_std + 0.1 * (hi_std - lo_std), hi_std 46 ) 47 hi_bandlimit_index = np.argmin(clipped_std) + upper_passband_index 48 passband_low_freq = plot_freqs_arr[0, low_bandlimit_index] 49 passband_mid_freq = plot_freqs_arr[0, mid_freq_index] 50 passband_hi_freq = plot_freqs_arr[0, hi_bandlimit_index] 51 52 if verbose: 53 print( 54 f"(low, mid, hi) frequencies = {passband_low_freq:.4f}, {passband_mid_freq:.4f}, {passband_hi_freq:.4f}" 55 ) 56 return passband_low_freq, passband_mid_freq, passband_hi_freq
Get passband frequencies
def
hanflat(inarray, pctflat):
59def hanflat(inarray, pctflat): 60 """ 61 # Function applies a Hanning taper to ends of "inarray". 62 # Center "pctflat" of samples remain unchanged. 63 # 64 # Parameters: 65 # array : array of values to have ends tapered 66 # pctflat : this percent of samples to remain unchanged (e.g. 0.80) 67 """ 68 numsamples = len(inarray) 69 lowflatindex = int(round(numsamples * (1.0 - pctflat) / 2.0)) 70 hiflatindex = numsamples - lowflatindex 71 72 # print 'length of array, start, end ',len(inarray),lowflatindex,hiflatindex 73 74 # get hanning for numsamples*(1.0-pctflat) 75 hanwgt = np.hanning(len(inarray) - (hiflatindex - lowflatindex)) 76 77 # piece together hanning weights at ends and weights=1.0 in flat portion 78 outarray = np.ones(len(inarray), dtype=float) 79 outarray[:lowflatindex] = hanwgt[:lowflatindex] 80 outarray[hiflatindex:] = hanwgt[numsamples - hiflatindex :] 81 82 return outarray
Function applies a Hanning taper to ends of "inarray".
Center "pctflat" of samples remain unchanged.
#
Parameters:
array : array of values to have ends tapered
pctflat : this percent of samples to remain unchanged (e.g. 0.80)
def
ricker(f, dt, convolutions=2):
85def ricker(f, dt, convolutions=2): 86 """ 87 input frequency and sampling interval (sec) 88 """ 89 90 lenhalf = 1250.0 / f 91 halfsmp = int(lenhalf / dt) + 1 92 lenhalf = halfsmp * dt 93 t = np.arange(-lenhalf, lenhalf + dt, dt) / 1000.0 94 s = (1 - 2 * np.pi ** 2 * f ** 2 * t ** 2) * np.exp(-(np.pi ** 2) * f ** 2 * t ** 2) 95 t *= 1000.0 96 for _ in range(convolutions - 1): 97 s = np.convolve(s, s, mode="full") 98 # i_shorten1,i_shorten2 = shortenwavlet(s,.9999,method="strip") 99 # print "sinc length = ", s[i_shorten1:i_shorten2].shape 100 # print "sinc peak at ", np.argmax(s[i_shorten1:i_shorten2]) 101 # return t,s[i_shorten1:i_shorten2+1] 102 hanningwindow = hanflat(s, 0.50) 103 return t, s * hanningwindow
input frequency and sampling interval (sec)
def
default_fft(digi=4, convolutions=4):
106def default_fft(digi=4, convolutions=4): 107 """Setup a default fft""" 108 Nyquist = 0.5 * 1000.0 / digi 109 _, s = ricker(0.12 * Nyquist, digi, convolutions=convolutions) 110 return s, fftfreq(len(s), d=digi / 1000.0)
Setup a default fft
def
splineinfill(x, y, newx, damp=0.0):
113def splineinfill(x, y, newx, damp=0.0): 114 """ 115 # Function to return data spaced at locations specified by input variable 'newx' after 116 # fitting cubic spline function. Input data specified by 'x' and 'y' so data 117 # can be irregularly sampled. 118 """ 119 s = UnivariateSpline(x, y, s=damp) 120 return s(newx)
Function to return data spaced at locations specified by input variable 'newx' after
fitting cubic spline function. Input data specified by 'x' and 'y' so data
can be irregularly sampled.
def
calculate_wavelet_in_time_domain(ampls):
123def calculate_wavelet_in_time_domain(ampls): 124 """Generate wavelet in time domain from amplitudes and frequencies""" 125 ampli_spectrum_linear = (10.0 ** (ampls / 10.0)).astype("complex") 126 ampli_spectrum_linear = np.roll( 127 ampli_spectrum_linear, ampli_spectrum_linear.shape[0] // 2 128 ) 129 new_wavelet = np.real(ifft(ampli_spectrum_linear)) 130 new_wavelet = np.roll(new_wavelet, new_wavelet.size // 2) 131 # apply hanning taper at beginning and end of time-domain wavelet 132 new_wavelet = new_wavelet * hanflat(new_wavelet, 0.60) 133 return ampli_spectrum_linear, new_wavelet
Generate wavelet in time domain from amplitudes and frequencies
def
generate_wavelet(fs, pcts, digi=4.0, convolutions=4, verbose=False):
136def generate_wavelet(fs, pcts, digi=4.0, convolutions=4, verbose=False): 137 """Generate a wavelet using interpolated percentiles of the spectrum""" 138 freq_list_arr, db_list_arr = fs_arr(fs) 139 percentile_list = [5, 10, 25, 50, 75, 90, 95] 140 plot_freqs, plot_dbs = percentiles(freq_list_arr, db_list_arr, percentile_list) 141 plot_freqs_arr = np.array(plot_freqs) 142 plot_dbs_arr = np.array(plot_dbs) 143 144 passbands = passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose=verbose) 145 146 # set up default fft 147 s, freq = default_fft(digi=digi, convolutions=convolutions) 148 s /= np.sum(np.sqrt(s ** 2)) # normalize 149 ampli_spectrum = abs(fft(s)) 150 ampli_spectrum = fft(s) 151 freq = np.roll(freq, freq.shape[0] // 2) 152 ampli_spectrum = np.roll(ampli_spectrum, ampli_spectrum.shape[0] // 2) 153 154 __freqs = plot_freqs[0] * 1.0 155 ___freqs = np.hstack((0.0, plot_freqs[0], (freq[-2] + freq[-1]) / 2.0, freq[-1])) 156 ___freqs = np.hstack((-___freqs[1:][::-1], ___freqs)) 157 158 # low_freq_pctile = np.random.uniform(0, 100) 159 # mid_freq_pctile = np.random.uniform(0, 100) 160 # high_freq_pctile = np.random.uniform(0, 100) 161 freq_pctiles = np.interp(__freqs, passbands, pcts) 162 ampls = [] 163 for ifreq in range(__freqs.size): 164 ampls.append(np.percentile(db_list_arr[:, ifreq], freq_pctiles[ifreq], axis=0)) 165 166 ampls = np.array(ampls) 167 ampls -= ampls.max() 168 ampls_with_edges = np.hstack( 169 (-40.0, -40.0, ampls[::-1], -40.0, ampls, -40.0, -40.0) 170 ) 171 172 # interpolate onto freqs in fft 173 try: 174 ____dbs = splineinfill(___freqs, ampls_with_edges, freq, damp=0.0) 175 except: 176 ____dbs = np.interp(freq, ___freqs, ampls_with_edges) 177 178 # generate wavelet in time domain 179 ampli_spectrum, new_wavelet = calculate_wavelet_in_time_domain(____dbs) 180 freqs = [freq, __freqs, ___freqs] 181 amps = [ampli_spectrum, ampls, ampls_with_edges] 182 return freqs, amps, new_wavelet
Generate a wavelet using interpolated percentiles of the spectrum
def
plot_wavelets(freqs, ampls, wavs, labels, savepng=None):
185def plot_wavelets(freqs, ampls, wavs, labels, savepng=None): 186 """Plot wavelet and spectra""" 187 plt = import_matplotlib() 188 fig, axs = plt.subplots(1, 2, figsize=(20, 8)) 189 for f, a, w, l in zip(freqs, ampls, wavs, labels): 190 axs[0].plot(w, label=l) 191 axs[1].plot( 192 f[0][f[0].size // 2 :], 193 10.0 * np.log10(np.real(a[0])[f[0].size // 2 :])[::-1], 194 label=l, 195 ) 196 for a in axs.ravel(): 197 a.grid() 198 a.legend(loc="upper right") 199 if savepng is not None: 200 fig.savefig(savepng)
Plot wavelet and spectra