This commit is contained in:
markus 2021-01-22 14:23:49 +01:00
parent af9edf94ab
commit 34a035927a

View file

@ -254,221 +254,6 @@ def print_qr_info(qr_values: np.ndarray) -> None:
for quantile in (0.01, 0.05, 0.25, 0.50, 0.75, 0.95, 0.99):
print(f"qR_{quantile} = {np.quantile(qr_values, quantile)}")
@dataclass(frozen=True)
class MCVirus:
#: Biological decay (inactivation of the virus in air)
halflife: float
#: RNA copies / mL
viral_load_in_sputum: typing.Tuple[float, float]
@property
def decay_constant(self):
# Viral inactivation per hour (h^-1)
return np.log(2) / self.halflife
@dataclass(frozen=True)
class MCInfectedPopulation(models.Population):
#: The virus with which the population is infected.
virus: MCVirus
#: An integer signifying the expiratory activity of the infected subject
# (1 = breathing, 2 = speaking, 3 = speaking loudly)
expiratory_activity: int
# The total number of samples to be generated
samples: int
# The quantum infectious dose to be used in the calculations
qid: int
viral_load: typing.Optional[float] = None
def emission_rate_when_present(self) -> np.ndarray:
"""
Randomly samples values for the quantum generation rate
:return: A numpy array of length = samples, containing randomly generated qr-values
"""
# Extracting only the needed information from the pre-existing Mask class
masked = self.mask.exhale_efficiency != 0
(e_k, e_lambda), (d_k, d_lambda) = weibull_parameters[self.expiratory_activity]
emissions = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), e_k, loc=0, scale=e_lambda)
diameters = sct.weibull_min.isf(sct.norm.sf(np.random.normal(size=self.samples)), d_k, loc=0, scale=d_lambda)
if self.viral_load is None:
viral_loads = np.random.normal(loc=7.8, scale=1.7, size=self.samples)
else:
viral_loads = np.full(self.samples, self.viral_load)
mask_efficiency = [0.75, 0.81, 0.81][self.expiratory_activity] if masked else 0
qr_func = np.vectorize(self._calculate_qr)
# TODO: Add distributions for parameters
breathing_rate = 1
return qr_func(viral_loads, emissions, diameters, mask_efficiency, self.qid)
@staticmethod
def _calculate_qr(viral_load: float, emission: float, diameter: float, mask_efficiency: float,
copies_per_quantum: float, breathing_rate: typing.Optional[float] = None) -> float:
"""
Calculates the quantum generation rate given a set of parameters.
"""
# Unit conversions
diameter *= 1e-4
viral_load = 10 ** viral_load
emission = (emission * 3600) if breathing_rate is None else (emission * 1e6)
volume = (4 * np.pi * (diameter / 2) ** 3) / 3
if breathing_rate is None:
breathing_rate = 1
return viral_load * emission * volume * (1 - mask_efficiency) * breathing_rate / copies_per_quantum
def individual_emission_rate(self, time) -> np.ndarray:
"""
The emission rate of a single individual in the population.
"""
# Note: The original model avoids time dependence on the emission rate
# at the cost of implementing a piecewise (on time) concentration function.
if not self.person_present(time):
return np.zeros(self.samples)
# Note: It is essential that the value of the emission rate is not
# itself a function of time. Any change in rate must be accompanied
# with a declaration of state change time, as is the case for things
# like Ventilation.
return self.emission_rate_when_present()
@functools.lru_cache()
def emission_rate(self, time) -> float:
"""
The emission rate of the entire population.
"""
return self.individual_emission_rate(time) * self.number
@dataclass(frozen=True)
class MCConcentrationModel:
room: models.Room
ventilation: models.Ventilation
infected: MCInfectedPopulation
@property
def virus(self):
return self.infected.virus
def infectious_virus_removal_rate(self, time: float) -> float:
# Particle deposition on the floor
vg = 1 * 10 ** -4
# Height of the emission source to the floor - i.e. mouth/nose (m)
h = 1.5
# Deposition rate (h^-1)
k = (vg * 3600) / h
return k + self.virus.decay_constant + self.ventilation.air_exchange(self.room, time)
@functools.lru_cache()
def state_change_times(self):
"""
All time dependent entities on this model must provide information about
the times at which their state changes.
"""
state_change_times = set()
state_change_times.update(self.infected.presence.transition_times())
state_change_times.update(self.ventilation.transition_times())
return sorted(state_change_times)
def last_state_change(self, time: float):
"""
Find the most recent state change.
"""
for change_time in self.state_change_times()[::-1]:
if change_time < time:
return change_time
return 0
@functools.lru_cache()
def concentration(self, time: float) -> np.ndarray:
if time == 0:
return np.zeros(self.infected.samples)
IVRR = self.infectious_virus_removal_rate(time)
V = self.room.volume
t_last_state_change = self.last_state_change(time)
concentration_at_last_state_change = self.concentration(t_last_state_change)
delta_time = time - t_last_state_change
fac = np.exp(-IVRR * delta_time)
concentration_limit = (self.infected.emission_rate(time)) / (IVRR * V)
return concentration_limit * (1 - fac) + concentration_at_last_state_change * fac
@dataclass(frozen=True)
class MCExposureModel:
#: The virus concentration model which this exposure model should consider.
concentration_model: MCConcentrationModel
#: The population of non-infected people to be used in the model.
exposed: models.Population
#: The number of times the exposure event is repeated (default 1).
repeats: int = 1
def quanta_exposure(self) -> np.ndarray:
"""The number of virus quanta per meter^3."""
exposure = np.zeros(self.concentration_model.infected.samples)
for start, stop in self.exposed.presence.boundaries():
concentrations = np.asarray([self.concentration_model.concentration(t) for t in np.linspace(start, stop)])
integrals = np.trapz(concentrations, axis=0)
exposure += integrals
return exposure * self.repeats
def infection_probability(self):
exposure = self.quanta_exposure()
inf_aero = (
self.exposed.activity.inhalation_rate *
(1 - self.exposed.mask.η_inhale) *
exposure
)
# Probability of infection.
return (1 - np.exp(-inf_aero)) * 100
def expected_new_cases(self):
prob = self.infection_probability()
exposed_occupants = self.exposed.number
return prob * exposed_occupants / 100
# def reproduction_number(self):
# """
# The reproduction number can be thought of as the expected number of
# cases directly generated by one infected case in a population.
#
# """
# if self.concentration_model.infected.number == 1:
# return self.expected_new_cases()
#
# # Create an equivalent exposure model but with precisely
# # one infected case.
# single_exposure_model = nested_replace(
# self, {'concentration_model.infected.number': 1}
# )
#
# return single_exposure_model.expected_new_cases()
baseline_mc_exposure_model = MCExposureModel(
concentration_model=MCConcentrationModel(