diff --git a/cara/montecarlo.py b/cara/montecarlo.py index e772781c..81067d4c 100644 --- a/cara/montecarlo.py +++ b/cara/montecarlo.py @@ -29,6 +29,92 @@ class MCVirus: return np.log(2) / self.halflife +@dataclass(frozen=True) +class MCInfectedPopulation(models.Population): + #: The virus with which the population is infected. + virus: MCVirus + + #: An integer signifying the expiratory activity of the infected subject + # (1 = breathing, 2 = speaking, 3 = speaking loudly) + expiratory_activity: int + + # The total number of samples to be generated + samples: int + + # The quantum infectious dose to be used in the calculations + qid: int + + viral_load: typing.Optional[float] = None + + def emission_rate_when_present(self) -> np.ndarray: + """ + Randomly samples values for the quantum generation rate + :return: A numpy array of length = samples, containing randomly generated qr-values + """ + + # Extracting only the needed information from the pre-existing Mask class + masked = self.mask.exhale_efficiency != 0 + + (e_k, e_lambda), (d_k, d_lambda) = weibull_parameters[self.expiratory_activity] + emissions = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), e_k, loc=0, scale=e_lambda) + diameters = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), d_k, loc=0, scale=d_lambda) + if self.viral_load is None: + viral_loads = np.random.normal(loc=7.8, scale=1.7, size=self.samples) + else: + viral_loads = np.full(self.samples, self.viral_load) + + mask_efficiency = [0.75, 0.81, 0.81][self.expiratory_activity] if masked else 0 + qr_func = np.vectorize(self._calculate_qr) + + # TODO: Add distributions for parameters + breathing_rate = 1 + + return qr_func(viral_loads, emissions, diameters, mask_efficiency, self.qid) + + @staticmethod + def _calculate_qr(viral_load: float, emission: float, diameter: float, mask_efficiency: float, + copies_per_quantum: float, breathing_rate: typing.Optional[float] = None) -> float: + """ + Calculates the quantum generation rate given a set of parameters. + """ + # Unit conversions + diameter *= 1e-4 + viral_load = 10 ** viral_load + emission = (emission * 3600) if breathing_rate is None else (emission * 1e6) + + volume = (4 * np.pi * (diameter / 2) ** 3) / 3 + if breathing_rate is None: + breathing_rate = 1 + + return viral_load * emission * volume * (1 - mask_efficiency) * breathing_rate / copies_per_quantum + + def individual_emission_rate(self, time) -> np.ndarray: + """ + The emission rate of a single individual in the population. + + """ + # Note: The original model avoids time dependence on the emission rate + # at the cost of implementing a piecewise (on time) concentration function. + + if not self.person_present(time): + return np.zeros(self.samples) + + # Note: It is essential that the value of the emission rate is not + # itself a function of time. Any change in rate must be accompanied + # with a declaration of state change time, as is the case for things + # like Ventilation. + + return self.emission_rate_when_present() + + @functools.lru_cache() + def emission_rate(self, time) -> float: + """ + The emission rate of the entire population. + + """ + return self.individual_emission_rate(time) * self.number + + def logscale_hist(x: typing.Iterable, bins: int) -> None: """ Plots the data of x as a log-scale histogram