Skip to content

Source Model

Component Class and subclasses for source model.

A SourceModel instance inherits from 3 super-classes
  • Component: this is the general superclass for ELQModel components, which prototypes generic methods.
  • A type of SourceGrouping: this class type implements an allocation of sources to different categories (e.g. slab or spike), and sets up a sampler for estimating the classification of each source within the source map. Inheriting from the NullGrouping class ensures that the allocation of all sources is fixed during the inversion, and is not updated.
  • A type of SourceDistribution: this class type implements a particular type of response distribution (mostly Normal, but also allows for cases where we have e.g. exp(log_s) or a non-Gaussian prior).

ParameterMapping dataclass

Class for defining mapping variable/parameter labels needed for creating an analysis.

In instances where we want to include multiple source_model instances in an MCMC analysis, we can apply a suffix to all of the parameter names in the mapping dictionary. This allows us to create separate variables for different source map types, so that these can be associated with different sampler types in the MCMC analysis.

Attributes:

Name Type Description
map dict

dictionary containing mapping between variable types and MCMC parameters.

Source code in src/pyelq/component/source_model.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@dataclass
class ParameterMapping:
    """Class for defining mapping variable/parameter labels needed for creating an analysis.

    In instances where we want to include multiple source_model instances in an MCMC analysis, we can apply a suffix to
    all of the parameter names in the mapping dictionary. This allows us to create separate variables for different
    source map types, so that these can be associated with different sampler types in the MCMC analysis.

    Attributes:
        map (dict): dictionary containing mapping between variable types and MCMC parameters.

    """

    map: dict = field(
        default_factory=lambda: {
            "source": "s",
            "coupling_matrix": "A",
            "emission_rate_mean": "mu_s",
            "emission_rate_precision": "lambda_s",
            "allocation": "alloc_s",
            "source_prob": "s_prob",
            "precision_prior_shape": "a_lam_s",
            "precision_prior_rate": "b_lam_s",
            "source_location": "z_src",
            "number_sources": "n_src",
            "number_source_rate": "rho",
        }
    )

    def append_string(self, string: str = None):
        """Apply the supplied string as a suffix to all of the values in the mapping dictionary.

        For example: {'source': 's'} would become {'source': 's_fixed'} when string = 'fixed' is passed as the argument.
        If string is None, nothing is appended.

        Args:
            string (str): string to append to the variable names.

        """
        for key, value in self.map.items():
            self.map[key] = value + "_" + string

append_string(string=None)

Apply the supplied string as a suffix to all of the values in the mapping dictionary.

For example: {'source': 's'} would become {'source': 's_fixed'} when string = 'fixed' is passed as the argument. If string is None, nothing is appended.

Parameters:

Name Type Description Default
string str

string to append to the variable names.

None
Source code in src/pyelq/component/source_model.py
74
75
76
77
78
79
80
81
82
83
84
85
def append_string(self, string: str = None):
    """Apply the supplied string as a suffix to all of the values in the mapping dictionary.

    For example: {'source': 's'} would become {'source': 's_fixed'} when string = 'fixed' is passed as the argument.
    If string is None, nothing is appended.

    Args:
        string (str): string to append to the variable names.

    """
    for key, value in self.map.items():
        self.map[key] = value + "_" + string

SourceGrouping dataclass

Bases: ParameterMapping

Superclass for source grouping approach.

Source grouping method determines the group allocation of each source in the model, e.g: slab and spike distribution makes an on/off allocation for each source.

Attributes:

Name Type Description
nof_sources int

number of sources in the model.

emission_rate_mean Union[float, ndarray]

prior mean parameter for the emission rate distribution.

Source code in src/pyelq/component/source_model.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@dataclass
class SourceGrouping(ParameterMapping):
    """Superclass for source grouping approach.

    Source grouping method determines the group allocation of each source in the model, e.g: slab and spike
    distribution makes an on/off allocation for each source.

    Attributes:
        nof_sources (int): number of sources in the model.
        emission_rate_mean (Union[float, np.ndarray]): prior mean parameter for the emission rate distribution.

    """

    nof_sources: int = field(init=False)
    emission_rate_mean: Union[float, np.ndarray] = field(init=False)

    @abstractmethod
    def make_allocation_model(self, model: list) -> list:
        """Initialise the source allocation part of the model, and the parameters of the source response distribution.

        Args:
            model (list): overall model, consisting of list of distributions.

        Returns:
            list: overall model list, updated with allocation distribution.

        """

    @abstractmethod
    def make_allocation_sampler(self, model: Model, sampler_list: list) -> list:
        """Initialise the allocation part of the sampler.

        Args:
            model (Model): overall model, consisting of list of distributions.
            sampler_list (list): list of samplers for individual parameters.

        Returns:
            list: sampler_list updated with sampler for the source allocation.

        """

    @abstractmethod
    def make_allocation_state(self, state: dict) -> dict:
        """Initialise the allocation part of the state.

        Args:
            state (dict): dictionary containing current state information.

        Returns:
            dict: state updated with parameters related to the source grouping.

        """

    @abstractmethod
    def from_mcmc_group(self, store: dict):
        """Extract posterior allocation samples from the MCMC sampler, attach them to the class.

        Args:
            store (dict): dictionary containing samples from the MCMC.

        """

make_allocation_model(model) abstractmethod

Initialise the source allocation part of the model, and the parameters of the source response distribution.

Parameters:

Name Type Description Default
model list

overall model, consisting of list of distributions.

required

Returns:

Name Type Description
list list

overall model list, updated with allocation distribution.

Source code in src/pyelq/component/source_model.py
104
105
106
107
108
109
110
111
112
113
114
@abstractmethod
def make_allocation_model(self, model: list) -> list:
    """Initialise the source allocation part of the model, and the parameters of the source response distribution.

    Args:
        model (list): overall model, consisting of list of distributions.

    Returns:
        list: overall model list, updated with allocation distribution.

    """

make_allocation_sampler(model, sampler_list) abstractmethod

Initialise the allocation part of the sampler.

Parameters:

Name Type Description Default
model Model

overall model, consisting of list of distributions.

required
sampler_list list

list of samplers for individual parameters.

required

Returns:

Name Type Description
list list

sampler_list updated with sampler for the source allocation.

Source code in src/pyelq/component/source_model.py
116
117
118
119
120
121
122
123
124
125
126
127
@abstractmethod
def make_allocation_sampler(self, model: Model, sampler_list: list) -> list:
    """Initialise the allocation part of the sampler.

    Args:
        model (Model): overall model, consisting of list of distributions.
        sampler_list (list): list of samplers for individual parameters.

    Returns:
        list: sampler_list updated with sampler for the source allocation.

    """

make_allocation_state(state) abstractmethod

Initialise the allocation part of the state.

Parameters:

Name Type Description Default
state dict

dictionary containing current state information.

required

Returns:

Name Type Description
dict dict

state updated with parameters related to the source grouping.

Source code in src/pyelq/component/source_model.py
129
130
131
132
133
134
135
136
137
138
139
@abstractmethod
def make_allocation_state(self, state: dict) -> dict:
    """Initialise the allocation part of the state.

    Args:
        state (dict): dictionary containing current state information.

    Returns:
        dict: state updated with parameters related to the source grouping.

    """

from_mcmc_group(store) abstractmethod

Extract posterior allocation samples from the MCMC sampler, attach them to the class.

Parameters:

Name Type Description Default
store dict

dictionary containing samples from the MCMC.

required
Source code in src/pyelq/component/source_model.py
141
142
143
144
145
146
147
148
@abstractmethod
def from_mcmc_group(self, store: dict):
    """Extract posterior allocation samples from the MCMC sampler, attach them to the class.

    Args:
        store (dict): dictionary containing samples from the MCMC.

    """

NullGrouping dataclass

Bases: SourceGrouping

Null grouping: the grouping of the sources will not change during the course of the inversion.

Note that this is intended to support two distinct cases

1) The case where the source map is fixed, and a given prior mean and prior precision value are assigned to each source (can be a common value for all sources, or can be a distinct allocation to each element of the source map). 2) The case where the dimensionality of the source map is changing during the inversion, and a common prior mean and precision term are used for all sources.

number_on_sources (np.ndarray): number of sources switched on in the solution, per iteration. Extracted as a property from the MCMC samples in self.from_mcmc_group().

Source code in src/pyelq/component/source_model.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@dataclass
class NullGrouping(SourceGrouping):
    """Null grouping: the grouping of the sources will not change during the course of the inversion.

    Note that this is intended to support two distinct cases:
        1) The case where the source map is fixed, and a given prior mean and prior precision value are assigned to
            each source (can be a common value for all sources, or can be a distinct allocation to each element of the
            source map).
        2) The case where the dimensionality of the source map is changing during the inversion, and a common prior
            mean and precision term are used for all sources.

    Attributes:
    number_on_sources (np.ndarray): number of sources switched on in the solution, per iteration. Extracted as a
        property from the MCMC samples in self.from_mcmc_group().

    """

    number_on_sources: np.ndarray = field(init=False)

    def make_allocation_model(self, model: list) -> list:
        """Initialise the source allocation part of the model.

        In the NullGrouping case, the source allocation is fixed throughout, so this function does nothing (simply
        returns the existing model un-modified).

        Args:
            model (list): model as constructed so far, consisting of list of distributions.

        Returns:
            list: overall model list, updated with allocation distribution.

        """
        return model

    def make_allocation_sampler(self, model: Model, sampler_list: list) -> list:
        """Initialise the allocation part of the sampler.

        In the NullGrouping case, the source allocation is fixed throughout, so this function does nothing (simply
        returns the existing sampler list un-modified).

        Args:
            model (Model): overall model set for the problem.
            sampler_list (list): list of samplers for individual parameters.

        Returns:
            list: sampler_list updated with sampler for the source allocation.

        """
        return sampler_list

    def make_allocation_state(self, state: dict) -> dict:
        """Initialise the allocation part of the state.

        The prior mean parameter and the fixed source allocation are added to the state.

        Args:
            state (dict): dictionary containing current state information.

        Returns:
            dict: state updated with parameters related to the source grouping.

        """
        state[self.map["emission_rate_mean"]] = np.array(self.emission_rate_mean, ndmin=1)
        state[self.map["allocation"]] = np.zeros((self.nof_sources, 1), dtype="int")
        return state

    def from_mcmc_group(self, store: dict):
        """Extract posterior allocation samples from the MCMC sampler, attach them to the class.

        Gets the number of sources present in each iteration of the MCMC sampler, and attaches this as a class property.

        Args:
            store (dict): dictionary containing samples from the MCMC.

        """
        self.number_on_sources = np.count_nonzero(np.logical_not(np.isnan(store[self.map["source"]])), axis=0)

make_allocation_model(model)

