diff --git a/cara/apps/calculator/__init__.py b/cara/apps/calculator/__init__.py index f0463f15..674a6818 100644 --- a/cara/apps/calculator/__init__.py +++ b/cara/apps/calculator/__init__.py @@ -1,7 +1,7 @@ import html import json from pathlib import Path -from typing import Optional, Awaitable +from typing import Coroutine, Any, Optional, Awaitable import jinja2 import mistune @@ -14,7 +14,7 @@ from .user import AuthenticatedUser, AnonymousUser class BaseRequestHandler(RequestHandler): - async def prepare(self) -> Optional[Awaitable[None]]: + async def prepare(self): """Called at the beginning of a request before `get`/`post`/etc.""" username = self.request.headers.get("X-ADFS-LOGIN", None) if username: diff --git a/cara/apps/calculator/model_generator.py b/cara/apps/calculator/model_generator.py index b5c95ca7..c6ff4508 100644 --- a/cara/apps/calculator/model_generator.py +++ b/cara/apps/calculator/model_generator.py @@ -171,7 +171,7 @@ class FormData: def build_model(self) -> models.ExposureModel: return model_from_form(self) - def ventilation(self) -> models.Ventilation: + def ventilation(self) -> typing.Union[models.Ventilation, models.MultipleVentilation]: always_on = models.PeriodicInterval(period=120, duration=120) # Initializes a ventilation instance as a window if 'natural' is selected, or as a HEPA-filter otherwise if self.ventilation_type == 'natural': @@ -189,10 +189,12 @@ class FormData: inside_temp = models.PiecewiseConstant((0, 24), (293,)) outside_temp = data.GenevaTemperatures[month] + ventilation: models.Ventilation if self.window_type == 'sliding': ventilation = models.SlidingWindow( active=window_interval, - inside_temp=inside_temp, outside_temp=outside_temp, + inside_temp=inside_temp, + outside_temp=outside_temp, window_height=self.window_height, opening_length=self.opening_distance, number_of_windows=self.windows_number, @@ -200,7 +202,8 @@ class FormData: elif self.window_type == 'hinged': ventilation = models.HingedWindow( active=window_interval, - inside_temp=inside_temp, outside_temp=outside_temp, + inside_temp=inside_temp, + outside_temp=outside_temp, window_height=self.window_height, window_width=self.window_width, opening_length=self.opening_distance, @@ -218,7 +221,7 @@ class FormData: if self.hepa_option: hepa = models.HEPAFilter(active=always_on, q_air_mech=self.hepa_amount) - return models.MultipleVentilation((ventilation,hepa)) + return models.MultipleVentilation((ventilation, hepa)) else: return ventilation @@ -301,7 +304,7 @@ class FormData: ) return exposed - def _compute_breaks_in_interval(self, start, finish, n_breaks) -> typing.Tuple[typing.Tuple[int, int]]: + def _compute_breaks_in_interval(self, start, finish, n_breaks) -> models.BoundarySequence_t: break_delay = ((finish - start) - (n_breaks * self.coffee_duration)) // (n_breaks+1) break_times = [] end = start @@ -311,13 +314,13 @@ class FormData: break_times.append((begin, end)) return tuple(break_times) - def lunch_break_times(self) -> typing.Tuple[typing.Tuple[int, int]]: + def lunch_break_times(self) -> models.BoundarySequence_t: result = [] if self.lunch_option: result.append((self.lunch_start, self.lunch_finish)) return tuple(result) - def coffee_break_times(self) -> typing.Tuple[typing.Tuple[int, int]]: + def coffee_break_times(self) -> models.BoundarySequence_t: if not self.coffee_breaks: return () if self.lunch_option: @@ -475,10 +478,10 @@ def expiration_blend(expiration_weights: typing.Dict[models.Expiration, int]) -> ejection_factor += np.array(expiration.ejection_factor) * weight particle_sizes += np.array(expiration.particle_sizes) * weight - return models.Expiration( - ejection_factor=tuple(ejection_factor/total_weight), - particle_sizes=tuple(particle_sizes/total_weight), - ) + r_ejection_factor: typing.Tuple[float, float, float, float] = tuple(ejection_factor/total_weight) # type: ignore + r_particle_sizes: typing.Tuple[float, float, float, float] = tuple(particle_sizes/total_weight) # type: ignore + + return models.Expiration(ejection_factor=r_ejection_factor, particle_sizes=r_particle_sizes) def model_from_form(form: FormData) -> models.ExposureModel: diff --git a/cara/apps/calculator/report_generator.py b/cara/apps/calculator/report_generator.py index 27abbf80..11eab671 100644 --- a/cara/apps/calculator/report_generator.py +++ b/cara/apps/calculator/report_generator.py @@ -113,9 +113,9 @@ def minutes_to_time(minutes: int) -> str: return f"{hour_string}:{minute_string}" -def readable_minutes(minutes: int) -> str: - time = minutes +def readable_minutes(minutes: int) -> str: + time = float(minutes) unit = " minute" if time % 60 == 0: time = minutes/60 @@ -124,19 +124,20 @@ def readable_minutes(minutes: int) -> str: unit += "s" if time.is_integer(): - time = "{:0.0f}".format(time) + time_str = "{:0.0f}".format(time) else: - time = "{0:.2f}".format(time) + time_str = "{0:.2f}".format(time) - return time + unit + return time_str + unit -def non_zero_percentage (percentage: int) -> str: +def non_zero_percentage(percentage: int) -> str: if percentage < 0.01: return "<0.01%" else: return "{:0.0f}%".format(percentage) + def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, models.ExposureModel]: scenarios = {} @@ -243,7 +244,7 @@ def build_report(model: models.ExposureModel, form: FormData): cara_templates = Path(__file__).parent.parent / "templates" calculator_templates = Path(__file__).parent / "templates" env = jinja2.Environment( - loader=jinja2.FileSystemLoader([cara_templates, calculator_templates]), + loader=jinja2.FileSystemLoader([str(cara_templates), str(calculator_templates)]), undefined=jinja2.StrictUndefined, ) env.filters['non_zero_percentage'] = non_zero_percentage diff --git a/cara/apps/expert.py b/cara/apps/expert.py index 3dbd634d..a9b0223f 100644 --- a/cara/apps/expert.py +++ b/cara/apps/expert.py @@ -32,9 +32,9 @@ WidgetPairType = typing.Tuple[widgets.Widget, widgets.Widget] class WidgetGroup: - def __init__(self, label_widget_pairs: typing.Sequence[WidgetPairType]): - self.labels = [] - self.widgets = [] + def __init__(self, label_widget_pairs: typing.Iterable[WidgetPairType]): + self.labels: typing.List[widgets.Widget] = [] + self.widgets: typing.List[widgets.Widget] = [] self.add_pairs(label_widget_pairs) def set_visible(self, visible: bool): @@ -46,10 +46,10 @@ class WidgetGroup: widget.layout.visible = False widget.layout.display = 'none' - def pairs(self) -> typing.Sequence[WidgetPairType]: + def pairs(self) -> typing.Iterable[WidgetPairType]: return zip(*[self.labels, self.widgets]) - def add_pairs(self, label_widget_pairs: typing.Sequence[WidgetPairType]): + def add_pairs(self, label_widget_pairs: typing.Iterable[WidgetPairType]): labels, widgets_ = zip(*label_widget_pairs) self.labels.extend(labels) self.widgets.extend(widgets_) @@ -190,13 +190,13 @@ class ExposureComparissonResult(View): return ax def scenarios_updated(self, scenarios: typing.Sequence[ScenarioType], _): - labels, models = zip(*scenarios) - conc_models: typing.Tuple[models.ConcentrationModel] = tuple( - model.concentration_model.dcs_instance() for model in models + updated_labels, updated_models = zip(*scenarios) + conc_models: typing.Tuple[models.ConcentrationModel, ...] = tuple( + model.concentration_model.dcs_instance() for model in updated_models ) - self.update_plot(conc_models, labels) + self.update_plot(conc_models, updated_labels) - def update_plot(self, conc_models: typing.Tuple[models.ConcentrationModel], labels: typing.Tuple[str]): + def update_plot(self, conc_models: typing.Tuple[models.ConcentrationModel, ...], labels: typing.Tuple[str, ...]): self.ax.lines.clear() start, finish = models_start_end(conc_models) ts = np.linspace(start, finish, num=250) @@ -268,12 +268,12 @@ class ModelWidgets(View): outside_temp.observe(outsidetemp_change, names=['value']) auto_width = widgets.Layout(width='auto') - return WidgetGroup([ - [ + return WidgetGroup(( + ( widgets.Label('Outside temperature (℃)', layout=auto_width,), outside_temp, - ], - ]) + ), + )) def _build_window(self, node) -> WidgetGroup: period = widgets.IntSlider(value=node.active.period, min=0, max=240) @@ -314,24 +314,24 @@ class ModelWidgets(View): toggle_outsidetemp(outsidetemp_w.value) auto_width = widgets.Layout(width='auto') - result = WidgetGroup([ - [ + result = WidgetGroup(( + ( widgets.Label('Interval between openings (minutes)', layout=auto_width), period, - ], - [ + ), + ( widgets.Label('Duration of opening (minutes)', layout=auto_width), interval, - ], - [ + ), + ( widgets.Label('Inside temperature (℃)', layout=auto_width), inside_temp, - ], - [ + ), + ( widgets.Label('Outside temperature scheme', layout=auto_width), outsidetemp_w, - ] - ]) + ) + )) for sub_group in outsidetemp_widgets.values(): result.add_pairs(sub_group.pairs()) return result @@ -362,9 +362,9 @@ class ModelWidgets(View): month_choice.observe(on_month_change, names=['value']) return WidgetGroup( - [ - [widgets.Label("Month"), month_choice], - ] + ( + (widgets.Label("Month"), month_choice), + ) ) def _build_activity(self, node): diff --git a/cara/dataclass_utils.py b/cara/dataclass_utils.py index 50654d43..cdfd867b 100644 --- a/cara/dataclass_utils.py +++ b/cara/dataclass_utils.py @@ -2,13 +2,9 @@ import dataclasses import typing -DCInst = typing.TypeVar('T') - - -def nested_replace(obj: DCInst, new_values: typing.Dict[str, typing.Any]) -> DCInst: - """ - Replace an attribute on a dataclass, much like dataclasses.replace, except it - supports nested replacement definitions. For example: +def nested_replace(obj, new_values: typing.Dict[str, typing.Any]): + """Replace an attribute on a dataclass, much like dataclasses.replace, + except it supports nested replacement definitions. For example: >>> new_obj = nested_replace(obj, {'attr1.sub_attr2.sub_sub_attr3': 4}) >>> new_obj.attr1.sub_attr2.sub_sub_attr3 diff --git a/cara/models.py b/cara/models.py index 4478c1ce..dbdaa329 100644 --- a/cara/models.py +++ b/cara/models.py @@ -12,6 +12,11 @@ class Room: volume: float +Time_t = typing.TypeVar('Time_t', float, int) +BoundaryPair_t = typing.Tuple[Time_t, Time_t] +BoundarySequence_t = typing.Union[typing.Tuple[BoundaryPair_t, ...], typing.Tuple] + + @dataclass(frozen=True) class Interval: """ @@ -26,7 +31,7 @@ class Interval: start < t <= end """ - def boundaries(self) -> typing.Tuple[typing.Tuple[float, float], ...]: + def boundaries(self) -> BoundarySequence_t: return () def transition_times(self) -> typing.Set[float]: @@ -48,23 +53,23 @@ class SpecificInterval(Interval): #: A sequence of times (start, stop), in hours, that the infected person #: is present. The flattened list of times must be strictly monotonically #: increasing. - present_times: typing.Tuple[typing.Tuple[float, float], ...] + present_times: BoundarySequence_t - def boundaries(self): + def boundaries(self) -> BoundarySequence_t: return self.present_times @dataclass(frozen=True) class PeriodicInterval(Interval): #: How often does the interval occur (minutes). - period: int + period: float #: How long does the interval occur for (minutes). #: A value greater than :data:`period` signifies the event is permanently #: occurring, a value of 0 signifies that the event never happens. - duration: int + duration: float - def boundaries(self) -> typing.Tuple[typing.Tuple[float, float], ...]: + def boundaries(self) -> BoundarySequence_t: if self.period == 0 or self.duration == 0: return tuple() result = [] @@ -91,16 +96,17 @@ class PiecewiseConstant: if tuple(sorted(set(self.transition_times))) != self.transition_times: raise ValueError("transition_times should not contain duplicated elements and should be sorted") - def value(self,time) -> float: + def value(self, time) -> float: if time <= self.transition_times[0]: return self.values[0] - if time > self.transition_times[-1]: + elif time > self.transition_times[-1]: return self.values[-1] - for t1,t2,value in zip(self.transition_times[:-1], - self.transition_times[1:],self.values): - if time > t1 and time <= t2: - return value + for t1, t2, value in zip(self.transition_times[:-1], + self.transition_times[1:], self.values): + if t1 < time <= t2: + break + return value def interval(self) -> Interval: # build an Interval object @@ -109,7 +115,7 @@ class PiecewiseConstant: self.transition_times[1:],self.values): if value: present_times.append((t1,t2)) - return SpecificInterval(present_times=present_times) + return SpecificInterval(present_times=tuple(present_times)) def refine(self,refine_factor=10): # build a new PiecewiseConstant object with a refined mesh, @@ -258,10 +264,10 @@ class HingedWindow(WindowOpening): horizontal plane). """ #: Window width (m). - window_width: float = None + window_width: float = 0.0 def __post_init__(self): - if self.window_width is None: + if not self.window_width > 0: raise ValueError('window_width must be set') @property @@ -387,7 +393,10 @@ class Mask: #: Filtration efficiency of masks when inhaling. η_inhale: float - particle_sizes: typing.Tuple[float] = (0.8e-4, 1.8e-4, 3.5e-4, 5.5e-4) # In cm. + #: Particle sizes in cm. + particle_sizes: typing.Tuple[float, float, float, float] = ( + 0.8e-4, 1.8e-4, 3.5e-4, 5.5e-4 + ) #: Pre-populated examples of Masks. types: typing.ClassVar[typing.Dict[str, "Mask"]] @@ -542,7 +551,7 @@ class InfectedPopulation(Population): @dataclass(frozen=True) class ConcentrationModel: room: Room - ventilation: Ventilation + ventilation: typing.Union[Ventilation, MultipleVentilation] infected: InfectedPopulation @property diff --git a/cara/state.py b/cara/state.py index 145a1c2b..00cfb3bc 100644 --- a/cara/state.py +++ b/cara/state.py @@ -278,7 +278,7 @@ class DataclassStatePredefined(DataclassInstanceState): """ def __init__(self, dataclass: Datamodel_T, - choices: typing.Dict[typing.Hashable, dataclass_instance], + choices: typing.Dict[str, dataclass_instance], **kwargs, ): super().__init__(dataclass=dataclass, **kwargs) diff --git a/cara/tests/apps/calculator/test_report_generator.py b/cara/tests/apps/calculator/test_report_generator.py index 5bb7d719..3ba1a007 100644 --- a/cara/tests/apps/calculator/test_report_generator.py +++ b/cara/tests/apps/calculator/test_report_generator.py @@ -19,3 +19,17 @@ def test_generate_report(baseline_form): report = report_generator.build_report(model, baseline_form) assert report != "" + + +@pytest.mark.parametrize( + ["test_input", "expected"], + [ + [1, '1 minute'], + [2, '2 minutes'], + [60, '1 hour'], + [120, '2 hours'], + [150, '150 minutes'], + ], +) +def test_readable_minutes(test_input, expected): + assert report_generator.readable_minutes(test_input) == expected \ No newline at end of file diff --git a/cara/tests/test_state.py b/cara/tests/test_state.py index 2102b96e..814d2ae4 100644 --- a/cara/tests/test_state.py +++ b/cara/tests/test_state.py @@ -25,7 +25,7 @@ class DCSimpleSubclass(DCSimple): @dataclass class DCOverrideSubclass(DCSimple): - attr1: float + attr1: float # type: ignore @dataclass diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 00000000..75cae8ff --- /dev/null +++ b/setup.cfg @@ -0,0 +1,14 @@ +[mypy] +no_warn_no_return = True + +[mypy-matplotlib.*] +ignore_missing_imports = True + +[mypy-ipympl.*] +ignore_missing_imports = True + +[mypy-ipywidgets.*] +ignore_missing_imports = True + +[mypy-mistune.*] +ignore_missing_imports = True