diff --git a/cara/montecarlo.py b/cara/montecarlo.py index a28cff1f..e772781c 100644 --- a/cara/montecarlo.py +++ b/cara/montecarlo.py @@ -15,44 +15,18 @@ weibull_parameters = [((0.5951563631241763, 0.027071715346754264), # em (7.348365409721486, 1.1158159287760463 * 3.33))] # particle_diameter_speaking_loudly -def calculate_qr(viral_load: float, emission: float, diameter: float, mask_efficiency: float, - copies_per_quantum: float, breathing_rate: typing.Optional[float] = None) -> float: - """ - Calculates the quantum generation rate given a set of parameters. - """ - # Unit conversions - diameter *= 1e-4 - viral_load = 10 ** viral_load - emission = (emission * 3600) if breathing_rate is None else (emission * 1e6) +@dataclass(frozen=True) +class MCVirus: + #: Biological decay (inactivation of the virus in air) + halflife: float - volume = (4 * np.pi * (diameter / 2)**3) / 3 - if breathing_rate is None: - breathing_rate = 1 - return viral_load * emission * volume * (1 - mask_efficiency) * breathing_rate / copies_per_quantum + #: RNA copies / mL + viral_load_in_sputum: typing.Tuple[float, float] - -def generate_qr_values(samples: int, expiratory_activity: int, masked: bool, qid: int = 100) -> np.ndarray: - """ - Randomly samples values for the quantum generation rate - :param samples: The total number of samples to be generated - :param expiratory_activity: An integer signifying the expiratory activity of the infected subject - (1 = breathing, 2 = speaking, 3 = speaking loudly) - :param masked: True if infected subject is wearing a mask, False otherwise - :param qid: The quantum infectious dose to be used in the calculations - :return: A numpy array of length = samples, containing randomly generated qr-values - """ - (e_k, e_lambda), (d_k, d_lambda) = weibull_parameters[expiratory_activity] - emissions = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=samples)), e_k, loc=0, scale=e_lambda) - diameters = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=samples)), d_k, loc=0, scale=d_lambda) - viral_loads = np.random.normal(size=samples, loc=7.8, scale=1.7) - - mask_efficiency = [0.75, 0.81, 0.81][expiratory_activity] if masked else 0 - qr_func = np.vectorize(calculate_qr) - - # TODO: Add distributions for parameters - breathing_rate = 1 - - return qr_func(viral_loads, emissions, diameters, mask_efficiency, qid) + @property + def decay_constant(self): + # Viral inactivation per hour (h^-1) + return np.log(2) / self.halflife def logscale_hist(x: typing.Iterable, bins: int) -> None: @@ -76,3 +50,279 @@ def print_qr_info(qr_values: np.ndarray) -> None: for quantile in (0.01, 0.05, 0.25, 0.50, 0.75, 0.95, 0.99): print(f"qR_{quantile} = {np.quantile(qr_values, quantile)}") + +@dataclass(frozen=True) +class MCVirus: + #: Biological decay (inactivation of the virus in air) + halflife: float + + #: RNA copies / mL + viral_load_in_sputum: typing.Tuple[float, float] + + @property + def decay_constant(self): + # Viral inactivation per hour (h^-1) + return np.log(2) / self.halflife + + +@dataclass(frozen=True) +class MCInfectedPopulation(models.Population): + #: The virus with which the population is infected. + virus: MCVirus + + #: An integer signifying the expiratory activity of the infected subject + # (1 = breathing, 2 = speaking, 3 = speaking loudly) + expiratory_activity: int + + # The total number of samples to be generated + samples: int + + # The quantum infectious dose to be used in the calculations + qid: int + + viral_load: typing.Optional[float] = None + + def emission_rate_when_present(self) -> np.ndarray: + """ + Randomly samples values for the quantum generation rate + :return: A numpy array of length = samples, containing randomly generated qr-values + """ + + # Extracting only the needed information from the pre-existing Mask class + masked = self.mask.exhale_efficiency != 0 + + (e_k, e_lambda), (d_k, d_lambda) = weibull_parameters[self.expiratory_activity] + emissions = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), e_k, loc=0, scale=e_lambda) + diameters = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), d_k, loc=0, scale=d_lambda) + if self.viral_load is None: + viral_loads = np.random.normal(loc=7.8, scale=1.7, size=self.samples) + else: + viral_loads = np.full(self.samples, self.viral_load) + + mask_efficiency = [0.75, 0.81, 0.81][self.expiratory_activity] if masked else 0 + qr_func = np.vectorize(self._calculate_qr) + + # TODO: Add distributions for parameters + breathing_rate = 1 + + return qr_func(viral_loads, emissions, diameters, mask_efficiency, self.qid) + + @staticmethod + def _calculate_qr(viral_load: float, emission: float, diameter: float, mask_efficiency: float, + copies_per_quantum: float, breathing_rate: typing.Optional[float] = None) -> float: + """ + Calculates the quantum generation rate given a set of parameters. + """ + # Unit conversions + diameter *= 1e-4 + viral_load = 10 ** viral_load + emission = (emission * 3600) if breathing_rate is None else (emission * 1e6) + + volume = (4 * np.pi * (diameter / 2) ** 3) / 3 + if breathing_rate is None: + breathing_rate = 1 + + return viral_load * emission * volume * (1 - mask_efficiency) * breathing_rate / copies_per_quantum + + def individual_emission_rate(self, time) -> np.ndarray: + """ + The emission rate of a single individual in the population. + + """ + # Note: The original model avoids time dependence on the emission rate + # at the cost of implementing a piecewise (on time) concentration function. + + if not self.person_present(time): + return np.zeros(self.samples) + + # Note: It is essential that the value of the emission rate is not + # itself a function of time. Any change in rate must be accompanied + # with a declaration of state change time, as is the case for things + # like Ventilation. + + return self.emission_rate_when_present() + + @functools.lru_cache() + def emission_rate(self, time) -> float: + """ + The emission rate of the entire population. + + """ + return self.individual_emission_rate(time) * self.number + + +@dataclass(frozen=True) +class MCConcentrationModel: + room: models.Room + ventilation: models.Ventilation + infected: MCInfectedPopulation + + @property + def virus(self): + return self.infected.virus + + def infectious_virus_removal_rate(self, time: float) -> float: + # Particle deposition on the floor + vg = 1 * 10 ** -4 + # Height of the emission source to the floor - i.e. mouth/nose (m) + h = 1.5 + # Deposition rate (h^-1) + k = (vg * 3600) / h + + return k + self.virus.decay_constant + self.ventilation.air_exchange(self.room, time) + + @functools.lru_cache() + def state_change_times(self): + """ + All time dependent entities on this model must provide information about + the times at which their state changes. + + """ + state_change_times = set() + state_change_times.update(self.infected.presence.transition_times()) + state_change_times.update(self.ventilation.transition_times()) + + return sorted(state_change_times) + + def last_state_change(self, time: float): + """ + Find the most recent state change. + + """ + for change_time in self.state_change_times()[::-1]: + if change_time < time: + return change_time + return 0 + + @functools.lru_cache() + def concentration(self, time: float) -> np.ndarray: + if time == 0: + return np.zeros(self.infected.samples) + IVRR = self.infectious_virus_removal_rate(time) + V = self.room.volume + + t_last_state_change = self.last_state_change(time) + concentration_at_last_state_change = self.concentration(t_last_state_change) + + delta_time = time - t_last_state_change + fac = np.exp(-IVRR * delta_time) + concentration_limit = (self.infected.emission_rate(time)) / (IVRR * V) + return concentration_limit * (1 - fac) + concentration_at_last_state_change * fac + + +@dataclass(frozen=True) +class MCExposureModel: + #: The virus concentration model which this exposure model should consider. + concentration_model: MCConcentrationModel + + #: The population of non-infected people to be used in the model. + exposed: models.Population + + #: The number of times the exposure event is repeated (default 1). + repeats: int = 1 + + def quanta_exposure(self) -> np.ndarray: + """The number of virus quanta per meter^3.""" + exposure = np.zeros(self.concentration_model.infected.samples) + + for start, stop in self.exposed.presence.boundaries(): + concentrations = np.asarray([self.concentration_model.concentration(t) for t in np.linspace(start, stop)]) + integrals = np.trapz(concentrations, axis=0) + exposure += integrals + + return exposure * self.repeats + + def infection_probability(self): + exposure = self.quanta_exposure() + + inf_aero = ( + self.exposed.activity.inhalation_rate * + (1 - self.exposed.mask.η_inhale) * + exposure + ) + + # Probability of infection. + return (1 - np.exp(-inf_aero)) * 100 + + def expected_new_cases(self): + prob = self.infection_probability() + exposed_occupants = self.exposed.number + return prob * exposed_occupants / 100 + + # def reproduction_number(self): + # """ + # The reproduction number can be thought of as the expected number of + # cases directly generated by one infected case in a population. + # + # """ + # if self.concentration_model.infected.number == 1: + # return self.expected_new_cases() + # + # # Create an equivalent exposure model but with precisely + # # one infected case. + # single_exposure_model = nested_replace( + # self, {'concentration_model.infected.number': 1} + # ) + # + # return single_exposure_model.expected_new_cases() + + +baseline_mc_exposure_model = MCExposureModel( + concentration_model=MCConcentrationModel( + room=models.Room(volume=75), + ventilation=models.SlidingWindow( + active=models.PeriodicInterval(period=120, duration=120), + inside_temp=models.PiecewiseConstant((0,24),(293,)), + outside_temp=models.PiecewiseConstant((0,24),(283,)), + window_height=1.6, opening_length=0.6, + ), + infected=MCInfectedPopulation( + number=1, + presence=models.SpecificInterval(((0, 4), (5, 8))), + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Light activity'], + virus=MCVirus(halflife=1.1, viral_load_in_sputum=(7.8, 1.7)), + expiratory_activity=1, + samples=200000, + qid=100 + ) + ), + exposed=models.Population( + number=1, + presence=models.SpecificInterval(((0, 4), (5, 8))), + activity=models.Activity.types['Light activity'], + mask=models.Mask.types['No mask'] + ) +) + + +models = [ +MCExposureModel( + concentration_model=MCConcentrationModel( + room=models.Room(volume=75), + ventilation=models.SlidingWindow( + active=models.PeriodicInterval(period=120, duration=120), + inside_temp=models.PiecewiseConstant((0,24),(293,)), + outside_temp=models.PiecewiseConstant((0,24),(283,)), + window_height=1.6, opening_length=0.6, + ), + infected=MCInfectedPopulation( + number=1, + presence=models.SpecificInterval(((0, 4), (5, 8))), + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Light activity'], + virus=MCVirus(halflife=1.1, viral_load_in_sputum=(7.8, 1.7)), + expiratory_activity=1, + samples=2000000, + qid=100, + viral_load=vl + ) + ), + exposed=models.Population( + number=1, + presence=models.SpecificInterval(((0, 4), (5, 8))), + activity=models.Activity.types['Light activity'], + mask=models.Mask.types['No mask'] + ) +) for vl in (None, 7, 8, 9, 10) +]