Initialise the source allocation part of the model.

In the NullGrouping case, the source allocation is fixed throughout, so this function does nothing (simply returns the existing model un-modified).

Parameters:

Name Type Description Default
model list

model as constructed so far, consisting of list of distributions.

required

Returns:

Name Type Description
list list

overall model list, updated with allocation distribution.

Source code in src/pyelq/component/source_model.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def make_allocation_model(self, model: list) -> list:
    """Initialise the source allocation part of the model.

    In the NullGrouping case, the source allocation is fixed throughout, so this function does nothing (simply
    returns the existing model un-modified).

    Args:
        model (list): model as constructed so far, consisting of list of distributions.

    Returns:
        list: overall model list, updated with allocation distribution.

    """
    return model

make_allocation_sampler(model, sampler_list)

Initialise the allocation part of the sampler.

In the NullGrouping case, the source allocation is fixed throughout, so this function does nothing (simply returns the existing sampler list un-modified).

Parameters:

Name Type Description Default
model Model

overall model set for the problem.

required
sampler_list list

list of samplers for individual parameters.

required

Returns:

Name Type Description
list list

sampler_list updated with sampler for the source allocation.

Source code in src/pyelq/component/source_model.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def make_allocation_sampler(self, model: Model, sampler_list: list) -> list:
    """Initialise the allocation part of the sampler.

    In the NullGrouping case, the source allocation is fixed throughout, so this function does nothing (simply
    returns the existing sampler list un-modified).

    Args:
        model (Model): overall model set for the problem.
        sampler_list (list): list of samplers for individual parameters.

    Returns:
        list: sampler_list updated with sampler for the source allocation.

    """
    return sampler_list

make_allocation_state(state)

Initialise the allocation part of the state.

The prior mean parameter and the fixed source allocation are added to the state.

Parameters:

Name Type Description Default
state dict

dictionary containing current state information.

required

Returns:

Name Type Description
dict dict

state updated with parameters related to the source grouping.

Source code in src/pyelq/component/source_model.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def make_allocation_state(self, state: dict) -> dict:
    """Initialise the allocation part of the state.

    The prior mean parameter and the fixed source allocation are added to the state.

    Args:
        state (dict): dictionary containing current state information.

    Returns:
        dict: state updated with parameters related to the source grouping.

    """
    state[self.map["emission_rate_mean"]] = np.array(self.emission_rate_mean, ndmin=1)
    state[self.map["allocation"]] = np.zeros((self.nof_sources, 1), dtype="int")
    return state

from_mcmc_group(store)

Extract posterior allocation samples from the MCMC sampler, attach them to the class.

Gets the number of sources present in each iteration of the MCMC sampler, and attaches this as a class property.

Parameters:

Name Type Description Default
store dict

dictionary containing samples from the MCMC.

required
Source code in src/pyelq/component/source_model.py
217
218
219
220
221
222
223
224
225
226
def from_mcmc_group(self, store: dict):
    """Extract posterior allocation samples from the MCMC sampler, attach them to the class.

    Gets the number of sources present in each iteration of the MCMC sampler, and attaches this as a class property.

    Args:
        store (dict): dictionary containing samples from the MCMC.

    """
    self.number_on_sources = np.count_nonzero(np.logical_not(np.isnan(store[self.map["source"]])), axis=0)

SlabAndSpike dataclass

Bases: SourceGrouping

Slab and spike source model, special case for the source grouping.

Slab and spike: the prior for the emission rates is a two-component mixture, and the allocation is to be estimated as part of the inversion.

Attributes:

Name Type Description
slab_probability float

prior probability of allocation to the slab component. Defaults to 0.05.

allocation ndarray

set of allocation samples, with shape=(n_sources, n_iterations). Attached to the class by self.from_mcmc_group().

number_on_sources ndarray

number of sources switched on in the solution, per iteration.

Source code in src/pyelq/component/source_model.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@dataclass
class SlabAndSpike(SourceGrouping):
    """Slab and spike source model, special case for the source grouping.

    Slab and spike: the prior for the emission rates is a two-component mixture, and the allocation is to be
    estimated as part of the inversion.

    Attributes:
        slab_probability (float): prior probability of allocation to the slab component. Defaults to 0.05.
        allocation (np.ndarray): set of allocation samples, with shape=(n_sources, n_iterations). Attached to
            the class by self.from_mcmc_group().
        number_on_sources (np.ndarray): number of sources switched on in the solution, per iteration.

    """

    slab_probability: float = 0.05
    allocation: np.ndarray = field(init=False)
    number_on_sources: np.ndarray = field(init=False)

    def make_allocation_model(self, model: list) -> list:
        """Initialise the source allocation part of the model.

        Args:
            model (list): model as constructed so far, consisting of list of distributions.

        Returns:
            list: overall model list, updated with allocation distribution.

        """
        model.append(Categorical(self.map["allocation"], prob=self.map["source_prob"]))
        return model

    def make_allocation_sampler(self, model: Model, sampler_list: list) -> list:
        """Initialise the allocation part of the sampler.

        Args:
            model (Model): overall model set for the problem.
            sampler_list (list): list of samplers for individual parameters.

        Returns:
            list: sampler_list updated with sampler for the source allocation.

        """
        sampler_list.append(
            MixtureAllocation(param=self.map["allocation"], model=model, response_param=self.map["source"])
        )
        return sampler_list

    def make_allocation_state(self, state: dict) -> dict:
        """Initialise the allocation part of the state.

        Args:
            state (dict): dictionary containing current state information.

        Returns:
            dict: state updated with parameters related to the source grouping.

        """
        state[self.map["emission_rate_mean"]] = np.array(self.emission_rate_mean, ndmin=1)
        state[self.map["source_prob"]] = np.tile(
            np.array([self.slab_probability, 1 - self.slab_probability]), (self.nof_sources, 1)
        )
        state[self.map["allocation"]] = np.ones((self.nof_sources, 1), dtype="int")
        return state

    def from_mcmc_group(self, store: dict):
        """Extract posterior allocation samples from the MCMC sampler, attach them to the class.

        Args:
            store (dict): dictionary containing samples from the MCMC.

        """
        self.allocation = store[self.map["allocation"]]
        self.number_on_sources = self.allocation.shape[0] - np.sum(self.allocation, axis=0)

make_allocation_model(model)

Initialise the source allocation part of the model.

Parameters:

Name Type Description Default
model list

model as constructed so far, consisting of list of distributions.

required

Returns:

Name Type Description
list list

overall model list, updated with allocation distribution.

Source code in src/pyelq/component/source_model.py
248
249
250
251
252
253
254
255
256
257
258
259
def make_allocation_model(self, model: list) -> list:
    """Initialise the source allocation part of the model.

    Args:
        model (list): model as constructed so far, consisting of list of distributions.

    Returns:
        list: overall model list, updated with allocation distribution.

    """
    model.append(Categorical(self.map["allocation"], prob=self.map["source_prob"]))
    return model

make_allocation_sampler(model, sampler_list)

Initialise the allocation part of the sampler.

Parameters:

Name Type Description Default
model Model

overall model set for the problem.

required
sampler_list list

list of samplers for individual parameters.

required

Returns:

Name Type Description
list list

sampler_list updated with sampler for the source allocation.

Source code in src/pyelq/component/source_model.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def make_allocation_sampler(self, model: Model, sampler_list: list) -> list:
    """Initialise the allocation part of the sampler.

    Args:
        model (Model): overall model set for the problem.
        sampler_list (list): list of samplers for individual parameters.

    Returns:
        list: sampler_list updated with sampler for the source allocation.

    """
    sampler_list.append(
        MixtureAllocation(param=self.map["allocation"], model=model, response_param=self.map["source"])
    )
    return sampler_list

make_allocation_state(state)

Initialise the allocation part of the state.

Parameters:

Name Type Description Default
state dict

dictionary containing current state information.

required

Returns:

Name Type Description
dict dict

state updated with parameters related to the source grouping.

Source code in src/pyelq/component/source_model.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def make_allocation_state(self, state: dict) -> dict:
    """Initialise the allocation part of the state.

    Args:
        state (dict): dictionary containing current state information.

    Returns:
        dict: state updated with parameters related to the source grouping.

    """
    state[self.map["emission_rate_mean"]] = np.array(self.emission_rate_mean, ndmin=1)
    state[self.map["source_prob"]] = np.tile(
        np.array([self.slab_probability, 1 - self.slab_probability]), (self.nof_sources, 1)
    )
    state[self.map["allocation"]] = np.ones((self.nof_sources, 1), dtype="int")
    return state

from_mcmc_group(store)

Extract posterior allocation samples from the MCMC sampler, attach them to the class.

Parameters:

Name Type Description Default
store dict

dictionary containing samples from the MCMC.

required
Source code in src/pyelq/component/source_model.py
294
295
296
297
298
299
300
301
302
def from_mcmc_group(self, store: dict):
    """Extract posterior allocation samples from the MCMC sampler, attach them to the class.

    Args:
        store (dict): dictionary containing samples from the MCMC.

    """
    self.allocation = store[self.map["allocation"]]
    self.number_on_sources = self.allocation.shape[0] - np.sum(self.allocation, axis=0)

SourceDistribution dataclass

Bases: ParameterMapping

Superclass for source emission rate distribution.

Source distribution determines the type of prior to be used for the source emission rates, and the transformation linking the source parameters and the data.

Elements related to transformation of source parameters are also specified at the model level.

Attributes:

Name Type Description
nof_sources int

number of sources in the model.

emission_rate ndarray

set of emission rate samples, with shape=(n_sources, n_iterations). Attached to the class by self.from_mcmc_dist().

Source code in src/pyelq/component/source_model.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
@dataclass
class SourceDistribution(ParameterMapping):
    """Superclass for source emission rate distribution.

    Source distribution determines the type of prior to be used for the source emission rates, and the transformation
    linking the source parameters and the data.

    Elements related to transformation of source parameters are also specified at the model level.

    Attributes:
        nof_sources (int): number of sources in the model.
        emission_rate (np.ndarray): set of emission rate samples, with shape=(n_sources, n_iterations). Attached to
            the class by self.from_mcmc_dist().

    """

    nof_sources: int = field(init=False)
    emission_rate: np.ndarray = field(init=False)

    @abstractmethod
    def make_source_model(self, model: list) -> list:
        """Add distributional component to the overall model corresponding to the source emission rate distribution.

        Args:
            model (list): model as constructed so far, consisting of list of distributions.

        Returns:
            list: overall model list, updated with distributions related to source prior.

        """

    @abstractmethod
    def make_source_sampler(self, model: Model, sampler_list: list) -> list:
        """Initialise the source prior distribution part of the sampler.

        Args:
            model (Model): overall model set for the problem.
            sampler_list (list): list of samplers for individual parameters.

        Returns:
            list: sampler_list updated with sampler for the emission rate parameters.

        """

    @abstractmethod
    def make_source_state(self, state: dict) -> dict:
        """Initialise the emission rate parts of the state.

        Args:
            state (dict): dictionary containing current state information.

        Returns:
            dict: state updated with parameters related to the source emission rates.

        """

    @abstractmethod
    def from_mcmc_dist(self, store: dict):
        """Extract posterior emission rate samples from the MCMC, attach them to the class.

        Args:
            store (dict): dictionary containing samples from the MCMC.

        """

