diff --git a/cara/apps/calculator/model_generator.py b/cara/apps/calculator/model_generator.py index b05da082..19596e0a 100644 --- a/cara/apps/calculator/model_generator.py +++ b/cara/apps/calculator/model_generator.py @@ -234,7 +234,7 @@ class FormData: raise ValueError("mechanical_ventilation_type cannot be 'not-applicable' if " "ventilation_type is 'mechanical_ventilation'") - def build_mc_model(self) -> mc.SimulationModel: + def build_mc_model(self) -> mc.ExposureModel: # Initializes room with volume either given directly or as product of area and height if self.volume_type == 'room_volume_explicit': volume = self.room_volume @@ -256,26 +256,22 @@ class FormData: short_range_expirations = [short_range_expiration_distributions[activity] for activity in sr_activities] # Initializes and returns a model with the attributes defined above - return mc.SimulationModel( - mc.ExposureModel( - concentration_model=mc.ConcentrationModel( - room=room, - ventilation=self.ventilation(), - infected=self.infected_population(), - evaporation_factor=0.3, - ), - exposed=self.exposed_population(), + return mc.ExposureModel( + concentration_model=mc.ConcentrationModel( + room=room, + ventilation=self.ventilation(), + infected=self.infected_population(), + evaporation_factor=0.3, ), - mc.ShortRangeModel( + short_range = mc.ShortRangeModel( presence=sr_presence, - long_range_presence=self.long_range_intervals(), - activities=sr_activities, expirations=short_range_expirations, dilutions=dilution_factor(activities=sr_activities, distance=np.random.uniform(0.5, 1.5, 250000)), ), + exposed=self.exposed_population(), ) - def build_model(self, sample_size=_DEFAULT_MC_SAMPLE_SIZE) -> models.SimulationModel: + def build_model(self, sample_size=_DEFAULT_MC_SAMPLE_SIZE) -> models.ExposureModel: return self.build_mc_model().build_model(size=sample_size) def tz_name_and_utc_offset(self) -> typing.Tuple[str, float]: @@ -644,7 +640,7 @@ class FormData: def infected_present_interval(self) -> models.Interval: return self.present_interval( self.infected_start, self.infected_finish, - breaks=self.infected_lunch_break_times() + self.infected_coffee_break_times() + breaks=self.infected_lunch_break_times() + self.infected_coffee_break_times(), ) def short_range_intervals(self) -> typing.List[models.SpecificInterval]: @@ -658,22 +654,10 @@ class FormData: else: return [] - def long_range_intervals(self) -> models.Interval: - short_range_intervals = [] - for interval in self.short_range_interactions: - short_range_intervals.append( - (time_string_to_minutes(interval['start_time']), - time_string_to_minutes(interval['start_time']) + float(interval['duration'])), - ) - return self.present_interval( - self.exposed_start, self.exposed_finish, - breaks=self.infected_lunch_break_times() + self.infected_coffee_break_times() + tuple(short_range_intervals), - ) - def exposed_present_interval(self) -> models.Interval: return self.present_interval( self.exposed_start, self.exposed_finish, - breaks=self.exposed_lunch_break_times() + self.exposed_coffee_break_times() + breaks=self.exposed_lunch_break_times() + self.exposed_coffee_break_times(), ) def short_range_activities(self) -> typing.List[str]: diff --git a/cara/apps/calculator/report_generator.py b/cara/apps/calculator/report_generator.py index ef8bafc4..b1fc09fc 100644 --- a/cara/apps/calculator/report_generator.py +++ b/cara/apps/calculator/report_generator.py @@ -104,8 +104,8 @@ def short_range_interesting_times(model: models.ExposureModel, times: typing.Lis return short_range_times -def calculate_report_data(model: models.SimulationModel): - times = interesting_times(model.exposure_model) +def calculate_report_data(model: models.ExposureModel): + times = interesting_times(model) short_range_intervals = [] for interval in model.short_range.presence: short_range_intervals.append(list(interval.boundaries())) @@ -116,9 +116,9 @@ def calculate_report_data(model: models.SimulationModel): ] highest_const = max(concentrations) prob = np.array(model.infection_probability()).mean() - er = np.array(model.exposure_model.concentration_model.infected.emission_rate_when_present()).mean() - exposed_occupants = model.exposure_model.exposed.number - expected_new_cases = np.array(model.exposure_model.expected_new_cases()).mean() + er = np.array(model.concentration_model.infected.emission_rate_when_present()).mean() + exposed_occupants = model.exposed.number + expected_new_cases = np.array(model.expected_new_cases()).mean() cumulative_doses = np.cumsum([ np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean() for time1, time2 in zip(times[:-1], times[1:]) @@ -127,7 +127,7 @@ def calculate_report_data(model: models.SimulationModel): return { "times": list(times), "short_range_intervals": short_range_intervals, - "exposed_presence_intervals": [list(interval) for interval in model.exposure_model.exposed.presence.boundaries()], + "exposed_presence_intervals": [list(interval) for interval in model.exposed.presence.boundaries()], "cumulative_doses": list(cumulative_doses), "concentrations": concentrations, "highest_const": highest_const, @@ -246,20 +246,20 @@ def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.Exp return scenarios -def scenario_statistics(mc_model: mc.SimulationModel, sample_times: np.ndarray): +def scenario_statistics(mc_model: mc.ExposureModel, sample_times: np.ndarray): model = mc_model.build_model(size=_DEFAULT_MC_SAMPLE_SIZE) return { - 'probability_of_infection': np.mean(model.exposure_model.infection_probability()), - 'expected_new_cases': np.mean(model.exposure_model.expected_new_cases()), + 'probability_of_infection': np.mean(model.infection_probability()), + 'expected_new_cases': np.mean(model.expected_new_cases()), 'concentrations': [ - np.mean(model.exposure_model.concentration_model.concentration(time)) + np.mean(model.concentration(time)) for time in sample_times ], } def comparison_report( - scenarios: typing.Dict[str, mc.SimulationModel], + scenarios: typing.Dict[str, mc.ExposureModel], sample_times: typing.List[float], executor_factory: typing.Callable[[], concurrent.futures.Executor], ): @@ -297,7 +297,7 @@ class ReportGenerator: def prepare_context( self, base_url: str, - model: models.SimulationModel, + model: models.ExposureModel, form: FormData, executor_factory: typing.Callable[[], concurrent.futures.Executor], ) -> dict: @@ -305,12 +305,12 @@ class ReportGenerator: time = now.strftime("%Y-%m-%d %H:%M:%S UTC") context = { - 'model': model.exposure_model, + 'model': model, 'form': form, 'creation_date': time, } - scenario_sample_times = interesting_times(model.exposure_model) + scenario_sample_times = interesting_times(model) context.update(calculate_report_data(model)) alternative_scenarios = manufacture_alternative_scenarios(form) diff --git a/cara/apps/expert.py b/cara/apps/expert.py index bba12b6b..31f589dc 100644 --- a/cara/apps/expert.py +++ b/cara/apps/expert.py @@ -503,6 +503,7 @@ baseline_model = models.ExposureModel( ), evaporation_factor=0.3, ), + short_range=[], exposed=models.Population( number=10, presence=models.SpecificInterval(((8., 12.), (13., 17.))), diff --git a/cara/models.py b/cara/models.py index 74844cd3..f95dc1d8 100644 --- a/cara/models.py +++ b/cara/models.py @@ -634,6 +634,9 @@ class _ExpirationBase: """ raise NotImplementedError("Subclass must implement") + def jet_origin_concentration(self): + raise NotImplementedError("Subclass must implement") + @dataclass(frozen=True) class Expiration(_ExpirationBase): @@ -670,7 +673,9 @@ class Expiration(_ExpirationBase): def jet_origin_concentration(self): def volume(d): return (np.pi * d**3) / 6. - return self.cn * volume(self.diameter) + + # final result converted from microns^3/cm3 to mL/m3 + return self.cn * volume(self.diameter) * 1e-6 @dataclass(frozen=True) @@ -887,27 +892,6 @@ class InfectedPopulation(_PopulationWithVirus): return self.expiration.particle -@dataclass(frozen=True) -class ShortRangeModel: - #: Short range interactions - presence: typing.List[Interval] - - #: Long range presence intervals - long_range_presence: Interval - - #: The type of expiractory activities in the short range interactions - activities: typing.List[str] - - #: Expiration type - expirations: typing.List[Expiration] - - #: The dilution factors for each of the expiratory activity in the short range interactions - dilutions: _VectorisedFloat - - def short_range_concentration(self, long_range_concentration: _VectorisedFloat, dilution: _VectorisedFloat, jet_origin_concentration: _VectorisedFloat) -> _VectorisedFloat: - return long_range_concentration + ((1/dilution)*(jet_origin_concentration - long_range_concentration)) - - @dataclass(frozen=True) class ConcentrationModel: room: Room @@ -1053,7 +1037,6 @@ class ConcentrationModel: Get the integrated concentration of viruses in the air between the times start and stop, normalized by the emission rate. """ - if stop <= self._first_presence_time(): return 0.0 state_change_times = self.state_change_times() @@ -1086,6 +1069,40 @@ class ConcentrationModel: self.infected.emission_rate_when_present()) +@dataclass(frozen=True) +class ShortRangeModel: + #: Short range interactions + presence: typing.List[Interval] + + #: Expiration type + expirations: typing.List[Expiration] + + #: The dilution factors for each of the expiratory activity in the short range interactions + dilutions: _VectorisedFloat + + def _normed_concentration(self, concentration_model: ConcentrationModel, time: float) -> _VectorisedFloat: + # normalized only by the viral load + for index, period in enumerate(self.presence): + start, finish = tuple(period.boundaries()) + if start < time <= finish: + dilution = self.dilutions[index] + jet_origin_concentration = concentration_model.infected.expiration.jet_origin_concentration() + + background_concentration = concentration_model.concentration(time) / concentration_model.virus.viral_load_in_sputum + long_range_normed_concentration=np.interp(self.expirations[index].particle.diameter, concentration_model.infected.particle.diameter, background_concentration) + + return ((1/dilution)*(jet_origin_concentration - long_range_normed_concentration)) + + return 0. + + def short_range_concentration(self, concentration_model: ConcentrationModel, time: float): + return (self._normed_concentration(concentration_model, time) * + concentration_model.virus.viral_load_in_sputum) + + def normed_exposure_between_bounds(self, concentration_model, time1, time2): + return scipy.integrate.quad_vec(lambda t: self._normed_concentration(concentration_model, t), time1, time2)[0] + + @dataclass(frozen=True) class ExposureModel: """ @@ -1098,13 +1115,16 @@ class ExposureModel: #: The virus concentration model which this exposure model should consider. concentration_model: ConcentrationModel + #: The short range model which this simulation model should consider. + short_range: ShortRangeModel + #: The population of non-infected people to be used in the model. exposed: Population #: The number of times the exposure event is repeated (default 1). repeats: int = 1 - def fraction_deposited(self) -> _VectorisedFloat: + def long_range_fraction_deposited(self) -> _VectorisedFloat: """ The fraction of particles actually deposited in the respiratory tract (over the total number of particles). It depends on the @@ -1113,7 +1133,7 @@ class ExposureModel: return self.concentration_model.infected.particle.fraction_deposited( self.concentration_model.evaporation_factor) - def _normed_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat: + def _long_range_normed_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat: """The number of virions per meter^3 between any two times, normalized by the emission rate of the infected population""" exposure = 0. @@ -1131,50 +1151,75 @@ class ExposureModel: elif time1 <= start and stop < time2: exposure += self.concentration_model.normed_integrated_concentration(start, stop) return exposure - - def _normed_exposure(self) -> _VectorisedFloat: - """ - The number of virions per meter^3, normalized by the emission rate - of the infected population. - """ - normed_exposure = 0.0 - for start, stop in self.exposed.presence.boundaries(): - normed_exposure += self.concentration_model.normed_integrated_concentration(start, stop) - - return normed_exposure * self.repeats + def concentration(self, time: float) -> _VectorisedFloat: + return (self.concentration_model.concentration(time) + + self.short_range.short_range_concentration(self.concentration_model, time)) def deposited_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat: - """ - The number of virus per m^3 deposited on the respiratory tract - between any two times. - """ + deposited_exposure = 0. + for index, period in enumerate(self.short_range.presence): + start, stop = tuple(period.boundaries()) + if stop < time1: + continue + elif start > time2: + break + elif start <= time1 and time2<= stop: + short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, time1, time2) + elif start <= time1 and stop < time2: + short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, time1, stop) + elif time1 < start and time2 <= stop: + short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, start, time2) + elif time1 <= start and stop < time2: + short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, start, stop) + + fdep = self.short_range.expirations[index].particle.fraction_deposited(evaporation_factor=1.0) #ASK for evaporation factor + diameter = self.short_range.expirations[index].particle.diameter + + if not np.isscalar(diameter) and diameter is not None: + # we compute first the mean of all diameter-dependent quantities + # to perform properly the Monte-Carlo integration over + # particle diameters (doing things in another order would + # lead to wrong results). + deposited_exposure += np.array(short_range_exposure * + #aerosols * + fdep).mean() + else: + # in the case of a single diameter or no diameter defined, + # one should not take any mean at this stage. + deposited_exposure += short_range_exposure*fdep#*aerosols + + deposited_exposure *= self.concentration_model.virus.viral_load_in_sputum + + # Background concentration emission_rate_per_aerosol = self.concentration_model.infected.emission_rate_per_aerosol_when_present() - aerosols = self.concentration_model.infected.aerosols() - fdep = self.fraction_deposited() f_inf = self.concentration_model.infected.fraction_of_infectious_virus() + aerosols = self.concentration_model.infected.aerosols() + fdep = self.long_range_fraction_deposited() diameter = self.concentration_model.infected.particle.diameter - + if not np.isscalar(diameter) and diameter is not None: # we compute first the mean of all diameter-dependent quantities # to perform properly the Monte-Carlo integration over # particle diameters (doing things in another order would # lead to wrong results). - dep_exposure_integrated = np.array(self._normed_exposure_between_bounds(time1, time2) * - aerosols * - fdep).mean() + dep_exposure_integrated = np.array(self._long_range_normed_exposure_between_bounds(time1, time2) * + aerosols * + fdep).mean() else: # in the case of a single diameter or no diameter defined, # one should not take any mean at this stage. - dep_exposure_integrated = self._normed_exposure_between_bounds(time1, time2)*aerosols*fdep + dep_exposure_integrated = self._long_range_normed_exposure_between_bounds(time1, time2)*aerosols*fdep # then we multiply by the diameter-independent quantity emission_rate_per_aerosol, # and parameters of the vD equation (i.e. f_inf, BR_k and n_in). - return (dep_exposure_integrated * emission_rate_per_aerosol * - f_inf * self.exposed.activity.inhalation_rate * + deposited_exposure += (dep_exposure_integrated * emission_rate_per_aerosol * + self.exposed.activity.inhalation_rate * (1 - self.exposed.mask.inhale_efficiency())) + return f_inf * deposited_exposure + def deposited_exposure(self) -> _VectorisedFloat: """ The number of virus per m^3 deposited on the respiratory tract. @@ -1189,7 +1234,7 @@ class ExposureModel: def infection_probability(self) -> _VectorisedFloat: # viral dose (vD) vD = self.deposited_exposure() - + # oneoverln2 multiplied by ID_50 corresponds to ID_63. infectious_dose = oneoverln2 * self.concentration_model.virus.infectious_dose @@ -1218,111 +1263,4 @@ class ExposureModel: ) return single_exposure_model.expected_new_cases() - - -@dataclass(frozen=True) -class SimulationModel: - exposure_model: ExposureModel - short_range: ShortRangeModel - - def normed_integrated_concentration(self, time1, time2): - return scipy.integrate.quad_vec(lambda t: self._normed_concentration(t), time1, time2)[0] - - def _normed_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat: - """The number of virions per meter^3 between any two times, normalized - by the emission rate of the infected population""" - exposure = 0. - for start, stop in self.exposure_model.exposed.presence.boundaries(): - if stop < time1: - continue - elif start > time2: - break - elif start <= time1 and time2<= stop: - exposure += self.normed_integrated_concentration(time1, time2) - elif start <= time1 and stop < time2: - exposure += self.normed_integrated_concentration(time1, stop) - elif time1 < start and time2 <= stop: - exposure += self.normed_integrated_concentration(start, time2) - elif time1 <= start and stop < time2: - exposure += self.normed_integrated_concentration(start, stop) - return exposure - - def _normed_concentration(self, time: float) -> _VectorisedFloat: - for index, period in enumerate(self.short_range.presence): - start, finish = tuple(period.boundaries()) - if start <= time <= finish: - model = nested_replace( - self, {'exposure_model.concentration_model.infected.expiration': self.short_range.expirations[index]} - ) - dilution = self.short_range.dilutions[index] - jet_origin_concentration = (model.exposure_model.concentration_model.infected.expiration.jet_origin_concentration() - * 1e-6 * model.exposure_model.concentration_model.virus.viral_load_in_sputum) - long_range_normed_concentration=np.interp(model.short_range.expirations[index].particle.diameter, self.exposure_model.concentration_model.infected.particle.diameter, self.exposure_model.concentration_model._normed_concentration(time)) - return self.short_range.short_range_concentration(long_range_normed_concentration, dilution, jet_origin_concentration) - - return self.exposure_model.concentration_model._normed_concentration(time) - - def concentration(self, time: float) -> _VectorisedFloat: - for index, period in enumerate(self.short_range.presence): - start, finish = tuple(period.boundaries()) - if start <= time <= finish: - model = nested_replace( - self, {'exposure_model.concentration_model.infected.expiration': self.short_range.expirations[index]} - ) - return model._normed_concentration(time) * model.exposure_model.concentration_model.infected.emission_rate_when_present() - return self._normed_concentration(time) * self.exposure_model.concentration_model.infected.emission_rate_when_present() - - def deposited_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat: - for index, period in enumerate(self.short_range.presence): - start, finish = tuple(period.boundaries()) - if (start <= time1 <= finish and start <= time2 <= finish): # What if one is SR and another LR? - # Check if the given times are within the short range interactions - model = nested_replace( - self, {'exposure_model.concentration_model.infected.expiration': self.short_range.expirations[index]} - ) - emission_rate_per_aerosol = model.exposure_model.concentration_model.infected.emission_rate_per_aerosol_when_present() - aerosols = model.exposure_model.concentration_model.infected.aerosols() - fdep = model.exposure_model.fraction_deposited() - f_inf = model.exposure_model.concentration_model.infected.fraction_of_infectious_virus() - - diameter = self.exposure_model.concentration_model.infected.particle.diameter - - if not np.isscalar(diameter) and diameter is not None: - # we compute first the mean of all diameter-dependent quantities - # to perform properly the Monte-Carlo integration over - # particle diameters (doing things in another order would - # lead to wrong results). - dep_exposure_integrated = np.array(self._normed_exposure_between_bounds(time1, time2) * - aerosols * - fdep).mean() - else: - # in the case of a single diameter or no diameter defined, - # one should not take any mean at this stage. - dep_exposure_integrated = self._normed_exposure_between_bounds(time1, time2)*aerosols*fdep - - # then we multiply by the diameter-independent quantity emission_rate_per_aerosol, - # and parameters of the vD equation (i.e. f_inf, BR_k and n_in). - return (dep_exposure_integrated * emission_rate_per_aerosol * - f_inf * model.exposure_model.exposed.activity.inhalation_rate * - (1 - self.exposure_model.exposed.mask.inhale_efficiency())) - - return self.exposure_model.deposited_exposure_between_bounds(time1, time2) - - def infection_probability(self): - dose = 0.0 - for start, stop in self.short_range.long_range_presence.boundaries(): - dose += self.exposure_model.deposited_exposure_between_bounds(start, stop) - - for presence in self.short_range.presence: - start, stop = presence.boundaries() - dose += self.deposited_exposure_between_bounds(start, stop) - - vD = dose * self.exposure_model.repeats - - # oneoverln2 multiplied by ID_50 corresponds to ID_63. - infectious_dose = oneoverln2 * self.exposure_model.concentration_model.virus.infectious_dose - - # Probability of infection. - return (1 - np.exp(-((vD * (1 - self.exposure_model.exposed.host_immunity))/(infectious_dose * - self.exposure_model.concentration_model.virus.transmissibility_factor)))) * 100 \ No newline at end of file diff --git a/cara/monte_carlo/data.py b/cara/monte_carlo/data.py index 8a7cae0f..832e1137 100644 --- a/cara/monte_carlo/data.py +++ b/cara/monte_carlo/data.py @@ -3,7 +3,6 @@ import typing import numpy as np from scipy import special as sp -import scipy.integrate import cara.monte_carlo as mc from cara.monte_carlo.sampleable import Normal,LogNormal,LogCustomKernel,CustomKernel,Uniform @@ -52,9 +51,6 @@ class BLOmodel: np.exp(-(np.log(d) - mu) ** 2 / (2 * sigma ** 2)) for A,cn,mu,sigma in zip(self.BLO_factors, self.cn, self.mu, self.sigma) ) - - def volume(self, d): - return(np.pi * d**3) / 6 def integrate(self, dmin, dmax): """