added methods to read and parse dynamic occupancy. Probabilistic exposure and expected new cases were removed from results

This commit is contained in:
lrdossan 2024-07-10 15:59:54 +02:00
parent fbdeccd7c0
commit 527dd5ffa2
2 changed files with 87 additions and 18 deletions

View file

@ -171,13 +171,19 @@ def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable
long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
prob = np.array(model.infection_probability())
prob_dist_count, prob_dist_bins = np.histogram(
prob/100, bins=100, density=True)
prob_probabilistic_exposure = np.array(
model.total_probability_rule()).mean()
expected_new_cases = np.array(model.expected_new_cases()).mean()
exposed_presence_intervals = [
list(interval) for interval in model.exposed.presence_interval().boundaries()]
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
if form.exposure_option == "p_probabilistic_exposure":
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
else: prob_probabilistic_exposure = None
if ((isinstance(form.dynamic_infected_occupancy, typing.List) and len(form.dynamic_infected_occupancy) > 0) or
(isinstance(form.dynamic_exposed_occupancy, typing.List) and len(form.dynamic_exposed_occupancy) > 0)):
expected_new_cases = None
else:
expected_new_cases = np.array(model.expected_new_cases()).mean()
exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]
conditional_probability_data = None
uncertainties_plot_src = None

View file

@ -439,6 +439,56 @@ class VirusFormData(FormData):
] = respiratory_activity['percentage']
return (self.precise_activity['physical_activity'], respiratory_dict)
def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]):
### Data format validation ###
for occupancy in dynamic_occupancy:
# Check if each occupancy entry is a dictionary
if not isinstance(occupancy, typing.Dict):
raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}."')
# Check for required keys in each occupancy entry
dict_keys = list(occupancy.keys())
if "total_people" not in dict_keys:
raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys[0]}".')
else:
value = occupancy["total_people"]
# Check if the total_people value is a non-negative integer
if not isinstance(value, int):
raise ValueError(f"Total number of people should be integer. Got {value}.")
elif not value >= 0:
raise ValueError(f"Total number of people should be non-negative. Got {value}.")
if "start_time" not in dict_keys:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[1]}".')
if "finish_time" not in dict_keys:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[2]}".')
# Validate time format for start_time and finish_time
for time_key in ["start_time", "finish_time"]:
time = occupancy[time_key]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
transition_times = []
values = []
for occupancy in dynamic_occupancy:
start_time = time_string_to_minutes(occupancy['start_time'])/60
finish_time = time_string_to_minutes(occupancy['finish_time'])/60
transition_times.extend([start_time, finish_time])
values.append(occupancy['total_people'])
unique_transition_times_sorted = np.array(sorted(set(transition_times)))
if len(values) != len(unique_transition_times_sorted) - 1:
raise ValueError("Cannot compute dynamic occupancy with the inputs provided.")
population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant(
transition_times=tuple(unique_transition_times_sorted),
values=tuple(values)
)
population_presence: typing.Union[None, models.Interval] = None
return population_occupancy, population_presence
def infected_population(self) -> mc.InfectedPopulation:
# Initializes the virus
@ -453,18 +503,26 @@ class VirusFormData(FormData):
expiration_defn = {'Speaking': 1,
'Breathing': self.total_people - 1}
elif (self.activity_type == 'precise'):
activity_defn, expiration_defn = self.generate_precise_activity_expiration()
activity_defn, expiration_defn = self.generate_precise_activity_expiration() # TODO: what to do here?
activity = activity_distributions(self.data_registry)[activity_defn]
expiration = build_expiration(self.data_registry, expiration_defn)
infected_occupants = self.infected_people
if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0:
# If dynamic occupancy is defined, the generator will parse and validate the
# respective input to a format readable by the model - IntPiecewiseConstant.
infected_occupancy, infected_presence = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
else:
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
infected_occupancy = self.infected_people
infected_presence = self.infected_present_interval()
infected = mc.InfectedPopulation(
data_registry=self.data_registry,
number=infected_occupants,
number=infected_occupancy,
presence=infected_presence,
virus=virus,
presence=self.infected_present_interval(),
mask=self.mask(),
activity=activity,
expiration=expiration,
@ -478,11 +536,16 @@ class VirusFormData(FormData):
if self.activity_type == 'precise'
else str(self.data_registry.population_scenario_activity[self.activity_type]['activity']))
activity = activity_distributions(self.data_registry)[activity_defn]
infected_occupants = self.infected_people
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
exposed_occupants = self.total_people - infected_occupants
if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0:
# If dynamic occupancy is defined, the generator will parse and validate the
# respective input to a format readable by the model - IntPiecewiseConstant.
exposed_occupancy, exposed_presence = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy)
else:
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
exposed_occupancy = self.total_people - self.infected_people
exposed_presence = self.exposed_present_interval()
if (self.vaccine_option):
if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'):
@ -495,8 +558,8 @@ class VirusFormData(FormData):
host_immunity = 0.
exposed = mc.Population(
number=exposed_occupants,
presence=self.exposed_present_interval(),
number=exposed_occupancy,
presence=exposed_presence,
activity=activity,
mask=self.mask(),
host_immunity=host_immunity,