make_source_model(model) abstractmethod

Add distributional component to the overall model corresponding to the source emission rate distribution.

Parameters:

Name Type Description Default
model list

model as constructed so far, consisting of list of distributions.

required

Returns:

Name Type Description
list list

overall model list, updated with distributions related to source prior.

Source code in src/pyelq/component/source_model.py
324
325
326
327
328
329
330
331
332
333
334
@abstractmethod
def make_source_model(self, model: list) -> list:
    """Add distributional component to the overall model corresponding to the source emission rate distribution.

    Args:
        model (list): model as constructed so far, consisting of list of distributions.

    Returns:
        list: overall model list, updated with distributions related to source prior.

    """

make_source_sampler(model, sampler_list) abstractmethod

Initialise the source prior distribution part of the sampler.

Parameters:

Name Type Description Default
model Model

overall model set for the problem.

required
sampler_list list

list of samplers for individual parameters.

required

Returns:

Name Type Description
list list

sampler_list updated with sampler for the emission rate parameters.

Source code in src/pyelq/component/source_model.py
336
337
338
339
340
341
342
343
344
345
346
347
@abstractmethod
def make_source_sampler(self, model: Model, sampler_list: list) -> list:
    """Initialise the source prior distribution part of the sampler.

    Args:
        model (Model): overall model set for the problem.
        sampler_list (list): list of samplers for individual parameters.

    Returns:
        list: sampler_list updated with sampler for the emission rate parameters.

    """

make_source_state(state) abstractmethod

Initialise the emission rate parts of the state.

Parameters:

Name Type Description Default
state dict

dictionary containing current state information.

required

Returns:

Name Type Description
dict dict

state updated with parameters related to the source emission rates.

Source code in src/pyelq/component/source_model.py
349
350
351
352
353
354
355
356
357
358
359
@abstractmethod
def make_source_state(self, state: dict) -> dict:
    """Initialise the emission rate parts of the state.

    Args:
        state (dict): dictionary containing current state information.

    Returns:
        dict: state updated with parameters related to the source emission rates.

    """

from_mcmc_dist(store) abstractmethod

Extract posterior emission rate samples from the MCMC, attach them to the class.

Parameters:

Name Type Description Default
store dict

dictionary containing samples from the MCMC.

required
Source code in src/pyelq/component/source_model.py
361
362
363
364
365
366
367
368
@abstractmethod
def from_mcmc_dist(self, store: dict):
    """Extract posterior emission rate samples from the MCMC, attach them to the class.

    Args:
        store (dict): dictionary containing samples from the MCMC.

    """

NormalResponse dataclass

Bases: SourceDistribution

(Truncated) Gaussian prior for sources.

No transformation applied to parameters, i.e.: - Prior distribution: s ~ N(mu, 1/precision) - Likelihood contribution: y = A*s + b + ...

Attributes:

Name Type Description
truncation bool

indication of whether the emission rate prior should be truncated at 0. Defaults to True.

emission_rate_lb Union[float, ndarray]

lower bound for the source emission rates. Defaults to 0.

emission_rate_mean Union[float, ndarray]

prior mean for the emission rate distribution. Defaults to 0.

Source code in src/pyelq/component/source_model.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
@dataclass
class NormalResponse(SourceDistribution):
    """(Truncated) Gaussian prior for sources.

    No transformation applied to parameters, i.e.:
    - Prior distribution: s ~ N(mu, 1/precision)
    - Likelihood contribution: y = A*s + b + ...

    Attributes:
        truncation (bool): indication of whether the emission rate prior should be truncated at 0. Defaults to True.
        emission_rate_lb (Union[float, np.ndarray]): lower bound for the source emission rates. Defaults to 0.
        emission_rate_mean (Union[float, np.ndarray]): prior mean for the emission rate distribution. Defaults to 0.

    """

    truncation: bool = True
    emission_rate_lb: Union[float, np.ndarray] = 0
    emission_rate_mean: Union[float, np.ndarray] = 0

    def make_source_model(self, model: list) -> list:
        """Add distributional component to the overall model corresponding to the source emission rate distribution.

        Args:
            model (list): model as constructed so far, consisting of list of distributions.

        Returns:
            list: model, updated with distributions related to source prior.

        """
        domain_response_lower = None
        if self.truncation:
            domain_response_lower = self.emission_rate_lb

        model.append(
            mcmcNormal(
                self.map["source"],
                mean=parameter.MixtureParameterVector(
                    param=self.map["emission_rate_mean"], allocation=self.map["allocation"]
                ),
                precision=parameter.MixtureParameterMatrix(
                    param=self.map["emission_rate_precision"], allocation=self.map["allocation"]
                ),
                domain_response_lower=domain_response_lower,
            )
        )
        return model

    def make_source_sampler(self, model: Model, sampler_list: list = None) -> list:
        """Initialise the source prior distribution part of the sampler.

        Args:
            model (Model): overall model set for the problem.
            sampler_list (list): list of samplers for individual parameters.

        Returns:
            list: sampler_list updated with sampler for the emission rate parameters.

        """
        if sampler_list is None:
            sampler_list = []
        sampler_list.append(NormalNormal(self.map["source"], model))
        return sampler_list

    def make_source_state(self, state: dict) -> dict:
        """Initialise the emission rate part of the state.

        Args:
            state (dict): dictionary containing current state information.

        Returns:
            dict: state updated with initial emission rate vector.

        """
        state[self.map["source"]] = np.zeros((self.nof_sources, 1))
        return state

    def from_mcmc_dist(self, store: dict):
        """Extract posterior emission rate samples from the MCMC sampler, attach them to the class.

        Args:
            store (dict): dictionary containing samples from the MCMC.

        """
        self.emission_rate = store[self.map["source"]]

make_source_model(model)

Add distributional component to the overall model corresponding to the source emission rate distribution.

Parameters:

Name Type Description Default
model list

model as constructed so far, consisting of list of distributions.

required

Returns:

Name Type Description
list list

model, updated with distributions related to source prior.

Source code in src/pyelq/component/source_model.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def make_source_model(self, model: list) -> list:
    """Add distributional component to the overall model corresponding to the source emission rate distribution.

    Args:
        model (list): model as constructed so far, consisting of list of distributions.

    Returns:
        list: model, updated with distributions related to source prior.

    """
    domain_response_lower = None
    if self.truncation:
        domain_response_lower = self.emission_rate_lb

    model.append(
        mcmcNormal(
            self.map["source"],
            mean=parameter.MixtureParameterVector(
                param=self.map["emission_rate_mean"], allocation=self.map["allocation"]
            ),
            precision=parameter.MixtureParameterMatrix(
                param=self.map["emission_rate_precision"], allocation=self.map["allocation"]
            ),
            domain_response_lower=domain_response_lower,
        )
    )
    return model

make_source_sampler(model, sampler_list=None)

Initialise the source prior distribution part of the sampler.

Parameters:

Name Type Description Default
model Model

overall model set for the problem.

required
sampler_list list

list of samplers for individual parameters.

None

Returns:

Name Type Description
list list

sampler_list updated with sampler for the emission rate parameters.

Source code in src/pyelq/component/source_model.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def make_source_sampler(self, model: Model, sampler_list: list = None) -> list:
    """Initialise the source prior distribution part of the sampler.

    Args:
        model (Model): overall model set for the problem.
        sampler_list (list): list of samplers for individual parameters.

    Returns:
        list: sampler_list updated with sampler for the emission rate parameters.

    """
    if sampler_list is None:
        sampler_list = []
    sampler_list.append(NormalNormal(self.map["source"], model))
    return sampler_list

make_source_state(state)

Initialise the emission rate part of the state.

Parameters:

Name Type Description Default
state dict

dictionary containing current state information.

required

Returns:

Name Type Description
dict dict

state updated with initial emission rate vector.

Source code in src/pyelq/component/source_model.py
434
435
436
437
438
439
440
441
442
443
444
445
def make_source_state(self, state: dict) -> dict:
    """Initialise the emission rate part of the state.

    Args:
        state (dict): dictionary containing current state information.

    Returns:
        dict: state updated with initial emission rate vector.

    """
    state[self.map["source"]] = np.zeros((self.nof_sources, 1))
    return state

from_mcmc_dist(store)

Extract posterior emission rate samples from the MCMC sampler, attach them to the class.

Parameters:

Name Type Description Default
store dict

dictionary containing samples from the MCMC.

required
Source code in src/pyelq/component/source_model.py
447
448
449
450
451
452
453
454
def from_mcmc_dist(self, store: dict):
    """Extract posterior emission rate samples from the MCMC sampler, attach them to the class.

    Args:
        store (dict): dictionary containing samples from the MCMC.

    """
    self.emission_rate = store[self.map["source"]]

SourceModel dataclass

Bases: Component, SourceGrouping, SourceDistribution

Superclass for the specification of the source model in an inversion run.

Various different types of model. A SourceModel is an optional component of a model, and thus inherits from Component.

A subclass instance of SourceModel must inherit from
  • an INSTANCE of SourceDistribution, which specifies a prior emission rate distribution for all sources in the source map.
  • an INSTANCE of SourceGrouping, which specifies a type of mixture prior specification for the sources (for which the allocation is to be estimated as part of the inversion).

If the flag reversible_jump == True, then the number of sources and their locations are also estimated as part of the inversion, in addition to the emission rates. If this flag is set to true, the sensor_object, meteorology and gas_species objects are all attached to the class, as they will be required in the repeated computation of updates to the coupling matrix during the inversion.

Attributes:

Name Type Description
dispersion_model GaussianPlume

dispersion model used to generate the couplings between source locations and sensor observations.

coupling ndarray

coupling matrix generated using dispersion_model.

sensor_object SensorGroup

stores sensor information for reversible jump coupling updates.

meteorology MeteorologyGroup

stores meteorology information for reversible jump coupling updates.

gas_species GasSpecies

stores gas species information for reversible jump coupling updates.

reversible_jump bool

logical indicating whether the reversible jump algorithm for estimation of the number of sources and their locations should be run. Defaults to False.

distribution_number_sources str

distribution for the number of sources in the solution. Can be either "Poisson" or "Uniform". Defaults to "Poisson".

random_walk_step_size ndarray

