Introduced short_range in ExposureModel

This commit is contained in:
Luis Aleixo 2022-02-15 16:13:13 +00:00
parent 444fde14b7
commit 01820fa4e0
5 changed files with 122 additions and 203 deletions

View file

@ -234,7 +234,7 @@ class FormData:
raise ValueError("mechanical_ventilation_type cannot be 'not-applicable' if "
"ventilation_type is 'mechanical_ventilation'")
def build_mc_model(self) -> mc.SimulationModel:
def build_mc_model(self) -> mc.ExposureModel:
# Initializes room with volume either given directly or as product of area and height
if self.volume_type == 'room_volume_explicit':
volume = self.room_volume
@ -256,26 +256,22 @@ class FormData:
short_range_expirations = [short_range_expiration_distributions[activity] for activity in sr_activities]
# Initializes and returns a model with the attributes defined above
return mc.SimulationModel(
mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=room,
ventilation=self.ventilation(),
infected=self.infected_population(),
evaporation_factor=0.3,
),
exposed=self.exposed_population(),
return mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=room,
ventilation=self.ventilation(),
infected=self.infected_population(),
evaporation_factor=0.3,
),
mc.ShortRangeModel(
short_range = mc.ShortRangeModel(
presence=sr_presence,
long_range_presence=self.long_range_intervals(),
activities=sr_activities,
expirations=short_range_expirations,
dilutions=dilution_factor(activities=sr_activities, distance=np.random.uniform(0.5, 1.5, 250000)),
),
exposed=self.exposed_population(),
)
def build_model(self, sample_size=_DEFAULT_MC_SAMPLE_SIZE) -> models.SimulationModel:
def build_model(self, sample_size=_DEFAULT_MC_SAMPLE_SIZE) -> models.ExposureModel:
return self.build_mc_model().build_model(size=sample_size)
def tz_name_and_utc_offset(self) -> typing.Tuple[str, float]:
@ -644,7 +640,7 @@ class FormData:
def infected_present_interval(self) -> models.Interval:
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=self.infected_lunch_break_times() + self.infected_coffee_break_times()
breaks=self.infected_lunch_break_times() + self.infected_coffee_break_times(),
)
def short_range_intervals(self) -> typing.List[models.SpecificInterval]:
@ -658,22 +654,10 @@ class FormData:
else:
return []
def long_range_intervals(self) -> models.Interval:
short_range_intervals = []
for interval in self.short_range_interactions:
short_range_intervals.append(
(time_string_to_minutes(interval['start_time']),
time_string_to_minutes(interval['start_time']) + float(interval['duration'])),
)
return self.present_interval(
self.exposed_start, self.exposed_finish,
breaks=self.infected_lunch_break_times() + self.infected_coffee_break_times() + tuple(short_range_intervals),
)
def exposed_present_interval(self) -> models.Interval:
return self.present_interval(
self.exposed_start, self.exposed_finish,
breaks=self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
breaks=self.exposed_lunch_break_times() + self.exposed_coffee_break_times(),
)
def short_range_activities(self) -> typing.List[str]:

View file

