Merge branch 'feature/improved-validation' into 'master'

Improve the validation of FormData

See merge request cara/cara!122
This commit is contained in:
Nicolas Mounet 2021-01-06 14:36:40 +00:00
commit 718a08ef1b
2 changed files with 307 additions and 80 deletions

View file

@ -1,5 +1,6 @@
from dataclasses import dataclass
import html
import logging
import typing
import numpy as np
@ -8,6 +9,9 @@ from cara import models
from cara import data
LOG = logging.getLogger(__name__)
@dataclass
class FormData:
# Number of minutes after 00:00
@ -53,6 +57,8 @@ class FormData:
@classmethod
def from_dict(cls, form_data: typing.Dict) -> "FormData":
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
valid_na_values = ['windows_open', 'window_type', 'mechanical_ventilation_type']
for name in valid_na_values:
@ -63,26 +69,6 @@ class FormData:
if not form_data.get(name, ''):
form_data[name] = '00:00'
validation_tuples = [('activity_type', ACTIVITY_TYPES),
('event_type', EVENT_TYPES),
('mechanical_ventilation_type', MECHANICAL_VENTILATION_TYPES),
('mask_type', MASK_TYPES),
('mask_wearing', MASK_WEARING),
('ventilation_type', VENTILATION_TYPES),
('volume_type', VOLUME_TYPES),
('windows_open', WINDOWS_OPEN),
('window_type', WINDOWS_TYPES)]
for key, valid_set in validation_tuples:
if key not in form_data:
raise ValueError(f"Missing key {key}")
if form_data[key] not in valid_set:
raise ValueError(f"{form_data[key]} is not a valid value for {key}")
if (form_data['ventilation_type'] == 'natural' and
form_data['window_type'] == 'not-applicable'):
raise ValueError("window_type cannot be ''not-applicable'' if "
"ventilation_type is ''natural''")
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
@ -93,9 +79,22 @@ class FormData:
if value == "":
form_data[key] = "0"
return cls(
activity_finish=time_string_to_minutes(form_data['activity_finish']),
activity_start=time_string_to_minutes(form_data['activity_start']),
time_attributes = [
'activity_start', 'activity_finish', 'lunch_start',
'lunch_finish', 'infected_start', 'infected_finish',
]
for attr_name in time_attributes:
form_data[attr_name] = time_string_to_minutes(form_data[attr_name])
boolean_attributes = [
'hepa_option', 'lunch_option',
]
for attr_name in boolean_attributes:
form_data[attr_name] = form_data[attr_name] == '1'
instance = cls(
activity_finish=form_data['activity_finish'],
activity_start=form_data['activity_start'],
activity_type=form_data['activity_type'],
air_changes=float(form_data['air_changes']),
air_supply=float(form_data['air_supply']),
@ -105,11 +104,11 @@ class FormData:
event_type=form_data['event_type'],
floor_area=float(form_data['floor_area']),
hepa_amount=float(form_data['hepa_amount']),
hepa_option=(form_data['hepa_option'] == '1'),
hepa_option=form_data['hepa_option'],
infected_people=int(form_data['infected_people']),
lunch_finish=time_string_to_minutes(form_data['lunch_finish']),
lunch_option=(form_data['lunch_option'] == '1'),
lunch_start=time_string_to_minutes(form_data['lunch_start']),
lunch_finish=form_data['lunch_finish'],
lunch_option=form_data['lunch_option'],
lunch_start=form_data['lunch_start'],
mask_type=form_data['mask_type'],
mask_wearing=form_data['mask_wearing'],
mechanical_ventilation_type=form_data['mechanical_ventilation_type'],
@ -130,9 +129,44 @@ class FormData:
window_width=float(form_data['window_width']),
windows_number=int(form_data['windows_number']),
windows_open=form_data['windows_open'],
infected_start=time_string_to_minutes(form_data['infected_start']),
infected_finish=time_string_to_minutes(form_data['infected_finish']),
infected_start=form_data['infected_start'],
infected_finish=form_data['infected_finish'],
)
instance.validate()
return instance
def validate(self):
time_intervals = [
['activity_start', 'activity_finish'],
['lunch_start', 'lunch_finish'],
['infected_start', 'infected_finish'],
]
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
end = getattr(self, end_name)
if start > end:
raise ValueError(
f"{start_name} must be less than {end_name}. Got {start} and {end}.")
validation_tuples = [('activity_type', ACTIVITY_TYPES),
('event_type', EVENT_TYPES),
('mechanical_ventilation_type', MECHANICAL_VENTILATION_TYPES),
('mask_type', MASK_TYPES),
('mask_wearing', MASK_WEARING),
('ventilation_type', VENTILATION_TYPES),
('volume_type', VOLUME_TYPES),
('windows_open', WINDOWS_OPEN),
('window_type', WINDOWS_TYPES)]
for attr_name, valid_set in validation_tuples:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
if (
self.ventilation_type == 'natural'
and self.window_type == 'not-applicable'
):
raise ValueError("window_type cannot be 'not-applicable' if "
"ventilation_type is 'natural'")
def build_model(self) -> models.ExposureModel:
return model_from_form(self)
@ -277,6 +311,12 @@ class FormData:
break_times.append((begin, end))
return tuple(break_times)
def lunch_break_times(self) -> typing.Tuple[typing.Tuple[int, int]]:
result = []
if self.lunch_option:
result.append((self.lunch_start, self.lunch_finish))
return tuple(result)
def coffee_break_times(self) -> typing.Tuple[typing.Tuple[int, int]]:
if not self.coffee_breaks:
return ()
@ -297,61 +337,116 @@ class FormData:
breaks = self._compute_breaks_in_interval(self.activity_start, self.activity_finish, self.coffee_breaks)
return breaks
def present_interval(self, start, finish) -> models.Interval:
leave_times = []
enter_times = []
if self.lunch_option:
leave_times.append(self.lunch_start)
enter_times.append(self.lunch_finish)
def present_interval(
self,
start: int,
finish: int,
breaks: typing.Tuple[typing.Tuple[int, int], ...] = None,
) -> models.Interval:
"""
Calculate the presence interval given the start and end times (in minutes), and
a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes).
for coffee_start, coffee_end in self.coffee_break_times():
leave_times.append(coffee_start)
enter_times.append(coffee_end)
"""
if not breaks:
# If there are no breaks, the interval is the start and end.
return models.SpecificInterval(((start, finish),))
# These lists represent the times where the infected person leaves or enters the room, respectively, sorted in
# reverse order. Note that these lists allows the person to "leave" when they should not even be present in the
# room. The following loop handles this.
leave_times.sort(reverse=True)
enter_times.sort(reverse=True)
# Order the breaks by their start-time, and ensure that they are monotonic
# and that the start of one break happens after the end of another.
breaks = sorted(breaks, key=lambda break_pair: break_pair[0])
for break_start, break_end in breaks:
if break_start >= break_end:
raise ValueError("Break ends before it begins.")
prev_break_end = breaks[0][1]
for break_start, break_end in breaks[1:]:
if prev_break_end >= break_start:
raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
prev_break_end = break_end
# This loop iterates through the lists above, populating present_intervals with (enter, leave) intervals
# representing the infected person entering and leaving the room. Note that if one of the evenly spaced coffee-
# breaks happens to coincide with the lunch-break, it is simply ignored.
present_intervals = []
time = start
is_present = True
while time < finish:
if is_present:
if not leave_times:
present_intervals.append((time / 60, finish / 60))
break
if leave_times[-1] < time:
leave_times.pop()
else:
new_time = leave_times.pop()
if time / 60 != min(new_time, finish) / 60 :
present_intervals.append((time / 60, min(new_time, finish) / 60))
is_present = False
time = new_time
def hours2time(hours: float):
# Convert times like 14.5 to strings, like "14:30"
return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
else:
if not enter_times:
break
# def add_interval(start, end):
if enter_times[-1] < time:
enter_times.pop()
else:
is_present = True
time = enter_times.pop()
current_time = start
LOG.debug(f"starting time march at {hours2time(current_time/60)} to {hours2time(finish/60)}")
# As we step through the breaks. For each break there are 6 important cases
# we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
# 1. The interval is entirely before the break. S < E <= Bs < Be
# 2. The interval straddles the start of the break. S < Bs < E <= Be
# 3. The break is entirely inside the interval. S < Bs < Be <= E
# 4. The interval is entirely inside the break. Bs <= S < E <= Be
# 5. The interval straddles the end of the break. Bs <= S < Be <= E
# 6. The interval is entirely after the break. Bs < Be <= S < E
for current_break in breaks:
if current_time >= finish:
break
LOG.debug(f"handling break {hours2time(current_break[0]/60)}-{hours2time(current_break[1]/60)} "
f" (current time: {hours2time(current_time/60)})")
break_s, break_e = current_break
case1 = finish <= break_s
case2 = current_time < break_s < finish < break_e
case3 = current_time < break_s < break_e <= finish
case4 = break_s <= current_time < finish <= break_e
case5 = break_s <= current_time < break_e < finish
case6 = break_e <= current_time
if case1:
LOG.debug(f"case 1: interval entirely before break")
present_intervals.append((current_time / 60, finish / 60))
LOG.debug(f" + added interval {hours2time(present_intervals[-1][0])} "
f"- {hours2time(present_intervals[-1][1])}")
current_time = finish
elif case2:
LOG.debug(f"case 2: interval straddles start of break")
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {hours2time(present_intervals[-1][0])} "
f"- {hours2time(present_intervals[-1][1])}")
current_time = break_e
elif case3:
LOG.debug(f"case 3: break entirely inside interval")
# We add the bit before the break, but not the bit afterwards,
# as it may hit another break.
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {hours2time(present_intervals[-1][0])} "
f"- {hours2time(present_intervals[-1][1])}")
current_time = break_e
elif case4:
LOG.debug(f"case 4: interval entirely inside break")
current_time = finish
elif case5:
LOG.debug(f"case 5: interval straddles end of break")
current_time = break_e
elif case6:
LOG.debug(f"case 6: interval entirely after the break")
if current_time < finish:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
return self.present_interval(self.infected_start, self.infected_finish)
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=self.lunch_break_times() + self.coffee_break_times(),
)
def exposed_present_interval(self) -> models.Interval:
return self.present_interval(self.activity_start, self.activity_finish)
return self.present_interval(
self.activity_start, self.activity_finish,
breaks=self.lunch_break_times() + self.coffee_break_times(),
)
def build_expiration(expiration_definition) -> models.Expiration:

View file

@ -16,9 +16,8 @@ def baseline_form(baseline_form_data):
def test_model_from_dict(baseline_form_data):
model = model_generator.FormData.from_dict(baseline_form_data)
# TODO:
# assert model.ventilation == cara.models.Ventilation()
form = model_generator.FormData.from_dict(baseline_form_data)
assert isinstance(form.build_model(), models.ExposureModel)
def test_blend_expiration():
@ -156,21 +155,154 @@ def test_exposed_present_intervals(baseline_form):
baseline_form.activity_finish = 17 * 60
baseline_form.lunch_start = 12 * 60 + 30
baseline_form.lunch_finish = 13 * 60 + 30
baseline_form.infected_start = 10 * 60
baseline_form.infected_finish = 15 * 60
correct = ((9, 10+37/60), (10+52/60, 12.5), (13.5, 15+7/60), (15+22/60, 17.0))
assert baseline_form.exposed_present_interval().present_times == correct
def test_exposed_present_intervals_starting_with_lunch(baseline_form):
baseline_form.coffee_breaks = 0
baseline_form.activity_start = baseline_form.lunch_start = 13 * 60
baseline_form.activity_finish = 18 * 60
baseline_form.lunch_finish = 14 * 60
baseline_form.infected_start = 9 * 60
baseline_form.infected_finish = 13 * 60
correct = ((14.0, 18.0),)
correct = ((14.0, 18.0), )
assert baseline_form.exposed_present_interval().present_times == correct
def test_exposed_present_intervals_ending_with_lunch(baseline_form):
baseline_form.coffee_breaks = 0
baseline_form.activity_start = 11 * 60
baseline_form.activity_finish = baseline_form.lunch_start = 13 * 60
baseline_form.lunch_finish = 14 * 60
correct = ((11.0, 13.0),)
assert baseline_form.exposed_present_interval().present_times == correct
def test_exposed_present_lunch_end_before_beginning(baseline_form):
baseline_form.coffee_breaks = 0
baseline_form.lunch_start = 14 * 60
baseline_form.lunch_finish = 13 * 60
with pytest.raises(ValueError):
baseline_form.validate()
@pytest.fixture
def coffee_break_between_1045_and_1115(baseline_form):
baseline_form.coffee_breaks = 1
baseline_form.coffee_duration = 30
baseline_form.activity_start = 10 * 60
baseline_form.activity_finish = 12 * 60
baseline_form.lunch_option = False
coffee_breaks = baseline_form.coffee_break_times()
assert coffee_breaks == ((10.75 * 60, 11.25 * 60),)
return baseline_form
def test_present_before_coffee(coffee_break_between_1045_and_1115):
breaks = coffee_break_between_1045_and_1115.coffee_break_times()
interval = coffee_break_between_1045_and_1115.present_interval(
10.5 * 60, 11 * 60, breaks=breaks)
assert interval.boundaries() == ((10.5, 10.75),)
def test_present_after_coffee(coffee_break_between_1045_and_1115):
breaks = coffee_break_between_1045_and_1115.coffee_break_times()
interval = coffee_break_between_1045_and_1115.present_interval(
11 * 60, 11.5 * 60, breaks=breaks)
assert interval.boundaries() == ((11.25, 11.5),)
def test_present_when_coffee_starts(coffee_break_between_1045_and_1115):
breaks = coffee_break_between_1045_and_1115.coffee_break_times()
interval = coffee_break_between_1045_and_1115.present_interval(
10.75 * 60, 11.5 * 60, breaks=breaks)
assert interval.boundaries() == ((11.25, 11.5),)
def test_present_when_coffee_ends(coffee_break_between_1045_and_1115):
breaks = coffee_break_between_1045_and_1115.coffee_break_times()
interval = coffee_break_between_1045_and_1115.present_interval(
10.5 * 60, 11.25 * 60, breaks=breaks)
assert interval.boundaries() == ((10.5, 10.75), )
def test_present_only_for_coffee_ends(coffee_break_between_1045_and_1115):
breaks = coffee_break_between_1045_and_1115.coffee_break_times()
interval = coffee_break_between_1045_and_1115.present_interval(
10.75 * 60, 11.25 * 60, breaks=breaks)
assert interval.boundaries() == ()
def time2mins(time: str):
# Convert times like "14:30" to decimal, like 14.5 * 60.
return int(time.split(':')[0]) * 60 + int(time.split(':')[1])
def hours2time(hours: float):
# Convert times like 14.5 to strings, like "14:30"
return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
def assert_boundaries(interval, boundaries_in_time_string_form):
boundaries = [(hours2time(start), hours2time(end))
for start, end in interval.boundaries()]
assert boundaries == boundaries_in_time_string_form
@pytest.fixture
def breaks_every_25_mins_for_20_mins(baseline_form):
baseline_form.coffee_breaks = 4
baseline_form.coffee_duration = 20
baseline_form.activity_start = time2mins("10:00")
baseline_form.activity_finish = time2mins("14:10")
baseline_form.lunch_start = time2mins("11:55")
baseline_form.lunch_finish = time2mins("12:15")
baseline_form.lunch_option = True
breaks = baseline_form.coffee_break_times() + baseline_form.lunch_break_times()
interval = baseline_form.present_interval(
baseline_form.activity_start, baseline_form.activity_finish, breaks=breaks,
)
assert_boundaries(interval, [
('10:00', '10:25'),
('10:45', '11:10'),
('11:30', '11:55'),
('12:15', '12:40'),
('13:00', '13:25'),
('13:45', '14:10')
])
return baseline_form
def test_present_after_two_breaks_for_small_interval(breaks_every_25_mins_for_20_mins):
breaks = breaks_every_25_mins_for_20_mins.coffee_break_times() + breaks_every_25_mins_for_20_mins.lunch_break_times()
# The first two breaks start at 10:25 and 11:10.
interval = breaks_every_25_mins_for_20_mins.present_interval(
time2mins("11:35"), time2mins("11:40"), breaks=breaks,
)
# Only present for a short duration of a presence period.
assert_boundaries(interval, [('11:35', '11:40')])
def test_present_only_during_second_break(breaks_every_25_mins_for_20_mins):
breaks = breaks_every_25_mins_for_20_mins.coffee_break_times() + breaks_every_25_mins_for_20_mins.lunch_break_times()
# The first two breaks start at 10:25 and 11:10.
interval = breaks_every_25_mins_for_20_mins.present_interval(
time2mins("11:15"), time2mins("11:20"), breaks=breaks
)
# No presence.
assert_boundaries(interval, [])
def test_valid_no_lunch(baseline_form):
# Check that it is valid to have a 0 length lunch if no lunch is selected.
baseline_form.lunch_option = False
baseline_form.lunch_start = 0
baseline_form.lunch_finish = 0
assert baseline_form.validate() is None
def test_coffee_lunch_breaks(baseline_form):
baseline_form.coffee_duration = 30
baseline_form.coffee_breaks = 4
@ -190,7 +322,7 @@ def test_coffee_lunch_breaks_unbalance(baseline_form):
baseline_form.activity_finish = 13 * 60 + 30
baseline_form.lunch_start = 12 * 60 + 30
baseline_form.lunch_finish = 13 * 60 + 30
correct = ((9, 9+50/60), (10+20/60, 11+10/60), (11+40/60, 12+30/60) )
correct = ((9, 9+50/60), (10+20/60, 11+10/60), (11+40/60, 12+30/60))
np.testing.assert_allclose(baseline_form.exposed_present_interval().present_times, correct, rtol=1e-14)