(3 x 1) array specifying the standard deviations of the distributions from which the random walk sampler draws new source locations. Defaults to np.array([1.0, 1.0, 0.1]).

site_limits ndarray

(3 x 2) array specifying the lower (column 0) and upper (column 1) limits of the analysis site. Only relevant for cases where reversible_jump == True (where sources are free to move in the solution).

rate_num_sources int

specification for the parameter for the Poisson prior distribution for the total number of sources. Only relevant for cases where reversible_jump == True (where the number of sources in the solution can change). Unused in the case of a Uniform prior (self.distribution_number_sources == "Uniform").

n_sources_max int

maximum number of sources that can feature in the solution. Only relevant for cases where reversible_jump == True (where the number of sources in the solution can change).

emission_proposal_std float

standard deviation of the truncated Gaussian distribution used to propose the new source emission rate in case of a birth move.

update_precision bool

logical indicating whether the prior precision parameter for emission rates should be updated as part of the inversion. Defaults to false.

prior_precision_shape Union[float, ndarray]

shape parameters for the prior Gamma distribution for the source precision parameter.

prior_precision_rate Union[float, ndarray]

rate parameters for the prior Gamma distribution for the source precision parameter.

initial_precision Union[float, ndarray]

initial value for the source emission rate precision parameter.

precision_scalar ndarray

precision values generated by MCMC inversion.

all_source_locations ENU

ENU object containing the locations of after the mcmc has been run, therefore in the situation where the reversible_jump == True, this will be the final locations of the sources in the solution over all iterations. For the case where reversible_jump == False, this will be the locations of the sources in the source map and will not change during the course of the inversion.

individual_source_labels list

list of labels for each source in the source map, defaults to None.

coverage_detection float

sensor detection threshold (in ppm) to be used for coverage calculations.

coverage_test_source float

test source (in kg/hr) which we wish to be able to see in coverage calculation.

threshold_function Callable

Callable function which returns a single value that defines the threshold for the coupling in a lambda function form. Examples: lambda x: np.quantile(x, 0.95, axis=0), lambda x: np.max(x, axis=0), lambda x: np.mean(x, axis=0). Defaults to np.quantile.

label_string str

string to append to the parameter mapping, e.g. for fixed sources, defaults to None.

