diff --git a/cara/montecarlo.py b/cara/montecarlo.py index 29998fa5..2c43421d 100644 --- a/cara/montecarlo.py +++ b/cara/montecarlo.py @@ -82,6 +82,10 @@ class MCInfectedPopulation(models.Population): # (1 = breathing, 2 = speaking, 3 = speaking loudly) expiratory_activity: int + #: An integer signifying the breathing category of the infected subject + # (1 = seated, 2 = standing, 3 = light exercise, 4 = moderate exercise, 5 = heavy exercise) + breathing_category: int + # The total number of samples to be generated samples: int @@ -90,8 +94,6 @@ class MCInfectedPopulation(models.Population): viral_load: typing.Optional[float] = None - breathing_category: typing.Optional[int] = None - def _generate_viral_loads(self) -> np.ndarray: kde_model = KernelDensity(kernel='gaussian', bandwidth=0.1) kde_model.fit(np.asarray(log_viral_load_frequencies)[0, :][:, np.newaxis], @@ -108,48 +110,31 @@ class MCInfectedPopulation(models.Population): # Extracting only the needed information from the pre-existing Mask class masked = self.mask.exhale_efficiency != 0 - (e_k, e_lambda), (d_k, d_lambda) = weibull_parameters[self.expiratory_activity] - emissions = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), e_k, loc=0, scale=e_lambda) - diameters = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), d_k, loc=0, scale=d_lambda) if self.viral_load is None: viral_loads = self._generate_viral_loads() else: viral_loads = np.full(self.samples, self.viral_load) - mask_efficiency = [0.75, 0.81, 0.81][self.expiratory_activity] if masked else 0 + emission_concentration = emission_concentrations[self.expiratory_activity - 1] + + mask_efficiency = [0.75, 0.81, 0.81][self.expiratory_activity - 1] if masked else 0 qr_func = np.vectorize(self._calculate_qr) - if self.expiratory_activity == 0: - assert self.breathing_category is not None, \ - "expiratory_activity specified as 0 (breathing) without specified 'breathing_category'" - else: - if self.breathing_category is not None: - print("'breathing_category' unused as 0 (breathing) was not chosen as 'expiratory_activity'") + br_params = lognormal_parameters[self.breathing_category - 1] + (self.samples,) + breathing_rates = lognormal(*br_params) - if self.breathing_category is not None: - csi, lamb = lognormal_parameters[self.breathing_category] - breathing_rates = lognormal(csi, lamb, self.samples) - else: - breathing_rates = None - - return qr_func(viral_loads, emissions, diameters, mask_efficiency, self.qid, breathing_rates) + return qr_func(viral_loads, emission_concentration, mask_efficiency, self.qid, breathing_rates) @staticmethod - def _calculate_qr(viral_load: float, emission: float, diameter: float, mask_efficiency: float, - copies_per_quantum: float, breathing_rate: typing.Optional[float] = None) -> float: + def _calculate_qr(viral_load: float, emission_concentration: float, mask_efficiency: float, + copies_per_quantum: float, breathing_rate: float) -> float: """ Calculates the quantum generation rate given a set of parameters. """ # Unit conversions - diameter *= 1e-4 viral_load = 10 ** viral_load - emission = (emission * 3600) if breathing_rate is None else (emission * 1e6) - volume = (4 * np.pi * (diameter / 2) ** 3) / 3 - if breathing_rate is None: - breathing_rate = 1 - - return viral_load * emission * volume * (1 - mask_efficiency) * breathing_rate / copies_per_quantum + return viral_load * emission_concentration * (1 - mask_efficiency) * breathing_rate / copies_per_quantum def individual_emission_rate(self, time) -> np.ndarray: """