Skip to content

Background

Model components for background modelling.

Background dataclass

Bases: Component

Superclass for background models.

Attributes:

Name Type Description
n_obs int

total number of observations in the background model (across all sensors).

n_parameter int

number of parameters in the background model

bg ndarray

array of sampled background values, populated in self.from_mcmc() after the MCMC run is completed.

precision_scalar ndarray

array of sampled background precision values, populated in self.from_mcmc() after the MCMC run is completed. Only populated if update_precision is True.

precision_matrix Union[ndarray, csr_array]

un-scaled precision matrix for the background parameter vector.

mean_bg float

global mean background value. Should be populated from the value specified in the GasSpecies object.

update_precision bool

logical determining whether the background (scalar) precision parameter should be updated as part of the MCMC. Defaults to False.

prior_precision_shape float

shape parameter for the prior gamma distribution for the scalar precision parameter(s).

prior_precision_rate float

rate parameter for the prior gamma distribution for the scalar precision parameter(s).

initial_precision float

initial value for the scalar precision parameter.

basis_matrix csr_array

[n_obs x n_time] matrix mapping the background model parameters on to the observations.

Source code in src/pyelq/component/background.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
@dataclass
class Background(Component):
    """Superclass for background models.

    Attributes:
        n_obs (int): total number of observations in the background model (across all sensors).
        n_parameter (int): number of parameters in the background model
        bg (np.ndarray): array of sampled background values, populated in self.from_mcmc() after the MCMC run is
            completed.
        precision_scalar (np.ndarray): array of sampled background precision values, populated in self.from_mcmc() after
            the MCMC run is completed. Only populated if update_precision is True.
        precision_matrix (Union[np.ndarray, sparse.csr_array]): un-scaled precision matrix for the background parameter
            vector.
        mean_bg (float): global mean background value. Should be populated from the value specified in the GasSpecies
            object.
        update_precision (bool): logical determining whether the background (scalar) precision parameter should be
            updated as part of the MCMC. Defaults to False.
        prior_precision_shape (float): shape parameter for the prior gamma distribution for the scalar precision
            parameter(s).
        prior_precision_rate (float): rate parameter for the prior gamma distribution for the scalar precision
            parameter(s).
        initial_precision (float): initial value for the scalar precision parameter.
        basis_matrix (sparse.csr_array): [n_obs x n_time] matrix mapping the background model parameters on to the
            observations.

    """

    n_obs: int = field(init=False)
    n_parameter: int = field(init=False)
    bg: np.ndarray = field(init=False)
    precision_scalar: np.ndarray = field(init=False)
    precision_matrix: Union[np.ndarray, sparse.csc_matrix] = field(init=False)
    mean_bg: Union[float, None] = None
    update_precision: bool = False
    prior_precision_shape: float = 1e-3
    prior_precision_rate: float = 1e-3
    initial_precision: float = 1.0
    basis_matrix: sparse.csr_array = field(init=False)

    @abstractmethod
    def initialise(self, sensor_object: SensorGroup, meteorology: MeteorologyGroup, gas_species: GasSpecies):
        """Take data inputs and extract relevant properties.

        Args:
            sensor_object (SensorGroup): sensor data
            meteorology (MeteorologyGroup): meteorology data
            gas_species (GasSpecies): gas species information

        """

    def make_model(self, model: list = None) -> list:
        """Take model list and append new elements from current model component.

        Args:
            model (list, optional): Current list of model elements. Defaults to None.

        Returns:
            list: model output list.

        """
        bg_precision_predictor = parameter.ScaledMatrix(matrix="P_bg", scalar="lambda_bg")
        model.append(Normal("bg", mean="mu_bg", precision=bg_precision_predictor))
        if self.update_precision:
            model.append(Gamma("lambda_bg", shape="a_lam_bg", rate="b_lam_bg"))
        return model

    def make_sampler(self, model: Model, sampler_list: list = None) -> list:
        """Take sampler list and append new elements from current model component.

        Args:
            model (Model): Full model list of distributions.
            sampler_list (list, optional): Current list of samplers. Defaults to None.

        Returns:
            list: sampler output list.

        """
        if sampler_list is None:
            sampler_list = []
        sampler_list.append(NormalNormal("bg", model))
        if self.update_precision:
            sampler_list.append(NormalGamma("lambda_bg", model))
        return sampler_list

    def make_state(self, state: dict = None) -> dict:
        """Take state dictionary and append initial values from model component.

        Args:
            state (dict, optional): current state vector. Defaults to None.

        Returns:
            dict: current state vector with components added.

        """
        state["mu_bg"] = np.ones((self.n_parameter, 1)) * self.mean_bg
        state["B_bg"] = self.basis_matrix
        state["bg"] = np.ones((self.n_parameter, 1)) * self.mean_bg
        state["P_bg"] = self.precision_matrix
        state["lambda_bg"] = self.initial_precision
        if self.update_precision:
            state["a_lam_bg"] = self.prior_precision_shape
            state["b_lam_bg"] = self.prior_precision_rate
        return state

    def from_mcmc(self, store: dict):
        """Extract results of mcmc from mcmc.store and attach to components.

        Args:
            store (dict): mcmc result dictionary.

        """
        self.bg = store["bg"]
        if self.update_precision:
            self.precision_scalar = store["lambda_bg"]

