diff --git a/caimira/models.py b/caimira/models.py index 9ef7eece..ed0767f3 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -1494,9 +1494,17 @@ class ExposureModel: c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))): raise ValueError("If the diameter is an array, none of the ventilation parameters " "or virus decay constant can be arrays at the same time.") - if not isinstance(self.exposed.number, int): - raise NotImplementedError("Cannot use dynamic occupancy for" - " the exposed population") + + @method_cache + def population_state_change_times(self) -> typing.List[float]: + """ + All time dependent population entities on this model must provide information + about the times at which their state changes. + """ + state_change_times = set(self.concentration_model.infected.presence_interval().transition_times()) + state_change_times.update(self.exposed.presence_interval().transition_times()) + + return sorted(state_change_times) def long_range_fraction_deposited(self) -> _VectorisedFloat: """ @@ -1681,6 +1689,59 @@ class ExposureModel: else: return 0 + + def dynamic_total_probability_rule(self): + if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0): + total_probability_rule = [] + + state_change_times = self.population_state_change_times() + for interval in zip(state_change_times[:-1], state_change_times[1:]): + exposed_present = self.exposed.people_present(interval[1]) + infected_present = self.concentration_model.infected.people_present(interval[1]) + + # Create an equivalent exposure model but changing the number of infected cases. + total_people = exposed_present + infected_present + max_num_infected = (total_people if total_people < 10 else 10) + # The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability. + # To be on the safe side, a hard coded limit with a safety margin of 2x was set. + # Therefore we decided a hard limit of 10 infected people. + for num_infected in range(1, max_num_infected + 1): + exposure_model = nested_replace( + self, { + 'concentration_model.infected': InfectedPopulation( + number=num_infected, + presence=SpecificInterval(present_times = (interval, )), + mask=self.concentration_model.infected.mask, + activity=self.concentration_model.infected.activity, + host_immunity=self.concentration_model.infected.host_immunity, + virus=self.concentration_model.infected.virus, + expiration=self.concentration_model.infected.expiration, + ), + 'exposed': Population( + number=exposed_present, + presence=SpecificInterval(present_times=(interval,)), + mask=self.exposed.mask, + activity=self.exposed.activity, + host_immunity=self.exposed.host_immunity, + ), + } + ) + + prob_ind = exposure_model.infection_probability().mean() / 100 + n = total_people - num_infected + # By means of the total probability rule + prob_at_least_one_infected = 1 - (1 - prob_ind)**n + total_probability_rule.append(prob_at_least_one_infected * + self.geographical_data.probability_meet_infected_person( + self.concentration_model.infected.virus, + num_infected, total_people)) + + if (isinstance(self.exposed.number, IntPiecewiseConstant) or + isinstance(self.concentration_model.infected.number, IntPiecewiseConstant)): + return (1 - np.prod([(1 - prob) for prob in total_probability_rule])) * 100 + else: + return 0 + def expected_new_cases(self) -> _VectorisedFloat: # Create an equivalent exposure model without short-range interactions, if any. if (len(self.short_range) == 0): @@ -1689,8 +1750,7 @@ class ExposureModel: else: prob = self.infection_probability() - exposed_occupants = self.exposed.number - return prob * exposed_occupants / 100 + return prob * self.exposed.number / 100 def reproduction_number(self) -> _VectorisedFloat: """