Source code for bioevents.hypnogram

import abc
import datetime
from enum import IntEnum
from typing import Optional

import matplotlib.pyplot as plt
import numpy as np
from pydantic import (
    BaseModel,
    NonNegativeFloat,
    NonNegativeInt,
    confloat,
    model_validator,
)

from bioevents import event_handling

WAKE_PERSISTENCE_THRESHOLD_IN_MINUTES = 2


[docs] class SleepStages(IntEnum): W = 5 REM = 4 N1 = 3 N2 = 2 N3 = 1
[docs] class Labels(IntEnum): W = SleepStages.W.value REM = SleepStages.REM.value N1 = SleepStages.N1.value N2 = SleepStages.N2.value N3 = SleepStages.N3.value MISSING = event_handling.MissingDataLabels.MISSING.value UNSCORABLE = event_handling.MissingDataLabels.UNSCORABLE.value ARTIFACT = event_handling.MissingDataLabels.ARTIFACT.value DISAGREE = event_handling.MissingDataLabels.DISAGREE.value
[docs] class HypnogramReport(BaseModel, abc.ABC): """Used for the EDS reports""" duration_minutes: NonNegativeFloat latency_minutes_rem: Optional[NonNegativeFloat] = None latency_minutes_n1: Optional[NonNegativeFloat] = None latency_minutes_n2: Optional[NonNegativeFloat] = None latency_minutes_n3: Optional[NonNegativeFloat] = None latency_minutes_persistent_sleep: Optional[NonNegativeFloat] = None latency_minutes_rem_with_persistence: Optional[NonNegativeFloat] = None latency_minutes_sleep_onset: Optional[NonNegativeFloat] = None latency_minutes_sleep_onset_unequivocal: Optional[NonNegativeFloat] = None total_minutes_rem: NonNegativeFloat total_minutes_n1: NonNegativeFloat total_minutes_n2: NonNegativeFloat total_minutes_n3: NonNegativeFloat total_minutes_nrem: NonNegativeFloat total_minutes_sleep: NonNegativeFloat efficiency_rem: confloat(ge=0, le=1) efficiency_rem_first_two_hours: confloat(ge=0, le=1) efficiency_sleep: confloat(ge=0, le=1) event_count_rem: NonNegativeInt event_count_wake: NonNegativeInt event_count_wake_persistent: NonNegativeInt wake_after_sleep_onset_minutes: Optional[NonNegativeFloat] = None def __str__(self): rows = [ f"Latency to REM: {self.latency_minutes_rem:.1f} minutes", f"Latency to N1: {self.latency_minutes_n1:.1f} minutes", f"Latency to N2: {self.latency_minutes_n2:.1f} minutes", f"Latency to N3: {self.latency_minutes_n3:.1f} minutes", f"Latency to persistent sleep: {self.latency_minutes_persistent_sleep:.1f} minutes", f"Latency to REM from persistent sleep: {self.latency_minutes_rem_with_persistence:.1f} minutes", f"Latency to sleep onset (SOL) {self.latency_minutes_sleep_onset:.1f} minutes", f"Latency to sleep onset (SOL-U) {self.latency_minutes_sleep_onset_unequivocal:.1f} minutes", f"Total time in REM: {self.total_minutes_rem:.1f} minutes", f"Total time in N1: {self.total_minutes_n1:.1f} minutes", f"Total time in N2: {self.total_minutes_n2:.1f} minutes", f"Total time in N3: {self.total_minutes_n3:.1f} minutes", f"Total time in non-REM sleep: {self.total_minutes_nrem:.1f} minutes", f"Total time asleep: {self.total_minutes_sleep:.1f} minutes", f"Efficiency of REM: {100 * self.efficiency_rem:.1f} %", f"Efficiency of REM (first two hours): {100 * self.efficiency_rem_first_two_hours:.1f} %", f"Efficiency of sleep: {100 * self.efficiency_sleep:.1f} %", f"Number of REM cycles: {self.event_count_rem}", f"Number of awakenings: {self.event_count_wake}", f"Number of awakenings >120s: {self.event_count_wake_persistent}", ] return "\n".join(rows)
[docs] @model_validator(mode="before") @classmethod def check_total_times_add_up(cls, values): rem = values.get("total_minutes_rem") n1 = values.get("total_minutes_n1") n2 = values.get("total_minutes_n2") n3 = values.get("total_minutes_n3") nrem = values.get("total_minutes_nrem") sleep = values.get("total_minutes_sleep") if not np.isclose(n1 + n2 + n3, nrem): raise ValueError( "Total minutes in non-REM sleep doesn't match individual stage times." ) if not np.isclose(nrem + rem, sleep): raise ValueError("Total minutes in sleep doesn't match REM + NREM times.") return values
[docs] @model_validator(mode="before") @classmethod def check_efficiencies(cls, values): rem = values.get("efficiency_rem") rem_time = values.get("total_minutes_rem") sleep = values.get("efficiency_sleep") sleep_time = values.get("total_minutes_sleep") dur = values.get("duration_minutes") if not np.isclose(rem_time / dur, rem): raise ValueError("REM efficiency and total time don't match.") if not np.isclose(sleep_time / dur, sleep): raise ValueError("Sleep efficiency and total time don't match.") return values
[docs] @model_validator(mode="before") @classmethod def check_awakenings(cls, values): total = values.get("event_count_wake") persistent = values.get("event_count_wake_persistent") if total < persistent: raise ValueError("Persistent event count is greater than total count!") return values
[docs] @model_validator(mode="before") @classmethod def check_no_rem(cls, values): value_if_no_rem = { "efficiency_rem": 0, "total_minutes_rem": 0, "event_count_rem": 0, "latency_minutes_rem": None, } no_rem_indicators = [values[k] == value_if_no_rem[k] for k in value_if_no_rem] if not any(no_rem_indicators): return values if all(no_rem_indicators): return values raise ValueError("Some, but not all, REM metrics indicate REM is present!")
[docs] class Hypnogram(event_handling.EventClassSeries): def __init__(self, event_stack): super().__init__(event_stack) # make sure that we fill in any missing sleep stages with empty event series self.update( { c: event_handling.EventSeries( [], duration=self.duration, sampling_rate=self.sampling_rate ) for c in list(SleepStages) if c not in self.classes } )
[docs] @classmethod def read_json(cls, filepath, intenum=Labels): return super().read_json(filepath, intenum)
[docs] def total_sleep_time(self, bedtime_series: event_handling.EventSeries = None) -> float: """Find total sleep time (in samples) from a hypnogram Parameters ---------- bedtime_series : Optional[event_handling.EventSeries] An event series derived from clinical lights off/on annotations. In the clinical setting, this is the period of time when subjects are expected to attempt sleep. If given, only compute sleep time that occurs during bedtime. Notes ----- This metric is traditionally reported in minutes. Please take note of the sample_rate to convert if needed. """ if bedtime_series is not None: return sum([self.trim(ev.on, ev.off).total_sleep_time() for ev in bedtime_series]) return self.duration - self.time_in_stage(SleepStages.W)
[docs] def total_recording_time(self, bedtime_series: event_handling.EventSeries = None) -> float: """Find total recording time, i.e., total time in bed, from a hypnogram (in samples) Parameters ---------- bedtime_series : Optional[event_handling.EventSeries] An event series derived from clinical lights off/on annotations. In the clinical setting, this is the period of time when subjects are expected to attempt sleep. If given, only compute recording time that occurs during bedtime. Notes ----- This metric is traditionally reported in minutes. Please take note of the sample_rate to convert if needed. """ if bedtime_series is None: return self.duration return sum([ev.duration for ev in bedtime_series])
[docs] def sleep_efficiency_percent(self, bedtime_series: event_handling.EventSeries = None) -> float: eff = self.total_sleep_time(bedtime_series) / self.total_recording_time(bedtime_series) return 100 * eff
# NOTE: the hypnogram needs to be trimmed
[docs] def sleep_onset_latency(self, n1_tolerance: float = 0) -> Optional[float]: """Find sleep onset latency from a hypnogram (in samples). Assume the start of the hypnogram is the start of bedtime (light off). Parameters ---------- n1_tolerance : float, default 0 If 0 (default), return latency to sleep onset as the first non-Wake stage (used for overnight sleep) If positive, units are samples, and more rules are followed (see Notes) Returns ------- sleep_onset_latency : Optional(float) Time (in samples) from beginning of this hypnogram until the sleep onset. See Notes for more information on the definition of sleep onset. Notes ----- This metric is traditionally reported in minutes. Please take note of the sample_rate to convert if needed. There are multiple ways to report SOL: 1. The start of first scored epoch of any stage of sleep. Typically used in overnight PSG. 2. The start of the first epoch of "unequivocal sleep". Typically used in MWT for assessment of daytime sleepiness. "Unequivocal sleep" is "defined as 3 consecutive epochs of stage 1 sleep, or 1 epoch of any other stage of sleep" (see Reference #1). Epochs, in this case, are 30 seconds long. Thus, the first epoch of any of the following would be considered unequivocal sleep onset: - [N1, N1, N1, ...] three epochs of N1 - [N1, N1, S, ...] two epochs of N1, followed immediately by any other stage of sleep - [N1, S, ...] one epoch of N1, followed immediately by any other stage of sleep - [S, ...] one epoch of any other stage of sleep For algorithmic purposes, this reduces to the observation that unequivocal sleep occurs at the onset of ANY sleep other than short, N1-only sleep episodes lasting less than 90 seconds. References ---------- 1. Clinical study using SOL as a primary endpoint: https://clinicaltrials.gov/ct2/show/NCT03522506 """ asleep = ~self[SleepStages.W] if not len(asleep): return None if n1_tolerance == 0: # return onset of first non-wake event return asleep[0].on # first remove n1 events shorter than the tolerance from the "asleep" time series n1 = self[SleepStages.N1] n1_exclude = event_handling.EventSeries( [ev for ev in n1 if ev.duration <= n1_tolerance], duration=self.duration, sampling_rate=self.sampling_rate, ) asleep -= n1_exclude if not len(asleep): return None # IF there's ANY N1 event that preceded the first sleep event, we'll use it as the onset first_sleep_event = asleep[0] for n1_event in n1: if n1_event.off == first_sleep_event.on: first_sleep_event = n1_event return first_sleep_event.on
[docs] def wake_after_sleep_onset(self) -> Optional[float]: """Compute WASO (wake after sleep onset) from a hypnogram. Useful for overnight sleep Returns ------- WASO : Optional(float) If SOL is None, returns None. If SOL is not None, returns wake after sleep onset (in samples). Notes ----- This metric is traditionally reported in minutes. Please take note of the sample_rate to convert if needed. """ SOL = self.sleep_onset_latency(n1_tolerance=0) if SOL is None: return None TRT = self.total_recording_time() TST = self.total_sleep_time() WASO = TRT - TST - SOL if WASO < -0.1: raise RuntimeError("WASO is negative and exceeds precision error. This is a bug.") WASO = max(WASO, 0.0) # avoids precision errors return WASO
[docs] def time_in_stage(self, stage: SleepStages) -> float: """Total time spent in the given stage, in samples.""" return sum([ev.duration for ev in self[stage]])
[docs] def latency_to_first_persistent_sleep(self) -> Optional[float]: """Latency to first epoch of persistent (10 minutes) sleep. Returns ------- latency : Optional(float) If no persistent sleep occurs, returns None. If persistent sleep occurs, returns latency (in samples). Notes ----- This metric is traditionally reported in minutes. Please take note of the sample_rate to convert if needed. """ asleep = ~self[SleepStages.W] persistence_threshold_seconds = datetime.timedelta(minutes=10).seconds persistence_threshold_samples = persistence_threshold_seconds * asleep.sampling_rate persistent_sleep = [ev for ev in asleep if ev.duration >= persistence_threshold_samples] if len(persistent_sleep): return persistent_sleep[0].on return None
[docs] def latency_to_stage(self, stage: SleepStages) -> Optional[float]: """Time from lights off to first epoch of the given stage Parameters ---------- stage : SleepStages Returns ------- latency : Optional[float] If stage does not occur, returns None. Otherwise, returns latency (in samples). Notes ----- This metric is traditionally reported in minutes. Please take note of the sample_rate to convert if needed. """ events = self[stage] if len(events) != 0: return events[0].on return None
[docs] def generate_report(self) -> HypnogramReport: self_in_minutes = self.resample(1 / 60) self_first_two_hours = ( self_in_minutes if self_in_minutes.duration < 120 else self_in_minutes.trim(end=120) ) stages_nrem = [SleepStages.N1, SleepStages.N2, SleepStages.N3] sol = self_in_minutes.sleep_onset_latency(n1_tolerance=0.0) waso = self_in_minutes.wake_after_sleep_onset() # Although it might be safe to assume we have resampled 30-seconds epochs, # let's tolerate n1 episodes up to 2.5 30-s epochs. sol_unequivocal = self_in_minutes.sleep_onset_latency(n1_tolerance=1.25) latency_persistent_sleep = self_in_minutes.latency_to_first_persistent_sleep() if latency_persistent_sleep is not None: self_from_persistent = self_in_minutes.trim( latency_persistent_sleep, self_in_minutes.duration ) latency_rem_from_persistent = self_from_persistent.latency_to_stage(SleepStages.REM) else: latency_rem_from_persistent = None wake_time = self_in_minutes.time_in_stage(SleepStages.W) rem_time = self_in_minutes.time_in_stage(SleepStages.REM) rem_time_first_two_hours = self_first_two_hours.time_in_stage(SleepStages.REM) persistent_wake_events = [ ev for ev in self_in_minutes[SleepStages.W] if ev.duration > WAKE_PERSISTENCE_THRESHOLD_IN_MINUTES ] report = HypnogramReport( duration_minutes=self_in_minutes.duration, latency_minutes_rem=self_in_minutes.latency_to_stage(SleepStages.REM), latency_minutes_n1=self_in_minutes.latency_to_stage(SleepStages.N1), latency_minutes_n2=self_in_minutes.latency_to_stage(SleepStages.N2), latency_minutes_n3=self_in_minutes.latency_to_stage(SleepStages.N3), latency_minutes_persistent_sleep=latency_persistent_sleep, latency_minutes_rem_with_persistence=latency_rem_from_persistent, latency_minutes_sleep_onset=sol, latency_minutes_sleep_onset_unequivocal=sol_unequivocal, total_minutes_rem=rem_time, total_minutes_n1=self_in_minutes.time_in_stage(SleepStages.N1), total_minutes_n2=self_in_minutes.time_in_stage(SleepStages.N2), total_minutes_n3=self_in_minutes.time_in_stage(SleepStages.N3), total_minutes_nrem=sum([self_in_minutes.time_in_stage(s) for s in stages_nrem]), total_minutes_sleep=self_in_minutes.duration - wake_time, efficiency_rem=rem_time / self_in_minutes.duration, efficiency_rem_first_two_hours=rem_time_first_two_hours / 120, efficiency_sleep=1 - wake_time / self_in_minutes.duration, event_count_rem=len(self_in_minutes[SleepStages.REM]), event_count_wake=len(self_in_minutes[SleepStages.W]), event_count_wake_persistent=len(persistent_wake_events), wake_after_sleep_onset_minutes=0.0 if waso is None else waso, ) return report
[docs] def plot(self, unit="seconds", ax=None, color=None, alpha=1, label=None, **plt_kwargs): """Stacks individual series onto the same plot Parameters ---------- unit : str, Optional[default="seconds"] Must be one of the keyword args to datetime.timedelta This will affect the x-axis label's units. ax : plt.Axes, Optional color : plt.colors[Optional] Color to use for the hypnogram and any non-sleep labels alpha : float, Optional[default=1] Alpha to use for hypnogram. A gradient thereof will be used for any non-sleep labels. label : str, Optional Label to use for this hypnogram. Useful when plotting multiple hypnograms on the given axis "ax" plt_kwargs : kwargs Any additional keyword args meant for plt.plot AND plt.fill_betweenx Notes ----- This function is better aligned with the format of a hypnogram plot that is expected in sleep science. """ if ax is None: fig, ax = plt.subplots(figsize=(10, 0.5 * len(self))) # handle unit conversion try: unit_hz = 1 / datetime.timedelta(**{unit: 1}).total_seconds() except TypeError: raise ValueError("Unit argument must be one of those accepted by datetime.timedelta!") resampled = self.resample(unit_hz) ax.set_xlabel(f"Time ({unit})") events_and_labels = [] for key, values in resampled.items(): event_label = key if key in SleepStages else np.nan for event in values: events_and_labels.append((event, event_label)) events_and_labels = sorted(events_and_labels, key=lambda ev: ev[0].on) x, event_labels = [], [] for event, event_label in events_and_labels: x.append(event.on) event_labels.append(event_label) x.append(event.off) event_labels.append(event_label) # "typical" hypnogram p = ax.plot(x, event_labels, color=color, alpha=alpha, label=label, **plt_kwargs) stages = [s for s in resampled if isinstance(s, SleepStages)] ax.set_yticks([s.value for s in stages]) ax.set_yticklabels([s.name for s in stages]) # pull out the color used for the hypnogram itself in case we need to reuse it color = p[0].get_color() if color is None else color missing_data_labels = { event_label: events for event_label, events in resampled.items() if isinstance(event_label, event_handling.MissingDataLabels) } if len(missing_data_labels): top = max(event_labels) for k, v in missing_data_labels.items(): for on, off in v: ax.fill_betweenx( y=[0, top], x1=on, x2=off, color=color, # reuse color from line alpha=alpha / (1 - k), # make an alpha gradient by enumeration label=k if label is None else f"{label}: ({k.name})", **plt_kwargs, # any additional kwargs we'd like to reuse ) # reset the legend to remove any duplicate labels handles, event_labels = ax.get_legend_handles_labels() by_label = dict(zip(event_labels, handles)) ax.legend(by_label.values(), by_label.keys(), frameon=False, bbox_to_anchor=(1, 1)) ax.set_ylim(ax.get_ylim()) ax.set_xlabel(f"Time ({unit})")