initialise(sensor_object, meteorology, gas_species) abstractmethod

Take data inputs and extract relevant properties.

Parameters:

Name Type Description Default
sensor_object SensorGroup

sensor data

required
meteorology MeteorologyGroup

meteorology data

required
gas_species GasSpecies

gas species information

required
Source code in src/pyelq/component/background.py
69
70
71
72
73
74
75
76
77
78
@abstractmethod
def initialise(self, sensor_object: SensorGroup, meteorology: MeteorologyGroup, gas_species: GasSpecies):
    """Take data inputs and extract relevant properties.

    Args:
        sensor_object (SensorGroup): sensor data
        meteorology (MeteorologyGroup): meteorology data
        gas_species (GasSpecies): gas species information

    """

make_model(model=None)

Take model list and append new elements from current model component.

Parameters:

Name Type Description Default
model list

Current list of model elements. Defaults to None.

None

Returns:

Name Type Description
list list

model output list.

Source code in src/pyelq/component/background.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def make_model(self, model: list = None) -> list:
    """Take model list and append new elements from current model component.

    Args:
        model (list, optional): Current list of model elements. Defaults to None.

    Returns:
        list: model output list.

    """
    bg_precision_predictor = parameter.ScaledMatrix(matrix="P_bg", scalar="lambda_bg")
    model.append(Normal("bg", mean="mu_bg", precision=bg_precision_predictor))
    if self.update_precision:
        model.append(Gamma("lambda_bg", shape="a_lam_bg", rate="b_lam_bg"))
    return model

make_sampler(model, sampler_list=None)

Take sampler list and append new elements from current model component.

Parameters:

Name Type Description Default
model Model

Full model list of distributions.

required
sampler_list list

Current list of samplers. Defaults to None.

None

Returns:

Name Type Description
list list

sampler output list.

Source code in src/pyelq/component/background.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def make_sampler(self, model: Model, sampler_list: list = None) -> list:
    """Take sampler list and append new elements from current model component.

    Args:
        model (Model): Full model list of distributions.
        sampler_list (list, optional): Current list of samplers. Defaults to None.

    Returns:
        list: sampler output list.

    """
    if sampler_list is None:
        sampler_list = []
    sampler_list.append(NormalNormal("bg", model))
    if self.update_precision:
        sampler_list.append(NormalGamma("lambda_bg", model))
    return sampler_list

make_state(state=None)

Take state dictionary and append initial values from model component.

Parameters:

Name Type Description Default
state dict

current state vector. Defaults to None.

None

Returns:

Name Type Description
dict dict

current state vector with components added.

Source code in src/pyelq/component/background.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def make_state(self, state: dict = None) -> dict:
    """Take state dictionary and append initial values from model component.

    Args:
        state (dict, optional): current state vector. Defaults to None.

    Returns:
        dict: current state vector with components added.

    """
    state["mu_bg"] = np.ones((self.n_parameter, 1)) * self.mean_bg
    state["B_bg"] = self.basis_matrix
    state["bg"] = np.ones((self.n_parameter, 1)) * self.mean_bg
    state["P_bg"] = self.precision_matrix
    state["lambda_bg"] = self.initial_precision
    if self.update_precision:
        state["a_lam_bg"] = self.prior_precision_shape
        state["b_lam_bg"] = self.prior_precision_rate
    return state