Source code in src/pyelq/component/source_model.py
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
@dataclass
class SourceModel(Component, SourceGrouping, SourceDistribution):
    """Superclass for the specification of the source model in an inversion run.

    Various different types of model. A SourceModel is an optional component of a model, and thus inherits
    from Component.

    A subclass instance of SourceModel must inherit from:
        - an INSTANCE of SourceDistribution, which specifies a prior emission rate distribution for all sources in the
            source map.
        - an INSTANCE of SourceGrouping, which specifies a type of mixture prior specification for the sources (for
            which the allocation is to be estimated as part of the inversion).

    If the flag reversible_jump == True, then the number of sources and their locations are also estimated as part of
    the inversion, in addition to the emission rates. If this flag is set to true, the sensor_object, meteorology and
    gas_species objects are all attached to the class, as they will be required in the repeated computation of updates
    to the coupling matrix during the inversion.

    Attributes:
        dispersion_model (GaussianPlume): dispersion model used to generate the couplings between source locations and
            sensor observations.
        coupling (np.ndarray): coupling matrix generated using dispersion_model.

        sensor_object (SensorGroup): stores sensor information for reversible jump coupling updates.
        meteorology (MeteorologyGroup): stores meteorology information for reversible jump coupling updates.
        gas_species (GasSpecies): stores gas species information for reversible jump coupling updates.

        reversible_jump (bool): logical indicating whether the reversible jump algorithm for estimation of the number
            of sources and their locations should be run. Defaults to False.
        distribution_number_sources (str): distribution for the number of sources in the solution. Can be either
            "Poisson" or "Uniform". Defaults to "Poisson".
        random_walk_step_size (np.ndarray): (3 x 1) array specifying the standard deviations of the distributions
            from which the random walk sampler draws new source locations. Defaults to np.array([1.0, 1.0, 0.1]).
        site_limits (np.ndarray): (3 x 2) array specifying the lower (column 0) and upper (column 1) limits of the
            analysis site. Only relevant for cases where reversible_jump == True (where sources are free to move in
            the solution).
        rate_num_sources (int): specification for the parameter for the Poisson prior distribution for the total number
            of sources. Only relevant for cases where reversible_jump == True (where the number of sources in the
            solution can change). Unused in the case of a Uniform prior (self.distribution_number_sources == "Uniform").
        n_sources_max (int): maximum number of sources that can feature in the solution. Only relevant for cases where
            reversible_jump == True (where the number of sources in the solution can change).
        emission_proposal_std (float): standard deviation of the truncated Gaussian distribution used to propose the
            new source emission rate in case of a birth move.

        update_precision (bool): logical indicating whether the prior precision parameter for emission rates should be
            updated as part of the inversion. Defaults to false.
        prior_precision_shape (Union[float, np.ndarray]): shape parameters for the prior Gamma distribution for the
            source precision parameter.
        prior_precision_rate (Union[float, np.ndarray]): rate parameters for the prior Gamma distribution for the
            source precision parameter.
        initial_precision (Union[float, np.ndarray]): initial value for the source emission rate precision parameter.
        precision_scalar (np.ndarray): precision values generated by MCMC inversion.

        all_source_locations (ENU): ENU object containing the locations of after the mcmc has been run, therefore in
            the situation where the reversible_jump == True, this will be the final locations of the sources in the
            solution over all iterations. For the case where reversible_jump == False, this will be the locations of
            the sources in the source map and will not change during the course of the inversion.
        individual_source_labels (list, optional): list of labels for each source in the source map, defaults to None.

        coverage_detection (float): sensor detection threshold (in ppm) to be used for coverage calculations.
        coverage_test_source (float): test source (in kg/hr) which we wish to be able to see in coverage calculation.

        threshold_function (Callable): Callable function which returns a single value that defines the threshold
            for the coupling in a lambda function form. Examples: lambda x: np.quantile(x, 0.95, axis=0),
            lambda x: np.max(x, axis=0), lambda x: np.mean(x, axis=0). Defaults to np.quantile.
        label_string (str): string to append to the parameter mapping, e.g. for fixed sources, defaults to None.

    """

    dispersion_model: GaussianPlume = field(init=False, default=None)
    coupling: np.ndarray = field(init=False)

    sensor_object: SensorGroup = field(init=False, default=None)
    meteorology: Meteorology = field(init=False, default=None)
    gas_species: GasSpecies = field(init=False, default=None)

    reversible_jump: bool = False
    distribution_number_sources: str = "Poisson"
    random_walk_step_size: np.ndarray = field(default_factory=lambda: np.array([1.0, 1.0, 0.1], ndmin=2).T)
    site_limits: np.ndarray = None
    rate_num_sources: int = 5
    n_sources_max: int = 20
    emission_proposal_std: float = 0.5

    update_precision: bool = False
    prior_precision_shape: Union[float, np.ndarray] = 1e-3
    prior_precision_rate: Union[float, np.ndarray] = 1e-3
    initial_precision: Union[float, np.ndarray] = 1.0
    precision_scalar: np.ndarray = field(init=False)

    all_source_locations: np.ndarray = field(init=False)
    individual_source_labels: Optional[list] = None

    coverage_detection: float = 0.1
    coverage_test_source: float = 6.0

    threshold_function: callable = lambda x: np.quantile(x, 0.95, axis=0)

    label_string: Optional[str] = None

    def __post_init__(self):
        """Post-initialisation of the class.

        This function is called after the class has been initialised, and is used to set up the mapping dictionary for
        the class by applying the append_string function to the mapping dictionary.

        """
        if self.label_string is not None:
            self.append_string(self.label_string)

    @property
    def nof_sources(self):
        """Get number of sources in the source map."""
        return self.dispersion_model.source_map.nof_sources

    @property
    def coverage_threshold(self):
        """Compute coverage threshold from detection threshold and test source strength."""
        return self.coverage_test_source / self.coverage_detection

    def initialise(self, sensor_object: SensorGroup, meteorology: Meteorology, gas_species: GasSpecies):
        """Set up the source model.

        Extract required information from the sensor, meteorology and gas species objects:
            - Attach coupling calculated using self.dispersion_model.
            - (If self.reversible_jump == True) Attach objects to source model which will be used in RJMCMC sampler,
                they will be required when we need to update the couplings when new source locations are proposed when
                we move/birth/death.

        Args:
            sensor_object (SensorGroup): object containing sensor data.
            meteorology (MeteorologyGroup): object containing meteorology data.
            gas_species (GasSpecies): object containing gas species information.

        """
        self.initialise_dispersion_model(sensor_object)
        self.coupling = self.dispersion_model.compute_coupling(
            sensor_object, meteorology, gas_species, output_stacked=True
        )

        self.sensor_object = sensor_object
        self.screen_coverage()
        if self.reversible_jump:
            self.meteorology = meteorology
            self.gas_species = gas_species

    def initialise_dispersion_model(self, sensor_object: SensorGroup):
        """Initialise the dispersion model.

        If a dispersion_model has already been attached to this instance, then this function takes no action.

        If a dispersion_model has not already been attached to the instance, then this function adds a GaussianPlume
        dispersion model, with a default source map that has limits set based on the sensor locations.

        Args:
            sensor_object (SensorGroup): object containing sensor data.

        """
        if self.dispersion_model is None:
            source_map = SourceMap()
            sensor_locations = sensor_object.location.to_enu()
            location_object = ENU(
                ref_latitude=sensor_locations.ref_latitude,
                ref_longitude=sensor_locations.ref_longitude,
                ref_altitude=sensor_locations.ref_altitude,
            )
            source_map.generate_sources(
                coordinate_object=location_object,
                sourcemap_limits=np.array(
                    [
                        [np.min(sensor_locations.east), np.max(sensor_locations.east)],
                        [np.min(sensor_locations.north), np.max(sensor_locations.north)],
                        [np.min(sensor_locations.up), np.max(sensor_locations.up)],
                    ]
                ),
                sourcemap_type="grid",
            )
            self.dispersion_model = GaussianPlume(source_map)

    def screen_coverage(self):
        """Screen the initial source map for coverage."""
        in_coverage_area = self.compute_coverage(self.coupling)
        self.coupling = self.coupling[:, in_coverage_area]
        all_locations = self.dispersion_model.source_map.location.to_array()
        screened_locations = all_locations[in_coverage_area, :]
        self.dispersion_model.source_map.location.from_array(screened_locations)

    def compute_coverage(self, couplings: np.ndarray, **kwargs) -> np.ndarray:
        """Returns a logical vector that indicates which sources in the couplings are, or are not, within the coverage.

        The 'coverage' is the area inside which all sources are well covered by wind data. E.g. If wind exclusively
        blows towards East, then all sources to the East of any sensor are 'invisible', and are not within the coverage.

        Couplings are returned in hr/kg. Some threshold function defines the largest allowed coupling value. This is
        used to calculate estimated emission rates in kg/hr. Any emissions which are greater than the value of
        'self.coverage_threshold' are defined as not within the coverage.

        If sensor_object.source_on is being used only the parts where th coupling is computed are used in the coverage
        check. This avoids threshold_function being affected by large amounts of zero values.

        Args:
            couplings (np.ndarray): Array of coupling values. Dimensions: n_data points x n_sources.
            kwargs (dict, optional): Keyword arguments required for the threshold function.

        Returns:
            coverage (np.ndarray): A logical array specifying which sources are within the coverage.

        """
        if self.sensor_object.source_on is not None:
            couplings = deepcopy(couplings)
            index_keep = self.sensor_object.source_on > 0
            couplings = couplings[index_keep]

        coupling_threshold = self.threshold_function(couplings, **kwargs)
        no_warning_threshold = np.where(coupling_threshold <= 1e-100, 1, coupling_threshold)
        no_warning_estimated_emission_rates = np.where(coupling_threshold <= 1e-100, np.inf, 1 / no_warning_threshold)
        coverage = no_warning_estimated_emission_rates < self.coverage_threshold

        return coverage

    def update_coupling_column(self, state: dict, update_column: int) -> dict:
        """Update the coupling, based on changes to the source locations as part of inversion.

        To be used in two different situations:
            - movement of source locations (e.g. Metropolis Hastings, random walk).
            - adding of new source locations (e.g. reversible jump birth move).
        If [update_column < A.shape[1]]: an existing column of the A matrix is updated.
        If [update_column == A.shape[1]]: a new column is appended to the right-hand side of the A matrix
        (corresponding to a new source).

        A central assumption of this function is that the sensor information and meteorology information
        have already been interpolated onto the same space/time points.

        If an update_column is supplied, the coupling for that source location only is calculated to save on
        computation time. If update_column is None, then we just re-compute the whole coupling matrix.

        Args:
            state (dict): dictionary containing state parameters.
            update_column (int): index of the coupling column to be updated.

        Returns:
            state (dict): state dictionary containing updated coupling information.

        """
        self.dispersion_model.source_map.location.from_array(state[self.map["source_location"]][:, [update_column]].T)
        new_coupling = self.dispersion_model.compute_coupling(
            self.sensor_object, self.meteorology, self.gas_species, output_stacked=True, run_interpolation=False
        )

        if update_column == state[self.map["coupling_matrix"]].shape[1]:
            state[self.map["coupling_matrix"]] = np.concatenate(
                (state[self.map["coupling_matrix"]], new_coupling), axis=1
            )
        elif update_column < state[self.map["coupling_matrix"]].shape[1]:
            state[self.map["coupling_matrix"]][:, [update_column]] = new_coupling
        else:
            raise ValueError("Invalid column specification for updating.")
        return state

    def birth_function(self, current_state: dict, prop_state: dict) -> Tuple[dict, float, float]:
        """Update MCMC state based on source birth proposal.

        Proposed state updated as follows:
            1- Add column to coupling matrix for new source location.
            2- If required, adjust other components of the state which correspond to the sources.
        The source emission rate vector will be adjusted using the standardised functionality
        in the openMCMC package.

        After the coupling has been updated, a coverage test is applied for the new source
        location. If the max coupling is too small, a large contribution is added to the
        log-proposal density for the new state, to force the sampler to reject it.

        A central assumption of this function is that the sensor information and meteorology information
        have already been interpolated onto the same space/time points.

        This function assumes that the new source location has been added as the final column of
        the source location matrix, and so will correspondingly append the new coupling column to the right
        hand side of the current state coupling, and append an emission rate as the last element of the
        current state emission rate vector.

        Args:
            current_state (dict): dictionary containing parameters of the current state.
            prop_state (dict): dictionary containing the parameters of the proposed state.

        Returns:
            prop_state (dict): proposed state, with coupling matrix and source emission rate vector updated.
            logp_pr_g_cr (float): log-transition density of the proposed state given the current state
                (i.e. log[p(proposed | current)])
            logp_cr_g_pr (float): log-transition density of the current state given the proposed state
                (i.e. log[p(current | proposed)])

        """
        prop_state = self.update_coupling_column(prop_state, int(prop_state[self.map["number_sources"]].item()) - 1)
        prop_state[self.map["allocation"]] = np.concatenate(
            (prop_state[self.map["allocation"]], np.array([0], ndmin=2)), axis=0
        )
        in_cov_area = self.compute_coverage(prop_state[self.map["coupling_matrix"]][:, -1])
        if not in_cov_area:
            logp_pr_g_cr = 1e10
        else:
            logp_pr_g_cr = 0.0
        logp_cr_g_pr = 0.0

        return prop_state, logp_pr_g_cr, logp_cr_g_pr

    def death_function(self, current_state: dict, prop_state: dict, deletion_index: int) -> Tuple[dict, float, float]:
        """Update MCMC state based on source death proposal.

        Proposed state updated as follows:
            1- Remove column from coupling for deleted source.
            2- If required, adjust other components of the state which correspond to the sources.
        The source emission rate vector will be adjusted using the standardised functionality in the general_mcmc repo.

        A central assumption of this function is that the sensor information and meteorology information have already
        been interpolated onto the same space/time points.

        Args:
            current_state (dict): dictionary containing parameters of the current state.
            prop_state (dict): dictionary containing the parameters of the proposed state.
            deletion_index (int): index of the source to be deleted in the overall set of sources.

        Returns:
            prop_state (dict): proposed state, with coupling matrix and source emission rate vector updated.
            logp_pr_g_cr (float): log-transition density of the proposed state given the current state
                (i.e. log[p(proposed | current)])
            logp_cr_g_pr (float): log-transition density of the current state given the proposed state
                (i.e. log[p(current | proposed)])

        """
        prop_state[self.map["coupling_matrix"]] = np.delete(
            prop_state[self.map["coupling_matrix"]], obj=deletion_index, axis=1
        )
        prop_state[self.map["allocation"]] = np.delete(prop_state[self.map["allocation"]], obj=deletion_index, axis=0)
        logp_pr_g_cr = 0.0
        logp_cr_g_pr = 0.0

        return prop_state, logp_pr_g_cr, logp_cr_g_pr

    def move_function(self, prop_state: dict, update_column: int) -> Tuple[dict, float, float]:
        """Re-compute the coupling after a source location move.

        Function first updates the coupling column, and then checks whether the location passes a coverage test. If the
        location does not have good enough coverage, we return a high log-probability of the move to reject.

        Args:
            prop_state (dict): dictionary containing parameters of the proposed state.
            update_column (int): index of the coupling column to be updated.

        Returns:
            prop_state (dict): proposed state, with coupling matrix and source emission rate vector updated.
            logp_pr_g_cr (float): log-transition density of the proposed state given the current state
                (i.e. log[p(proposed | current)])
            logp_cr_g_pr (float): log-transition density of the current state given the proposed state
                (i.e. log[p(current | proposed)])

        """
        prop_state = self.update_coupling_column(prop_state, update_column)
        in_cov_area = self.compute_coverage(prop_state[self.map["coupling_matrix"]][:, update_column])

        if not in_cov_area:
            logp_pr_g_cr = 1e10
        else:
            logp_pr_g_cr = 0.0
        logp_cr_g_pr = 0.0

        return prop_state, logp_pr_g_cr, logp_cr_g_pr

    def make_model(self, model: list) -> list:
        """Take model list and append new elements from current model component.

        Args:
            model (list): Current list of model elements.

        Returns:
            list: model list updated with source-related distributions.

        """
        model = self.make_allocation_model(model)
        model = self.make_source_model(model)
        if self.update_precision:
            model.append(
                Gamma(
                    self.map["emission_rate_precision"],
                    shape=self.map["precision_prior_shape"],
                    rate=self.map["precision_prior_rate"],
                )
            )
        if self.reversible_jump:
            model.append(
                Uniform(
                    response=self.map["source_location"],
                    domain_response_lower=self.site_limits[:, [0]],
                    domain_response_upper=self.site_limits[:, [1]],
                )
            )
            if self.distribution_number_sources == "Uniform":
                model.append(
                    Uniform(
                        response=self.map["number_sources"],
                        domain_response_lower=1,
                        domain_response_upper=self.n_sources_max,
                    )
                )
            elif self.distribution_number_sources == "Poisson":
                model.append(Poisson(response=self.map["number_sources"], rate=self.map["number_source_rate"]))
            else:
                raise ValueError("Invalid distribution type for number of sources.")
        return model

    def make_sampler(self, model: Model, sampler_list: list) -> list:
        """Take sampler list and append new elements from current model component.

        Args:
            model (Model): Full model list of distributions.
            sampler_list (list): Current list of samplers.

        Returns:
            list: sampler list updated with source-related samplers.

        """
        sampler_list = self.make_source_sampler(model, sampler_list)
        sampler_list = self.make_allocation_sampler(model, sampler_list)
        if self.update_precision:
            sampler_list.append(NormalGamma(self.map["emission_rate_precision"], model))
        if self.reversible_jump:
            sampler_list = self.make_sampler_rjmcmc(model, sampler_list)
        return sampler_list

    def make_state(self, state: dict) -> dict:
        """Take state dictionary and append initial values from model component.

        Args:
            state (dict): current state vector.

        Returns:
            dict: current state vector with source-related parameters added.

        """
        state = self.make_allocation_state(state)
        state = self.make_source_state(state)
        state[self.map["coupling_matrix"]] = self.coupling
        state[self.map["emission_rate_precision"]] = np.array(self.initial_precision, ndmin=1)
        if self.update_precision:
            state[self.map["precision_prior_shape"]] = np.ones_like(self.initial_precision) * self.prior_precision_shape
            state[self.map["precision_prior_rate"]] = np.ones_like(self.initial_precision) * self.prior_precision_rate
        if self.reversible_jump:
            state[self.map["source_location"]] = self.dispersion_model.source_map.location.to_array().T
            state[self.map["number_sources"]] = np.array(
                state[self.map["source_location"]].shape[1], ndmin=2, dtype=int
            )
            state[self.map["number_source_rate"]] = self.rate_num_sources
        return state

    def make_sampler_rjmcmc(self, model: Model, sampler_list: list) -> list:
        """Create the parts of the sampler related to the reversible jump MCMC scheme.

        RJ MCMC scheme:
            - create the RandomWalkLoop sampler object which updates the source locations one-at-a-time.
            - create the ReversibleJump sampler which proposes birth/death moves to add/remove sources from the source
                map.

        Args:
            model (Model): model object containing probability density objects for all uncertain
                parameters.
            sampler_list (list): list of existing samplers.

        Returns:
            sampler_list (list): list of samplers updated with samplers corresponding to RJMCMC routine.

        """
        for sampler in sampler_list:
            if sampler.param == self.map["source"]:
                sampler.max_variable_size = self.n_sources_max

        sampler_list.append(
            RandomWalkLoop(
                self.map["source_location"],
                model,
                step=self.random_walk_step_size,
                max_variable_size=(3, self.n_sources_max),
                domain_limits=self.site_limits,
                state_update_function=self.move_function,
            )
        )
        matching_params = {
            "variable": self.map["source"],
            "matrix": self.map["coupling_matrix"],
            "scale": 1.0,
            "limits": [0.0, 1e6],
        }
        sampler_list.append(
            ReversibleJump(
                self.map["number_sources"],
                model,
                step=np.array([1.0], ndmin=2),
                associated_params=self.map["source_location"],
                n_max=self.n_sources_max,
                state_birth_function=self.birth_function,
                state_death_function=self.death_function,
                matching_params=matching_params,
            )
        )
        return sampler_list

    def from_mcmc(self, store: dict):
        """Extract results of mcmc from mcmc.store and attach to components.

        For the reversible jump case we extract all estimated source locations
        per iteration. For the fixed sources case we grab the source locations
        from the inputted sourcemap and repeat those for all iterations.

        Args:
            store (dict): mcmc result dictionary.

        """
        self.from_mcmc_group(store)
        self.from_mcmc_dist(store)
        if self.individual_source_labels is None:
            self.individual_source_labels = list(np.repeat(None, store[self.map["source"]].shape[0]))

        if self.update_precision:
            self.precision_scalar = store[self.map["emission_rate_precision"]]

        if self.reversible_jump:
            reference_latitude = self.dispersion_model.source_map.location.ref_latitude
            reference_longitude = self.dispersion_model.source_map.location.ref_longitude
            ref_altitude = self.dispersion_model.source_map.location.ref_altitude
            self.all_source_locations = ENU(
                ref_latitude=reference_latitude,
                ref_longitude=reference_longitude,
                ref_altitude=ref_altitude,
                east=store[self.map["source_location"]][0, :, :],
                north=store[self.map["source_location"]][1, :, :],
                up=store[self.map["source_location"]][2, :, :],
            )

        else:
            location_temp = self.dispersion_model.source_map.location.to_enu()
            location_temp.east = np.repeat(location_temp.east[:, np.newaxis], store["log_post"].shape[0], axis=1)
            location_temp.north = np.repeat(location_temp.north[:, np.newaxis], store["log_post"].shape[0], axis=1)
            location_temp.up = np.repeat(location_temp.up[:, np.newaxis], store["log_post"].shape[0], axis=1)
            self.all_source_locations = location_temp

    def plot_iterations(self, plot: "Plot", burn_in_value: int, y_axis_type: str = "linear") -> "Plot":
        """Plot the emission rate estimates source model object against MCMC iteration.

        Args:
            burn_in_value (int): Burn in value to show in plot.
            y_axis_type (str, optional): String to indicate whether the y-axis should be linear of log scale.
            plot (Plot): Plot object to which this figure will be added in the figure dictionary.

        Returns:
            plot (Plot): Plot object to which the figures added in the figure dictionary with
                keys 'estimated_values_plot'/'log_estimated_values_plot' and 'number_of_sources_plot'

        """
        plot.plot_emission_rate_estimates(source_model_object=self, burn_in=burn_in_value, y_axis_type=y_axis_type)
        plot.plot_single_trace(object_to_plot=self)
        return plot