@ -104,8 +104,8 @@ def short_range_interesting_times(model: models.ExposureModel, times: typing.Lis
return short_range_times
def calculate_report_data(model: models.SimulationModel):
times = interesting_times(model.exposure_model)
def calculate_report_data(model: models.ExposureModel):
times = interesting_times(model)
short_range_intervals = []
for interval in model.short_range.presence:
short_range_intervals.append(list(interval.boundaries()))
@ -116,9 +116,9 @@ def calculate_report_data(model: models.SimulationModel):
]
highest_const = max(concentrations)
prob = np.array(model.infection_probability()).mean()
er = np.array(model.exposure_model.concentration_model.infected.emission_rate_when_present()).mean()
exposed_occupants = model.exposure_model.exposed.number
expected_new_cases = np.array(model.exposure_model.expected_new_cases()).mean()
er = np.array(model.concentration_model.infected.emission_rate_when_present()).mean()
exposed_occupants = model.exposed.number
expected_new_cases = np.array(model.expected_new_cases()).mean()
cumulative_doses = np.cumsum([
np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean()
for time1, time2 in zip(times[:-1], times[1:])
@ -127,7 +127,7 @@ def calculate_report_data(model: models.SimulationModel):
return {
"times": list(times),
"short_range_intervals": short_range_intervals,
"exposed_presence_intervals": [list(interval) for interval in model.exposure_model.exposed.presence.boundaries()],
"exposed_presence_intervals": [list(interval) for interval in model.exposed.presence.boundaries()],
"cumulative_doses": list(cumulative_doses),
"concentrations": concentrations,
"highest_const": highest_const,
@ -246,20 +246,20 @@ def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.Exp
return scenarios
def scenario_statistics(mc_model: mc.SimulationModel, sample_times: np.ndarray):
def scenario_statistics(mc_model: mc.ExposureModel, sample_times: np.ndarray):
model = mc_model.build_model(size=_DEFAULT_MC_SAMPLE_SIZE)
return {
'probability_of_infection': np.mean(model.exposure_model.infection_probability()),
'expected_new_cases': np.mean(model.exposure_model.expected_new_cases()),
'probability_of_infection': np.mean(model.infection_probability()),
'expected_new_cases': np.mean(model.expected_new_cases()),
'concentrations': [
np.mean(model.exposure_model.concentration_model.concentration(time))
np.mean(model.concentration(time))
for time in sample_times
],
}
def comparison_report(
scenarios: typing.Dict[str, mc.SimulationModel],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
@ -297,7 +297,7 @@ class ReportGenerator:
def prepare_context(
self,
base_url: str,
model: models.SimulationModel,
model: models.ExposureModel,
form: FormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> dict:
@ -305,12 +305,12 @@ class ReportGenerator:
time = now.strftime("%Y-%m-%d %H:%M:%S UTC")
context = {
'model': model.exposure_model,
'model': model,
'form': form,
'creation_date': time,
}
scenario_sample_times = interesting_times(model.exposure_model)
scenario_sample_times = interesting_times(model)
context.update(calculate_report_data(model))
alternative_scenarios = manufacture_alternative_scenarios(form)

View file

@ -503,6 +503,7 @@ baseline_model = models.ExposureModel(
),
evaporation_factor=0.3,
),
short_range=[],
exposed=models.Population(
number=10,
presence=models.SpecificInterval(((8., 12.), (13., 17.))),

View file

@ -634,6 +634,9 @@ class _ExpirationBase:
"""
raise NotImplementedError("Subclass must implement")
def jet_origin_concentration(self):
raise NotImplementedError("Subclass must implement")
@dataclass(frozen=True)
class Expiration(_ExpirationBase):
@ -670,7 +673,9 @@ class Expiration(_ExpirationBase):
def jet_origin_concentration(self):
def volume(d):
return (np.pi * d**3) / 6.
return self.cn * volume(self.diameter)
# final result converted from microns^3/cm3 to mL/m3
return self.cn * volume(self.diameter) * 1e-6
@dataclass(frozen=True)
@ -887,27 +892,6 @@ class InfectedPopulation(_PopulationWithVirus):
return self.expiration.particle
@dataclass(frozen=True)
class ShortRangeModel:
#: Short range interactions
presence: typing.List[Interval]
#: Long range presence intervals
long_range_presence: Interval
#: The type of expiractory activities in the short range interactions
activities: typing.List[str]
#: Expiration type
expirations: typing.List[Expiration]
#: The dilution factors for each of the expiratory activity in the short range interactions
dilutions: _VectorisedFloat
def short_range_concentration(self, long_range_concentration: _VectorisedFloat, dilution: _VectorisedFloat, jet_origin_concentration: _VectorisedFloat) -> _VectorisedFloat:
return long_range_concentration + ((1/dilution)*(jet_origin_concentration - long_range_concentration))
@dataclass(frozen=True)
class ConcentrationModel:
room: Room
@ -1053,7 +1037,6 @@ class ConcentrationModel:
Get the integrated concentration of viruses in the air between the times start and stop,
normalized by the emission rate.
"""
if stop <= self._first_presence_time():
return 0.0
state_change_times = self.state_change_times()
@ -1086,6 +1069,40 @@ class ConcentrationModel:
self.infected.emission_rate_when_present())
@dataclass(frozen=True)
class ShortRangeModel:
#: Short range interactions
presence: typing.List[Interval]
#: Expiration type
expirations: typing.List[Expiration]
#: The dilution factors for each of the expiratory activity in the short range interactions
dilutions: _VectorisedFloat
def _normed_concentration(self, concentration_model: ConcentrationModel, time: float) -> _VectorisedFloat:
# normalized only by the viral load
for index, period in enumerate(self.presence):
start, finish = tuple(period.boundaries())
if start < time <= finish:
dilution = self.dilutions[index]
jet_origin_concentration = concentration_model.infected.expiration.jet_origin_concentration()
background_concentration = concentration_model.concentration(time) / concentration_model.virus.viral_load_in_sputum
long_range_normed_concentration=np.interp(self.expirations[index].particle.diameter, concentration_model.infected.particle.diameter, background_concentration)
return ((1/dilution)*(jet_origin_concentration - long_range_normed_concentration))
return 0.
def short_range_concentration(self, concentration_model: ConcentrationModel, time: float):
return (self._normed_concentration(concentration_model, time) *
concentration_model.virus.viral_load_in_sputum)
def normed_exposure_between_bounds(self, concentration_model, time1, time2):
return scipy.integrate.quad_vec(lambda t: self._normed_concentration(concentration_model, t), time1, time2)[0]
@dataclass(frozen=True)
class ExposureModel:
"""
@ -1098,13 +1115,16 @@ class ExposureModel:
#: The virus concentration model which this exposure model should consider.
concentration_model: ConcentrationModel
#: The short range model which this simulation model should consider.
short_range: ShortRangeModel
#: The population of non-infected people to be used in the model.
exposed: Population
#: The number of times the exposure event is repeated (default 1).
repeats: int = 1
def fraction_deposited(self) -> _VectorisedFloat:
def long_range_fraction_deposited(self) -> _VectorisedFloat:
"""
The fraction of particles actually deposited in the respiratory
tract (over the total number of particles). It depends on the
@ -1113,7 +1133,7 @@ class ExposureModel:
return self.concentration_model.infected.particle.fraction_deposited(
self.concentration_model.evaporation_factor)
def _normed_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat:
def _long_range_normed_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat:
"""The number of virions per meter^3 between any two times, normalized
by the emission rate of the infected population"""
exposure = 0.
@ -1131,50 +1151,75 @@ class ExposureModel:
elif time1 <= start and stop < time2:
exposure += self.concentration_model.normed_integrated_concentration(start, stop)
return exposure
def _normed_exposure(self) -> _VectorisedFloat:
"""
The number of virions per meter^3, normalized by the emission rate
of the infected population.
"""
normed_exposure = 0.0
for start, stop in self.exposed.presence.boundaries():
normed_exposure += self.concentration_model.normed_integrated_concentration(start, stop)
return normed_exposure * self.repeats
def concentration(self, time: float) -> _VectorisedFloat:
return (self.concentration_model.concentration(time) +
self.short_range.short_range_concentration(self.concentration_model, time))
def deposited_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat:
"""
The number of virus per m^3 deposited on the respiratory tract
between any two times.
"""
deposited_exposure = 0.
for index, period in enumerate(self.short_range.presence):
start, stop = tuple(period.boundaries())
if stop < time1:
continue
elif start > time2:
break
elif start <= time1 and time2<= stop:
short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, time1, time2)
elif start <= time1 and stop < time2:
short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, time1, stop)
elif time1 < start and time2 <= stop:
short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, start, time2)
elif time1 <= start and stop < time2:
short_range_exposure = self.short_range.normed_exposure_between_bounds(self.concentration_model, start, stop)
fdep = self.short_range.expirations[index].particle.fraction_deposited(evaporation_factor=1.0) #ASK for evaporation factor
diameter = self.short_range.expirations[index].particle.diameter
if not np.isscalar(diameter) and diameter is not None:
# we compute first the mean of all diameter-dependent quantities
# to perform properly the Monte-Carlo integration over
# particle diameters (doing things in another order would
# lead to wrong results).
deposited_exposure += np.array(short_range_exposure *
#aerosols *
fdep).mean()
else:
# in the case of a single diameter or no diameter defined,
# one should not take any mean at this stage.
deposited_exposure += short_range_exposure*fdep#*aerosols
deposited_exposure *= self.concentration_model.virus.viral_load_in_sputum
# Background concentration
emission_rate_per_aerosol = self.concentration_model.infected.emission_rate_per_aerosol_when_present()
aerosols = self.concentration_model.infected.aerosols()
fdep = self.fraction_deposited()
f_inf = self.concentration_model.infected.fraction_of_infectious_virus()
aerosols = self.concentration_model.infected.aerosols()
fdep = self.long_range_fraction_deposited()
diameter = self.concentration_model.infected.particle.diameter
if not np.isscalar(diameter) and diameter is not None:
# we compute first the mean of all diameter-dependent quantities
# to perform properly the Monte-Carlo integration over
# particle diameters (doing things in another order would
# lead to wrong results).
dep_exposure_integrated = np.array(self._normed_exposure_between_bounds(time1, time2) *
aerosols *
fdep).mean()
dep_exposure_integrated = np.array(self._long_range_normed_exposure_between_bounds(time1, time2) *
aerosols *
fdep).mean()
else:
# in the case of a single diameter or no diameter defined,
# one should not take any mean at this stage.
dep_exposure_integrated = self._normed_exposure_between_bounds(time1, time2)*aerosols*fdep
dep_exposure_integrated = self._long_range_normed_exposure_between_bounds(time1, time2)*aerosols*fdep
# then we multiply by the diameter-independent quantity emission_rate_per_aerosol,
# and parameters of the vD equation (i.e. f_inf, BR_k and n_in).
return (dep_exposure_integrated * emission_rate_per_aerosol *
f_inf * self.exposed.activity.inhalation_rate *
deposited_exposure += (dep_exposure_integrated * emission_rate_per_aerosol *
self.exposed.activity.inhalation_rate *
(1 - self.exposed.mask.inhale_efficiency()))
return f_inf * deposited_exposure
def deposited_exposure(self) -> _VectorisedFloat:
"""
The number of virus per m^3 deposited on the respiratory tract.
@ -1189,7 +1234,7 @@ class ExposureModel:
def infection_probability(self) -> _VectorisedFloat:
# viral dose (vD)
vD = self.deposited_exposure()
# oneoverln2 multiplied by ID_50 corresponds to ID_63.
infectious_dose = oneoverln2 * self.concentration_model.virus.infectious_dose
@ -1218,111 +1263,4 @@ class ExposureModel:
)
return single_exposure_model.expected_new_cases()
@dataclass(frozen=True)
class SimulationModel:
exposure_model: ExposureModel
short_range: ShortRangeModel
def normed_integrated_concentration(self, time1, time2):
return scipy.integrate.quad_vec(lambda t: self._normed_concentration(t), time1, time2)[0]
def _normed_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat:
"""The number of virions per meter^3 between any two times, normalized
by the emission rate of the infected population"""
exposure = 0.
for start, stop in self.exposure_model.exposed.presence.boundaries():
if stop < time1:
continue
elif start > time2:
break
elif start <= time1 and time2<= stop:
exposure += self.normed_integrated_concentration(time1, time2)
elif start <= time1 and stop < time2:
exposure += self.normed_integrated_concentration(time1, stop)
elif time1 < start and time2 <= stop:
exposure += self.normed_integrated_concentration(start, time2)
elif time1 <= start and stop < time2:
exposure += self.normed_integrated_concentration(start, stop)
return exposure
def _normed_concentration(self, time: float) -> _VectorisedFloat:
for index, period in enumerate(self.short_range.presence):
start, finish = tuple(period.boundaries())
if start <= time <= finish:
model = nested_replace(
self, {'exposure_model.concentration_model.infected.expiration': self.short_range.expirations[index]}
)
dilution = self.short_range.dilutions[index]
jet_origin_concentration = (model.exposure_model.concentration_model.infected.expiration.jet_origin_concentration()
* 1e-6 * model.exposure_model.concentration_model.virus.viral_load_in_sputum)
long_range_normed_concentration=np.interp(model.short_range.expirations[index].particle.diameter, self.exposure_model.concentration_model.infected.particle.diameter, self.exposure_model.concentration_model._normed_concentration(time))
return self.short_range.short_range_concentration(long_range_normed_concentration, dilution, jet_origin_concentration)
return self.exposure_model.concentration_model._normed_concentration(time)
def concentration(self, time: float) -> _VectorisedFloat:
for index, period in enumerate(self.short_range.presence):
start, finish = tuple(period.boundaries())
if start <= time <= finish:
model = nested_replace(
self, {'exposure_model.concentration_model.infected.expiration': self.short_range.expirations[index]}
)
return model._normed_concentration(time) * model.exposure_model.concentration_model.infected.emission_rate_when_present()
return self._normed_concentration(time) * self.exposure_model.concentration_model.infected.emission_rate_when_present()
def deposited_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat:
for index, period in enumerate(self.short_range.presence):
start, finish = tuple(period.boundaries())
if (start <= time1 <= finish and start <= time2 <= finish): # What if one is SR and another LR?
# Check if the given times are within the short range interactions
model = nested_replace(
self, {'exposure_model.concentration_model.infected.expiration': self.short_range.expirations[index]}
)
emission_rate_per_aerosol = model.exposure_model.concentration_model.infected.emission_rate_per_aerosol_when_present()
aerosols = model.exposure_model.concentration_model.infected.aerosols()
fdep = model.exposure_model.fraction_deposited()
f_inf = model.exposure_model.concentration_model.infected.fraction_of_infectious_virus()
diameter = self.exposure_model.concentration_model.infected.particle.diameter
if not np.isscalar(diameter) and diameter is not None:
# we compute first the mean of all diameter-dependent quantities
# to perform properly the Monte-Carlo integration over
# particle diameters (doing things in another order would
# lead to wrong results).
dep_exposure_integrated = np.array(self._normed_exposure_between_bounds(time1, time2) *
aerosols *
fdep).mean()
else:
# in the case of a single diameter or no diameter defined,
# one should not take any mean at this stage.
dep_exposure_integrated = self._normed_exposure_between_bounds(time1, time2)*aerosols*fdep
# then we multiply by the diameter-independent quantity emission_rate_per_aerosol,
# and parameters of the vD equation (i.e. f_inf, BR_k and n_in).
return (dep_exposure_integrated * emission_rate_per_aerosol *
f_inf * model.exposure_model.exposed.activity.inhalation_rate *
(1 - self.exposure_model.exposed.mask.inhale_efficiency()))
return self.exposure_model.deposited_exposure_between_bounds(time1, time2)
def infection_probability(self):
dose = 0.0
for start, stop in self.short_range.long_range_presence.boundaries():
dose += self.exposure_model.deposited_exposure_between_bounds(start, stop)
for presence in self.short_range.presence:
start, stop = presence.boundaries()
dose += self.deposited_exposure_between_bounds(start, stop)
vD = dose * self.exposure_model.repeats
# oneoverln2 multiplied by ID_50 corresponds to ID_63.
infectious_dose = oneoverln2 * self.exposure_model.concentration_model.virus.infectious_dose
# Probability of infection.
return (1 - np.exp(-((vD * (1 - self.exposure_model.exposed.host_immunity))/(infectious_dose *
self.exposure_model.concentration_model.virus.transmissibility_factor)))) * 100

View file

@ -3,7 +3,6 @@ import typing
import numpy as np
from scipy import special as sp
import scipy.integrate
import cara.monte_carlo as mc
from cara.monte_carlo.sampleable import Normal,LogNormal,LogCustomKernel,CustomKernel,Uniform
@ -52,9 +51,6 @@ class BLOmodel:
np.exp(-(np.log(d) - mu) ** 2 / (2 * sigma ** 2))
for A,cn,mu,sigma in zip(self.BLO_factors, self.cn,
self.mu, self.sigma) )
def volume(self, d):
return(np.pi * d**3) / 6
def integrate(self, dmin, dmax):
"""