from_mcmc(store)

Extract results of mcmc from mcmc.store and attach to components.

Parameters:

Name Type Description Default
store dict

mcmc result dictionary.

required
Source code in src/pyelq/component/background.py
134
135
136
137
138
139
140
141
142
143
def from_mcmc(self, store: dict):
    """Extract results of mcmc from mcmc.store and attach to components.

    Args:
        store (dict): mcmc result dictionary.

    """
    self.bg = store["bg"]
    if self.update_precision:
        self.precision_scalar = store["lambda_bg"]

TemporalBackground dataclass

Bases: Background

Model which imposes only temporal correlation on the background parameters.

Assumes that the prior mean concentration of the background at every location/time point is the global average background concentration as defined in the input GasSpecies object.

Generates the (un-scaled) prior background precision matrix using the function gmrf.precision_temporal: this precision matrix imposes first-oder Markov structure for the temporal dependence.

By default, the times used for the model definition are the set of unique times in the observation set.

This background model only requires the initialise function, and does not require the implementation of any further methods.

Attributes:

Name Type Description
time Union[ndarray, DatetimeArray]

vector of times used in defining the model.

Source code in src/pyelq/component/background.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@dataclass
class TemporalBackground(Background):
    """Model which imposes only temporal correlation on the background parameters.

    Assumes that the prior mean concentration of the background at every location/time point is the global average
    background concentration as defined in the input GasSpecies object.

    Generates the (un-scaled) prior background precision matrix using the function gmrf.precision_temporal: this
    precision matrix imposes first-oder Markov structure for the temporal dependence.

    By default, the times used for the model definition are the set of unique times in the observation set.

    This background model only requires the initialise function, and does not require the implementation of any further
    methods.

    Attributes:
        time (Union[np.ndarray, pd.arrays.DatetimeArray]): vector of times used in defining the model.

    """

    time: Union[np.ndarray, pd.arrays.DatetimeArray] = field(init=False)

    def initialise(self, sensor_object: SensorGroup, meteorology: MeteorologyGroup, gas_species: GasSpecies):
        """Create temporal background model from sensor, meteorology and gas species inputs.

        Args:
            sensor_object (SensorGroup): sensor data object.
            meteorology (MeteorologyGroup): meteorology data object.
            gas_species (GasSpecies): gas species data object.

        """
        self.n_obs = sensor_object.nof_observations
        self.time, unique_inverse = np.unique(sensor_object.time, return_inverse=True)
        self.time = pd.array(self.time, dtype="datetime64[ns]")
        self.n_parameter = len(self.time)
        self.basis_matrix = sparse.csr_array((np.ones(self.n_obs), (np.array(range(self.n_obs)), unique_inverse)))
        self.precision_matrix = gmrf.precision_temporal(time=self.time)
        if self.mean_bg is None:
            self.mean_bg = gas_species.global_background

initialise(sensor_object, meteorology, gas_species)

Create temporal background model from sensor, meteorology and gas species inputs.

Parameters:

Name Type Description Default
sensor_object SensorGroup

sensor data object.

required
meteorology MeteorologyGroup

meteorology data object.

required
gas_species GasSpecies

gas species data object.

required
Source code in src/pyelq/component/background.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def initialise(self, sensor_object: SensorGroup, meteorology: MeteorologyGroup, gas_species: GasSpecies):
    """Create temporal background model from sensor, meteorology and gas species inputs.

    Args:
        sensor_object (SensorGroup): sensor data object.
        meteorology (MeteorologyGroup): meteorology data object.
        gas_species (GasSpecies): gas species data object.

    """
    self.n_obs = sensor_object.nof_observations
    self.time, unique_inverse = np.unique(sensor_object.time, return_inverse=True)
    self.time = pd.array(self.time, dtype="datetime64[ns]")
    self.n_parameter = len(self.time)
    self.basis_matrix = sparse.csr_array((np.ones(self.n_obs), (np.array(range(self.n_obs)), unique_inverse)))
    self.precision_matrix = gmrf.precision_temporal(time=self.time)
    if self.mean_bg is None:
        self.mean_bg = gas_species.global_background