nof_sources property

Get number of sources in the source map.

coverage_threshold property

Compute coverage threshold from detection threshold and test source strength.

__post_init__()

Post-initialisation of the class.

This function is called after the class has been initialised, and is used to set up the mapping dictionary for the class by applying the append_string function to the mapping dictionary.

Source code in src/pyelq/component/source_model.py
557
558
559
560
561
562
563
564
565
def __post_init__(self):
    """Post-initialisation of the class.

    This function is called after the class has been initialised, and is used to set up the mapping dictionary for
    the class by applying the append_string function to the mapping dictionary.

    """
    if self.label_string is not None:
        self.append_string(self.label_string)

initialise(sensor_object, meteorology, gas_species)

Set up the source model.

Extract required information from the sensor, meteorology and gas species objects: - Attach coupling calculated using self.dispersion_model. - (If self.reversible_jump == True) Attach objects to source model which will be used in RJMCMC sampler, they will be required when we need to update the couplings when new source locations are proposed when we move/birth/death.

Parameters:

Name Type Description Default
sensor_object SensorGroup

object containing sensor data.

required
meteorology MeteorologyGroup

object containing meteorology data.

required
gas_species GasSpecies

object containing gas species information.

required
Source code in src/pyelq/component/source_model.py
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def initialise(self, sensor_object: SensorGroup, meteorology: Meteorology, gas_species: GasSpecies):
    """Set up the source model.

    Extract required information from the sensor, meteorology and gas species objects:
        - Attach coupling calculated using self.dispersion_model.
        - (If self.reversible_jump == True) Attach objects to source model which will be used in RJMCMC sampler,
            they will be required when we need to update the couplings when new source locations are proposed when
            we move/birth/death.

    Args:
        sensor_object (SensorGroup): object containing sensor data.
        meteorology (MeteorologyGroup): object containing meteorology data.
        gas_species (GasSpecies): object containing gas species information.

    """
    self.initialise_dispersion_model(sensor_object)
    self.coupling = self.dispersion_model.compute_coupling(
        sensor_object, meteorology, gas_species, output_stacked=True
    )

    self.sensor_object = sensor_object
    self.screen_coverage()
    if self.reversible_jump:
        self.meteorology = meteorology
        self.gas_species = gas_species

initialise_dispersion_model(sensor_object)

Initialise the dispersion model.

If a dispersion_model has already been attached to this instance, then this function takes no action.

If a dispersion_model has not already been attached to the instance, then this function adds a GaussianPlume dispersion model, with a default source map that has limits set based on the sensor locations.

Parameters:

Name Type Description Default
sensor_object SensorGroup

object containing sensor data.

required
Source code in src/pyelq/component/source_model.py
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
def initialise_dispersion_model(self, sensor_object: SensorGroup):
    """Initialise the dispersion model.

    If a dispersion_model has already been attached to this instance, then this function takes no action.

    If a dispersion_model has not already been attached to the instance, then this function adds a GaussianPlume
    dispersion model, with a default source map that has limits set based on the sensor locations.

    Args:
        sensor_object (SensorGroup): object containing sensor data.

    """
    if self.dispersion_model is None:
        source_map = SourceMap()
        sensor_locations = sensor_object.location.to_enu()
        location_object = ENU(
            ref_latitude=sensor_locations.ref_latitude,
            ref_longitude=sensor_locations.ref_longitude,
            ref_altitude=sensor_locations.ref_altitude,
        )
        source_map.generate_sources(
            coordinate_object=location_object,
            sourcemap_limits=np.array(
                [
                    [np.min(sensor_locations.east), np.max(sensor_locations.east)],
                    [np.min(sensor_locations.north), np.max(sensor_locations.north)],
                    [np.min(sensor_locations.up), np.max(sensor_locations.up)],
                ]
            ),
            sourcemap_type="grid",
        )
        self.dispersion_model = GaussianPlume(source_map)

screen_coverage()

Screen the initial source map for coverage.

Source code in src/pyelq/component/source_model.py
636
637
638
639
640
641
642
def screen_coverage(self):
    """Screen the initial source map for coverage."""
    in_coverage_area = self.compute_coverage(self.coupling)
    self.coupling = self.coupling[:, in_coverage_area]
    all_locations = self.dispersion_model.source_map.location.to_array()
    screened_locations = all_locations[in_coverage_area, :]
    self.dispersion_model.source_map.location.from_array(screened_locations)

compute_coverage(couplings, **kwargs)

Returns a logical vector that indicates which sources in the couplings are, or are not, within the coverage.

The 'coverage' is the area inside which all sources are well covered by wind data. E.g. If wind exclusively blows towards East, then all sources to the East of any sensor are 'invisible', and are not within the coverage.

Couplings are returned in hr/kg. Some threshold function defines the largest allowed coupling value. This is used to calculate estimated emission rates in kg/hr. Any emissions which are greater than the value of 'self.coverage_threshold' are defined as not within the coverage.

If sensor_object.source_on is being used only the parts where th coupling is computed are used in the coverage check. This avoids threshold_function being affected by large amounts of zero values.

Parameters:

Name Type Description Default
couplings ndarray

Array of coupling values. Dimensions: n_data points x n_sources.

required
kwargs dict

Keyword arguments required for the threshold function.

{}

Returns:

Name Type Description
coverage ndarray

A logical array specifying which sources are within the coverage.

Source code in src/pyelq/component/source_model.py
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
def compute_coverage(self, couplings: np.ndarray, **kwargs) -> np.ndarray:
    """Returns a logical vector that indicates which sources in the couplings are, or are not, within the coverage.

    The 'coverage' is the area inside which all sources are well covered by wind data. E.g. If wind exclusively
    blows towards East, then all sources to the East of any sensor are 'invisible', and are not within the coverage.

    Couplings are returned in hr/kg. Some threshold function defines the largest allowed coupling value. This is
    used to calculate estimated emission rates in kg/hr. Any emissions which are greater than the value of
    'self.coverage_threshold' are defined as not within the coverage.

    If sensor_object.source_on is being used only the parts where th coupling is computed are used in the coverage
    check. This avoids threshold_function being affected by large amounts of zero values.

    Args:
        couplings (np.ndarray): Array of coupling values. Dimensions: n_data points x n_sources.
        kwargs (dict, optional): Keyword arguments required for the threshold function.

    Returns:
        coverage (np.ndarray): A logical array specifying which sources are within the coverage.

    """
    if self.sensor_object.source_on is not None:
        couplings = deepcopy(couplings)
        index_keep = self.sensor_object.source_on > 0
        couplings = couplings[index_keep]

    coupling_threshold = self.threshold_function(couplings, **kwargs)
    no_warning_threshold = np.where(coupling_threshold <= 1e-100, 1, coupling_threshold)
    no_warning_estimated_emission_rates = np.where(coupling_threshold <= 1e-100, np.inf, 1 / no_warning_threshold)
    coverage = no_warning_estimated_emission_rates < self.coverage_threshold

    return coverage

update_coupling_column(state, update_column)

Update the coupling, based on changes to the source locations as part of inversion.

To be used in two different situations
  • movement of source locations (e.g. Metropolis Hastings, random walk).
  • adding of new source locations (e.g. reversible jump birth move).

