datagenerator.wavelets

Generate wavelets from filters derived from real data

  1"""Generate wavelets from filters derived from real data"""
  2import numpy as np
  3from scipy.fftpack import fft, fftfreq, ifft
  4from scipy.interpolate import UnivariateSpline
  5from datagenerator.util import import_matplotlib
  6
  7
  8def fs_arr(fs):
  9    """Split input data into array of frequencies and array of amplitudes"""
 10    freq_values = fs[:, 0]
 11    db_levels = fs[:, 1]
 12    freq_list_arr = np.array(list(freq_values))
 13    db_list_arr = np.array(list(db_levels))
 14    for i in range(db_list_arr.shape[0]):
 15        db_list_arr[i, :] = 10.0 * np.log10(db_list_arr[i, :])
 16        db_list_arr[i, :] -= db_list_arr[i, :].max()
 17    return freq_list_arr, db_list_arr
 18
 19
 20def percentiles(freq_list_arr, db_list_arr, percentile_list):
 21    """Calculate percentiles of frequencies and amplitudes"""
 22    plot_freqs = []
 23    plot_dbs = []
 24    for ipctile in percentile_list:
 25        plot_freqs.append(np.mean(freq_list_arr, axis=0))
 26        dbs = np.percentile(db_list_arr, ipctile, axis=0)
 27        dbs -= dbs.max()
 28        plot_dbs.append(dbs)
 29    return plot_freqs, plot_dbs
 30
 31
 32def passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose):
 33    """Get passband frequencies"""
 34    upper_passband_index = np.argmax(np.std(plot_dbs_arr, axis=0))
 35    mid_freq_index = np.argmin(
 36        np.std(plot_dbs_arr[:, : upper_passband_index + 1], axis=0)
 37    )
 38    low_bandlimit_index = np.argmax(
 39        np.std(plot_dbs_arr[:, : mid_freq_index + 1], axis=0)
 40    )
 41    hi_std = np.max(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0))
 42    lo_std = np.min(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0))
 43    clipped_std = np.std(plot_dbs_arr[:, upper_passband_index:], axis=0).clip(
 44        lo_std + 0.1 * (hi_std - lo_std), hi_std
 45    )
 46    hi_bandlimit_index = np.argmin(clipped_std) + upper_passband_index
 47    passband_low_freq = plot_freqs_arr[0, low_bandlimit_index]
 48    passband_mid_freq = plot_freqs_arr[0, mid_freq_index]
 49    passband_hi_freq = plot_freqs_arr[0, hi_bandlimit_index]
 50
 51    if verbose:
 52        print(
 53            f"(low, mid, hi) frequencies = {passband_low_freq:.4f}, {passband_mid_freq:.4f}, {passband_hi_freq:.4f}"
 54        )
 55    return passband_low_freq, passband_mid_freq, passband_hi_freq
 56
 57
 58def hanflat(inarray, pctflat):
 59    """
 60    #   Function applies a Hanning taper to ends of "inarray".
 61    #   Center "pctflat" of samples remain unchanged.
 62    #
 63    #   Parameters:
 64    #   array :       array of values to have ends tapered
 65    #   pctflat :     this percent of  samples to remain unchanged (e.g. 0.80)
 66    """
 67    numsamples = len(inarray)
 68    lowflatindex = int(round(numsamples * (1.0 - pctflat) / 2.0))
 69    hiflatindex = numsamples - lowflatindex
 70
 71    # print 'length of array, start, end ',len(inarray),lowflatindex,hiflatindex
 72
 73    # get hanning for numsamples*(1.0-pctflat)
 74    hanwgt = np.hanning(len(inarray) - (hiflatindex - lowflatindex))
 75
 76    # piece together hanning weights at ends and weights=1.0 in flat portion
 77    outarray = np.ones(len(inarray), dtype=float)
 78    outarray[:lowflatindex] = hanwgt[:lowflatindex]
 79    outarray[hiflatindex:] = hanwgt[numsamples - hiflatindex :]
 80
 81    return outarray
 82
 83
 84def ricker(f, dt, convolutions=2):
 85    """
 86    input frequency and sampling interval (sec)
 87    """
 88
 89    lenhalf = 1250.0 / f
 90    halfsmp = int(lenhalf / dt) + 1
 91    lenhalf = halfsmp * dt
 92    t = np.arange(-lenhalf, lenhalf + dt, dt) / 1000.0
 93    s = (1 - 2 * np.pi ** 2 * f ** 2 * t ** 2) * np.exp(-(np.pi ** 2) * f ** 2 * t ** 2)
 94    t *= 1000.0
 95    for _ in range(convolutions - 1):
 96        s = np.convolve(s, s, mode="full")
 97    # i_shorten1,i_shorten2 = shortenwavlet(s,.9999,method="strip")
 98    # print "sinc length = ", s[i_shorten1:i_shorten2].shape
 99    # print "sinc peak at ", np.argmax(s[i_shorten1:i_shorten2])