SpatioTemporalBackground dataclass

Bases: Background

Model which imposes both spatial and temporal correlation on the background parameters.

Defines a grid in time, and assumes a correlated time-series per sensor using the defined time grid.

The background parameter is an [n_location * n_time x 1] (if self.spatial_dependence is True) or an [n_time x 1] vector (if self.spatial_dependence is False). In the spatio-temporal case, the background vector is assumed to unwrap over space and time as follows: bg = [b_1(t_1), b_2(t_1),..., b_nlct(t_1),...,b_1(t_k),..., b_nlct(t_k),...].T where nlct is the number of sensor locations. This unwrapping mechanism is chosen as it greatly speeds up the sparse matrix operations in the solver (vs. the alternative).

self.basis_matrix is set up to map the elements of the full background vector onto the observations, on the basis of spatial location and nearest time knot.

The temporal background correlation is computed using gmrf.precision_temporal, and the spatial correlation is computed using a squared exponential correlation function, parametrized by self.spatial_correlation_param (spatial correlation, measured in metres). The full precision matrix is simply a Kronecker product between the two component precision matrices.

Attributes:

Name Type Description
n_time int

number of time knots for which the model is defined. Note that this does not need to be the same as the number of concentration observations in the analysis.

n_location int

number of spatial knots in the model.

time DatetimeArray

vector of times used in defining the model.

spatial_dependence bool

flag indicating whether the background parameters should be spatially correlated. If True, the model assumes a separate background time-series per sensor location, and assumes these time-series to be spatially correlated. If False (default), the background parameters are assumed to be common between sensors (only temporally correlated).

spatial_correlation_param float

correlation length parameter, determining the degree of spatial correlation imposed on the background time-series. Units are metres. Assumes equal correlation in all spatial directions. Defaults to 1.0.

location ndarray

[n_location x 3] array of sensor locations, used for calculating the spatial correlation between the sensor background values. If self.spatial_dependence is False, this attribute is simply set to be the location of the first sensor in the sensor object.

temporal_precision_matrix Union[ndarray, csc_matrix]

temporal component of the precision matrix. The full model precision matrix is the Kronecker product of this matrix with self.spatial_precision_matrix.

spatial_precision_matrix ndarray

spatial component of the precision matrix. The full model precision matrix is the Kronecker product of this matrix with the self.temporal_precision_matrix. Simply set to 1 if self.spatial_dependence is False.

precision_time_0 float

precision relating to the first time stamp in the model. Defaults to 0.01.