If [update_column < A.shape[1]]: an existing column of the A matrix is updated. If [update_column == A.shape[1]]: a new column is appended to the right-hand side of the A matrix (corresponding to a new source).

A central assumption of this function is that the sensor information and meteorology information have already been interpolated onto the same space/time points.

If an update_column is supplied, the coupling for that source location only is calculated to save on computation time. If update_column is None, then we just re-compute the whole coupling matrix.

Parameters:

Name Type Description Default
state dict

dictionary containing state parameters.

required
update_column int

index of the coupling column to be updated.

required

Returns:

Name Type Description
state dict

state dictionary containing updated coupling information.

Source code in src/pyelq/component/source_model.py
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
def update_coupling_column(self, state: dict, update_column: int) -> dict:
    """Update the coupling, based on changes to the source locations as part of inversion.

    To be used in two different situations:
        - movement of source locations (e.g. Metropolis Hastings, random walk).
        - adding of new source locations (e.g. reversible jump birth move).
    If [update_column < A.shape[1]]: an existing column of the A matrix is updated.
    If [update_column == A.shape[1]]: a new column is appended to the right-hand side of the A matrix
    (corresponding to a new source).

    A central assumption of this function is that the sensor information and meteorology information
    have already been interpolated onto the same space/time points.

    If an update_column is supplied, the coupling for that source location only is calculated to save on
    computation time. If update_column is None, then we just re-compute the whole coupling matrix.

    Args:
        state (dict): dictionary containing state parameters.
        update_column (int): index of the coupling column to be updated.

    Returns:
        state (dict): state dictionary containing updated coupling information.

    """
    self.dispersion_model.source_map.location.from_array(state[self.map["source_location"]][:, [update_column]].T)
    new_coupling = self.dispersion_model.compute_coupling(
        self.sensor_object, self.meteorology, self.gas_species, output_stacked=True, run_interpolation=False
    )

    if update_column == state[self.map["coupling_matrix"]].shape[1]:
        state[self.map["coupling_matrix"]] = np.concatenate(
            (state[self.map["coupling_matrix"]], new_coupling), axis=1
        )
    elif update_column < state[self.map["coupling_matrix"]].shape[1]:
        state[self.map["coupling_matrix"]][:, [update_column]] = new_coupling
    else:
        raise ValueError("Invalid column specification for updating.")
    return state

birth_function(current_state, prop_state)

Update MCMC state based on source birth proposal.

Proposed state updated as follows

1- Add column to coupling matrix for new source location. 2- If required, adjust other components of the state which correspond to the sources.

The source emission rate vector will be adjusted using the standardised functionality in the openMCMC package.

After the coupling has been updated, a coverage test is applied for the new source location. If the max coupling is too small, a large contribution is added to the log-proposal density for the new state, to force the sampler to reject it.

A central assumption of this function is that the sensor information and meteorology information have already been interpolated onto the same space/time points.

This function assumes that the new source location has been added as the final column of the source location matrix, and so will correspondingly append the new coupling column to the right hand side of the current state coupling, and append an emission rate as the last element of the current state emission rate vector.

Parameters:

Name Type Description Default
current_state dict

dictionary containing parameters of the current state.

required
prop_state dict

dictionary containing the parameters of the proposed state.

required

Returns:

Name Type Description
prop_state dict

proposed state, with coupling matrix and source emission rate vector updated.

logp_pr_g_cr float

log-transition density of the proposed state given the current state (i.e. log[p(proposed | current)])

logp_cr_g_pr float

log-transition density of the current state given the proposed state (i.e. log[p(current | proposed)])

Source code in src/pyelq/component/source_model.py
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
def birth_function(self, current_state: dict, prop_state: dict) -> Tuple[dict, float, float]:
    """Update MCMC state based on source birth proposal.

    Proposed state updated as follows:
        1- Add column to coupling matrix for new source location.
        2- If required, adjust other components of the state which correspond to the sources.
    The source emission rate vector will be adjusted using the standardised functionality
    in the openMCMC package.

    After the coupling has been updated, a coverage test is applied for the new source
    location. If the max coupling is too small, a large contribution is added to the
    log-proposal density for the new state, to force the sampler to reject it.

    A central assumption of this function is that the sensor information and meteorology information
    have already been interpolated onto the same space/time points.

    This function assumes that the new source location has been added as the final column of
    the source location matrix, and so will correspondingly append the new coupling column to the right
    hand side of the current state coupling, and append an emission rate as the last element of the
    current state emission rate vector.

    Args:
        current_state (dict): dictionary containing parameters of the current state.
        prop_state (dict): dictionary containing the parameters of the proposed state.

    Returns:
        prop_state (dict): proposed state, with coupling matrix and source emission rate vector updated.
        logp_pr_g_cr (float): log-transition density of the proposed state given the current state
            (i.e. log[p(proposed | current)])
        logp_cr_g_pr (float): log-transition density of the current state given the proposed state
            (i.e. log[p(current | proposed)])

    """
    prop_state = self.update_coupling_column(prop_state, int(prop_state[self.map["number_sources"]].item()) - 1)
    prop_state[self.map["allocation"]] = np.concatenate(
        (prop_state[self.map["allocation"]], np.array([0], ndmin=2)), axis=0
    )
    in_cov_area = self.compute_coverage(prop_state[self.map["coupling_matrix"]][:, -1])
    if not in_cov_area:
        logp_pr_g_cr = 1e10
    else:
        logp_pr_g_cr = 0.0
    logp_cr_g_pr = 0.0

    return prop_state, logp_pr_g_cr, logp_cr_g_pr

death_function(current_state, prop_state, deletion_index)

Update MCMC state based on source death proposal.

Proposed state updated as follows

1- Remove column from coupling for deleted source. 2- If required, adjust other components of the state which correspond to the sources.

The source emission rate vector will be adjusted using the standardised functionality in the general_mcmc repo.

A central assumption of this function is that the sensor information and meteorology information have already been interpolated onto the same space/time points.

Parameters:

Name Type Description Default
current_state dict

dictionary containing parameters of the current state.

required
prop_state dict

dictionary containing the parameters of the proposed state.

required
deletion_index int

index of the source to be deleted in the overall set of sources.

required

Returns:

Name Type Description
prop_state dict

proposed state, with coupling matrix and source emission rate vector updated.

logp_pr_g_cr float

log-transition density of the proposed state given the current state (i.e. log[p(proposed | current)])

logp_cr_g_pr float

log-transition density of the current state given the proposed state (i.e. log[p(current | proposed)])

Source code in src/pyelq/component/source_model.py
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
def death_function(self, current_state: dict, prop_state: dict, deletion_index: int) -> Tuple[dict, float, float]:
    """Update MCMC state based on source death proposal.

    Proposed state updated as follows:
        1- Remove column from coupling for deleted source.
        2- If required, adjust other components of the state which correspond to the sources.
    The source emission rate vector will be adjusted using the standardised functionality in the general_mcmc repo.

    A central assumption of this function is that the sensor information and meteorology information have already
    been interpolated onto the same space/time points.

    Args:
        current_state (dict): dictionary containing parameters of the current state.
        prop_state (dict): dictionary containing the parameters of the proposed state.
        deletion_index (int): index of the source to be deleted in the overall set of sources.

    Returns:
        prop_state (dict): proposed state, with coupling matrix and source emission rate vector updated.
        logp_pr_g_cr (float): log-transition density of the proposed state given the current state
            (i.e. log[p(proposed | current)])
        logp_cr_g_pr (float): log-transition density of the current state given the proposed state
            (i.e. log[p(current | proposed)])

    """
    prop_state[self.map["coupling_matrix"]] = np.delete(
        prop_state[self.map["coupling_matrix"]], obj=deletion_index, axis=1
    )
    prop_state[self.map["allocation"]] = np.delete(prop_state[self.map["allocation"]], obj=deletion_index, axis=0)
    logp_pr_g_cr = 0.0
    logp_cr_g_pr = 0.0

    return prop_state, logp_pr_g_cr, logp_cr_g_pr

move_function(prop_state, update_column)

Re-compute the coupling after a source location move.

Function first updates the coupling column, and then checks whether the location passes a coverage test. If the location does not have good enough coverage, we return a high log-probability of the move to reject.

Parameters:

Name Type Description Default
prop_state dict

dictionary containing parameters of the proposed state.

required
update_column int

index of the coupling column to be updated.

required

Returns:

Name Type Description
prop_state dict

proposed state, with coupling matrix and source emission rate vector updated.

logp_pr_g_cr float

log-transition density of the proposed state given the current state (i.e. log[p(proposed | current)])

logp_cr_g_pr float

log-transition density of the current state given the proposed state (i.e. log[p(current | proposed)])

Source code in src/pyelq/component/source_model.py
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
def move_function(self, prop_state: dict, update_column: int) -> Tuple[dict, float, float]:
    """Re-compute the coupling after a source location move.

    Function first updates the coupling column, and then checks whether the location passes a coverage test. If the
    location does not have good enough coverage, we return a high log-probability of the move to reject.

    Args:
        prop_state (dict): dictionary containing parameters of the proposed state.
        update_column (int): index of the coupling column to be updated.

    Returns:
        prop_state (dict): proposed state, with coupling matrix and source emission rate vector updated.
        logp_pr_g_cr (float): log-transition density of the proposed state given the current state
            (i.e. log[p(proposed | current)])
        logp_cr_g_pr (float): log-transition density of the current state given the proposed state
            (i.e. log[p(current | proposed)])

    """
    prop_state = self.update_coupling_column(prop_state, update_column)
    in_cov_area = self.compute_coverage(prop_state[self.map["coupling_matrix"]][:, update_column])

    if not in_cov_area:
        logp_pr_g_cr = 1e10
    else:
        logp_pr_g_cr = 0.0
    logp_cr_g_pr = 0.0

    return prop_state, logp_pr_g_cr, logp_cr_g_pr

make_model(model)

Take model list and append new elements from current model component.

Parameters:

Name Type Description Default
model list

Current list of model elements.

required

Returns:

Name Type Description
list list

model list updated with source-related distributions.

Source code in src/pyelq/component/source_model.py
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
def make_model(self, model: list) -> list:
    """Take model list and append new elements from current model component.

    Args:
        model (list): Current list of model elements.

    Returns:
        list: model list updated with source-related distributions.

    """
    model = self.make_allocation_model(model)
    model = self.make_source_model(model)
    if self.update_precision:
        model.append(
            Gamma(
                self.map["emission_rate_precision"],
                shape=self.map["precision_prior_shape"],
                rate=self.map["precision_prior_rate"],
            )
        )
    if self.reversible_jump:
        model.append(
            Uniform(
                response=self.map["source_location"],
                domain_response_lower=self.site_limits[:, [0]],
                domain_response_upper=self.site_limits[:, [1]],
            )
        )
        if self.distribution_number_sources == "Uniform":
            model.append(
                Uniform(
                    response=self.map["number_sources"],
                    domain_response_lower=1,
                    domain_response_upper=self.n_sources_max,
                )
            )
        elif self.distribution_number_sources == "Poisson":
            model.append(Poisson(response=self.map["number_sources"], rate=self.map["number_source_rate"]))
        else:
            raise ValueError("Invalid distribution type for number of sources.")
    return model