100    # return t,s[i_shorten1:i_shorten2+1]
101    hanningwindow = hanflat(s, 0.50)
102    return t, s * hanningwindow
103
104
105def default_fft(digi=4, convolutions=4):
106    """Setup a default fft"""
107    Nyquist = 0.5 * 1000.0 / digi
108    _, s = ricker(0.12 * Nyquist, digi, convolutions=convolutions)
109    return s, fftfreq(len(s), d=digi / 1000.0)
110
111
112def splineinfill(x, y, newx, damp=0.0):
113    """
114    #   Function to return data spaced at locations specified by input variable 'newx' after
115    #   fitting cubic spline function. Input data specified by 'x' and 'y' so data
116    #   can be irregularly sampled.
117    """
118    s = UnivariateSpline(x, y, s=damp)
119    return s(newx)
120
121
122def calculate_wavelet_in_time_domain(ampls):
123    """Generate wavelet in time domain from amplitudes and frequencies"""
124    ampli_spectrum_linear = (10.0 ** (ampls / 10.0)).astype("complex")
125    ampli_spectrum_linear = np.roll(
126        ampli_spectrum_linear, ampli_spectrum_linear.shape[0] // 2
127    )
128    new_wavelet = np.real(ifft(ampli_spectrum_linear))
129    new_wavelet = np.roll(new_wavelet, new_wavelet.size // 2)
130    # apply hanning taper at beginning and end of time-domain wavelet
131    new_wavelet = new_wavelet * hanflat(new_wavelet, 0.60)
132    return ampli_spectrum_linear, new_wavelet
133
134
135def generate_wavelet(fs, pcts, digi=4.0, convolutions=4, verbose=False):
136    """Generate a wavelet using interpolated percentiles of the spectrum"""
137    freq_list_arr, db_list_arr = fs_arr(fs)
138    percentile_list = [5, 10, 25, 50, 75, 90, 95]
139    plot_freqs, plot_dbs = percentiles(freq_list_arr, db_list_arr, percentile_list)
140    plot_freqs_arr = np.array(plot_freqs)
141    plot_dbs_arr = np.array(plot_dbs)
142
143    passbands = passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose=verbose)
144
145    # set up default fft
146    s, freq = default_fft(digi=digi, convolutions=convolutions)
147    s /= np.sum(np.sqrt(s ** 2))  # normalize
148    ampli_spectrum = abs(fft(s))
149    ampli_spectrum = fft(s)
150    freq = np.roll(freq, freq.shape[0] // 2)
151    ampli_spectrum = np.roll(ampli_spectrum, ampli_spectrum.shape[0] // 2)
152
153    __freqs = plot_freqs[0] * 1.0
154    ___freqs = np.hstack((0.0, plot_freqs[0], (freq[-2] + freq[-1]) / 2.0, freq[-1]))
155    ___freqs = np.hstack((-___freqs[1:][::-1], ___freqs))
156
157    # low_freq_pctile = np.random.uniform(0, 100)
158    # mid_freq_pctile = np.random.uniform(0, 100)
159    # high_freq_pctile = np.random.uniform(0, 100)
160    freq_pctiles = np.interp(__freqs, passbands, pcts)
161    ampls = []
162    for ifreq in range(__freqs.size):
163        ampls.append(np.percentile(db_list_arr[:, ifreq], freq_pctiles[ifreq], axis=0))
164
165    ampls = np.array(ampls)
166    ampls -= ampls.max()
167    ampls_with_edges = np.hstack(
168        (-40.0, -40.0, ampls[::-1], -40.0, ampls, -40.0, -40.0)
169    )
170
171    # interpolate onto freqs in fft
172    try:
173        ____dbs = splineinfill(___freqs, ampls_with_edges, freq, damp=0.0)
174    except:
175        ____dbs = np.interp(freq, ___freqs, ampls_with_edges)
176
177    # generate wavelet in time domain
178    ampli_spectrum, new_wavelet = calculate_wavelet_in_time_domain(____dbs)
179    freqs = [freq, __freqs, ___freqs]
180    amps = [ampli_spectrum, ampls, ampls_with_edges]
181    return freqs, amps, new_wavelet
182
183
184def plot_wavelets(freqs, ampls, wavs, labels, savepng=None):
185    """Plot wavelet and spectra"""
186    plt = import_matplotlib()
187    fig, axs = plt.subplots(1, 2, figsize=(20, 8))
188    for f, a, w, l in zip(freqs, ampls, wavs, labels):
189        axs[0].plot(w, label=l)
190        axs[1].plot(
191            f[0][f[0].size // 2 :],
192            10.0 * np.log10(np.real(a[0])[f[0].size // 2 :])[::-1],
193            label=l,
194        )
195    for a in axs.ravel():
196        a.grid()
197        a.legend(loc="upper right")
198    if savepng is not None:
199        fig.savefig(savepng)
def fs_arr(fs):
 9def fs_arr(fs):
10    """Split input data into array of frequencies and array of amplitudes"""
11    freq_values = fs[:, 0]
12    db_levels = fs[:, 1]
13    freq_list_arr = np.array(list(freq_values))
14    db_list_arr = np.array(list(db_levels))
15    for i in range(db_list_arr.shape[0]):
16        db_list_arr[i, :] = 10.0 * np.log10(db_list_arr[i, :])
17        db_list_arr[i, :] -= db_list_arr[i, :].max()
18    return freq_list_arr, db_list_arr

Split input data into array of frequencies and array of amplitudes

def percentiles(freq_list_arr, db_list_arr, percentile_list):
21def percentiles(freq_list_arr, db_list_arr, percentile_list):
22    """Calculate percentiles of frequencies and amplitudes"""
23    plot_freqs = []
24    plot_dbs = []
25    for ipctile in percentile_list:
26        plot_freqs.append(np.mean(freq_list_arr, axis=0))
27        dbs = np.percentile(db_list_arr, ipctile, axis=0)
28        dbs -= dbs.max()
29        plot_dbs.append(dbs)
30    return plot_freqs, plot_dbs

Calculate percentiles of frequencies and amplitudes

def passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose):
33def passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose):
34    """Get passband frequencies"""
35    upper_passband_index = np.argmax(np.std(plot_dbs_arr, axis=0))
36    mid_freq_index = np.argmin(
37        np.std(plot_dbs_arr[:, : upper_passband_index + 1], axis=0)
38    )
39    low_bandlimit_index = np.argmax(
40        np.std(plot_dbs_arr[:, : mid_freq_index + 1], axis=0)
41    )
42    hi_std = np.max(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0))
43    lo_std = np.min(np.std(plot_dbs_arr[:, upper_passband_index:], axis=0))
44    clipped_std = np.std(plot_dbs_arr[:, upper_passband_index:], axis=0).clip(
45        lo_std + 0.1 * (hi_std - lo_std), hi_std
46    )
47    hi_bandlimit_index = np.argmin(clipped_std) + upper_passband_index
48    passband_low_freq = plot_freqs_arr[0, low_bandlimit_index]
49    passband_mid_freq = plot_freqs_arr[0, mid_freq_index]
50    passband_hi_freq = plot_freqs_arr[0, hi_bandlimit_index]
51
52    if verbose:
53        print(
54            f"(low, mid, hi) frequencies = {passband_low_freq:.4f}, {passband_mid_freq:.4f}, {passband_hi_freq:.4f}"
55        )
56    return passband_low_freq, passband_mid_freq, passband_hi_freq

Get passband frequencies

def hanflat(inarray, pctflat):
59def hanflat(inarray, pctflat):
60    """
61    #   Function applies a Hanning taper to ends of "inarray".
62    #   Center "pctflat" of samples remain unchanged.
63    #
64    #   Parameters:
65    #   array :       array of values to have ends tapered
66    #   pctflat :     this percent of  samples to remain unchanged (e.g. 0.80)
67    """
68    numsamples = len(inarray)
69    lowflatindex = int(round(numsamples * (1.0 - pctflat) / 2.0))
70    hiflatindex = numsamples - lowflatindex
71
72    # print 'length of array, start, end ',len(inarray),lowflatindex,hiflatindex
73
74    # get hanning for numsamples*(1.0-pctflat)
75    hanwgt = np.hanning(len(inarray) - (hiflatindex - lowflatindex))
76
77    # piece together hanning weights at ends and weights=1.0 in flat portion
78    outarray = np.ones(len(inarray), dtype=float)
79    outarray[:lowflatindex] = hanwgt[:lowflatindex]
80    outarray[hiflatindex:] = hanwgt[numsamples - hiflatindex :]
81
82    return outarray

Function applies a Hanning taper to ends of "inarray".

Center "pctflat" of samples remain unchanged.

#

Parameters:

array : array of values to have ends tapered

pctflat : this percent of samples to remain unchanged (e.g. 0.80)

def ricker(f, dt, convolutions=2):
 85def ricker(f, dt, convolutions=2):
 86    """
 87    input frequency and sampling interval (sec)
 88    """
 89
 90    lenhalf = 1250.0 / f
 91    halfsmp = int(lenhalf / dt) + 1
 92    lenhalf = halfsmp * dt
 93    t = np.arange(-lenhalf, lenhalf + dt, dt) / 1000.0
 94    s = (1 - 2 * np.pi ** 2 * f ** 2 * t ** 2) * np.exp(-(np.pi ** 2) * f ** 2 * t ** 2)
 95    t *= 1000.0
 96    for _ in range(convolutions - 1):
 97        s = np.convolve(s, s, mode="full")
 98    # i_shorten1,i_shorten2 = shortenwavlet(s,.9999,method="strip")
 99    # print "sinc length = ", s[i_shorten1:i_shorten2].shape
100    # print "sinc peak at ", np.argmax(s[i_shorten1:i_shorten2])
101    # return t,s[i_shorten1:i_shorten2+1]
102    hanningwindow = hanflat(s, 0.50)
103    return t, s * hanningwindow

input frequency and sampling interval (sec)

def default_fft(digi=4, convolutions=4):
106def default_fft(digi=4, convolutions=4):
107    """Setup a default fft"""
108    Nyquist = 0.5 * 1000.0 / digi
109    _, s = ricker(0.12 * Nyquist, digi, convolutions=convolutions)
110    return s, fftfreq(len(s), d=digi / 1000.0)

Setup a default fft

def splineinfill(x, y, newx, damp=0.0):
113def splineinfill(x, y, newx, damp=0.0):
114    """
115    #   Function to return data spaced at locations specified by input variable 'newx' after
116    #   fitting cubic spline function. Input data specified by 'x' and 'y' so data
117    #   can be irregularly sampled.
118    """
119    s = UnivariateSpline(x, y, s=damp)
120    return s(newx)

Function to return data spaced at locations specified by input variable 'newx' after

fitting cubic spline function. Input data specified by 'x' and 'y' so data

can be irregularly sampled.

def calculate_wavelet_in_time_domain(ampls):
123def calculate_wavelet_in_time_domain(ampls):
124    """Generate wavelet in time domain from amplitudes and frequencies"""
125    ampli_spectrum_linear = (10.0 ** (ampls / 10.0)).astype("complex")
126    ampli_spectrum_linear = np.roll(
127        ampli_spectrum_linear, ampli_spectrum_linear.shape[0] // 2
128    )
129    new_wavelet = np.real(ifft(ampli_spectrum_linear))
130    new_wavelet = np.roll(new_wavelet, new_wavelet.size // 2)
131    # apply hanning taper at beginning and end of time-domain wavelet
132    new_wavelet = new_wavelet * hanflat(new_wavelet, 0.60)
133    return ampli_spectrum_linear, new_wavelet

Generate wavelet in time domain from amplitudes and frequencies

def generate_wavelet(fs, pcts, digi=4.0, convolutions=4, verbose=False):
136def generate_wavelet(fs, pcts, digi=4.0, convolutions=4, verbose=False):
137    """Generate a wavelet using interpolated percentiles of the spectrum"""
138    freq_list_arr, db_list_arr = fs_arr(fs)
139    percentile_list = [5, 10, 25, 50, 75, 90, 95]
140    plot_freqs, plot_dbs = percentiles(freq_list_arr, db_list_arr, percentile_list)
141    plot_freqs_arr = np.array(plot_freqs)
142    plot_dbs_arr = np.array(plot_dbs)
143
144    passbands = passband_freqs(plot_freqs_arr, plot_dbs_arr, verbose=verbose)
145
146    # set up default fft
147    s, freq = default_fft(digi=digi, convolutions=convolutions)
148    s /= np.sum(np.sqrt(s ** 2))  # normalize
149    ampli_spectrum = abs(fft(s))
150    ampli_spectrum = fft(s)
151    freq = np.roll(freq, freq.shape[0] // 2)
152    ampli_spectrum = np.roll(ampli_spectrum, ampli_spectrum.shape[0] // 2)
153
154    __freqs = plot_freqs[0] * 1.0
155    ___freqs = np.hstack((0.0, plot_freqs[0], (freq[-2] + freq[-1]) / 2.0, freq[-1]))
156    ___freqs = np.hstack((-___freqs[1:][::-1], ___freqs))
157
158    # low_freq_pctile = np.random.uniform(0, 100)
159    # mid_freq_pctile = np.random.uniform(0, 100)
160    # high_freq_pctile = np.random.uniform(0, 100)
161    freq_pctiles = np.interp(__freqs, passbands, pcts)
162    ampls = []
163    for ifreq in range(__freqs.size):
164        ampls.append(np.percentile(db_list_arr[:, ifreq], freq_pctiles[ifreq], axis=0))
165
166    ampls = np.array(ampls)
167    ampls -= ampls.max()
168    ampls_with_edges = np.hstack(
169        (-40.0, -40.0, ampls[::-1], -40.0, ampls, -40.0, -40.0)
170    )
171
172    # interpolate onto freqs in fft
173    try:
174        ____dbs = splineinfill(___freqs, ampls_with_edges, freq, damp=0.0)
175    except:
176        ____dbs = np.interp(freq, ___freqs, ampls_with_edges)
177
178    # generate wavelet in time domain
179    ampli_spectrum, new_wavelet = calculate_wavelet_in_time_domain(____dbs)
180    freqs = [freq, __freqs, ___freqs]
181    amps = [ampli_spectrum, ampls, ampls_with_edges]
182    return freqs, amps, new_wavelet

Generate a wavelet using interpolated percentiles of the spectrum

def plot_wavelets(freqs, ampls, wavs, labels, savepng=None):
185def plot_wavelets(freqs, ampls, wavs, labels, savepng=None):
186    """Plot wavelet and spectra"""
187    plt = import_matplotlib()
188    fig, axs = plt.subplots(1, 2, figsize=(20, 8))
189    for f, a, w, l in zip(freqs, ampls, wavs, labels):
190        axs[0].plot(w, label=l)
191        axs[1].plot(
192            f[0][f[0].size // 2 :],
193            10.0 * np.log10(np.real(a[0])[f[0].size // 2 :])[::-1],
194            label=l,
195        )
196    for a in axs.ravel():
197        a.grid()
198        a.legend(loc="upper right")
199    if savepng is not None:
200        fig.savefig(savepng)

Plot wavelet and spectra