Source code in src/pyelq/component/background.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
@dataclass
class SpatioTemporalBackground(Background):
    """Model which imposes both spatial and temporal correlation on the background parameters.

    Defines a grid in time, and assumes a correlated time-series per sensor using the defined time grid.

    The background parameter is an [n_location * n_time x 1] (if self.spatial_dependence is True) or an [n_time x 1]
    vector (if self.spatial_dependence is False). In the spatio-temporal case, the background vector is assumed to
    unwrap over space and time as follows:
    bg = [b_1(t_1), b_2(t_1),..., b_nlct(t_1),...,b_1(t_k),..., b_nlct(t_k),...].T
    where nlct is the number of sensor locations.
    This unwrapping mechanism is chosen as it greatly speeds up the sparse matrix operations in the solver (vs. the
    alternative).

    self.basis_matrix is set up to map the elements of the full background vector onto the observations, on the basis
    of spatial location and nearest time knot.

    The temporal background correlation is computed using gmrf.precision_temporal, and the spatial correlation is
    computed using a squared exponential correlation function, parametrized by self.spatial_correlation_param (spatial
    correlation, measured in metres). The full precision matrix is simply a Kronecker product between the two
    component precision matrices.

    Attributes:
        n_time (int): number of time knots for which the model is defined. Note that this does not need to be the same
            as the number of concentration observations in the analysis.
        n_location (int): number of spatial knots in the model.
        time (pd.arrays.DatetimeArray): vector of times used in defining the model.
        spatial_dependence (bool): flag indicating whether the background parameters should be spatially correlated. If
            True, the model assumes a separate background time-series per sensor location, and assumes these
            time-series to be spatially correlated. If False (default), the background parameters are assumed to be
            common between sensors (only temporally correlated).
        spatial_correlation_param (float): correlation length parameter, determining the degree of spatial correlation
            imposed on the background time-series. Units are metres. Assumes equal correlation in all spatial
            directions. Defaults to 1.0.
        location (np.ndarray): [n_location x 3] array of sensor locations, used for calculating the spatial correlation
            between the sensor background values. If self.spatial_dependence is False, this attribute is simply set to
            be the location of the first sensor in the sensor object.
        temporal_precision_matrix (Union[np.ndarray, sparse.csc_matrix]): temporal component of the precision matrix.
            The full model precision matrix is the Kronecker product of this matrix with self.spatial_precision_matrix.
        spatial_precision_matrix (np.ndarray): spatial component of the precision matrix. The full model precision
            matrix is the Kronecker product of this matrix with the self.temporal_precision_matrix. Simply set to 1 if
            self.spatial_dependence is False.
        precision_time_0 (float): precision relating to the first time stamp in the model. Defaults to 0.01.

    """

    n_time: Union[int, None] = None
    n_location: int = field(init=False)
    time: pd.arrays.DatetimeArray = field(init=False)
    spatial_dependence: bool = False
    spatial_correlation_param: float = field(init=False, default=1.0)
    location: Coordinate = field(init=False)
    temporal_precision_matrix: Union[np.ndarray, sparse.csc_matrix] = field(init=False)
    spatial_precision_matrix: np.ndarray = field(init=False)
    precision_time_0: float = field(init=False, default=0.01)

    def initialise(self, sensor_object: SensorGroup, meteorology: MeteorologyGroup, gas_species: GasSpecies):
        """Take data inputs and extract relevant properties.

        Args:
            sensor_object (SensorGroup): sensor data
            meteorology (MeteorologyGroup): meteorology data wind data
            gas_species (GasSpecies): gas species information

        """
        self.make_temporal_knots(sensor_object)
        self.make_spatial_knots(sensor_object)
        self.n_parameter = self.n_time * self.n_location
        self.n_obs = sensor_object.nof_observations

        self.make_precision_matrix()
        self.make_parameter_mapping(sensor_object)

        if self.mean_bg is None:
            self.mean_bg = gas_species.global_background

    def make_parameter_mapping(self, sensor_object: SensorGroup):
        """Create the mapping of parameters onto observations, through creation of the associated basis matrix.

        The background vector unwraps first over the spatial (sensor) location dimension, then over the temporal
        dimension. For more detail, see the main class docstring.

        The data vector in the solver state is assumed to consist of the individual sensor data vectors stacked
        consecutively.

        Args:
            sensor_object (SensorGroup): group of sensor objects.

        """
        nn_object = NearestNeighbors(n_neighbors=1, algorithm="kd_tree").fit(self.time.to_numpy().reshape(-1, 1))
        for k, sensor in enumerate(sensor_object.values()):
            _, time_index = nn_object.kneighbors(sensor.time.to_numpy().reshape(-1, 1))
            basis_matrix = sparse.csr_array(
                (np.ones(sensor.nof_observations), (np.array(range(sensor.nof_observations)), time_index.flatten())),
                shape=(sensor.nof_observations, self.n_time),
            )
            if self.spatial_dependence:
                basis_matrix = sparse.kron(basis_matrix, np.eye(N=self.n_location, M=1, k=-k).T)

            if k == 0:
                self.basis_matrix = basis_matrix
            else:
                self.basis_matrix = sparse.vstack([self.basis_matrix, basis_matrix])

    def make_temporal_knots(self, sensor_object: SensorGroup):
        """Create the temporal grid for the model.

        If self.n_time is not specified, then the model will use the unique set of times from the sensor data.

        If self.n_time is specified, then the model will define a time grid with self.n_time elements.

        Args:
            sensor_object (SensorGroup): group of sensor objects.

        """
        if self.n_time is None:
            self.time = pd.array(np.unique(sensor_object.time), dtype="datetime64[ns]")
            self.n_time = len(self.time)
        else:
            self.time = pd.array(
                pd.date_range(start=np.min(sensor_object.time), end=np.max(sensor_object.time), periods=self.n_time),
                dtype="datetime64[ns]",
            )

    def make_spatial_knots(self, sensor_object: SensorGroup):
        """Create the spatial grid for the model.

        If self.spatial_dependence is False, the code assumes that only a single (arbitrary) location is used, thereby
        eliminating any spatial dependence.

        If self.spatial_dependence is True, a separate but correlated time-series of background parameters is assumed
        for each sensor location.

        Args:
            sensor_object (SensorGroup): group of sensor objects.

        """
        if self.spatial_dependence:
            self.n_location = sensor_object.nof_sensors
            self.get_locations_from_sensors(sensor_object)
        else:
            self.n_location = 1
            self.location = sensor_object[list(sensor_object.keys())[0]].location

    def make_precision_matrix(self):
        """Create the full precision matrix for the background parameters.

        Defined as the Kronecker product of the temporal precision matrix and the spatial precision matrix.

        """
        self.temporal_precision_matrix = gmrf.precision_temporal(time=self.time)
        lam = self.temporal_precision_matrix[0, 0]
        self.temporal_precision_matrix[0, 0] = lam * (2.0 - lam / (self.precision_time_0 + lam))

        if self.spatial_dependence:
            self.make_spatial_precision_matrix()
            self.precision_matrix = sparse.kron(self.temporal_precision_matrix, self.spatial_precision_matrix)
        else:
            self.precision_matrix = self.temporal_precision_matrix
        if (self.n_parameter == 1) and sparse.issparse(self.precision_matrix):
            self.precision_matrix = self.precision_matrix.toarray()

    def make_spatial_precision_matrix(self):
        """Create the spatial precision matrix for the model.

        The spatial precision matrix is simply calculated as the inverse of a squared exponential covariance matrix
        calculated using the sensor locations.

        """
        location_array = self.location.to_array()
        spatial_covariance_matrix = np.exp(
            -(1 / (2 * np.power(self.spatial_correlation_param, 2)))
            * (
                np.power(location_array[:, [0]] - location_array[:, [0]].T, 2)
                + np.power(location_array[:, [1]] - location_array[:, [1]].T, 2)
                + np.power(location_array[:, [2]] - location_array[:, [2]].T, 2)
            )
        )
        self.spatial_precision_matrix = np.linalg.inv(
            spatial_covariance_matrix + (1e-6) * np.eye(spatial_covariance_matrix.shape[0])
        )

    def get_locations_from_sensors(self, sensor_object: SensorGroup):
        """Extract the location information from the sensor object.

        Attaches a Coordinate.ENU object as the self.location attribute, with all the sensor locations stored on the
        same object.

        Args:
            sensor_object (SensorGroup): group of sensor objects.

        """
        self.location = deepcopy(sensor_object[list(sensor_object.keys())[0]].location.to_enu())
        self.location.east = np.full(shape=(self.n_location,), fill_value=np.nan)
        self.location.north = np.full(shape=(self.n_location,), fill_value=np.nan)
        self.location.up = np.full(shape=(self.n_location,), fill_value=np.nan)
        for k, sensor in enumerate(sensor_object.values()):
            if isinstance(sensor, Beam):
                self.location.east[k] = np.mean(sensor.location.to_enu().east, axis=0)
                self.location.north[k] = np.mean(sensor.location.to_enu().north, axis=0)
                self.location.up[k] = np.mean(sensor.location.to_enu().up, axis=0)
            else:
                self.location.east[k] = sensor.location.to_enu().east
                self.location.north[k] = sensor.location.to_enu().north
                self.location.up[k] = sensor.location.to_enu().up

