From fe7cbf45b73b88cf16a23d56be647e3fbca3e4bc Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Tue, 4 Apr 2023 11:35:40 +0200 Subject: [PATCH 1/8] added expert apps validation for presence times --- caimira/apps/calculator/report_generator.py | 22 ++- caimira/apps/expert.py | 60 ++++--- caimira/apps/expert_co2.py | 20 ++- caimira/models.py | 155 +++++++++++++----- caimira/monte_carlo/models.py | 8 + .../tests/models/test_concentration_model.py | 3 +- 6 files changed, 189 insertions(+), 79 deletions(-) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index bd6d273f..6b122948 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -20,10 +20,18 @@ from ... import dataclass_utils def model_start_end(model: models.ExposureModel): - t_start = min(model.exposed.presence.boundaries()[0][0], - model.concentration_model.infected.presence.boundaries()[0][0]) - t_end = max(model.exposed.presence.boundaries()[-1][1], - model.concentration_model.infected.presence.boundaries()[-1][1]) + if (isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval) + and isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval)): + t_start = min(model.exposed.presence.boundaries()[0][0], + model.concentration_model.infected.presence.boundaries()[0][0]) + t_end = max(model.exposed.presence.boundaries()[-1][1], + model.concentration_model.infected.presence.boundaries()[-1][1]) + elif (isinstance(model.exposed.number, models.IntPiecewiseContant) + and isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant)): + t_start = min(model.exposed.number.interval().boundaries()[0][0], + model.concentration_model.infected.number.interval().boundaries()[0][0]) + t_end = max(model.exposed.number.interval().boundaries()[-1][1], + model.concentration_model.infected.number.interval().boundaries()[-1][1]) return t_start, t_end @@ -141,11 +149,15 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing exposed_occupants = model.exposed.number expected_new_cases = np.array(model.expected_new_cases()).mean() uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(model))) if form.conditional_probability_plot else None + if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): + exposed_presence_intervals = [list(interval) for interval in model.exposed.presence.boundaries()] + elif isinstance(model.exposed.number, models.IntPiecewiseContant): + exposed_presence_intervals = [list(interval) for interval in model.exposed.number.interval().boundaries()] return { "model_repr": repr(model), "times": list(times), - "exposed_presence_intervals": [list(interval) for interval in model.exposed.presence.boundaries()], + "exposed_presence_intervals": exposed_presence_intervals, "short_range_intervals": short_range_intervals, "short_range_expirations": short_range_expirations, "concentrations": concentrations, diff --git a/caimira/apps/expert.py b/caimira/apps/expert.py index 77e7e82b..34a6a062 100644 --- a/caimira/apps/expert.py +++ b/caimira/apps/expert.py @@ -148,8 +148,12 @@ class ExposureModelResult(View): def update_plot(self, model: models.ExposureModel): resolution = 600 - ts = np.linspace(sorted(model.concentration_model.infected.presence.transition_times())[0], - sorted(model.concentration_model.infected.presence.transition_times())[-1], resolution) + if isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval): + infected_presence = model.concentration_model.infected.presence + elif isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant): + infected_presence = model.concentration_model.infected.number.interval() + ts = np.linspace(sorted(infected_presence.transition_times())[0], + sorted(infected_presence.transition_times())[-1], resolution) concentration = [model.concentration(t) for t in ts] cumulative_doses = np.cumsum([ @@ -164,16 +168,21 @@ class ExposureModelResult(View): self.ax.ignore_existing_data_limits = False self.concentration_line.set_data(ts, concentration) + if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): + exposed_presence = model.exposed.presence + elif isinstance(model.exposed.number, models.IntPiecewiseContant): + exposed_presence = model.exposed.number.interval() + if self.concentration_area is None: self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) | - (model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1]))) + where = ((exposed_presence.boundaries()[0][0] < ts) & (ts < exposed_presence.boundaries()[0][1]) | + (exposed_presence.boundaries()[1][0] < ts) & (ts < exposed_presence.boundaries()[1][1]))) else: self.concentration_area.remove() self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) | - (model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1]))) + where = ((exposed_presence.boundaries()[0][0] < ts) & (ts < exposed_presence.boundaries()[0][1]) | + (exposed_presence.boundaries()[1][0] < ts) & (ts < exposed_presence.boundaries()[1][1]))) if self.cumulative_line is None: [self.cumulative_line] = self.ax2.plot(ts[:-1], cumulative_doses, color='#0000c8', linestyle='dotted') @@ -187,8 +196,8 @@ class ExposureModelResult(View): cumulative_top = max(cumulative_doses) self.ax2.set_ylim(bottom=0., top=cumulative_top) - self.ax.set_xlim(left = min(min(model.concentration_model.infected.presence.boundaries()[0]), min(model.exposed.presence.boundaries()[0])), - right = max(max(model.concentration_model.infected.presence.boundaries()[1]), max(model.exposed.presence.boundaries()[1]))) + self.ax.set_xlim(left = min(min(infected_presence.boundaries()[0]), min(exposed_presence.boundaries()[0])), + right = max(max(infected_presence.boundaries()[1]), max(exposed_presence.boundaries()[1]))) figure_legends = [mlines.Line2D([], [], color='#3530fe', markersize=15, label='Mean concentration'), mlines.Line2D([], [], color='#0000c8', markersize=15, ls="dotted", label='Cumulative dose'), @@ -649,21 +658,10 @@ class ModelWidgets(View): number.observe(on_exposed_number_change, names=['value']) return widgets.HBox([widgets.Label('Number of exposed people in the room '), number], layout=widgets.Layout(justify_content='space-between')) - - def generate_presence_widget(self, min, max, node): - options = list(pd.date_range(min, max, freq="1min").strftime('%H:%M')) - start_hour = float(node[0]) - end_hour = float(node[1]) - start_hour_datetime = datetime.time(hour = int(start_hour), minute=int(start_hour%1*60)) - end_hour_datetime = datetime.time(hour = int(end_hour), minute=int(end_hour%1*60)) - return widgets.SelectionRangeSlider( - options=options, - index=(options.index(str(start_hour_datetime)[:-3]), options.index(str(end_hour_datetime)[:-3])), - ) def _build_exposed_presence(self, node): - presence_start = self.generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) - presence_finish = self.generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) + presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) + presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) def on_presence_start_change(change): new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) @@ -719,8 +717,8 @@ class ModelWidgets(View): return widgets.HBox([widgets.Label("Viral load (copies/ml)"), viral_load_in_sputum], layout=widgets.Layout(justify_content='space-between')) def _build_infected_presence(self, node, ventilation_node): - presence_start = self.generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) - presence_finish = self.generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) + presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) + presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) def on_presence_start_change(change): new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) @@ -1119,6 +1117,18 @@ def models_start_end(models: typing.Sequence[models.ExposureModel]) -> typing.Tu Returns the earliest start and latest end time of a collection of ConcentrationModel objects """ - infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models) - infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models) + infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models) # type: ignore + infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models) # type: ignore return infected_start, infected_finish + + +def generate_presence_widget(min, max, node): + options = list(pd.date_range(min, max, freq="1min").strftime('%H:%M')) + start_hour = float(node[0]) + end_hour = float(node[1]) + start_hour_datetime = datetime.time(hour = int(start_hour), minute=int(start_hour%1*60)) + end_hour_datetime = datetime.time(hour = int(end_hour), minute=int(end_hour%1*60)) + return widgets.SelectionRangeSlider( + options=options, + index=(options.index(str(start_hour_datetime)[:-3]), options.index(str(end_hour_datetime)[:-3])), + ) diff --git a/caimira/apps/expert_co2.py b/caimira/apps/expert_co2.py index c78a5b10..505870d9 100644 --- a/caimira/apps/expert_co2.py +++ b/caimira/apps/expert_co2.py @@ -8,7 +8,7 @@ import matplotlib import matplotlib.figure import matplotlib.lines as mlines import matplotlib.patches as patches -from .expert import collapsible, ipympl_canvas, WidgetGroup, CAIMIRAStateBuilder +from .expert import generate_presence_widget, collapsible, ipympl_canvas, WidgetGroup, CAIMIRAStateBuilder baseline_model = models.CO2ConcentrationModel( @@ -368,20 +368,26 @@ class ModelWidgets(View): return widgets.HBox([widgets.Label('Number of people in the room '), number], layout=widgets.Layout(justify_content='space-between')) def _build_population_presence(self, node, ventilation_node): - presence_start = widgets.FloatRangeSlider(value = node.present_times[0], min = 8., max=13., step=0.1) - presence_finish = widgets.FloatRangeSlider(value = node.present_times[1], min = 13., max=18., step=0.1) + presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0]) + presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1]) def on_presence_start_change(change): - ventilation_node.active.start = change['new'][0] - ventilation_node.active.duration / 60 - node.present_times = (change['new'], presence_finish.value) + new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) + ventilation_node.active.start = new_value[0] - ventilation_node.active.duration / 60 + node.present_times = (new_value, node.present_times[1]) def on_presence_finish_change(change): - node.present_times = (presence_start.value, change['new']) + new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']]) + node.present_times = (node.present_times[0], new_value) presence_start.observe(on_presence_start_change, names=['value']) presence_finish.observe(on_presence_finish_change, names=['value']) - return widgets.HBox([widgets.Label('Population presence'), presence_start, presence_finish], layout = widgets.Layout(justify_content='space-between')) + return widgets.VBox([ + widgets.Label('Exposed presence:'), + widgets.HBox([widgets.Label('Morning:', layout=widgets.Layout(width='15%')), presence_start]), + widgets.HBox([widgets.Label('Afternoon:', layout=widgets.Layout(width='15%')), presence_finish]) + ]) def present(self): return self.widget diff --git a/caimira/models.py b/caimira/models.py index 5483596b..2698ef02 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -186,6 +186,23 @@ class PiecewiseConstant: ) +@dataclass(frozen=True) +class IntPiecewiseContant(PiecewiseConstant): + + #: values of the function between transitions + values: typing.Tuple[int, ...] + + def value(self, time) -> _VectorisedFloat: + if time <= self.transition_times[0] or time > self.transition_times[-1]: + return 0 + + for t1, t2, value in zip(self.transition_times[:-1], + self.transition_times[1:], self.values): + if t1 < time <= t2: + break + return value + + @dataclass(frozen=True) class Room: #: The total volume of the room @@ -779,7 +796,7 @@ class Population: """ #: How many in the population. - number: int + number: typing.Union[int, IntPiecewiseContant] #: The times in which the people are in the room. presence: Interval @@ -795,9 +812,46 @@ class Population: # which might render inactive some RNA copies (virions). host_immunity: float - def person_present(self, time): - return self.presence.triggered(time) + def __post_init__(self): + if isinstance(self.number, int): + if not isinstance(self.presence, Interval): + raise TypeError(f'The presence argument must be an "Interval". Got {type(self.presence)}') + else: + if self.presence is not None: + raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number.') + + def _last_presence_state(self, time: float): + """ + Find the most recent/previous presence state change. + """ + times = set(self.number.transition_times) # type: ignore + t_index: int = np.searchsorted(sorted(times), time) # type: ignore + t_index = max([t_index - 1, 0]) + return sorted(times)[t_index] + def person_present(self, time: float): + # Allow back-compatibility + if isinstance(self.number, int) and isinstance(self.presence, Interval): + return self.presence.triggered(time) + elif isinstance(self.number, IntPiecewiseContant): + return self.number.value(time) != 0 + + def people_present(self, time: typing.Optional[float] = None): + # Allow back-compatibility + if isinstance(self.number, int) and isinstance(self.presence, Interval): + return self.number + + elif isinstance(self.number, IntPiecewiseContant): + # For a given scenario, time falls within a break, + # we should get the previous state occupancy profile. + if time and not self.person_present(time): + if time <= self.number.transition_times[0]: + number = self.number.values[0] + else: + number = int(self.number.value(self._last_presence_state(time))) + else: + number = int(self.number.value(time)) + return number @dataclass(frozen=True) class _PopulationWithVirus(Population): @@ -818,7 +872,7 @@ class _PopulationWithVirus(Population): """ raise NotImplementedError("Subclass must implement") - def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: + def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). @@ -828,12 +882,12 @@ class _PopulationWithVirus(Population): raise NotImplementedError("Subclass must implement") @method_cache - def emission_rate_when_present(self) -> _VectorisedFloat: + def emission_rate_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate if the infected population is present (in virions / h). """ - return (self.emission_rate_per_aerosol_when_present() * + return (self.emission_rate_per_aerosol_when_present(time) * self.aerosols()) def emission_rate(self, time) -> _VectorisedFloat: @@ -851,7 +905,7 @@ class _PopulationWithVirus(Population): # with a declaration of state change time, as is the case for things # like Ventilation. - return self.emission_rate_when_present() + return self.emission_rate_when_present(time) @property def particle(self) -> Particle: @@ -875,14 +929,14 @@ class EmittingPopulation(_PopulationWithVirus): return 1. @method_cache - def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: + def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). This method includes only the diameter-independent variables within the emission rate. It should not be a function of time. """ - return self.known_individual_emission_rate * self.number + return self.known_individual_emission_rate * self.people_present(time) @dataclass(frozen=True) @@ -904,7 +958,7 @@ class InfectedPopulation(_PopulationWithVirus): return self.expiration.aerosols(self.mask) @method_cache - def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat: + def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). @@ -917,8 +971,11 @@ class InfectedPopulation(_PopulationWithVirus): ER = (self.virus.viral_load_in_sputum * self.activity.exhalation_rate * 10 ** 6) - - return ER * self.number + + # if self.people_present(time) == 0: + # return ER + # else: + return ER * self.people_present(time) @property def particle(self) -> Particle: @@ -986,7 +1043,7 @@ class _ConcentrationModelBase: """ return 0. - def normalization_factor(self) -> _VectorisedFloat: + def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: """ Normalization factor (in the same unit as the concentration). This factor is applied to the normalized concentration only @@ -1006,12 +1063,12 @@ class _ConcentrationModelBase: dependence has been solved for. """ if not self.population.person_present(time): - return self.min_background_concentration()/self.normalization_factor() + return self.min_background_concentration()/self.normalization_factor(time) V = self.room.volume RR = self.removal_rate(time) return (1. / (RR * V) + self.min_background_concentration()/ - self.normalization_factor()) + self.normalization_factor(time)) @method_cache def state_change_times(self) -> typing.List[float]: @@ -1020,7 +1077,10 @@ class _ConcentrationModelBase: the times at which their state changes. """ state_change_times = {0.} - state_change_times.update(self.population.presence.transition_times()) + if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): + state_change_times.update(self.population.presence.transition_times()) + elif isinstance(self.population.number, IntPiecewiseContant): + state_change_times.update(self.population.number.interval().transition_times()) state_change_times.update(self.ventilation.transition_times(self.room)) return sorted(state_change_times) @@ -1029,7 +1089,11 @@ class _ConcentrationModelBase: """ First presence time. Before that, the concentration is zero. """ - return self.population.presence.boundaries()[0][0] + if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): + first_presence = self.population.presence.boundaries()[0][0] + elif isinstance(self.population.number, IntPiecewiseContant): + first_presence = self.population.number.interval().boundaries()[0][0] + return first_presence def last_state_change(self, time: float) -> float: """ @@ -1082,7 +1146,11 @@ class _ConcentrationModelBase: # The model always starts at t=0, but we avoid running concentration calculations # before the first presence as an optimisation. if time <= self._first_presence_time(): - return self.min_background_concentration()/self.normalization_factor() + try: + return self.min_background_concentration()/self.normalization_factor(time) + except ZeroDivisionError: + return 0. + next_state_change_time = self._next_state_change(time) RR = self.removal_rate(next_state_change_time) # If RR is 0, conc_limit does not play a role but its computation @@ -1108,7 +1176,7 @@ class _ConcentrationModelBase: to this method. """ return (self._normed_concentration_cached(time) * - self.normalization_factor()) + self.normalization_factor(time)) @method_cache def normed_integrated_concentration(self, start: float, stop: float) -> _VectorisedFloat: @@ -1145,7 +1213,7 @@ class _ConcentrationModelBase: Get the integrated concentration of viruses in the air between the times start and stop. """ return (self.normed_integrated_concentration(start, stop) * - self.normalization_factor()) + self.normalization_factor(start)) @dataclass(frozen=True) @@ -1170,9 +1238,9 @@ class ConcentrationModel(_ConcentrationModelBase): def virus(self) -> Virus: return self.infected.virus - def normalization_factor(self) -> _VectorisedFloat: + def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: # we normalize by the emission rate - return self.infected.emission_rate_when_present() + return self.infected.emission_rate_when_present(time) def removal_rate(self, time: float) -> _VectorisedFloat: # Equilibrium velocity of particle motion toward the floor @@ -1220,10 +1288,10 @@ class CO2ConcentrationModel(_ConcentrationModelBase): """ return self.CO2_atmosphere_concentration - def normalization_factor(self) -> _VectorisedFloat: + def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: # normalization by the CO2 exhaled. # CO2 concentration given in ppm, hence the 1e6 factor. - return (1e6*self.population.number + return (1e6*self.population.people_present(time) *self.population.activity.exhalation_rate *self.CO2_fraction_exhaled) @@ -1469,19 +1537,21 @@ class ExposureModel: by the emission rate of the infected population """ exposure = 0. - for start, stop in self.exposed.presence.boundaries(): - if stop < time1: - continue - elif start > time2: - break - elif start <= time1 and time2<= stop: - exposure += self.concentration_model.normed_integrated_concentration(time1, time2) - elif start <= time1 and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(time1, stop) - elif time1 < start and time2 <= stop: - exposure += self.concentration_model.normed_integrated_concentration(start, time2) - elif time1 <= start and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(start, stop) + if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): + # TODO: Implement the case for the IntPiecewiseConstant + for start, stop in self.exposed.presence.boundaries(): + if stop < time1: + continue + elif start > time2: + break + elif start <= time1 and time2<= stop: + exposure += self.concentration_model.normed_integrated_concentration(time1, time2) + elif start <= time1 and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(time1, stop) + elif time1 < start and time2 <= stop: + exposure += self.concentration_model.normed_integrated_concentration(start, time2) + elif time1 <= start and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(start, stop) return exposure def concentration(self, time: float) -> _VectorisedFloat: @@ -1589,9 +1659,10 @@ class ExposureModel: The number of virus per m^3 deposited on the respiratory tract. """ deposited_exposure: _VectorisedFloat = 0.0 - - for start, stop in self.exposed.presence.boundaries(): - deposited_exposure += self.deposited_exposure_between_bounds(start, stop) + if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): + # TODO: Implement the case for the IntPiecewiseConstant + for start, stop in self.exposed.presence.boundaries(): + deposited_exposure += self.deposited_exposure_between_bounds(start, stop) return deposited_exposure * self.repeats @@ -1610,7 +1681,9 @@ class ExposureModel: if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): sum_probability = 0.0 # Create an equivalent exposure model but changing the number of infected cases. - total_people = self.concentration_model.infected.number + self.exposed.number + if isinstance(self.concentration_model.infected.number, int) and isinstance(self.exposed.number, int): + # TODO: Implement the case for the IntPiecewiseConstant + total_people = self.concentration_model.infected.number + self.exposed.number max_num_infected = (total_people if total_people < 10 else 10) # The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability. # To be on the safe side, a hard coded limit with a safety margin of 2x was set. diff --git a/caimira/monte_carlo/models.py b/caimira/monte_carlo/models.py index 47e8ebab..ccaafc59 100644 --- a/caimira/monte_carlo/models.py +++ b/caimira/monte_carlo/models.py @@ -74,6 +74,14 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model elif new_field.type == typing.Tuple[caimira.models.SpecificInterval, ...]: SI = getattr(sys.modules[__name__], "SpecificInterval") field_type = typing.Tuple[typing.Union[caimira.models.SpecificInterval, SI], ...] + + elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseContant]: + IPC = getattr(sys.modules[__name__], "IntPiecewiseContant") + field_type = typing.Union[caimira.models.IntPiecewiseContant, IPC] + elif new_field.type == typing.Union[caimira.models.Interval, None]: + I = getattr(sys.modules[__name__], "Interval") + field_type = typing.Union[caimira.models.Interval, I] + else: # Check that we don't need to do anything with this type. for item in new_field.type.__args__: diff --git a/caimira/tests/models/test_concentration_model.py b/caimira/tests/models/test_concentration_model.py index 568e6757..5d152b52 100644 --- a/caimira/tests/models/test_concentration_model.py +++ b/caimira/tests/models/test_concentration_model.py @@ -4,6 +4,7 @@ import numpy as np import numpy.testing as npt import pytest from dataclasses import dataclass +import typing from caimira import models @@ -32,7 +33,7 @@ class KnownConcentrationModelBase(models._ConcentrationModelBase): def min_background_concentration(self) -> float: return self.known_min_background_concentration - def normalization_factor(self) -> float: + def normalization_factor(self, time: typing.Optional[float] = None) -> float: return self.known_normalization_factor From 70f7e865245041c830b378d1e3b0221af405547d Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Wed, 15 Mar 2023 15:42:57 +0100 Subject: [PATCH 2/8] test for dynamic occupants --- caimira/models.py | 4 - .../tests/models/test_dynamic_population.py | 98 +++++++++++++++++++ 2 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 caimira/tests/models/test_dynamic_population.py diff --git a/caimira/models.py b/caimira/models.py index 2698ef02..71fbdec3 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -971,10 +971,6 @@ class InfectedPopulation(_PopulationWithVirus): ER = (self.virus.viral_load_in_sputum * self.activity.exhalation_rate * 10 ** 6) - - # if self.people_present(time) == 0: - # return ER - # else: return ER * self.people_present(time) @property diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py new file mode 100644 index 00000000..80808524 --- /dev/null +++ b/caimira/tests/models/test_dynamic_population.py @@ -0,0 +1,98 @@ +import numpy as np +import numpy.testing as npt +import pytest + +from caimira import models +import caimira.dataclass_utils as dc_utils + +@pytest.fixture +def full_exposure_model(): + return models.ExposureModel( + concentration_model=models.ConcentrationModel( + room=models.Room(volume=100), + ventilation=models.AirChange( + active=models.PeriodicInterval(120, 120), air_exch=0.25), + infected=models.InfectedPopulation( + number=1, + presence=models.SpecificInterval(((8, 12), (13, 17), )), + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + expiration=models.Expiration.types['Breathing'], + virus=models.Virus.types['SARS_CoV_2'], + host_immunity=0. + ), + ), + short_range=(), + exposed=models.Population( + number=10, + presence=models.SpecificInterval(((8, 12), (13, 17), )), + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + host_immunity=0. + ), + geographical_data=(), + ) + + +@pytest.fixture +def baseline_infected_population_number(): + return models.InfectedPopulation( + number=models.IntPiecewiseContant( + (8, 12, 13, 17), (1, 0, 1)), + presence=None, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + virus=models.Virus.types['SARS_CoV_2'], + expiration=models.Expiration.types['Breathing'], + host_immunity=0., + ) + + +@pytest.fixture +def baseline_exposed_population_number(): + return models.Population( + number=models.IntPiecewiseContant( + (8, 12, 13, 17), (1, 0, 1)), + presence=None, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + host_immunity=0., + ) + +@pytest.mark.parametrize( + "time", + [4., 8., 10., 12., 13., 14., 16., 20., 24.], +) +def test_population_number(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation, time: float): + + int_population_number: models.InfectedPopulation = full_exposure_model.concentration_model.infected + assert isinstance(int_population_number.number, int) + assert isinstance(int_population_number.presence, models.Interval) + + piecewise_population_number: models.InfectedPopulation = baseline_infected_population_number + assert isinstance(piecewise_population_number.number, + models.IntPiecewiseContant) + assert piecewise_population_number.presence is None + + assert int_population_number.person_present(time) == piecewise_population_number.person_present(time) + assert int_population_number.people_present(time) == piecewise_population_number.people_present(time) + + +@pytest.mark.parametrize( + "time", + [4., 8., 10., 12., 13., 14., 16., 20., 24.], +) +def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation, + baseline_exposed_population_number: models.Population, + time: float): + + dynamic_model: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected': baseline_infected_population_number, + 'exposed': baseline_exposed_population_number, + } + ) + assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) From 801955317faa435f64d5139af1846d7363a4635c Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Thu, 16 Mar 2023 16:07:35 +0100 Subject: [PATCH 3/8] added verifications for unexpected jumps --- caimira/models.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/caimira/models.py b/caimira/models.py index 71fbdec3..8fa1a7d4 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -847,6 +847,8 @@ class Population: if time and not self.person_present(time): if time <= self.number.transition_times[0]: number = self.number.values[0] + elif time >= self.number.transition_times[-1]: + number = self.number.values[-1] else: number = int(self.number.value(self._last_presence_state(time))) else: @@ -904,7 +906,6 @@ class _PopulationWithVirus(Population): # itself a function of time. Any change in rate must be accompanied # with a declaration of state change time, as is the case for things # like Ventilation. - return self.emission_rate_when_present(time) @property @@ -1142,12 +1143,10 @@ class _ConcentrationModelBase: # The model always starts at t=0, but we avoid running concentration calculations # before the first presence as an optimisation. if time <= self._first_presence_time(): - try: - return self.min_background_concentration()/self.normalization_factor(time) - except ZeroDivisionError: - return 0. - + return self.min_background_concentration()/self.normalization_factor(time) + next_state_change_time = self._next_state_change(time) + RR = self.removal_rate(next_state_change_time) # If RR is 0, conc_limit does not play a role but its computation # would raise an error -> we set it to zero. @@ -1157,11 +1156,12 @@ class _ConcentrationModelBase: conc_limit = 0. t_last_state_change = self.last_state_change(time) - conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change) + conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change)*self.normalization_factor(t_last_state_change) delta_time = time - t_last_state_change fac = np.exp(-RR * delta_time) - return conc_limit * (1 - fac) + conc_at_last_state_change * fac + + return conc_limit * (1 - fac) + conc_at_last_state_change/self.normalization_factor(time) * fac def concentration(self, time: float) -> _VectorisedFloat: """ From e494151d788b2cfdd43151a124e1628382b17a76 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Wed, 29 Mar 2023 12:04:36 +0200 Subject: [PATCH 4/8] added test for dynamic and static population --- .../tests/models/test_dynamic_population.py | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index 80808524..b1c139e9 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -96,3 +96,83 @@ def test_concentration_model_dynamic_population(full_exposure_model: models.Expo } ) assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) + + +@pytest.mark.parametrize( + "time, number_of_infected", + [ + [8., 1], + [10., 2], + [12., 3], + [13., 4], + [15., 5], + ]) +def test_sum_of_models(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation, + time: float, + number_of_infected: int): + + + static_multiple_exposure_model: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': number_of_infected, + } + ) + + dynamic_single_exposure_model: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected': baseline_infected_population_number, + } + ) + + npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected) + + +def test_dynamic_dose(full_exposure_model): + + dynamic_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected': models.InfectedPopulation( + number=models.IntPiecewiseContant( + (8, 10, 12, 13, 17), (1, 2, 0, 3)), + presence=None, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + virus=models.Virus.types['SARS_CoV_2'], + expiration=models.Expiration.types['Breathing'], + host_immunity=0., + ), + } + ) + + single_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': 1, + 'concentration_model.infected.presence': models.SpecificInterval(((8, 10), )), + } + ) + + two_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': 2, + 'concentration_model.infected.presence': models.SpecificInterval(((10, 12), )), + } + ) + + three_infected: models.ExposureModel = dc_utils.nested_replace( + full_exposure_model, + { + 'concentration_model.infected.number': 3, + 'concentration_model.infected.presence': models.SpecificInterval(((13, 17), )), + } + ) + + dynamic_exposure = dynamic_infected.deposited_exposure() + static_exposure = np.sum([model.deposited_exposure() for model in (single_infected, two_infected, three_infected)]) + + npt.assert_almost_equal(dynamic_exposure, static_exposure) \ No newline at end of file From fe19cb5976916d6426f7ef36374748bad559f0a4 Mon Sep 17 00:00:00 2001 From: Nicolas Mounet Date: Thu, 6 Apr 2023 10:22:53 +0200 Subject: [PATCH 5/8] Change all normalization factors and emission rates (when present) into 'per person' quantities (i.e. do not normalize anymore by the number of infected people) - to allow dynamic number of infected people. Changing the test accordingly, and the documentation. Adding a __post_init__ check in ExposureModel to forbid the use of dynamic number of exposed people, for now. --- caimira/apps/calculator/report_generator.py | 6 +- caimira/apps/expert.py | 2 +- caimira/docs/full_diameter_dependence.rst | 26 +-- caimira/models.py | 150 ++++++++---------- .../tests/models/test_concentration_model.py | 6 +- .../tests/models/test_dynamic_population.py | 2 +- caimira/tests/models/test_exposure_model.py | 7 +- caimira/tests/test_monte_carlo_full_models.py | 12 +- 8 files changed, 98 insertions(+), 113 deletions(-) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index 6b122948..0630c1cb 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -145,7 +145,7 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing prob = np.array(model.infection_probability()) prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() - er = np.array(model.concentration_model.infected.emission_rate_when_present()).mean() + er = np.array(model.concentration_model.infected.emission_rate_per_person_when_present()).mean() exposed_occupants = model.exposed.number expected_new_cases = np.array(model.expected_new_cases()).mean() uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(model))) if form.conditional_probability_plot else None @@ -166,12 +166,12 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing "cumulative_doses": list(cumulative_doses), "long_range_cumulative_doses": list(long_range_cumulative_doses), "prob_inf": prob.mean(), - "prob_inf_sd": np.std(prob), + "prob_inf_sd": prob.std(), "prob_dist": list(prob), "prob_hist_count": list(prob_dist_count), "prob_hist_bins": list(prob_dist_bins), "prob_probabilistic_exposure": prob_probabilistic_exposure, - "emission_rate": er, + "emission_rate_per_person": er, "exposed_occupants": exposed_occupants, "expected_new_cases": expected_new_cases, "uncertainties_plot_src": uncertainties_plot_src, diff --git a/caimira/apps/expert.py b/caimira/apps/expert.py index 34a6a062..d32b1dca 100644 --- a/caimira/apps/expert.py +++ b/caimira/apps/expert.py @@ -209,7 +209,7 @@ class ExposureModelResult(View): def update_textual_result(self, model: models.ExposureModel): lines = [] P = np.array(model.infection_probability()).mean() - lines.append(f'Emission rate (virus/hr): {np.round(model.concentration_model.infected.emission_rate_when_present(),0)}') + lines.append(f'Emission rate per infected person (virus/hr): {np.round(model.concentration_model.infected.emission_rate_per_person_when_present(),0)}') lines.append(f'Probability of infection: {np.round(P, 0)}%') lines.append(f'Number of exposed: {model.exposed.number}') diff --git a/caimira/docs/full_diameter_dependence.rst b/caimira/docs/full_diameter_dependence.rst index 1e684ebb..219eb78f 100644 --- a/caimira/docs/full_diameter_dependence.rst +++ b/caimira/docs/full_diameter_dependence.rst @@ -26,7 +26,7 @@ provided the sample size is large enough. Example of the MC integration over the It is important to distinguish between 1) Monte-Carlo random variables (which are vectorised independently on its diameter-dependence) and 2) numerical Monte-Carlo integration for the diameter-dependence. Since the integral of the diameter-dependent variables are solved when computing the dose -- :math:`\mathrm{vD^{total}}` -- while performing some of the intermediate calculations, -we normalize the results by *dividing* by the Monte-Carlo variables that are diameter-independent, so that they are not considered in the Monte-Carlo integration (e.g. the **viral load** parameter, or the result of the :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_when_present` method). +we normalize the results by *dividing* by the Monte-Carlo variables that are diameter-independent, so that they are not considered in the Monte-Carlo integration (e.g. the **viral load** parameter, or the result of the :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_per_person_when_present` method). Expiration ========== @@ -62,9 +62,9 @@ Note that :math:`D_{\mathrm{max}}` value will differ, depending on the type of e In the code, for a given Expiration, we use different methods to perform the calculations *step-by-step*: -1. Calculate the non aerosol-dependent quantities in the emission rate, which is the multiplication of the diameter-**independent** variables: :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_when_present`. This corresponds to the :math:`\mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` part of the :math:`\mathrm{vR}(D)` equation. +1. Calculate the non aerosol-dependent quantities in the emission rate per person infected, which is the multiplication of the diameter-**independent** variables: :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_per_person_when_present`. This corresponds to the :math:`\mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` part of the :math:`\mathrm{vR}(D)` equation. 2. Calculate the diameter-**dependent** variable :meth:`caimira.models.InfectedPopulation.aerosols`, which is the result of :math:`E_{c,j}(D) = N_p(D) \cdot V_p(D) \cdot (1 − η_\mathrm{out}(D))` (in mL/(m\ :sup:`3` \.µm)), with :math:`N_p(D)` being the product of the BLO distribution by the scaling factor :math:`cn`. Note that this result is not integrated over the diameters at this stage, thus the units are still *'per aerosol diameter'*. -3. Calculate the full emission rate, which is the multiplication of the two previous methods, and corresponds to :math:`\mathrm{vR(D)}`: :meth:`caimira.models._PopulationWithVirus.emission_rate_when_present`. +3. Calculate the full emission rate (per person infected), which is the multiplication of the two previous methods, and corresponds to :math:`\mathrm{vR(D)}`: :meth:`caimira.models._PopulationWithVirus.emission_rate_per_person_when_present`. Note that the diameter-dependence is kept at this stage. Since other parameters downstream in code are also diameter-dependent, the Monte-Carlo integration over the aerosol sizes is computed at the level of the dose :math:`\mathrm{vD^{total}}`. In case one would like to have intermediate results for emission rate, perform the Monte-Carlo integration of :math:`E_{c, j}^{\mathrm{total}}` and compute :math:`\mathrm{vR^{total}} =\mathrm{vl_{in}} \cdot E_{c, j}^{\mathrm{total}} \cdot \mathrm{BR_k}`. @@ -97,8 +97,8 @@ the code computes first a normalized version of the concentration, i.e. divided To summarize, we can split the concentration in two different formulations: -* Normalized concentration :meth:`caimira.models._ConcentrationModelBase._normed_concentration`: :math:`\mathrm{C_\mathrm{LR, normed}}(t, D)` that computes the concentration without including the emission rate. -* Concentration :meth:`caimira.models._ConcentrationModelBase.concentration` : :math:`C_{\mathrm{LR}}(t, D) = \mathrm{C_\mathrm{LR, normed}}(t, D) \cdot \mathrm{vR}(D)`, where :math:`\mathrm{vR}(D)` is the result of the :meth:`caimira.models._PopulationWithVirus.emission_rate_when_present` method. +* Normalized concentration :meth:`caimira.models._ConcentrationModelBase._normed_concentration`: :math:`\mathrm{C_\mathrm{LR, normed}}(t, D)` that computes the concentration without including the emission rate per person infected. +* Concentration :meth:`caimira.models._ConcentrationModelBase.concentration` : :math:`C_{\mathrm{LR}}(t, D) = \mathrm{C_\mathrm{LR, normed}}(t, D) \cdot \mathrm{vR}(D)`, where :math:`\mathrm{vR}(D)` is the result of the :meth:`caimira.models._PopulationWithVirus.emission_rate_per_person_when_present` method. Note that in order to get the total concentration value in this stage, the final result should be averaged over the particle diameters (i.e. Monte-Carlo integration over diameters, see above). For the calculator app report, the total concentration (MC integral over the diameter) is performed only when generating the plot. @@ -106,8 +106,8 @@ Otherwise, the diameter-dependence continues until we compute the inhaled dose i The following methods calculate the integrated concentration between two times. They are mostly used when calculating the **dose**: -* :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`, :math:`\mathrm{C_\mathrm{normed}}(D)` that returns the integrated long-range concentration of viruses in the air, between any two times, normalized by the emission rate. Note that this method performs the integral between any two times of the previously mentioned :meth:`caimira.models._ConcentrationModelBase._normed_concentration` method. -* :meth:`caimira.models._ConcentrationModelBase.integrated_concentration`, :math:`C(D)`, that returns the same result as the previous one, but multiplied by the emission rate. +* :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`, :math:`\mathrm{C_\mathrm{normed}}(D)` that returns the integrated long-range concentration of viruses in the air, between any two times, normalized by the emission rate per person infected. Note that this method performs the integral between any two times of the previously mentioned :meth:`caimira.models._ConcentrationModelBase._normed_concentration` method. +* :meth:`caimira.models._ConcentrationModelBase.integrated_concentration`, :math:`C(D)`, that returns the same result as the previous one, but multiplied by the emission rate (per person infected). The integral over the exposure times is calculated directly in the class (integrated methods). @@ -221,14 +221,14 @@ Long-range approach ******************* Regarding the concentration part of the long-range exposure (concentration integrated over time, :math:`\int_{t1}^{t2}C_{\mathrm{LR}}(t, D)\;\mathrm{d}t`), the respective method is :meth:`caimira.models.ExposureModel._long_range_normed_exposure_between_bounds`, -which uses the long-range exposure (concentration) between two bounds (time1 and time2), normalized by the emission rate of the infected population, calculated from :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`. +which uses the long-range exposure (concentration) between two bounds (time1 and time2), normalized by the emission rate of the infected population (per person infected), calculated from :meth:`caimira.models._ConcentrationModelBase.normed_integrated_concentration`. The former method filters out the given bounds considering the breaks through the day (i.e. the time intervals during which there is no exposition to the virus) and retrieves the integrated long-range concentration of viruses in the air between any two times. After the calculations of the integrated concentration over the time, in order to calculate the final dose, we have to compute the remaining factors in the above equation. Note that the **Monte-Carlo integration over the diameters is performed at this stage**, where all the diameter-dependent parameters are grouped together to calculate the final average (:code:`np.mean()`). -Since, in the previous chapters, the quantities where normalised by the emission rate, one will need to re-incorporate it in the equations before performing the MC integrations over :math:`D`. -For that we need to split :math:`\mathrm{vR}(D)` (:meth:`caimira.models._PopulationWithVirus.emission_rate_when_present`) in diameter-dependent and diameter-independent quantities: +Since, in the previous chapters, the quantities where normalised by the emission rate per person infected, one will need to re-incorporate it in the equations before performing the MC integrations over :math:`D`. +For that we need to split :math:`\mathrm{vR}(D)` (:meth:`caimira.models._PopulationWithVirus.emission_rate_per_person_when_present`) in diameter-dependent and diameter-independent quantities: :math:`\mathrm{vR}(D) = \mathrm{vR}(D-\mathrm{dependent}) \times \mathrm{vR}(D-\mathrm{independent})` @@ -236,7 +236,7 @@ with :math:`\mathrm{vR}(D-\mathrm{dependent}) = \mathrm{cn} \cdot V_p(D) \cdot (1 − \mathrm{η_{out}}(D))` - :meth:`caimira.models.InfectedPopulation.aerosols` -:math:`\mathrm{vR}(D-\mathrm{independent}) = \mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` - :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_when_present` +:math:`\mathrm{vR}(D-\mathrm{independent}) = \mathrm{vl_{in}} \cdot \mathrm{BR_{k}}` - :meth:`caimira.models.InfectedPopulation.emission_rate_per_aerosol_per_person_when_present` In other words, in the code the procedure is the following (all performed in :meth:`caimira.models.ExposureModel.long_range_deposited_exposure_between_bounds` method): @@ -248,7 +248,7 @@ In other words, in the code the procedure is the following (all performed in :me * in order to complete the equation, multiply by the remaining diameter-independent variables in :math:`\mathrm{vD}` to obtain the total value: :math:`\mathrm{vD^{total}} = \mathrm{vD_{emission-rate}} \cdot \mathrm{BR}_{\mathrm{k}} \cdot (1-\eta_{\mathrm{in}}) \cdot f_{\mathrm{inf}}`; * in the end, the dose is a vectorized float used in the probability of infection formula. -**Note**: The aerosol volume concentration (*aerosols*) is introduced because the integrated concentration over the time was previously normalized by the emission rate. +**Note**: The aerosol volume concentration (*aerosols*) is introduced because the integrated concentration over the time was previously normalized by the emission rate (per person). Here, to calculate the integral over the diameters we also need to consider the diameter-dependent variables that are on the emission rate, represented by the aerosol volume concentration which depends on the diameter and on the mask type: :math:`\mathrm{aerosols} = \mathrm{cn} \cdot V_p(D) \cdot (1 − \mathrm{η_{out}}(D))` . @@ -339,4 +339,4 @@ REFERENCES ========== .. [1] Jia, Wei, et al. "Exposure and respiratory infection risk via the short-range airborne route." Building and environment 219 (2022): 109166. `doi.org/10.1016/j.buildenv.2022.109166 `_ -.. [2] Henriques, Andre, et al. "Modelling airborne transmission of SARS-CoV-2 using CARA: risk assessment for enclosed spaces." Interface Focus 12.2 (2022): 20210076. `doi.org/10.1098/rsfs.2021.0076 `_ \ No newline at end of file +.. [2] Henriques, Andre, et al. "Modelling airborne transmission of SARS-CoV-2 using CARA: risk assessment for enclosed spaces." Interface Focus 12.2 (2022): 20210076. `doi.org/10.1098/rsfs.2021.0076 `_ diff --git a/caimira/models.py b/caimira/models.py index 8fa1a7d4..38924f8c 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -819,15 +819,6 @@ class Population: else: if self.presence is not None: raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number.') - - def _last_presence_state(self, time: float): - """ - Find the most recent/previous presence state change. - """ - times = set(self.number.transition_times) # type: ignore - t_index: int = np.searchsorted(sorted(times), time) # type: ignore - t_index = max([t_index - 1, 0]) - return sorted(times)[t_index] def person_present(self, time: float): # Allow back-compatibility @@ -835,25 +826,15 @@ class Population: return self.presence.triggered(time) elif isinstance(self.number, IntPiecewiseContant): return self.number.value(time) != 0 - - def people_present(self, time: typing.Optional[float] = None): + + def people_present(self, time: float): # Allow back-compatibility - if isinstance(self.number, int) and isinstance(self.presence, Interval): - return self.number - - elif isinstance(self.number, IntPiecewiseContant): - # For a given scenario, time falls within a break, - # we should get the previous state occupancy profile. - if time and not self.person_present(time): - if time <= self.number.transition_times[0]: - number = self.number.values[0] - elif time >= self.number.transition_times[-1]: - number = self.number.values[-1] - else: - number = int(self.number.value(self._last_presence_state(time))) - else: - number = int(self.number.value(time)) - return number + if isinstance(self.number, int): + return self.number * self.person_present(time) + + else: + return int(self.number.value(time)) + @dataclass(frozen=True) class _PopulationWithVirus(Population): @@ -874,22 +855,22 @@ class _PopulationWithVirus(Population): """ raise NotImplementedError("Subclass must implement") - def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_aerosol_per_person_when_present(self) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, - if the infected population is present, in (virion.cm^3)/(mL.h). + per person, if the infected population is present, in (virion.cm^3)/(mL.h). This method includes only the diameter-independent variables within the emission rate. It should not be a function of time. """ raise NotImplementedError("Subclass must implement") @method_cache - def emission_rate_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_person_when_present(self) -> _VectorisedFloat: """ - The emission rate if the infected population is present + The emission rate if the infected population is present, per person (in virions / h). """ - return (self.emission_rate_per_aerosol_when_present(time) * + return (self.emission_rate_per_aerosol_per_person_when_present() * self.aerosols()) def emission_rate(self, time) -> _VectorisedFloat: @@ -906,7 +887,7 @@ class _PopulationWithVirus(Population): # itself a function of time. Any change in rate must be accompanied # with a declaration of state change time, as is the case for things # like Ventilation. - return self.emission_rate_when_present(time) + return self.emission_rate_per_person_when_present() * self.people_present(time) @property def particle(self) -> Particle: @@ -930,14 +911,14 @@ class EmittingPopulation(_PopulationWithVirus): return 1. @method_cache - def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_aerosol_per_person_when_present(self) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, - if the infected population is present, in (virion.cm^3)/(mL.h). + per person, if the infected population is present, in (virion.cm^3)/(mL.h). This method includes only the diameter-independent variables within the emission rate. It should not be a function of time. """ - return self.known_individual_emission_rate * self.people_present(time) + return self.known_individual_emission_rate @dataclass(frozen=True) @@ -959,7 +940,7 @@ class InfectedPopulation(_PopulationWithVirus): return self.expiration.aerosols(self.mask) @method_cache - def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def emission_rate_per_aerosol_per_person_when_present(self) -> _VectorisedFloat: """ The emission rate of virions in the expired air per mL of respiratory fluid, if the infected population is present, in (virion.cm^3)/(mL.h). @@ -972,7 +953,7 @@ class InfectedPopulation(_PopulationWithVirus): ER = (self.virus.viral_load_in_sputum * self.activity.exhalation_rate * 10 ** 6) - return ER * self.people_present(time) + return ER @property def particle(self) -> Particle: @@ -1040,7 +1021,7 @@ class _ConcentrationModelBase: """ return 0. - def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def normalization_factor(self) -> _VectorisedFloat: """ Normalization factor (in the same unit as the concentration). This factor is applied to the normalized concentration only @@ -1059,13 +1040,14 @@ class _ConcentrationModelBase: can be put back in front of the concentration after the time dependence has been solved for. """ - if not self.population.person_present(time): - return self.min_background_concentration()/self.normalization_factor(time) + #if not self.population.person_present(time): + # return self.min_background_concentration()/self.normalization_factor() + V = self.room.volume RR = self.removal_rate(time) - return (1. / (RR * V) + self.min_background_concentration()/ - self.normalization_factor(time)) + return (self.population.people_present(time) / (RR * V) + + self.min_background_concentration()/self.normalization_factor()) @method_cache def state_change_times(self) -> typing.List[float]: @@ -1143,7 +1125,7 @@ class _ConcentrationModelBase: # The model always starts at t=0, but we avoid running concentration calculations # before the first presence as an optimisation. if time <= self._first_presence_time(): - return self.min_background_concentration()/self.normalization_factor(time) + return self.min_background_concentration()/self.normalization_factor() next_state_change_time = self._next_state_change(time) @@ -1156,12 +1138,12 @@ class _ConcentrationModelBase: conc_limit = 0. t_last_state_change = self.last_state_change(time) - conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change)*self.normalization_factor(t_last_state_change) + conc_at_last_state_change = self._normed_concentration_cached(t_last_state_change) delta_time = time - t_last_state_change fac = np.exp(-RR * delta_time) - return conc_limit * (1 - fac) + conc_at_last_state_change/self.normalization_factor(time) * fac + return conc_limit * (1 - fac) + conc_at_last_state_change * fac def concentration(self, time: float) -> _VectorisedFloat: """ @@ -1172,7 +1154,7 @@ class _ConcentrationModelBase: to this method. """ return (self._normed_concentration_cached(time) * - self.normalization_factor(time)) + self.normalization_factor()) @method_cache def normed_integrated_concentration(self, start: float, stop: float) -> _VectorisedFloat: @@ -1209,7 +1191,7 @@ class _ConcentrationModelBase: Get the integrated concentration of viruses in the air between the times start and stop. """ return (self.normed_integrated_concentration(start, stop) * - self.normalization_factor(start)) + self.normalization_factor()) @dataclass(frozen=True) @@ -1234,9 +1216,9 @@ class ConcentrationModel(_ConcentrationModelBase): def virus(self) -> Virus: return self.infected.virus - def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: + def normalization_factor(self) -> _VectorisedFloat: # we normalize by the emission rate - return self.infected.emission_rate_when_present(time) + return self.infected.emission_rate_per_person_when_present() def removal_rate(self, time: float) -> _VectorisedFloat: # Equilibrium velocity of particle motion toward the floor @@ -1284,11 +1266,10 @@ class CO2ConcentrationModel(_ConcentrationModelBase): """ return self.CO2_atmosphere_concentration - def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat: - # normalization by the CO2 exhaled. + def normalization_factor(self) -> _VectorisedFloat: + # normalization by the CO2 exhaled per person. # CO2 concentration given in ppm, hence the 1e6 factor. - return (1e6*self.population.people_present(time) - *self.population.activity.exhalation_rate + return (1e6*self.population.activity.exhalation_rate *self.CO2_fraction_exhaled) @@ -1515,8 +1496,11 @@ class ExposureModel: and not ( all(np.isscalar(c_model.virus.decay_constant(c_model.room.humidity, c_model.room.inside_temp.value(time)) + c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))): - raise ValueError("If the diameter is an array, none of the ventilation parameters " - "or virus decay constant can be arrays at the same time.") + raise ValueError("If the diameter is an array, none of the ventilation parameters " + "or virus decay constant can be arrays at the same time.") + if not isinstance(self.exposed.number, int): + raise NotImplementedError("Cannot use dynamic occupancy for" + " the exposed population") def long_range_fraction_deposited(self) -> _VectorisedFloat: """ @@ -1533,21 +1517,19 @@ class ExposureModel: by the emission rate of the infected population """ exposure = 0. - if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): - # TODO: Implement the case for the IntPiecewiseConstant - for start, stop in self.exposed.presence.boundaries(): - if stop < time1: - continue - elif start > time2: - break - elif start <= time1 and time2<= stop: - exposure += self.concentration_model.normed_integrated_concentration(time1, time2) - elif start <= time1 and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(time1, stop) - elif time1 < start and time2 <= stop: - exposure += self.concentration_model.normed_integrated_concentration(start, time2) - elif time1 <= start and stop < time2: - exposure += self.concentration_model.normed_integrated_concentration(start, stop) + for start, stop in self.exposed.presence.boundaries(): + if stop < time1: + continue + elif start > time2: + break + elif start <= time1 and time2<= stop: + exposure += self.concentration_model.normed_integrated_concentration(time1, time2) + elif start <= time1 and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(time1, stop) + elif time1 < start and time2 <= stop: + exposure += self.concentration_model.normed_integrated_concentration(start, time2) + elif time1 <= start and stop < time2: + exposure += self.concentration_model.normed_integrated_concentration(start, stop) return exposure def concentration(self, time: float) -> _VectorisedFloat: @@ -1565,7 +1547,8 @@ class ExposureModel: def long_range_deposited_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat: deposited_exposure = 0. - emission_rate_per_aerosol = self.concentration_model.infected.emission_rate_per_aerosol_when_present() + emission_rate_per_aerosol_per_person = \ + self.concentration_model.infected.emission_rate_per_aerosol_per_person_when_present() aerosols = self.concentration_model.infected.aerosols() f_inf = self.concentration_model.infected.fraction_of_infectious_virus() fdep = self.long_range_fraction_deposited() @@ -1585,10 +1568,11 @@ class ExposureModel: # one should not take any mean at this stage. dep_exposure_integrated = self._long_range_normed_exposure_between_bounds(time1, time2)*aerosols*fdep - # Then we multiply by the diameter-independent quantity emission_rate_per_aerosol, + # Then we multiply by the diameter-independent quantity emission_rate_per_aerosol_per_person, # and parameters of the vD equation (i.e. BR_k and n_in). - deposited_exposure += (dep_exposure_integrated * emission_rate_per_aerosol * - self.exposed.activity.inhalation_rate * + deposited_exposure += (dep_exposure_integrated * + emission_rate_per_aerosol_per_person * + self.exposed.activity.inhalation_rate * (1 - self.exposed.mask.inhale_efficiency())) # In the end we multiply the final results by the fraction of infectious virus of the vD equation. @@ -1655,10 +1639,8 @@ class ExposureModel: The number of virus per m^3 deposited on the respiratory tract. """ deposited_exposure: _VectorisedFloat = 0.0 - if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval): - # TODO: Implement the case for the IntPiecewiseConstant - for start, stop in self.exposed.presence.boundaries(): - deposited_exposure += self.deposited_exposure_between_bounds(start, stop) + for start, stop in self.exposed.presence.boundaries(): + deposited_exposure += self.deposited_exposure_between_bounds(start, stop) return deposited_exposure * self.repeats @@ -1676,11 +1658,13 @@ class ExposureModel: def total_probability_rule(self) -> _VectorisedFloat: if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): sum_probability = 0.0 + if not isinstance(self.concentration_model.infected.number, int): + raise NotImplementedError("Cannot compute total probability " + "(including incidence rate) with dynamic occupancy") + # Create an equivalent exposure model but changing the number of infected cases. - if isinstance(self.concentration_model.infected.number, int) and isinstance(self.exposed.number, int): - # TODO: Implement the case for the IntPiecewiseConstant - total_people = self.concentration_model.infected.number + self.exposed.number - max_num_infected = (total_people if total_people < 10 else 10) + total_people = self.concentration_model.infected.number + self.exposed.number + max_num_infected = (total_people if total_people < 10 else 10) # The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability. # To be on the safe side, a hard coded limit with a safety margin of 2x was set. # Therefore we decided a hard limit of 10 infected people. diff --git a/caimira/tests/models/test_concentration_model.py b/caimira/tests/models/test_concentration_model.py index 5d152b52..f8f42106 100644 --- a/caimira/tests/models/test_concentration_model.py +++ b/caimira/tests/models/test_concentration_model.py @@ -33,7 +33,7 @@ class KnownConcentrationModelBase(models._ConcentrationModelBase): def min_background_concentration(self) -> float: return self.known_min_background_concentration - def normalization_factor(self, time: typing.Optional[float] = None) -> float: + def normalization_factor(self) -> float: return self.known_normalization_factor @@ -107,7 +107,7 @@ def simple_conc_model(): @pytest.fixture def dummy_population(simple_conc_model) -> models.Population: return models.Population( - number=10, + number=1, presence=simple_conc_model.infected.presence, mask=models.Mask.types['Type I'], activity=models.Activity.types['Seated'], @@ -268,7 +268,7 @@ def test_zero_ventilation_rate( ventilation = simple_conc_model.ventilation, known_population = dummy_population, known_removal_rate = known_removal_rate, - known_normalization_factor=10., + known_normalization_factor=1., known_min_background_concentration = known_min_background_concentration) normed_concentration = known_conc_model.concentration(1) diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index b1c139e9..e2737e10 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -175,4 +175,4 @@ def test_dynamic_dose(full_exposure_model): dynamic_exposure = dynamic_infected.deposited_exposure() static_exposure = np.sum([model.deposited_exposure() for model in (single_infected, two_infected, three_infected)]) - npt.assert_almost_equal(dynamic_exposure, static_exposure) \ No newline at end of file + npt.assert_almost_equal(dynamic_exposure, static_exposure) diff --git a/caimira/tests/models/test_exposure_model.py b/caimira/tests/models/test_exposure_model.py index ef9d0c4e..4c48b480 100644 --- a/caimira/tests/models/test_exposure_model.py +++ b/caimira/tests/models/test_exposure_model.py @@ -24,7 +24,7 @@ class KnownNormedconcentration(models.ConcentrationModel): return 1.e50 def _normed_concentration_limit(self, time: float) -> models._VectorisedFloat: - return self.normed_concentration_function(time) + return self.normed_concentration_function(time) * self.infected.number def state_change_times(self): return [0., 24.] @@ -33,7 +33,7 @@ class KnownNormedconcentration(models.ConcentrationModel): return 24. def _normed_concentration(self, time: float) -> models._VectorisedFloat: # noqa - return self.normed_concentration_function(time) + return self.normed_concentration_function(time) * self.infected.number halftime = models.PeriodicInterval(120, 60) @@ -67,7 +67,8 @@ def known_concentrations(func): expiration=models.Expiration.types['Speaking'], host_immunity=0., ) - normed_func = lambda x: func(x) / dummy_infected_population.emission_rate_when_present() + normed_func = lambda x: (func(x) / + dummy_infected_population.emission_rate_per_person_when_present()) return KnownNormedconcentration(dummy_room, dummy_ventilation, dummy_infected_population, 0.3, normed_func) diff --git a/caimira/tests/test_monte_carlo_full_models.py b/caimira/tests/test_monte_carlo_full_models.py index 9ce540e0..b6e3948c 100644 --- a/caimira/tests/test_monte_carlo_full_models.py +++ b/caimira/tests/test_monte_carlo_full_models.py @@ -313,19 +313,19 @@ def waiting_room_mc(): @retry(tries=10) @pytest.mark.parametrize( - "mc_model, expected_pi, expected_new_cases, expected_dose, expected_ER", + "mc_model, expected_pi, expected_new_cases, expected_dose, expected_ER_per_person", [ ["shared_office_mc", 5.38, 0.16, 3.350, 1056], ["classroom_mc", 8.21, 1.56, 11.356, 7416], ["ski_cabin_mc", 12.92, 0.39, 21.796, 10231], ["skagit_chorale_mc",61.01, 36.53, 84.730, 190422], ["bus_ride_mc", 10.59, 7.06, 6.650, 5419], - ["gym_mc", 0.52, 0.14, 0.249, 1450], + ["gym_mc", 0.52, 0.14, 0.249, 1450/2.], # there are two infected in this case ["waiting_room_mc", 1.53, 0.21, 0.844, 929], ] ) def test_report_models(mc_model, expected_pi, expected_new_cases, - expected_dose, expected_ER, request): + expected_dose, expected_ER_per_person, request): mc_model = request.getfixturevalue(mc_model) exposure_model = mc_model.build_model(size=SAMPLE_SIZE) npt.assert_allclose(exposure_model.infection_probability().mean(), @@ -335,8 +335,8 @@ def test_report_models(mc_model, expected_pi, expected_new_cases, npt.assert_allclose(exposure_model.deposited_exposure().mean(), expected_dose, rtol=TOLERANCE) npt.assert_allclose( - exposure_model.concentration_model.infected.emission_rate_when_present().mean(), - expected_ER, rtol=TOLERANCE) + exposure_model.concentration_model.infected.emission_rate_per_person_when_present().mean(), + expected_ER_per_person, rtol=TOLERANCE) @retry(tries=10) @@ -397,5 +397,5 @@ def test_small_shared_office_Geneva(mask_type, month, expected_pi, npt.assert_allclose(exposure_model.deposited_exposure().mean(), expected_dose, rtol=TOLERANCE) npt.assert_allclose( - exposure_model.concentration_model.infected.emission_rate_when_present().mean(), + exposure_model.concentration_model.infected.emission_rate_per_person_when_present().mean(), expected_ER, rtol=TOLERANCE) From bcb1d1012a21e5728a652d1fa352de734f11dc58 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Thu, 6 Apr 2023 12:27:39 +0200 Subject: [PATCH 6/8] modified dynamic tests for InfectedPopulation --- caimira/models.py | 10 +-- .../tests/models/test_dynamic_population.py | 62 ++++++++++++++----- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/caimira/models.py b/caimira/models.py index 38924f8c..98e3e9a5 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -818,7 +818,7 @@ class Population: raise TypeError(f'The presence argument must be an "Interval". Got {type(self.presence)}') else: if self.presence is not None: - raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number.') + raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number') def person_present(self, time: float): # Allow back-compatibility @@ -1656,11 +1656,13 @@ class ExposureModel: self.concentration_model.virus.transmissibility_factor)))) * 100 def total_probability_rule(self) -> _VectorisedFloat: - if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): - sum_probability = 0.0 - if not isinstance(self.concentration_model.infected.number, int): + if (isinstance(self.concentration_model.infected.number, IntPiecewiseContant) or + isinstance(self.exposed.number, IntPiecewiseContant)): raise NotImplementedError("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") + + if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): + sum_probability = 0.0 # Create an equivalent exposure model but changing the number of infected cases. total_people = self.concentration_model.infected.number + self.exposed.number diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index e2737e10..41137c34 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -1,3 +1,5 @@ +import re + import numpy as np import numpy.testing as npt import pytest @@ -48,17 +50,6 @@ def baseline_infected_population_number(): ) -@pytest.fixture -def baseline_exposed_population_number(): - return models.Population( - number=models.IntPiecewiseContant( - (8, 12, 13, 17), (1, 0, 1)), - presence=None, - mask=models.Mask.types['No mask'], - activity=models.Activity.types['Seated'], - host_immunity=0., - ) - @pytest.mark.parametrize( "time", [4., 8., 10., 12., 13., 14., 16., 20., 24.], @@ -74,6 +65,22 @@ def test_population_number(full_exposure_model: models.ExposureModel, assert isinstance(piecewise_population_number.number, models.IntPiecewiseContant) assert piecewise_population_number.presence is None + + with pytest.raises( + TypeError, + match=f'The presence argument must be an "Interval". Got {type(None)}' + ): + dc_utils.nested_replace( + int_population_number, {'presence': None} + ) + + with pytest.raises( + TypeError, + match="The presence argument must be None for a IntPiecewiseConstant number" + ): + dc_utils.nested_replace( + piecewise_population_number, {'presence': models.SpecificInterval(((8, 12), ))} + ) assert int_population_number.person_present(time) == piecewise_population_number.person_present(time) assert int_population_number.people_present(time) == piecewise_population_number.people_present(time) @@ -85,14 +92,12 @@ def test_population_number(full_exposure_model: models.ExposureModel, ) def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel, baseline_infected_population_number: models.InfectedPopulation, - baseline_exposed_population_number: models.Population, time: float): dynamic_model: models.ExposureModel = dc_utils.nested_replace( full_exposure_model, { 'concentration_model.infected': baseline_infected_population_number, - 'exposed': baseline_exposed_population_number, } ) assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) @@ -107,7 +112,7 @@ def test_concentration_model_dynamic_population(full_exposure_model: models.Expo [13., 4], [15., 5], ]) -def test_sum_of_models(full_exposure_model: models.ExposureModel, +def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureModel, baseline_infected_population_number: models.InfectedPopulation, time: float, number_of_infected: int): @@ -128,9 +133,13 @@ def test_sum_of_models(full_exposure_model: models.ExposureModel, ) npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected) + npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_single_exposure_model.deposited_exposure() * number_of_infected) -def test_dynamic_dose(full_exposure_model): +@pytest.mark.parametrize( + "time", (8., 9., 10., 11., 12., 13., 14.), +) +def test_dynamic_dose(full_exposure_model, time): dynamic_infected: models.ExposureModel = dc_utils.nested_replace( full_exposure_model, @@ -172,7 +181,26 @@ def test_dynamic_dose(full_exposure_model): } ) + dynamic_concentration = dynamic_infected.concentration(time) dynamic_exposure = dynamic_infected.deposited_exposure() - static_exposure = np.sum([model.deposited_exposure() for model in (single_infected, two_infected, three_infected)]) - npt.assert_almost_equal(dynamic_exposure, static_exposure) + static_concentration, static_exposure = [], [] + for model in (single_infected, two_infected, three_infected): + static_concentration.append(model.concentration(time)) + static_exposure.append(model.deposited_exposure()) + + npt.assert_almost_equal(dynamic_concentration, np.sum(np.array(static_concentration))) + npt.assert_almost_equal(dynamic_exposure, np.sum(np.array(static_exposure))) + + +def test_dynamic_total_probability_rule(full_exposure_model: models.ExposureModel, + baseline_infected_population_number: models.InfectedPopulation): + + model = dc_utils.nested_replace(full_exposure_model, + {'concentration_model.infected': baseline_infected_population_number }) + with pytest.raises( + NotImplementedError, + match=re.escape("Cannot compute total probability " + "(including incidence rate) with dynamic occupancy") + ): + model.total_probability_rule() From 022abad186603e35bd14771f9388d5c70ba777ec Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Fri, 28 Apr 2023 12:05:30 +0200 Subject: [PATCH 7/8] updated branch according to review: - new presence_interval() method in the population class that avoids repetition of if conditions through the code. - typo fix. -tests were updated accordingly --- caimira/apps/calculator/report_generator.py | 21 ++---- caimira/apps/expert.py | 14 ++-- caimira/apps/expert_co2.py | 25 ++++---- caimira/models.py | 40 ++++++------ caimira/monte_carlo/models.py | 8 +-- .../tests/models/test_dynamic_population.py | 64 ++++++------------- 6 files changed, 64 insertions(+), 108 deletions(-) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index 0630c1cb..432cf286 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -20,18 +20,10 @@ from ... import dataclass_utils def model_start_end(model: models.ExposureModel): - if (isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval) - and isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval)): - t_start = min(model.exposed.presence.boundaries()[0][0], - model.concentration_model.infected.presence.boundaries()[0][0]) - t_end = max(model.exposed.presence.boundaries()[-1][1], - model.concentration_model.infected.presence.boundaries()[-1][1]) - elif (isinstance(model.exposed.number, models.IntPiecewiseContant) - and isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant)): - t_start = min(model.exposed.number.interval().boundaries()[0][0], - model.concentration_model.infected.number.interval().boundaries()[0][0]) - t_end = max(model.exposed.number.interval().boundaries()[-1][1], - model.concentration_model.infected.number.interval().boundaries()[-1][1]) + t_start = min(model.exposed.presence_interval().boundaries()[0][0], + model.concentration_model.infected.presence_interval().boundaries()[0][0]) + t_end = max(model.exposed.presence_interval().boundaries()[-1][1], + model.concentration_model.infected.presence_interval().boundaries()[-1][1]) return t_start, t_end @@ -149,10 +141,7 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing exposed_occupants = model.exposed.number expected_new_cases = np.array(model.expected_new_cases()).mean() uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(model))) if form.conditional_probability_plot else None - if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): - exposed_presence_intervals = [list(interval) for interval in model.exposed.presence.boundaries()] - elif isinstance(model.exposed.number, models.IntPiecewiseContant): - exposed_presence_intervals = [list(interval) for interval in model.exposed.number.interval().boundaries()] + exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] return { "model_repr": repr(model), diff --git a/caimira/apps/expert.py b/caimira/apps/expert.py index d32b1dca..c15234c0 100644 --- a/caimira/apps/expert.py +++ b/caimira/apps/expert.py @@ -148,10 +148,7 @@ class ExposureModelResult(View): def update_plot(self, model: models.ExposureModel): resolution = 600 - if isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval): - infected_presence = model.concentration_model.infected.presence - elif isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant): - infected_presence = model.concentration_model.infected.number.interval() + infected_presence = model.concentration_model.infected.presence_interval() ts = np.linspace(sorted(infected_presence.transition_times())[0], sorted(infected_presence.transition_times())[-1], resolution) concentration = [model.concentration(t) for t in ts] @@ -168,10 +165,7 @@ class ExposureModelResult(View): self.ax.ignore_existing_data_limits = False self.concentration_line.set_data(ts, concentration) - if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval): - exposed_presence = model.exposed.presence - elif isinstance(model.exposed.number, models.IntPiecewiseContant): - exposed_presence = model.exposed.number.interval() + exposed_presence = model.exposed.presence_interval() if self.concentration_area is None: self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", @@ -1117,8 +1111,8 @@ def models_start_end(models: typing.Sequence[models.ExposureModel]) -> typing.Tu Returns the earliest start and latest end time of a collection of ConcentrationModel objects """ - infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models) # type: ignore - infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models) # type: ignore + infected_start = min(model.concentration_model.infected.presence_interval().boundaries()[0][0] for model in models) + infected_finish = min(model.concentration_model.infected.presence_interval().boundaries()[-1][1] for model in models) return infected_start, infected_finish diff --git a/caimira/apps/expert_co2.py b/caimira/apps/expert_co2.py index 505870d9..ba9d7f63 100644 --- a/caimira/apps/expert_co2.py +++ b/caimira/apps/expert_co2.py @@ -86,8 +86,8 @@ class ExposureModelResult(View): def update_plot(self, model: models.CO2ConcentrationModel): resolution = 600 - ts = np.linspace(sorted(model.CO2_emitters.presence.transition_times())[0], - sorted(model.CO2_emitters.presence.transition_times())[-1], resolution) + ts = np.linspace(sorted(model.CO2_emitters.presence_interval().transition_times())[0], + sorted(model.CO2_emitters.presence_interval().transition_times())[-1], resolution) concentration = [model.concentration(t) for t in ts] if self.concentration_line is None: @@ -99,19 +99,19 @@ class ExposureModelResult(View): if self.concentration_area is None: self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.CO2_emitters.presence.boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[0][1]) | - (model.CO2_emitters.presence.boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[1][1]))) + where = ((model.CO2_emitters.presence_interval().boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[0][1]) | + (model.CO2_emitters.presence_interval().boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[1][1]))) else: self.concentration_area.remove() self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", - where = ((model.CO2_emitters.presence.boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[0][1]) | - (model.CO2_emitters.presence.boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence.boundaries()[1][1]))) + where = ((model.CO2_emitters.presence_interval().boundaries()[0][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[0][1]) | + (model.CO2_emitters.presence_interval().boundaries()[1][0] < ts) & (ts < model.CO2_emitters.presence_interval().boundaries()[1][1]))) concentration_top = max(np.array(concentration)) self.ax.set_ylim(bottom=model.CO2_atmosphere_concentration * 0.9, top=concentration_top*1.1) - self.ax.set_xlim(left = min(model.CO2_emitters.presence.boundaries()[0])*0.95, - right = max(model.CO2_emitters.presence.boundaries()[1])*1.05) + self.ax.set_xlim(left = min(model.CO2_emitters.presence_interval().boundaries()[0])*0.95, + right = max(model.CO2_emitters.presence_interval().boundaries()[1])*1.05) figure_legends = [mlines.Line2D([], [], color='#3530fe', markersize=15, label='CO₂ concentration'), mlines.Line2D([], [], color='salmon', markersize=15, label='Insufficient level', linestyle='--'), @@ -122,7 +122,10 @@ class ExposureModelResult(View): self.ax.set_ylim(top=concentration_top*1.1) else: self.ax.set_ylim(top=1550) - self.ax.hlines([800, 1500], xmin=min(model.CO2_emitters.presence.boundaries()[0])*0.95, xmax=max(model.CO2_emitters.presence.boundaries()[1])*1.05, colors=['limegreen', 'salmon'], linestyles='dashed') + self.ax.hlines([800, 1500], xmin=min(model.CO2_emitters.presence_interval().boundaries()[0])*0.95, + xmax=max(model.CO2_emitters.presence_interval().boundaries()[1])*1.05, + colors=['limegreen', 'salmon'], + linestyles='dashed') self.figure.canvas.draw() @@ -788,6 +791,6 @@ def models_start_end(models: typing.Sequence[models.CO2ConcentrationModel]) -> t Returns the earliest start and latest end time of a collection of v objects """ - emitters_start = min(model.CO2_emitters.presence.boundaries()[0][0] for model in models) - emitters_finish = min(model.CO2_emitters.presence.boundaries()[-1][1] for model in models) + emitters_start = min(model.CO2_emitters.presence_interval().boundaries()[0][0] for model in models) + emitters_finish = min(model.CO2_emitters.presence_interval().boundaries()[-1][1] for model in models) return emitters_start, emitters_finish diff --git a/caimira/models.py b/caimira/models.py index 98e3e9a5..9db3fcad 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -187,7 +187,7 @@ class PiecewiseConstant: @dataclass(frozen=True) -class IntPiecewiseContant(PiecewiseConstant): +class IntPiecewiseConstant(PiecewiseConstant): #: values of the function between transitions values: typing.Tuple[int, ...] @@ -796,10 +796,10 @@ class Population: """ #: How many in the population. - number: typing.Union[int, IntPiecewiseContant] + number: typing.Union[int, IntPiecewiseConstant] #: The times in which the people are in the room. - presence: Interval + presence: typing.Union[None, Interval] #: The kind of mask being worn by the people. mask: Mask @@ -819,19 +819,24 @@ class Population: else: if self.presence is not None: raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number') + + def presence_interval(self): + if isinstance(self.presence, Interval): + return self.presence + elif isinstance(self.number, IntPiecewiseConstant): + return self.number.interval() def person_present(self, time: float): # Allow back-compatibility if isinstance(self.number, int) and isinstance(self.presence, Interval): return self.presence.triggered(time) - elif isinstance(self.number, IntPiecewiseContant): + elif isinstance(self.number, IntPiecewiseConstant): return self.number.value(time) != 0 def people_present(self, time: float): # Allow back-compatibility if isinstance(self.number, int): return self.number * self.person_present(time) - else: return int(self.number.value(time)) @@ -1040,9 +1045,6 @@ class _ConcentrationModelBase: can be put back in front of the concentration after the time dependence has been solved for. """ - #if not self.population.person_present(time): - # return self.min_background_concentration()/self.normalization_factor() - V = self.room.volume RR = self.removal_rate(time) @@ -1056,11 +1058,9 @@ class _ConcentrationModelBase: the times at which their state changes. """ state_change_times = {0.} - if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): - state_change_times.update(self.population.presence.transition_times()) - elif isinstance(self.population.number, IntPiecewiseContant): - state_change_times.update(self.population.number.interval().transition_times()) + state_change_times.update(self.population.presence_interval().transition_times()) state_change_times.update(self.ventilation.transition_times(self.room)) + return sorted(state_change_times) @method_cache @@ -1068,12 +1068,8 @@ class _ConcentrationModelBase: """ First presence time. Before that, the concentration is zero. """ - if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval): - first_presence = self.population.presence.boundaries()[0][0] - elif isinstance(self.population.number, IntPiecewiseContant): - first_presence = self.population.number.interval().boundaries()[0][0] - return first_presence - + return self.population.presence_interval().boundaries()[0][0] + def last_state_change(self, time: float) -> float: """ Find the most recent/previous state change. @@ -1517,7 +1513,7 @@ class ExposureModel: by the emission rate of the infected population """ exposure = 0. - for start, stop in self.exposed.presence.boundaries(): + for start, stop in self.exposed.presence_interval().boundaries(): if stop < time1: continue elif start > time2: @@ -1639,7 +1635,7 @@ class ExposureModel: The number of virus per m^3 deposited on the respiratory tract. """ deposited_exposure: _VectorisedFloat = 0.0 - for start, stop in self.exposed.presence.boundaries(): + for start, stop in self.exposed.presence_interval().boundaries(): deposited_exposure += self.deposited_exposure_between_bounds(start, stop) return deposited_exposure * self.repeats @@ -1656,8 +1652,8 @@ class ExposureModel: self.concentration_model.virus.transmissibility_factor)))) * 100 def total_probability_rule(self) -> _VectorisedFloat: - if (isinstance(self.concentration_model.infected.number, IntPiecewiseContant) or - isinstance(self.exposed.number, IntPiecewiseContant)): + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or + isinstance(self.exposed.number, IntPiecewiseConstant)): raise NotImplementedError("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") diff --git a/caimira/monte_carlo/models.py b/caimira/monte_carlo/models.py index ccaafc59..7215db16 100644 --- a/caimira/monte_carlo/models.py +++ b/caimira/monte_carlo/models.py @@ -75,12 +75,12 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model SI = getattr(sys.modules[__name__], "SpecificInterval") field_type = typing.Tuple[typing.Union[caimira.models.SpecificInterval, SI], ...] - elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseContant]: - IPC = getattr(sys.modules[__name__], "IntPiecewiseContant") - field_type = typing.Union[caimira.models.IntPiecewiseContant, IPC] + elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseConstant]: + IPC = getattr(sys.modules[__name__], "IntPiecewiseConstant") + field_type = typing.Union[int, caimira.models.IntPiecewiseConstant, IPC] elif new_field.type == typing.Union[caimira.models.Interval, None]: I = getattr(sys.modules[__name__], "Interval") - field_type = typing.Union[caimira.models.Interval, I] + field_type = typing.Union[None, caimira.models.Interval, I] else: # Check that we don't need to do anything with this type. diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index 41137c34..e5bf9805 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -39,7 +39,7 @@ def full_exposure_model(): @pytest.fixture def baseline_infected_population_number(): return models.InfectedPopulation( - number=models.IntPiecewiseContant( + number=models.IntPiecewiseConstant( (8, 12, 13, 17), (1, 0, 1)), presence=None, mask=models.Mask.types['No mask'], @@ -50,6 +50,12 @@ def baseline_infected_population_number(): ) +@pytest.fixture +def dynamic_single_exposure_model(full_exposure_model, baseline_infected_population_number): + return dc_utils.nested_replace(full_exposure_model, + {'concentration_model.infected': baseline_infected_population_number, }) + + @pytest.mark.parametrize( "time", [4., 8., 10., 12., 13., 14., 16., 20., 24.], @@ -58,13 +64,7 @@ def test_population_number(full_exposure_model: models.ExposureModel, baseline_infected_population_number: models.InfectedPopulation, time: float): int_population_number: models.InfectedPopulation = full_exposure_model.concentration_model.infected - assert isinstance(int_population_number.number, int) - assert isinstance(int_population_number.presence, models.Interval) - piecewise_population_number: models.InfectedPopulation = baseline_infected_population_number - assert isinstance(piecewise_population_number.number, - models.IntPiecewiseContant) - assert piecewise_population_number.presence is None with pytest.raises( TypeError, @@ -91,29 +91,16 @@ def test_population_number(full_exposure_model: models.ExposureModel, [4., 8., 10., 12., 13., 14., 16., 20., 24.], ) def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation, + dynamic_single_exposure_model: models.ExposureModel, time: float): - dynamic_model: models.ExposureModel = dc_utils.nested_replace( - full_exposure_model, - { - 'concentration_model.infected': baseline_infected_population_number, - } - ) - assert full_exposure_model.concentration(time) == dynamic_model.concentration(time) + assert full_exposure_model.concentration(time) == dynamic_single_exposure_model.concentration(time) -@pytest.mark.parametrize( - "time, number_of_infected", - [ - [8., 1], - [10., 2], - [12., 3], - [13., 4], - [15., 5], - ]) +@pytest.mark.parametrize("number_of_infected",[1, 2, 3, 4, 5]) +@pytest.mark.parametrize("time",[9., 12.5, 16.]) def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation, + dynamic_single_exposure_model: models.ExposureModel, time: float, number_of_infected: int): @@ -125,13 +112,6 @@ def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureM } ) - dynamic_single_exposure_model: models.ExposureModel = dc_utils.nested_replace( - full_exposure_model, - { - 'concentration_model.infected': baseline_infected_population_number, - } - ) - npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected) npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_single_exposure_model.deposited_exposure() * number_of_infected) @@ -145,7 +125,7 @@ def test_dynamic_dose(full_exposure_model, time): full_exposure_model, { 'concentration_model.infected': models.InfectedPopulation( - number=models.IntPiecewiseContant( + number=models.IntPiecewiseConstant( (8, 10, 12, 13, 17), (1, 2, 0, 3)), presence=None, mask=models.Mask.types['No mask'], @@ -184,23 +164,17 @@ def test_dynamic_dose(full_exposure_model, time): dynamic_concentration = dynamic_infected.concentration(time) dynamic_exposure = dynamic_infected.deposited_exposure() - static_concentration, static_exposure = [], [] - for model in (single_infected, two_infected, three_infected): - static_concentration.append(model.concentration(time)) - static_exposure.append(model.deposited_exposure()) + static_concentration, static_exposure = zip(*[(model.concentration(time), model.deposited_exposure()) + for model in (single_infected, two_infected, three_infected)]) - npt.assert_almost_equal(dynamic_concentration, np.sum(np.array(static_concentration))) - npt.assert_almost_equal(dynamic_exposure, np.sum(np.array(static_exposure))) + npt.assert_almost_equal(dynamic_concentration, np.sum(static_concentration)) + npt.assert_almost_equal(dynamic_exposure, np.sum(static_exposure)) -def test_dynamic_total_probability_rule(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation): - - model = dc_utils.nested_replace(full_exposure_model, - {'concentration_model.infected': baseline_infected_population_number }) +def test_dynamic_total_probability_rule(dynamic_single_exposure_model: models.ExposureModel): with pytest.raises( NotImplementedError, match=re.escape("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") ): - model.total_probability_rule() + dynamic_single_exposure_model.total_probability_rule() From 7cbe9bb0507c8b039c7e44f7a4fb5bd101245251 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Wed, 3 May 2023 16:35:01 +0200 Subject: [PATCH 8/8] updated version --- caimira/apps/calculator/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caimira/apps/calculator/__init__.py b/caimira/apps/calculator/__init__.py index f8be0a45..fef9ceaf 100644 --- a/caimira/apps/calculator/__init__.py +++ b/caimira/apps/calculator/__init__.py @@ -35,7 +35,7 @@ from .user import AuthenticatedUser, AnonymousUser # calculator version. If the calculator needs to make breaking changes (e.g. change # form attributes) then it can also increase its MAJOR version without needing to # increase the overall CAiMIRA version (found at ``caimira.__version__``). -__version__ = "4.8" +__version__ = "4.9" class BaseRequestHandler(RequestHandler):