make_sampler(model, sampler_list)

Take sampler list and append new elements from current model component.

Parameters:

Name Type Description Default
model Model

Full model list of distributions.

required
sampler_list list

Current list of samplers.

required

Returns:

Name Type Description
list list

sampler list updated with source-related samplers.

Source code in src/pyelq/component/source_model.py
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
def make_sampler(self, model: Model, sampler_list: list) -> list:
    """Take sampler list and append new elements from current model component.

    Args:
        model (Model): Full model list of distributions.
        sampler_list (list): Current list of samplers.

    Returns:
        list: sampler list updated with source-related samplers.

    """
    sampler_list = self.make_source_sampler(model, sampler_list)
    sampler_list = self.make_allocation_sampler(model, sampler_list)
    if self.update_precision:
        sampler_list.append(NormalGamma(self.map["emission_rate_precision"], model))
    if self.reversible_jump:
        sampler_list = self.make_sampler_rjmcmc(model, sampler_list)
    return sampler_list

make_state(state)

Take state dictionary and append initial values from model component.

Parameters:

Name Type Description Default
state dict

current state vector.

required

Returns:

Name Type Description
dict dict

current state vector with source-related parameters added.

Source code in src/pyelq/component/source_model.py
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
def make_state(self, state: dict) -> dict:
    """Take state dictionary and append initial values from model component.

    Args:
        state (dict): current state vector.

    Returns:
        dict: current state vector with source-related parameters added.

    """
    state = self.make_allocation_state(state)
    state = self.make_source_state(state)
    state[self.map["coupling_matrix"]] = self.coupling
    state[self.map["emission_rate_precision"]] = np.array(self.initial_precision, ndmin=1)
    if self.update_precision:
        state[self.map["precision_prior_shape"]] = np.ones_like(self.initial_precision) * self.prior_precision_shape
        state[self.map["precision_prior_rate"]] = np.ones_like(self.initial_precision) * self.prior_precision_rate
    if self.reversible_jump:
        state[self.map["source_location"]] = self.dispersion_model.source_map.location.to_array().T
        state[self.map["number_sources"]] = np.array(
            state[self.map["source_location"]].shape[1], ndmin=2, dtype=int
        )
        state[self.map["number_source_rate"]] = self.rate_num_sources
    return state

make_sampler_rjmcmc(model, sampler_list)

Create the parts of the sampler related to the reversible jump MCMC scheme.

RJ MCMC scheme
  • create the RandomWalkLoop sampler object which updates the source locations one-at-a-time.
  • create the ReversibleJump sampler which proposes birth/death moves to add/remove sources from the source map.

Parameters:

Name Type Description Default
model Model

model object containing probability density objects for all uncertain parameters.

required
sampler_list list

list of existing samplers.

required

Returns:

Name Type Description
sampler_list list

list of samplers updated with samplers corresponding to RJMCMC routine.

Source code in src/pyelq/component/source_model.py
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
def make_sampler_rjmcmc(self, model: Model, sampler_list: list) -> list:
    """Create the parts of the sampler related to the reversible jump MCMC scheme.

    RJ MCMC scheme:
        - create the RandomWalkLoop sampler object which updates the source locations one-at-a-time.
        - create the ReversibleJump sampler which proposes birth/death moves to add/remove sources from the source
            map.

    Args:
        model (Model): model object containing probability density objects for all uncertain
            parameters.
        sampler_list (list): list of existing samplers.

    Returns:
        sampler_list (list): list of samplers updated with samplers corresponding to RJMCMC routine.

    """
    for sampler in sampler_list:
        if sampler.param == self.map["source"]:
            sampler.max_variable_size = self.n_sources_max

    sampler_list.append(
        RandomWalkLoop(
            self.map["source_location"],
            model,
            step=self.random_walk_step_size,
            max_variable_size=(3, self.n_sources_max),
            domain_limits=self.site_limits,
            state_update_function=self.move_function,
        )
    )
    matching_params = {
        "variable": self.map["source"],
        "matrix": self.map["coupling_matrix"],
        "scale": 1.0,
        "limits": [0.0, 1e6],
    }
    sampler_list.append(
        ReversibleJump(
            self.map["number_sources"],
            model,
            step=np.array([1.0], ndmin=2),
            associated_params=self.map["source_location"],
            n_max=self.n_sources_max,
            state_birth_function=self.birth_function,
            state_death_function=self.death_function,
            matching_params=matching_params,
        )
    )
    return sampler_list

from_mcmc(store)

Extract results of mcmc from mcmc.store and attach to components.

For the reversible jump case we extract all estimated source locations per iteration. For the fixed sources case we grab the source locations from the inputted sourcemap and repeat those for all iterations.

Parameters:

Name Type Description Default
store dict

mcmc result dictionary.

required
Source code in src/pyelq/component/source_model.py
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
def from_mcmc(self, store: dict):
    """Extract results of mcmc from mcmc.store and attach to components.

    For the reversible jump case we extract all estimated source locations
    per iteration. For the fixed sources case we grab the source locations
    from the inputted sourcemap and repeat those for all iterations.

    Args:
        store (dict): mcmc result dictionary.

    """
    self.from_mcmc_group(store)
    self.from_mcmc_dist(store)
    if self.individual_source_labels is None:
        self.individual_source_labels = list(np.repeat(None, store[self.map["source"]].shape[0]))

    if self.update_precision:
        self.precision_scalar = store[self.map["emission_rate_precision"]]

    if self.reversible_jump:
        reference_latitude = self.dispersion_model.source_map.location.ref_latitude
        reference_longitude = self.dispersion_model.source_map.location.ref_longitude
        ref_altitude = self.dispersion_model.source_map.location.ref_altitude
        self.all_source_locations = ENU(
            ref_latitude=reference_latitude,
            ref_longitude=reference_longitude,
            ref_altitude=ref_altitude,
            east=store[self.map["source_location"]][0, :, :],
            north=store[self.map["source_location"]][1, :, :],
            up=store[self.map["source_location"]][2, :, :],
        )

    else:
        location_temp = self.dispersion_model.source_map.location.to_enu()
        location_temp.east = np.repeat(location_temp.east[:, np.newaxis], store["log_post"].shape[0], axis=1)
        location_temp.north = np.repeat(location_temp.north[:, np.newaxis], store["log_post"].shape[0], axis=1)
        location_temp.up = np.repeat(location_temp.up[:, np.newaxis], store["log_post"].shape[0], axis=1)
        self.all_source_locations = location_temp

plot_iterations(plot, burn_in_value, y_axis_type='linear')

Plot the emission rate estimates source model object against MCMC iteration.

Parameters:

Name Type Description Default
burn_in_value int

Burn in value to show in plot.

required
y_axis_type str

String to indicate whether the y-axis should be linear of log scale.

'linear'
plot Plot

Plot object to which this figure will be added in the figure dictionary.

required

Returns:

Name Type Description
plot Plot

Plot object to which the figures added in the figure dictionary with keys 'estimated_values_plot'/'log_estimated_values_plot' and 'number_of_sources_plot'

Source code in src/pyelq/component/source_model.py
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
def plot_iterations(self, plot: "Plot", burn_in_value: int, y_axis_type: str = "linear") -> "Plot":
    """Plot the emission rate estimates source model object against MCMC iteration.

    Args:
        burn_in_value (int): Burn in value to show in plot.
        y_axis_type (str, optional): String to indicate whether the y-axis should be linear of log scale.
        plot (Plot): Plot object to which this figure will be added in the figure dictionary.

    Returns:
        plot (Plot): Plot object to which the figures added in the figure dictionary with
            keys 'estimated_values_plot'/'log_estimated_values_plot' and 'number_of_sources_plot'

    """
    plot.plot_emission_rate_estimates(source_model_object=self, burn_in=burn_in_value, y_axis_type=y_axis_type)
    plot.plot_single_trace(object_to_plot=self)
    return plot

Normal dataclass

Bases: SourceModel, NullGrouping, NormalResponse

Normal model, with null allocation.

(Truncated) Gaussian prior for emission rates, no grouping/allocation; no transformation applied to emission rate parameters.

Can be used in the following cases
  • Fixed set of sources (grid or specific locations), all with the same Gaussian prior distribution.
  • Variable number of sources, with a common prior distribution, estimated using reversible jump MCMC.
  • Fixed set of sources with a bespoke prior per source (using the allocation to map prior parameters onto sources).
Source code in src/pyelq/component/source_model.py
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
@dataclass
class Normal(SourceModel, NullGrouping, NormalResponse):
    """Normal model, with null allocation.

    (Truncated) Gaussian prior for emission rates, no grouping/allocation; no transformation applied to emission rate
    parameters.

    Can be used in the following cases:
        - Fixed set of sources (grid or specific locations), all with the same Gaussian prior distribution.
        - Variable number of sources, with a common prior distribution, estimated using reversible jump MCMC.
        - Fixed set of sources with a bespoke prior per source (using the allocation to map prior parameters onto
            sources).

    """

NormalSlabAndSpike dataclass

Bases: SourceModel, SlabAndSpike, NormalResponse

Normal Slab and Spike model.

(Truncated) Gaussian prior for emission rates, slab and spike prior, with allocation estimation; no transformation applied to emission rate parameters.

Attributes:

Name Type Description
initial_precision ndarray

initial precision parameter for a slab and spike case. shape=(2, 1).

emission_rate_mean ndarray

emission rate prior mean for a slab and spike case. shape=(2, 1).

Source code in src/pyelq/component/source_model.py
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
@dataclass
class NormalSlabAndSpike(SourceModel, SlabAndSpike, NormalResponse):
    """Normal Slab and Spike model.

    (Truncated) Gaussian prior for emission rates, slab and spike prior, with allocation estimation; no transformation
    applied to emission rate parameters.

    Attributes:
        initial_precision (np.ndarray): initial precision parameter for a slab and spike case. shape=(2, 1).
        emission_rate_mean (np.ndarray): emission rate prior mean for a slab and spike case. shape=(2, 1).

    """

    initial_precision: np.ndarray = field(default_factory=lambda: np.array([1 / (10**2), 1 / (0.01**2)], ndmin=2).T)
    emission_rate_mean: np.ndarray = field(default_factory=lambda: np.array([0, 0], ndmin=2).T)