initialise(sensor_object, meteorology, gas_species)

Take data inputs and extract relevant properties.

Parameters:

Name Type Description Default
sensor_object SensorGroup

sensor data

required
meteorology MeteorologyGroup

meteorology data wind data

required
gas_species GasSpecies

gas species information

required
Source code in src/pyelq/component/background.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def initialise(self, sensor_object: SensorGroup, meteorology: MeteorologyGroup, gas_species: GasSpecies):
    """Take data inputs and extract relevant properties.

    Args:
        sensor_object (SensorGroup): sensor data
        meteorology (MeteorologyGroup): meteorology data wind data
        gas_species (GasSpecies): gas species information

    """
    self.make_temporal_knots(sensor_object)
    self.make_spatial_knots(sensor_object)
    self.n_parameter = self.n_time * self.n_location
    self.n_obs = sensor_object.nof_observations

    self.make_precision_matrix()
    self.make_parameter_mapping(sensor_object)

    if self.mean_bg is None:
        self.mean_bg = gas_species.global_background

make_parameter_mapping(sensor_object)

Create the mapping of parameters onto observations, through creation of the associated basis matrix.

The background vector unwraps first over the spatial (sensor) location dimension, then over the temporal dimension. For more detail, see the main class docstring.

The data vector in the solver state is assumed to consist of the individual sensor data vectors stacked consecutively.

Parameters:

Name Type Description Default
sensor_object SensorGroup

group of sensor objects.

required
Source code in src/pyelq/component/background.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def make_parameter_mapping(self, sensor_object: SensorGroup):
    """Create the mapping of parameters onto observations, through creation of the associated basis matrix.

    The background vector unwraps first over the spatial (sensor) location dimension, then over the temporal
    dimension. For more detail, see the main class docstring.

    The data vector in the solver state is assumed to consist of the individual sensor data vectors stacked
    consecutively.

    Args:
        sensor_object (SensorGroup): group of sensor objects.

    """
    nn_object = NearestNeighbors(n_neighbors=1, algorithm="kd_tree").fit(self.time.to_numpy().reshape(-1, 1))
    for k, sensor in enumerate(sensor_object.values()):
        _, time_index = nn_object.kneighbors(sensor.time.to_numpy().reshape(-1, 1))
        basis_matrix = sparse.csr_array(
            (np.ones(sensor.nof_observations), (np.array(range(sensor.nof_observations)), time_index.flatten())),
            shape=(sensor.nof_observations, self.n_time),
        )
        if self.spatial_dependence:
            basis_matrix = sparse.kron(basis_matrix, np.eye(N=self.n_location, M=1, k=-k).T)

        if k == 0:
            self.basis_matrix = basis_matrix
        else:
            self.basis_matrix = sparse.vstack([self.basis_matrix, basis_matrix])

make_temporal_knots(sensor_object)

Create the temporal grid for the model.

If self.n_time is not specified, then the model will use the unique set of times from the sensor data.

If self.n_time is specified, then the model will define a time grid with self.n_time elements.

Parameters:

Name Type Description Default
sensor_object SensorGroup

group of sensor objects.

required
Source code in src/pyelq/component/background.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def make_temporal_knots(self, sensor_object: SensorGroup):
    """Create the temporal grid for the model.

    If self.n_time is not specified, then the model will use the unique set of times from the sensor data.

    If self.n_time is specified, then the model will define a time grid with self.n_time elements.

    Args:
        sensor_object (SensorGroup): group of sensor objects.

    """
    if self.n_time is None:
        self.time = pd.array(np.unique(sensor_object.time), dtype="datetime64[ns]")
        self.n_time = len(self.time)
    else:
        self.time = pd.array(
            pd.date_range(start=np.min(sensor_object.time), end=np.max(sensor_object.time), periods=self.n_time),
            dtype="datetime64[ns]",
        )

make_spatial_knots(sensor_object)

Create the spatial grid for the model.

If self.spatial_dependence is False, the code assumes that only a single (arbitrary) location is used, thereby eliminating any spatial dependence.

If self.spatial_dependence is True, a separate but correlated time-series of background parameters is assumed for each sensor location.

Parameters:

Name Type Description Default
sensor_object SensorGroup

group of sensor objects.

required
Source code in src/pyelq/component/background.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def make_spatial_knots(self, sensor_object: SensorGroup):
    """Create the spatial grid for the model.

    If self.spatial_dependence is False, the code assumes that only a single (arbitrary) location is used, thereby
    eliminating any spatial dependence.

    If self.spatial_dependence is True, a separate but correlated time-series of background parameters is assumed
    for each sensor location.

    Args:
        sensor_object (SensorGroup): group of sensor objects.

    """
    if self.spatial_dependence:
        self.n_location = sensor_object.nof_sensors
        self.get_locations_from_sensors(sensor_object)
    else:
        self.n_location = 1
        self.location = sensor_object[list(sensor_object.keys())[0]].location

make_precision_matrix()

Create the full precision matrix for the background parameters.

Defined as the Kronecker product of the temporal precision matrix and the spatial precision matrix.

Source code in src/pyelq/component/background.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def make_precision_matrix(self):
    """Create the full precision matrix for the background parameters.

    Defined as the Kronecker product of the temporal precision matrix and the spatial precision matrix.

    """
    self.temporal_precision_matrix = gmrf.precision_temporal(time=self.time)
    lam = self.temporal_precision_matrix[0, 0]
    self.temporal_precision_matrix[0, 0] = lam * (2.0 - lam / (self.precision_time_0 + lam))

    if self.spatial_dependence:
        self.make_spatial_precision_matrix()
        self.precision_matrix = sparse.kron(self.temporal_precision_matrix, self.spatial_precision_matrix)
    else:
        self.precision_matrix = self.temporal_precision_matrix
    if (self.n_parameter == 1) and sparse.issparse(self.precision_matrix):
        self.precision_matrix = self.precision_matrix.toarray()

make_spatial_precision_matrix()

Create the spatial precision matrix for the model.

The spatial precision matrix is simply calculated as the inverse of a squared exponential covariance matrix calculated using the sensor locations.

Source code in src/pyelq/component/background.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def make_spatial_precision_matrix(self):
    """Create the spatial precision matrix for the model.

    The spatial precision matrix is simply calculated as the inverse of a squared exponential covariance matrix
    calculated using the sensor locations.

    """
    location_array = self.location.to_array()
    spatial_covariance_matrix = np.exp(
        -(1 / (2 * np.power(self.spatial_correlation_param, 2)))
        * (
            np.power(location_array[:, [0]] - location_array[:, [0]].T, 2)
            + np.power(location_array[:, [1]] - location_array[:, [1]].T, 2)
            + np.power(location_array[:, [2]] - location_array[:, [2]].T, 2)
        )
    )
    self.spatial_precision_matrix = np.linalg.inv(
        spatial_covariance_matrix + (1e-6) * np.eye(spatial_covariance_matrix.shape[0])
    )

get_locations_from_sensors(sensor_object)

Extract the location information from the sensor object.

Attaches a Coordinate.ENU object as the self.location attribute, with all the sensor locations stored on the same object.

Parameters:

Name Type Description Default
sensor_object SensorGroup

group of sensor objects.

required
Source code in src/pyelq/component/background.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def get_locations_from_sensors(self, sensor_object: SensorGroup):
    """Extract the location information from the sensor object.

    Attaches a Coordinate.ENU object as the self.location attribute, with all the sensor locations stored on the
    same object.

    Args:
        sensor_object (SensorGroup): group of sensor objects.

    """
    self.location = deepcopy(sensor_object[list(sensor_object.keys())[0]].location.to_enu())
    self.location.east = np.full(shape=(self.n_location,), fill_value=np.nan)
    self.location.north = np.full(shape=(self.n_location,), fill_value=np.nan)
    self.location.up = np.full(shape=(self.n_location,), fill_value=np.nan)
    for k, sensor in enumerate(sensor_object.values()):
        if isinstance(sensor, Beam):
            self.location.east[k] = np.mean(sensor.location.to_enu().east, axis=0)
            self.location.north[k] = np.mean(sensor.location.to_enu().north, axis=0)
            self.location.up[k] = np.mean(sensor.location.to_enu().up, axis=0)
        else:
            self.location.east[k] = sensor.location.to_enu().east
            self.location.north[k] = sensor.location.to_enu().north
            self.location.up[k] = sensor.location.to_enu().up