From 74e742bb3aa915e4be7735f579f15403418fbc14 Mon Sep 17 00:00:00 2001 From: Dennis <45356478+DennisMoschina@users.noreply.github.com> Date: Thu, 30 Apr 2026 14:23:28 +0200 Subject: [PATCH 1/4] feat(parser): support oe header parse schemes --- src/open_wearables/data/sensor_dataset.py | 97 +++- src/open_wearables/dataset.py | 392 ++-------------- src/open_wearables/parser.py | 545 ++-------------------- src/open_wearables/parsing/__init__.py | 20 + src/open_wearables/parsing/base.py | 2 + src/open_wearables/parsing/headers.py | 303 ++++++++++++ src/open_wearables/parsing/stream.py | 63 ++- src/open_wearables/schema/types.py | 13 +- tests/test_oe_headers.py | 235 ++++++++++ 9 files changed, 799 insertions(+), 871 deletions(-) create mode 100644 src/open_wearables/parsing/headers.py create mode 100644 tests/test_oe_headers.py diff --git a/src/open_wearables/data/sensor_dataset.py b/src/open_wearables/data/sensor_dataset.py index cf8c7bd..0ddace7 100644 --- a/src/open_wearables/data/sensor_dataset.py +++ b/src/open_wearables/data/sensor_dataset.py @@ -1,13 +1,21 @@ import os import tempfile -from typing import Dict, List, Optional, Sequence +from typing import Dict, List, Optional, Sequence, Union import numpy as np import pandas as pd from IPython.display import Audio, display from scipy.io.wavfile import write -from open_wearables.parsing import MicPayloadParser, ParseResult, Parser, mic_packet_to_stereo_frames +from open_wearables.parsing import ( + MicPayloadParser, + OeFileHeader, + ParseResult, + Parser, + mic_packet_to_stereo_frames, + read_oe_header, + sensor_scheme_labels, +) from open_wearables.schema import build_default_sensor_schemes from .accessors import SensorAccessor @@ -25,6 +33,7 @@ def __init__(self, filename: str, verbose: bool = False): self.filename = filename self.verbose = verbose self.parse_result: ParseResult = ParseResult(sensor_dfs={}, mic_samples=[]) + self.file_header: Optional[OeFileHeader] = None self.sensor_dfs: Dict[int, pd.DataFrame] = {} self.audio_stereo: Optional[np.ndarray] = None self.audio_df: pd.DataFrame = pd.DataFrame() @@ -39,14 +48,23 @@ def __init__(self, filename: str, verbose: bool = False): SensorAccessor(pd.DataFrame(columns=labels), labels), ) - self.parser: Parser = self._build_parser(verbose=verbose) + self.parser: Optional[Parser] = None self.parse() self._build_accessors() @classmethod - def _build_parser(cls, verbose: bool = False) -> Parser: - sensor_schemes = build_default_sensor_schemes(cls.SENSOR_SID) + def _build_parser( + cls, + file_header: Optional[OeFileHeader], + verbose: bool = False, + ) -> Parser: + """Build a packet parser from the file header metadata.""" + if file_header is not None and file_header.sensor_schemes: + sensor_schemes = file_header.sensor_schemes + else: + sensor_schemes = build_default_sensor_schemes(cls.SENSOR_SID) + dataset_parser = Parser.from_sensor_schemes( sensor_schemes=sensor_schemes, verbose=verbose, @@ -58,19 +76,32 @@ def _build_parser(cls, verbose: bool = False) -> Parser: return dataset_parser def parse(self) -> None: + """Read the file header, build the matching parser, and parse records.""" with open(self.filename, "rb") as stream: - self.parse_result = self.parser.parse(stream) + header_result = read_oe_header(stream) + self.file_header = header_result.header + self.parser = self._build_parser( + file_header=self.file_header, + verbose=self.verbose, + ) + self.parse_result = self.parser.parse_packets( + stream, + file_header=self.file_header, + initial_packet_bytes=header_result.initial_packet_bytes, + ) def _build_accessors(self) -> None: self.audio_stereo = self.parse_result.audio_stereo + self.file_header = self.parse_result.file_header self.audio_df = pd.DataFrame() self._audio_df_sampling_rate = None self.sensor_dfs = {} data_dict = self.parse_result.sensor_dfs for name, sid in self.SENSOR_SID.items(): - labels = LABELS.get(name, []) + labels = self._labels_for_sensor(name, sid) if name == "microphone": + labels = LABELS.get(name, []) df = self.get_audio_dataframe() elif sid in data_dict and isinstance(data_dict[sid], pd.DataFrame): df = data_dict[sid] @@ -83,6 +114,58 @@ def _build_accessors(self) -> None: self.df = pd.DataFrame() + def _labels_for_sensor(self, name: str, sid: int) -> List[str]: + """Return labels from the embedded scheme when available.""" + embedded_scheme = self.parser.sensor_schemes.get(sid) if self.parser else None + if embedded_scheme is not None: + return sensor_scheme_labels(embedded_scheme) + return LABELS.get(name, []) + + def get_sampling_rate(self, sensor: Union[str, int]) -> Optional[float]: + """Return the default sampling rate for one sensor, if available. + + Parameters + ---------- + sensor: + Sensor name such as ``"imu"`` or a numeric sensor ID. + + Returns + ------- + Optional[float] + The default sampling rate recorded in a v3 file header, or ``None`` + when the file does not contain sampling-rate metadata for the sensor. + """ + if isinstance(sensor, str): + if sensor not in self.SENSOR_SID: + raise KeyError( + f"Unknown sensor name: {sensor!r}. " + f"Known sensors: {sorted(self.SENSOR_SID.keys())}" + ) + sid = self.SENSOR_SID[sensor] + else: + sid = sensor + + if self.file_header is None: + return None + + options = self.file_header.sensor_config_options.get(sid) + if options is None or options.frequency_options is None: + return None + + frequency_options = options.frequency_options + default_index = frequency_options.default_frequency_index + if default_index >= len(frequency_options.frequencies): + return None + + return frequency_options.frequencies[default_index] + + def get_sampling_rates(self) -> Dict[str, Optional[float]]: + """Return default sampling rates for all known dataset sensors.""" + return { + sensor_name: self.get_sampling_rate(sensor_name) + for sensor_name in self.SENSOR_SID + } + def list_sensors(self) -> List[str]: available_sensors = [] for name in self.SENSOR_SID: diff --git a/src/open_wearables/dataset.py b/src/open_wearables/dataset.py index b18bb77..268237f 100644 --- a/src/open_wearables/dataset.py +++ b/src/open_wearables/dataset.py @@ -1,362 +1,30 @@ -import os -import tempfile -from collections import defaultdict -from typing import Dict, List, Optional, Sequence - -import numpy as np -import pandas as pd -from open_wearables import parser -import open_wearables.scheme as scheme -from IPython.display import Audio, display -from scipy.io.wavfile import write - -LABELS: Dict[str, List[str]] = { - "imu": [ - "acc.x", "acc.y", "acc.z", - "gyro.x", "gyro.y", "gyro.z", - "mag.x", "mag.y", "mag.z", - ], - "barometer": ["barometer.temperature", "barometer.pressure"], - "ppg": ["ppg.red", "ppg.ir", "ppg.green", "ppg.ambient"], - "bone_acc": ["bone_acc.x", "bone_acc.y", "bone_acc.z"], - "optical_temp": ["optical_temp"], - "microphone": ["mic.inner", "mic.outer"], -} - -COLORS: Dict[str, List[str]] = { - "ppg": ["red", "darkred", "green", "gray"], -} - - -class _SensorAccessor: - """Convenience wrapper around a pandas DataFrame to provide grouped access - to sensor channels. - - For IMU data with columns: - - acc.x, acc.y, acc.z - - gyro.x, gyro.y, gyro.z - - mag.x, mag.y, mag.z - - Access patterns: - - - accessor["imu"] or accessor.imu -> sub-DataFrame - - accessor.acc["x"] or accessor.acc.x -> Series - """ - - def __init__(self, df: pd.DataFrame, labels: Sequence[str]): - self._df = df - self._data: Dict[str, pd.DataFrame] = {} - - groups: Dict[str, List[str]] = defaultdict(list) - - for label in labels: - parts = label.split(".") - if len(parts) == 2: - group, _field = parts - if label in df: - groups[group].append(label) - elif label in df: - # Single-level column names are exposed directly. - self._data[label] = df[label] - - for group, columns in groups.items(): - short_names = [label.split(".")[1] for label in columns] - subdf = df[columns].copy() - subdf.columns = short_names - self._data[group] = subdf - - # Preserve the original column names to avoid collisions between groups - # with identical short names (e.g., acc.x vs gyro.x). - self._full_df = df.copy() - - @property - def df(self) -> pd.DataFrame: - """Return the underlying full DataFrame view.""" - return self._full_df - - def to_dataframe(self) -> pd.DataFrame: - """Alias for :attr:`df` for convenience.""" - return self._full_df - - def __getitem__(self, key): - if key in self._data: - return self._data[key] - - if key in self._full_df.columns: - return self._full_df[key] - - raise KeyError(f"{key!r} not found in available sensor groups or channels") - - def __getattr__(self, name): - if name in self._data: - return self._data[name] - - if hasattr(self._full_df, name): - return getattr(self._full_df, name) - - raise AttributeError(f"'SensorAccessor' object has no attribute '{name}'") - - def __repr__(self) -> str: - return repr(self._full_df) - - -class SensorDataset: - """High-level representation of an OpenEarable sensor recording file.""" - - SENSOR_SID: Dict[str, int] = { - "imu": 0, - "barometer": 1, - "microphone": 2, - "ppg": 4, - "optical_temp": 6, - "bone_acc": 7, - } - - SID_NAMES: Dict[int, str] = { - 0: "imu", - 1: "barometer", - 2: "microphone", - 4: "ppg", - 6: "optical_temp", - 7: "bone_acc", - } - - sensor_formats: Dict[int, str] = { - SENSOR_SID["imu"]: "<9f", - SENSOR_SID["barometer"]: "<2f", - SENSOR_SID["ppg"]: "<4I", - SENSOR_SID["optical_temp"]: " parser.Parser: - sensor_schemes = scheme.build_default_sensor_schemes(cls.SENSOR_SID) - dataset_parser = parser.Parser.from_sensor_schemes( - sensor_schemes=sensor_schemes, - verbose=verbose, - ) - dataset_parser.parsers[cls.SENSOR_SID["microphone"]] = parser.MicPayloadParser( - sample_count=48000, - verbose=verbose, - ) - return dataset_parser - - def parse(self) -> None: - """Parse the binary recording file into structured sensor data.""" - with open(self.filename, "rb") as f: - parse_result = self.parser.parse(f) - self.parse_result = parse_result - - def _build_accessors(self) -> None: - """Construct per-sensor accessors and per-SID DataFrames. - - Each sensor's data is stored in its own DataFrame in ``self.sensor_dfs``. - The combined DataFrame over all sensors is built lazily in - :meth:`get_dataframe`. - """ - self.audio_stereo = self.parse_result.audio_stereo - self.audio_df = pd.DataFrame() - self._audio_df_sampling_rate = None - self.sensor_dfs = {} - - data_dict = self.parse_result.sensor_dfs - for name, sid in self.SENSOR_SID.items(): - labels = LABELS.get(name, []) - if name == "microphone": - df = self.get_audio_dataframe() - elif sid in data_dict and isinstance(data_dict[sid], pd.DataFrame): - df = data_dict[sid] - df = df[~df.index.duplicated(keep="first")] - else: - df = pd.DataFrame(columns=labels) - - # Store per-SID dataframe - self.sensor_dfs[sid] = df - - # Create/update SensorAccessor for this sensor name - setattr(self, name, _SensorAccessor(df, labels)) - - # Clear combined dataframe; it will be built lazily on demand - self.df = pd.DataFrame() - - def list_sensors(self) -> List[str]: - """Return a list of available sensor names in the dataset.""" - available_sensors = [] - for name, sid in self.SENSOR_SID.items(): - accessor = getattr(self, name, None) - if isinstance(accessor, _SensorAccessor) and not accessor.df.empty: - available_sensors.append(name) - return available_sensors - - def get_sensor_dataframe(self, name: str) -> pd.DataFrame: - """Return the DataFrame for a single sensor. - - Parameters - ---------- - name: - Sensor name, e.g. "imu", "barometer", "ppg", "bone_acc", "optical_temp". - - Returns - ------- - pandas.DataFrame - The time-indexed DataFrame for the requested sensor. - """ - if name not in self.SENSOR_SID: - raise KeyError(f"Unknown sensor name: {name!r}. " - f"Known sensors: {sorted(self.SENSOR_SID.keys())}") - - accessor = getattr(self, name, None) - if isinstance(accessor, _SensorAccessor): - return accessor.to_dataframe() - - # Fallback: should not normally happen, but return an empty DataFrame - # instead of crashing. - return pd.DataFrame() - - def get_dataframe(self) -> pd.DataFrame: - """Return the combined, time-indexed DataFrame of all sensors. - - The merged DataFrame is built lazily from the per-SID DataFrames in - :attr:`sensor_dfs` and cached in :attr:`df`. - """ - # If we've already built a non-empty combined DataFrame, reuse it - if not self.df.empty: - return self.df - - # If per-SID dataframes are not available, nothing to merge - if not getattr(self, "sensor_dfs", None): - return self.df - - # Collect all non-empty per-SID dataframes - dfs = [df for df in self.sensor_dfs.values() if not df.empty] - if not dfs: - return self.df - - # Build a common time index over all sensors - common_index = pd.Index([]) - for df in dfs: - common_index = common_index.union(df.index) - common_index = common_index.sort_values() - - # Reindex each DataFrame to the common index and concatenate - reindexed_dfs = [df.reindex(common_index) for df in dfs] - self.df = pd.concat(reindexed_dfs, axis=1) - - return self.df - - def get_audio_dataframe(self, sampling_rate: int = 48000) -> pd.DataFrame: - """Return microphone audio as a timestamp-indexed stereo DataFrame. - - The returned DataFrame has: - - index: ``timestamp`` in seconds - - columns: ``mic.inner`` and ``mic.outer`` (int16 PCM) - """ - if sampling_rate <= 0: - raise ValueError(f"sampling_rate must be > 0, got {sampling_rate}") - - if self._audio_df_sampling_rate == sampling_rate: - return self.audio_df - - mic_packets = getattr(self.parse_result, "mic_packets", []) - if not mic_packets: - self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"]) - self.audio_df.index.name = "timestamp" - self._audio_df_sampling_rate = sampling_rate - return self.audio_df - - timestamps: List[np.ndarray] = [] - stereo_frames: List[np.ndarray] = [] - - for packet in mic_packets: - ts, stereo = parser.mic_packet_to_stereo_frames( - packet=packet, - sampling_rate=sampling_rate, - ) - if stereo.size == 0: - continue - timestamps.append(ts) - stereo_frames.append(stereo) - - if not timestamps: - self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"]) - self.audio_df.index.name = "timestamp" - self._audio_df_sampling_rate = sampling_rate - return self.audio_df - - all_ts = np.concatenate(timestamps) - all_stereo = np.vstack(stereo_frames) - - self.audio_df = pd.DataFrame( - { - "mic.inner": all_stereo[:, 0], - "mic.outer": all_stereo[:, 1], - }, - index=all_ts, - ) - self.audio_df.index.name = "timestamp" - self.audio_df = self.audio_df[~self.audio_df.index.duplicated(keep="first")] - self._audio_df_sampling_rate = sampling_rate - - if sampling_rate == 48000: - self.sensor_dfs[self.SENSOR_SID["microphone"]] = self.audio_df - - return self.audio_df - - def export_csv(self) -> None: - base_filename, _ = os.path.splitext(self.filename) - self.save_csv(base_filename + ".csv") - - def save_csv(self, path: str) -> None: - if not self.df.empty: - self.df.to_csv(path) - - def play_audio(self, sampling_rate: int = 48000) -> None: - if self.audio_stereo is None: - print("❌ No microphone data available.") - return - - with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: - write(tmp.name, sampling_rate, self.audio_stereo) - display(Audio(tmp.name)) - - def save_audio(self, path: str, sampling_rate: int = 48000) -> None: - if self.audio_stereo is None: - print("❌ No microphone data available to save.") - return - try: - write(path, sampling_rate, self.audio_stereo) - print(f"✅ Audio saved successfully to {path}") - except Exception as e: - print(f"❌ Error saving audio to {path}: {e}") - - -def load_recordings(file_paths: Sequence[str]) -> List[SensorDataset]: - return [SensorDataset(path) for path in file_paths if os.path.isfile(path)] +"""Backward-compatible dataset module. + +New code should import from :mod:`open_wearables.data`; this module keeps the +historic ``open_wearables.dataset`` import path working. +""" + +from open_wearables.data import ( + COLORS, + LABELS, + SENSOR_FORMATS, + SENSOR_SID, + SID_NAMES, + SensorAccessor, + SensorDataset, + load_recordings, +) + +_SensorAccessor = SensorAccessor + +__all__ = [ + "COLORS", + "LABELS", + "SENSOR_FORMATS", + "SENSOR_SID", + "SID_NAMES", + "SensorAccessor", + "SensorDataset", + "_SensorAccessor", + "load_recordings", +] diff --git a/src/open_wearables/parser.py b/src/open_wearables/parser.py index 925f008..b6fa82c 100644 --- a/src/open_wearables/parser.py +++ b/src/open_wearables/parser.py @@ -1,497 +1,48 @@ -import struct -from open_wearables.scheme import SensorScheme, ParseType -import pandas as pd -from typing import BinaryIO, Dict, List, Optional, Tuple, TypedDict, Union -from dataclasses import dataclass, field -import numpy as np - - -def interleaved_mic_to_stereo( - samples: Union[np.ndarray, List[int], tuple[int, ...]], -) -> np.ndarray: - """Convert interleaved [outer, inner, ...] int16 samples to [inner, outer] frames.""" - interleaved = np.asarray(samples, dtype=np.int16) - if interleaved.size < 2: - return np.empty((0, 2), dtype=np.int16) - - frame_count = interleaved.size // 2 - interleaved = interleaved[: frame_count * 2] - return np.column_stack((interleaved[1::2], interleaved[0::2])) - - -class PayloadParser: - """Abstract base class for payload parsers. - - Subclasses must set ``expected_size`` and implement :meth:`parse`. - """ - - expected_size: int - - def parse(self, data: bytes, **kwargs) -> List[dict]: - """Parse a payload into one or more decoded samples. - - Parameters - ---------- - data: - Raw payload bytes (without header). - """ - raise NotImplementedError - - def should_build_df(self) -> bool: - """Whether this parser's output should be included in the final DataFrame. - - By default, all parsers are included. Subclasses can override this method - to exclude certain parsers (e.g., microphone parsers). - """ - return True - - -# MARK: - ParseResult dataclass - -class MicPacket(TypedDict): - timestamp: float - samples: tuple[int, ...] - - -@dataclass -class ParseResult: - """Result of parsing a stream. - - - `sensor_dfs`: per-SID DataFrames (timestamp-indexed) - - `mic_samples`: interleaved int16 samples accumulated across mic packets - - `audio_stereo`: (N,2) int16 array [inner, outer] if microphone data was present - """ - - sensor_dfs: Dict[int, pd.DataFrame] - mic_samples: List[int] - mic_packets: List[MicPacket] = field(default_factory=list) - audio_stereo: Optional[np.ndarray] = None - - @staticmethod - def mic_samples_to_stereo(mic_samples: List[int]) -> Optional[np.ndarray]: - if not mic_samples: - return None - stereo = interleaved_mic_to_stereo(mic_samples) - if stereo.size == 0: - return None - return stereo - - -def mic_packet_to_stereo_frames( - packet: MicPacket, - sampling_rate: int, -) -> Tuple[np.ndarray, np.ndarray]: - """Return timestamps and stereo frames for a parsed microphone packet.""" - if sampling_rate <= 0: - raise ValueError(f"sampling_rate must be > 0, got {sampling_rate}") - - stereo = interleaved_mic_to_stereo(packet["samples"]) - if stereo.size == 0: - return np.empty((0,), dtype=np.float64), stereo - - timestamps = float(packet["timestamp"]) + ( - np.arange(stereo.shape[0], dtype=np.float64) / sampling_rate - ) - return timestamps, stereo - -class Parser: - def __init__(self, parsers: dict[int, PayloadParser], verbose: bool = False): - """Create a Parser from a mapping of SID -> PayloadParser.""" - self.parsers = parsers - self.verbose = verbose - - @classmethod - def from_sensor_schemes( - cls, - sensor_schemes: dict[int, SensorScheme], - verbose: bool = False, - ) -> "Parser": - """Construct a Parser where each SID uses a SchemePayloadParser. - - This does **not** add a special microphone parser; callers can - override or extend the parser mapping for microphone SIDs as needed. - """ - parsers: dict[int, PayloadParser] = { - sid: SchemePayloadParser(scheme) for sid, scheme in sensor_schemes.items() - } - return cls(parsers=parsers, verbose=verbose) - - def parse( - self, - data_stream: BinaryIO, - *, - chunk_size: int = 4096, - max_resync_scan_bytes: int = 256, - ) -> ParseResult: - """Parse a binary byte stream into per-SID DataFrames. - - This function reads from `data_stream` incrementally in chunks and keeps an - internal buffer so the entire stream does not need to be loaded into memory. - - Parameters - ---------- - data_stream: - A binary stream (file-like object) positioned at the beginning of packet data. - Note: If this is an .oe file, the caller should have already consumed the - file header before passing the stream here. - chunk_size: - Number of bytes to read per chunk. - max_resync_scan_bytes: - How many bytes ahead to scan when attempting to resynchronize after a corrupted - header/payload. - - Returns - ------- - ParseResult - Contains per-SID DataFrames, microphone samples, and stereo PCM audio if present. - """ - rows_by_sid: dict[int, list[dict]] = {} - - header_size = 10 - buffer = bytearray() - packet_idx = 0 - mic_samples: List[int] = [] - mic_packets: List[MicPacket] = [] - - def flush_to_dataframes() -> Dict[int, pd.DataFrame]: - result: Dict[int, pd.DataFrame] = {} - for sid, rows in rows_by_sid.items(): - df = pd.DataFrame(rows) - if not df.empty and "timestamp" in df.columns: - df.set_index("timestamp", inplace=True) - result[sid] = df - return result - - # Main read/parse loop - while True: - # Ensure we have enough data for at least a header; if not, read more - if len(buffer) < header_size: - chunk = data_stream.read(chunk_size) - if not chunk: - # End of stream - if self.verbose and buffer: - print( - f"End of stream with {len(buffer)} leftover bytes (incomplete header/payload)." - ) - break - buffer.extend(chunk) - continue - - # We have at least a header - header = bytes(buffer[:header_size]) - sid, size, time = self._parse_header(header) - - timestamp_s = time / 1e6 - - if self.verbose: - print( - f"Packet #{packet_idx}: SID={sid}, size={size}, time={timestamp_s:.6f}s " - f"(buffer_len={len(buffer)})" - ) - - # Basic sanity checks - if sid not in self.parsers: - if self.verbose: - print(f"Warning: No parser registered for SID={sid}. Attempting resync...") - new_offset = self._attempt_resync(bytes(buffer), 0, packet_idx, max_scan_bytes=max_resync_scan_bytes) - if new_offset is None: - del buffer[:1] - else: - del buffer[:new_offset] - continue - - if size <= 0: - if self.verbose: - print(f"Invalid size={size} for SID={sid}. Attempting resync...") - new_offset = self._attempt_resync(bytes(buffer), 0, packet_idx, max_scan_bytes=max_resync_scan_bytes) - if new_offset is None: - del buffer[:1] - else: - del buffer[:new_offset] - continue - - parser = self.parsers[sid] - - needed = header_size + size - if len(buffer) < needed: - chunk = data_stream.read(chunk_size) - if not chunk: - if self.verbose: - print( - f"Truncated payload at packet #{packet_idx}: need {needed} bytes, " - f"have {len(buffer)} bytes and stream ended." - ) - break - buffer.extend(chunk) - continue - - payload = bytes(buffer[header_size:needed]) - try: - values_list = parser.parse(payload) - # Accumulate microphone samples in a single interleaved buffer - if isinstance(parser, MicPayloadParser): - for item in values_list: - samples = item.get("samples") - if samples is None: - continue - # `samples` is a tuple of int16; extend global list - mic_samples.extend(list(samples)) - mic_packets.append({ - "timestamp": timestamp_s, - "samples": samples, - }) - if self.verbose: - if isinstance(parser, MicPayloadParser): - print( - f"Parsed mic packet #{packet_idx} (SID={sid}) successfully: " - f"{len(values_list[0].get('samples', [])) if values_list else 0} samples" - ) - else: - print( - f"Parsed packet #{packet_idx} (SID={sid}) successfully: {values_list}" - ) - except Exception as e: - if self.verbose: - print( - f"struct.error while parsing payload at packet #{packet_idx} " - f"(SID={sid}, size={size}): {e}. Attempting resync..." - ) - # Resync within the current buffer - new_offset = self._attempt_resync(bytes(buffer), 0, packet_idx, max_scan_bytes=max_resync_scan_bytes) - if new_offset is None: - del buffer[:1] - else: - del buffer[:new_offset] - continue - - if parser.should_build_df(): - for values in values_list: - # Flatten nested group structure (group.component -> value) - flat_values: dict[str, object] = {} - for key, val in values.items(): - if key == "t_delta": - timestamp_s += val / 1e6 - continue - if isinstance(val, dict): - for sub_key, sub_val in val.items(): - flat_values[f"{key}.{sub_key}"] = sub_val - else: - flat_values[key] = val - - row = { - "timestamp": timestamp_s, - **flat_values, - } - rows_by_sid.setdefault(sid, []).append(row) - - # Consume this packet from the buffer - del buffer[:needed] - packet_idx += 1 - - sensor_dfs = flush_to_dataframes() - audio_stereo = ParseResult.mic_samples_to_stereo(mic_samples) - return ParseResult( - sensor_dfs=sensor_dfs, - mic_samples=mic_samples, - mic_packets=mic_packets, - audio_stereo=audio_stereo, - ) - - def _parse_header(self, header: bytes) -> tuple[int, int, int]: - """Parse a 10-byte packet header into (sid, size, time).""" - sid, size, time = struct.unpack(" bool: - """Heuristic check whether a (sid, size) looks like a valid header. - - - SID must have a registered PayloadParser - - size must be positive, not exceed remaining bytes, and match the - expected payload size from the SensorScheme - """ - if sid not in self.parsers: - return False - if size <= 0 or size > remaining: - return False - - parser = self.parsers[sid] - if hasattr(parser, "expected_size") and parser.expected_size is not None: - if size != parser.expected_size: - return False - - return True - - def _attempt_resync( - self, - data: bytes, - packet_start: int, - packet_idx: int, - max_scan_bytes: int = 64, - ) -> Optional[int]: - """Try to recover from a corrupted header by scanning forward. - - Returns a new offset where a plausible header was found, or ``None`` - if no suitable header was located within ``max_scan_bytes``. - """ - total_len = len(data) - header_size = 10 - - if self.verbose: - print( - f"Attempting resync after packet #{packet_idx} from offset {packet_start} " - f"(scan up to {max_scan_bytes} bytes ahead)..." - ) - - for delta in range(1, max_scan_bytes + 1): - candidate = packet_start + delta - if candidate + header_size > total_len: - break - - header = data[candidate : candidate + header_size] - try: - sid, size, time = self._parse_header(header) - except struct.error: - continue - - remaining = total_len - (candidate + header_size) - if not self._is_plausible_header(sid, size, remaining): - continue - - if self.verbose: - timestamp_s = time / 1e6 - print( - f"Resynced at offset {candidate} (skipped {delta} bytes): " - f"SID={sid}, size={size}, time={timestamp_s:.6f}s" - ) - - return candidate - - if self.verbose: - print( - f"Resync failed within {max_scan_bytes} bytes after packet #{packet_idx}; " - f"giving up on this buffer." - ) - return None - -# MARK: - MicParser - -class MicPayloadParser(PayloadParser): - """Payload parser for microphone packets (int16 PCM samples).""" - - def __init__(self, sample_count: int, verbose: bool = False): - self.sample_count = sample_count - self.expected_size = sample_count * 2 # int16 samples - self.verbose = verbose - - def parse(self, data: bytes, **kwargs) -> List[dict]: - # Allow slight deviations in size but warn if unexpected - if len(data) != self.expected_size and self.verbose: - print( - f"Mic payload size {len(data)} bytes does not match expected " - f"{self.expected_size} bytes (sample_count={self.sample_count})." - ) - - if len(data) % 2 != 0 and self.verbose: - print( - f"Mic payload has odd size {len(data)}; last byte will be ignored." - ) - - n_samples = len(data) // 2 - format_str = f"<{n_samples}h" - samples = struct.unpack_from(format_str, data, 0) - return [{"samples": samples}] - - def should_build_df(self) -> bool: - return False - -# MARK: - SchemePayloadParser - -class SchemePayloadParser(PayloadParser): - def __init__(self, sensor_scheme: SensorScheme): - self.sensor_scheme = sensor_scheme - - # Precompute expected payload size in bytes for a single packet - size = 0 - for group in self.sensor_scheme.groups: - for component in group.components: - if component.data_type == ParseType.UINT8 or component.data_type == ParseType.INT8: - size += 1 - elif component.data_type in (ParseType.UINT16, ParseType.INT16): - size += 2 - elif component.data_type == ParseType.UINT32 or component.data_type == ParseType.INT32 or component.data_type == ParseType.FLOAT: - size += 4 - elif component.data_type == ParseType.DOUBLE: - size += 8 - else: - raise ValueError(f"Unsupported data type in scheme: {component.data_type}") - self.expected_size = size - - def check_size(self, data: bytes) -> None: - size = len(data) - if size != self.expected_size and not (size > self.expected_size and (size - 2) % self.expected_size == 0): - raise ValueError( - f"Payload size {size} bytes does not match expected size " - f"{self.expected_size} bytes for sensor '{self.sensor_scheme.name}'" - ) - - def is_buffered(self, data: bytes) -> bool: - size = len(data) - return size > self.expected_size and (size - 2) % self.expected_size == 0 - - def parse(self, data: bytes, **kwargs) -> List[dict]: - self.check_size(data) - if self.is_buffered(data): - results = [] - # get the t_delta as an uint16 from the last two bytes - t_delta = struct.unpack_from(" dict: - parsed_data = {} - offset = 0 - - for group in self.sensor_scheme.groups: - group_data = {} - for component in group.components: - if component.data_type == ParseType.UINT8: - value = struct.unpack_from(" Optional[np.ndarray]: diff --git a/src/open_wearables/parsing/headers.py b/src/open_wearables/parsing/headers.py new file mode 100644 index 0000000..727081c --- /dev/null +++ b/src/open_wearables/parsing/headers.py @@ -0,0 +1,303 @@ +"""Utilities for reading OpenEarable ``.oe`` file headers.""" + +from __future__ import annotations + +import struct +from dataclasses import dataclass, field +from typing import BinaryIO, Dict, Iterable, List, Optional, Union + +from open_wearables.schema import ( + ParseType, + SensorComponentGroupScheme, + SensorComponentScheme, + SensorScheme, +) + + +OE_HEADER_VERSION_2 = 0x0002 +OE_HEADER_VERSION_3 = 0x0003 +OE_V2_HEADER_SIZE = 10 +OE_V3_FIXED_HEADER_SIZE = 27 +FREQUENCIES_DEFINED = 0x10 + + +_FIRMWARE_PARSE_TYPES: Dict[int, ParseType] = { + 0: ParseType.INT8, + 1: ParseType.UINT8, + 2: ParseType.INT16, + 3: ParseType.UINT16, + 4: ParseType.INT32, + 5: ParseType.UINT32, + 6: ParseType.FLOAT, + 7: ParseType.DOUBLE, +} + + +@dataclass(frozen=True) +class FrequencyOptions: + """Available sample rates encoded in a v3 ParseInfo sensor scheme.""" + + frequency_count: int + default_frequency_index: int + max_ble_frequency_index: int + frequencies: tuple[float, ...] + + +@dataclass(frozen=True) +class SensorConfigOptions: + """Configuration metadata appended to a single sensor scheme.""" + + available_options: int + frequency_options: Optional[FrequencyOptions] = None + + +@dataclass(frozen=True) +class OeFileHeader: + """Parsed OpenEarable file-level header metadata.""" + + version: int + timestamp: int + header_size: int + parse_info_size: int = 0 + device_id: Optional[int] = None + side: Optional[int] = None + sensor_ids: tuple[int, ...] = () + sensor_schemes: Dict[int, SensorScheme] = field(default_factory=dict) + sensor_config_options: Dict[int, SensorConfigOptions] = field(default_factory=dict) + + +@dataclass(frozen=True) +class HeaderReadResult: + """Result of attempting to consume an ``.oe`` header from a stream.""" + + header: Optional[OeFileHeader] + initial_packet_bytes: bytes = b"" + + +class _ByteReader: + """Bounds-checked reader for little-endian ParseInfo byte blobs.""" + + def __init__(self, data: bytes): + self._data = data + self.offset = 0 + + @property + def remaining(self) -> int: + """Number of unread bytes.""" + return len(self._data) - self.offset + + def read_uint8(self) -> int: + """Read an unsigned 8-bit integer.""" + return int(self._unpack(" int: + """Read an unsigned 16-bit integer.""" + return int(self._unpack(" float: + """Read a 32-bit IEEE-754 float.""" + return float(self._unpack(" bytes: + """Read exactly ``size`` bytes.""" + if size < 0: + raise ValueError(f"Cannot read a negative byte count: {size}") + if self.offset + size > len(self._data): + raise ValueError( + f"ParseInfo blob ended early at offset {self.offset}; " + f"need {size} bytes, have {self.remaining}" + ) + result = self._data[self.offset : self.offset + size] + self.offset += size + return result + + def read_text(self) -> str: + """Read a length-prefixed UTF-8 string from the blob.""" + size = self.read_uint8() + return self.read_bytes(size).decode("utf-8") + + def _unpack(self, fmt: str, size: int) -> Union[int, float]: + if self.offset + size > len(self._data): + raise ValueError( + f"ParseInfo blob ended early at offset {self.offset}; " + f"need {size} bytes, have {self.remaining}" + ) + value = struct.unpack_from(fmt, self._data, self.offset)[0] + self.offset += size + return value + + +def read_oe_header(data_stream: BinaryIO) -> HeaderReadResult: + """Read a supported ``.oe`` file header from the stream.""" + + first_two = data_stream.read(2) + if len(first_two) < 2: + raise ValueError("Stream ended before a valid OE header could be read") + + version = struct.unpack(" ParseInfo: + """Decode the v3 ParseInfo blob into sensor IDs and schemes.""" + + reader = _ByteReader(blob) + sensor_count = reader.read_uint8() + sensor_ids = tuple(reader.read_uint8() for _ in range(sensor_count)) + sensor_schemes: Dict[int, SensorScheme] = {} + config_options: Dict[int, SensorConfigOptions] = {} + + for expected_sid in sensor_ids: + scheme_size = reader.read_uint16() + scheme_blob = reader.read_bytes(scheme_size) + scheme, options = parse_single_sensor_scheme(scheme_blob) + if scheme.sid != expected_sid: + raise ValueError( + f"ParseInfo sensor list expected SID {expected_sid}, " + f"but scheme encoded SID {scheme.sid}" + ) + sensor_schemes[scheme.sid] = scheme + config_options[scheme.sid] = options + + if reader.remaining: + raise ValueError(f"ParseInfo blob has {reader.remaining} trailing bytes") + + return ParseInfo( + sensor_ids=sensor_ids, + sensor_schemes=sensor_schemes, + sensor_config_options=config_options, + ) + + +def parse_single_sensor_scheme(blob: bytes) -> tuple[SensorScheme, SensorConfigOptions]: + """Decode one firmware ``Single Sensor Scheme`` payload.""" + + reader = _ByteReader(blob) + sensor_id = reader.read_uint8() + sensor_name = reader.read_text() + component_count = reader.read_uint8() + groups_by_name: Dict[str, List[SensorComponentScheme]] = {} + group_order: List[str] = [] + + for _ in range(component_count): + parse_type_id = reader.read_uint8() + try: + parse_type = _FIRMWARE_PARSE_TYPES[parse_type_id] + except KeyError as exc: + raise ValueError(f"Unsupported firmware parse type: {parse_type_id}") from exc + + group_name = reader.read_text() + component_name = reader.read_text() + reader.read_text() # Unit metadata is not needed for payload decoding. + + if group_name not in groups_by_name: + groups_by_name[group_name] = [] + group_order.append(group_name) + groups_by_name[group_name].append( + SensorComponentScheme(name=component_name, data_type=parse_type) + ) + + options = _read_sensor_config_options(reader) + if reader.remaining: + raise ValueError( + f"Single sensor scheme for SID {sensor_id} has {reader.remaining} trailing bytes" + ) + + groups = [ + SensorComponentGroupScheme(name=name, components=groups_by_name[name]) + for name in group_order + ] + return SensorScheme(name=sensor_name, sid=sensor_id, groups=groups), options + + +def _read_sensor_config_options(reader: _ByteReader) -> SensorConfigOptions: + available_options = reader.read_uint8() + frequency_options = None + if available_options & FREQUENCIES_DEFINED: + frequency_count = reader.read_uint8() + default_frequency_index = reader.read_uint8() + max_ble_frequency_index = reader.read_uint8() + frequencies = tuple(reader.read_float() for _ in range(frequency_count)) + frequency_options = FrequencyOptions( + frequency_count=frequency_count, + default_frequency_index=default_frequency_index, + max_ble_frequency_index=max_ble_frequency_index, + frequencies=frequencies, + ) + return SensorConfigOptions( + available_options=available_options, + frequency_options=frequency_options, + ) + + +def _read_exact(data_stream: BinaryIO, size: int) -> bytes: + data = data_stream.read(size) + if len(data) != size: + raise ValueError(f"Unexpected end of stream while reading {size} header bytes") + return data + + +def sensor_scheme_labels(scheme: SensorScheme) -> List[str]: + """Return flattened DataFrame labels produced by ``SchemePayloadParser``.""" + + return [ + f"{group.name}.{component.name}" + for group in scheme.groups + for component in group.components + ] + + +def iter_sensor_scheme_labels(schemes: Iterable[SensorScheme]) -> Dict[int, List[str]]: + """Build flattened labels for each scheme keyed by sensor ID.""" + + return {scheme.sid: sensor_scheme_labels(scheme) for scheme in schemes} diff --git a/src/open_wearables/parsing/stream.py b/src/open_wearables/parsing/stream.py index 368d7e6..564b2ba 100644 --- a/src/open_wearables/parsing/stream.py +++ b/src/open_wearables/parsing/stream.py @@ -7,13 +7,17 @@ from .audio import MicPacket, mic_samples_to_stereo from .base import ParseResult, PayloadParser +from .headers import OeFileHeader, read_oe_header from .payload_parsers import MicPayloadParser, SchemePayloadParser class Parser: + """Incremental parser for OpenEarable packet streams and ``.oe`` files.""" + def __init__(self, parsers: dict[int, PayloadParser], verbose: bool = False): self.parsers = parsers self.verbose = verbose + self.sensor_schemes: dict[int, SensorScheme] = {} @classmethod def from_sensor_schemes( @@ -24,7 +28,9 @@ def from_sensor_schemes( parsers: dict[int, PayloadParser] = { sid: SchemePayloadParser(scheme) for sid, scheme in sensor_schemes.items() } - return cls(parsers=parsers, verbose=verbose) + parser = cls(parsers=parsers, verbose=verbose) + parser.sensor_schemes = dict(sensor_schemes) + return parser def parse( self, @@ -33,10 +39,49 @@ def parse( chunk_size: int = 4096, max_resync_scan_bytes: int = 256, ) -> ParseResult: + """Parse a stream from the beginning of an OE file.""" + header_result = read_oe_header(data_stream) + file_header = header_result.header + if file_header is not None: + self._apply_file_header(file_header) + if self.verbose: + print( + f"Parsed OE header v{file_header.version}: " + f"timestamp={file_header.timestamp}, header_size={file_header.header_size}" + ) + + return self.parse_packets( + data_stream, + file_header=file_header, + initial_packet_bytes=header_result.initial_packet_bytes, + chunk_size=chunk_size, + max_resync_scan_bytes=max_resync_scan_bytes, + ) + + def parse_packets( + self, + data_stream: BinaryIO, + *, + file_header: Optional[OeFileHeader] = None, + initial_packet_bytes: bytes = b"", + chunk_size: int = 4096, + max_resync_scan_bytes: int = 256, + ) -> ParseResult: + """Parse packet data from a stream positioned at the first packet. + + Parameters + ---------- + data_stream: + Binary stream positioned at packet data, not at the file header. + file_header: + Optional file-level header metadata to attach to the result. + initial_packet_bytes: + Bytes already consumed by a caller before packet parsing starts. + """ rows_by_sid: dict[int, list[dict]] = {} header_size = 10 - buffer = bytearray() + buffer = bytearray(initial_packet_bytes) packet_idx = 0 mic_samples: List[int] = [] mic_packets: List[MicPacket] = [] @@ -188,8 +233,22 @@ def flush_to_dataframes() -> Dict[int, pd.DataFrame]: mic_samples=mic_samples, mic_packets=mic_packets, audio_stereo=audio_stereo, + file_header=file_header, ) + def _apply_file_header(self, file_header: OeFileHeader) -> None: + """Update parser registrations with schemes embedded in a v3 file header.""" + if not file_header.sensor_schemes: + return + + for sid, scheme in file_header.sensor_schemes.items(): + existing_parser = self.parsers.get(sid) + if isinstance(existing_parser, MicPayloadParser): + continue + self.parsers[sid] = SchemePayloadParser(scheme) + + self.sensor_schemes.update(file_header.sensor_schemes) + def _parse_header(self, header: bytes) -> tuple[int, int, int]: sid, size, time = struct.unpack(" str: class SensorScheme: """Schema definition for one OpenEarable sensor stream.""" - def __init__(self, name: str, sid: int, groups: list[SensorComponentGroupScheme]): + def __init__( + self, + name: str, + sid: int, + groups: list[SensorComponentGroupScheme], + sampling_rate: Optional[float] = None, + ): self.name = name self.sid = sid self.groups = groups + self.sampling_rate = sampling_rate def __repr__(self) -> str: - return f"SensorScheme(name={self.name}, sid={self.sid}, groups={self.groups})" + return f"SensorScheme(name={self.name}, sid={self.sid}, groups={self.groups}, sampling_rate={self.sampling_rate})" def group( diff --git a/tests/test_oe_headers.py b/tests/test_oe_headers.py new file mode 100644 index 0000000..334dde7 --- /dev/null +++ b/tests/test_oe_headers.py @@ -0,0 +1,235 @@ +import io +import os +import struct +import sys +import tempfile +import types +import unittest +from unittest import mock + +if "websockets" not in sys.modules: + websockets_module = types.ModuleType("websockets") + websockets_module.ConnectionClosed = Exception + websockets_client_module = types.ModuleType("websockets.client") + websockets_client_module.WebSocketClientProtocol = object + websockets_client_module.connect = object + sys.modules["websockets"] = websockets_module + sys.modules["websockets.client"] = websockets_client_module + +from open_wearables.parsing import Parser, parse_single_sensor_scheme +from open_wearables.data.sensor_dataset import SensorDataset +from open_wearables.schema import ParseType, SensorScheme +from open_wearables.schema.types import group + + +def _text(value: str) -> bytes: + encoded = value.encode("utf-8") + return struct.pack(" bytes: + options = struct.pack(" bytes: + blob = bytearray() + blob.extend(struct.pack(" bytes: + fixed_size = 27 + header_size = fixed_size + len(parse_info_blob) + return ( + struct.pack( + " Date: Thu, 30 Apr 2026 14:23:39 +0200 Subject: [PATCH 2/4] docs(parser): explain parser internals --- docs/README.md | 1 + docs/api-reference.md | 11 ++ docs/parser-internals.md | 355 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 367 insertions(+) create mode 100644 docs/parser-internals.md diff --git a/docs/README.md b/docs/README.md index 924f646..8c052ed 100644 --- a/docs/README.md +++ b/docs/README.md @@ -7,6 +7,7 @@ - [Getting started](getting-started.md) - [Data model and sensor channels](data-model.md) - [API reference](api-reference.md) +- [Parser internals](parser-internals.md) ## Package Scope diff --git a/docs/api-reference.md b/docs/api-reference.md index 24c8448..084c90a 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -146,6 +146,17 @@ Returns one sensor DataFrame by name. - Valid names: `imu`, `barometer`, `microphone`, `ppg`, `optical_temp`, `bone_acc` - Raises `KeyError` for unknown names. +#### `get_sampling_rate(sensor: str | int) -> Optional[float]` + +Returns the default sampling rate for a sensor name or SID when the file header +contains v3 frequency metadata. Returns `None` for v2 files or sensors without +frequency metadata. + +#### `get_sampling_rates() -> Dict[str, Optional[float]]` + +Returns default sampling rates for all known dataset sensors keyed by sensor +name. Values are `None` when metadata is unavailable. + #### `get_dataframe() -> pandas.DataFrame` Builds and caches a merged DataFrame across all non-empty sensor streams. diff --git a/docs/parser-internals.md b/docs/parser-internals.md new file mode 100644 index 0000000..e6e4013 --- /dev/null +++ b/docs/parser-internals.md @@ -0,0 +1,355 @@ +# Parser Internals + +This document explains how `open-wearables` turns OpenEarable `.oe` files into +timestamp-indexed pandas DataFrames, and how the package is structured for +developers who want to maintain or extend the parser. + +## Package Structure + +The package is split into small layers: + +| Module | Responsibility | +|--------|----------------| +| `open_wearables.data` | User-facing dataset API, sensor constants, and DataFrame accessors. | +| `open_wearables.parsing` | Binary file/header parsing, packet parsing, payload parsers, and audio conversion helpers. | +| `open_wearables.schema` | In-memory schema model used by payload parsers. | +| `open_wearables.ipc` | WebSocket IPC client models and transport logic. | +| `open_wearables.dataset` | Backward-compatible facade for older imports. | +| `open_wearables.parser` | Backward-compatible facade for older parser imports. | +| `open_wearables.scheme` | Backward-compatible facade for older schema imports. | + +New parser code should generally live in `open_wearables.parsing`. New schema +types should live in `open_wearables.schema`. User-facing dataset behavior +belongs in `open_wearables.data`. + +```plantuml +@startuml +skinparam packageStyle rectangle + +package "open_wearables.data" { + class SensorDataset + class SensorAccessor +} + +package "open_wearables.parsing" { + class Parser + class ParseResult + class OeFileHeader + class SchemePayloadParser + class MicPayloadParser +} + +package "open_wearables.schema" { + class SensorScheme + class SensorComponentGroupScheme + class SensorComponentScheme + enum ParseType +} + +SensorDataset --> Parser +SensorDataset --> SensorAccessor +Parser --> ParseResult +Parser --> OeFileHeader +Parser --> SchemePayloadParser +Parser --> MicPayloadParser +SchemePayloadParser --> SensorScheme +SensorScheme --> SensorComponentGroupScheme +SensorComponentGroupScheme --> SensorComponentScheme +SensorComponentScheme --> ParseType +@enduml +``` + +## High-Level Dataset Flow + +`SensorDataset` owns the file-level flow. It checks the file header before it +builds a parser, so version `0x0003` files can provide their own embedded parse +schemes. + +The flow is: + +1. `SensorDataset(filename)` initializes empty state and placeholder accessors. +2. `SensorDataset.parse()` opens the file as binary. +3. `read_oe_header(stream)` probes and consumes the `.oe` file header if present. +4. `SensorDataset._build_parser(file_header)` chooses parser schemes: + - v3 header with embedded schemes: use `file_header.sensor_schemes`. + - v2 header: use `build_default_sensor_schemes(...)`. +5. `Parser.parse_packets(...)` parses packet records from the current stream position. +6. `SensorDataset._build_accessors()` wraps DataFrames in `SensorAccessor` objects. + +```plantuml +@startuml +actor User +participant "SensorDataset" as Dataset +participant "read_oe_header" as Header +participant "Parser" as Parser +participant "PayloadParser" as Payload +participant "SensorAccessor" as Accessor + +User -> Dataset: SensorDataset("recording.oe") +Dataset -> Header: read_oe_header(stream) +Header --> Dataset: HeaderReadResult +Dataset -> Dataset: build parser from header +Dataset -> Parser: parse_packets(stream, file_header) +loop each packet + Parser -> Parser: read SID, size, timestamp + Parser -> Payload: parse(payload) + Payload --> Parser: decoded values +end +Parser --> Dataset: ParseResult +Dataset -> Accessor: build per-sensor accessors +Dataset --> User: dataset +@enduml +``` + +## File Header Handling + +Header parsing lives in `open_wearables.parsing.headers`. + +`read_oe_header(stream)` returns a `HeaderReadResult`: + +- `header`: an `OeFileHeader` when a supported file header is present. +- `initial_packet_bytes`: bytes that were already read before packet parsing starts. + +`SensorDataset` expects supported `.oe` file headers. Unsupported header +versions raise `ValueError`. + +### Header Version `0x0002` + +Version `0x0002` contains only: + +- `version: uint16` +- `timestamp: uint64` + +The header has no embedded parse scheme. `SensorDataset` therefore uses the +default hard-coded sensor schemes for v2 files. + +### Header Version `0x0003` + +Version `0x0003` contains: + +- `version: uint16` +- `timestamp: uint64` +- `header_size: uint32` +- `parse_info_size: uint32` +- `device_id: uint64` +- `side: uint8` +- `parse_info_blob: bytes` + +The ParseInfo blob contains the active sensor list and each referenced sensor +scheme. `parse_parse_info_blob(...)` converts that blob into: + +- `sensor_ids` +- `sensor_schemes` +- `sensor_config_options` + +Those schemes become the source of truth for packet payload decoding. + +```plantuml +@startuml +start +:Read first 2 bytes; +if (version == 0x0002?) then (yes) + :Read timestamp; + :Use default schemes; +elseif (version == 0x0003?) then (yes) + :Read fixed v3 header; + :Read ParseInfo blob; + :Decode embedded SensorScheme objects; + :Use embedded schemes; +else (no) + :Raise ValueError; +endif +:Parse packet records; +stop +@enduml +``` + +## ParseInfo Decoding + +The v3 ParseInfo blob starts with a sensor list: + +| Field | Type | +|-------|------| +| Sensor Count | `uint8` | +| Sensor IDs | `uint8[]` | + +It then stores each sensor scheme in the same order as the sensor IDs: + +| Field | Type | +|-------|------| +| Sensor Scheme Size | `uint16` | +| Sensor Scheme | `byte[]` | + +Each single sensor scheme contains: + +- sensor ID +- sensor name +- component count +- one component record per component +- sensor configuration options + +The parser converts the firmware enum values into `ParseType` values: + +| Firmware Value | Python `ParseType` | +|----------------|--------------------| +| `0` | `INT8` | +| `1` | `UINT8` | +| `2` | `INT16` | +| `3` | `UINT16` | +| `4` | `INT32` | +| `5` | `UINT32` | +| `6` | `FLOAT` | +| `7` | `DOUBLE` | + +Frequency metadata, when present, is stored in +`OeFileHeader.sensor_config_options[sid].frequency_options`. + +Example: + +```python +dataset = SensorDataset("recording.oe") +sid = dataset.SENSOR_SID["imu"] + +options = dataset.file_header.sensor_config_options[sid] +sampling_rates = options.frequency_options.frequencies +default_rate = sampling_rates[options.frequency_options.default_frequency_index] +``` + +For v2 files, this metadata is not available in the file. + +## Packet Parsing + +Packet parsing lives in `open_wearables.parsing.stream.Parser`. + +There are two entry points: + +- `Parser.parse(stream)`: convenience method for streams positioned at the start of an OE file. +- `Parser.parse_packets(stream, ...)`: packet-only method for streams already positioned at packet data. + +`SensorDataset` uses `parse_packets(...)` because it has already read the file +header before building the parser. + +Each packet starts with a 10-byte packet header: + +| Field | Type | +|-------|------| +| Sensor ID | `uint8` | +| Payload Size | `uint8` | +| Timestamp | `uint64` | + +After reading a packet header, `Parser`: + +1. Looks up a payload parser by sensor ID. +2. Reads `Payload Size` bytes. +3. Parses the payload. +4. Flattens grouped values into DataFrame columns. +5. Adds rows into a per-SID row buffer. +6. Converts row buffers into pandas DataFrames at the end. + +```plantuml +@startuml +start +:Read packet header; +:sid, size, timestamp; +if (parser for sid exists?) then (yes) + :Read payload; + :payload parser decodes bytes; + if (microphone?) then (yes) + :Accumulate PCM samples; + else (no) + :Flatten group.component values; + :Append row to per-SID rows; + endif +else (no) + :Attempt resynchronization; +endif +if (more data?) then (yes) + :Read next packet header; +else (no) + :Build DataFrames; + :Build stereo audio if present; + stop +endif +@enduml +``` + +## Payload Parsers + +Payload parser implementations live in `open_wearables.parsing.payload_parsers`. + +### `SchemePayloadParser` + +`SchemePayloadParser` decodes structured sensor payloads from `SensorScheme` +objects. It precomputes the expected payload size from the scheme component +types. + +It supports: + +- fixed-size single samples +- buffered payloads with a trailing `uint16` time delta + +Parsed values are returned as nested dictionaries: + +```python +{ + "acc": {"x": 0.1, "y": 0.2, "z": 0.3}, + "gyro": {"x": 1.0, "y": 2.0, "z": 3.0}, +} +``` + +`Parser` flattens these into DataFrame columns: + +```text +acc.x +acc.y +acc.z +gyro.x +gyro.y +gyro.z +``` + +### `MicPayloadParser` + +`MicPayloadParser` decodes microphone payloads as little-endian `int16` PCM +samples. Microphone samples are not directly added as normal sensor DataFrame +rows. Instead, the parser stores microphone packets and builds stereo audio +helpers from them. + +## Accessors and DataFrames + +`SensorDataset._build_accessors()` creates one `SensorAccessor` per known sensor +name. Each accessor exposes: + +- `.df` or `.to_dataframe()` for the full sensor DataFrame. +- grouped sub-DataFrames for columns named `group.component`. +- direct access for single-level columns. + +For v3 files, labels come from embedded sensor schemes when available. For v2 +files, labels come from the default constants in `open_wearables.data.constants`. + +## Compatibility Facades + +The project keeps older import paths working: + +- `open_wearables.parser` re-exports parser types from `open_wearables.parsing`. +- `open_wearables.dataset` re-exports dataset types from `open_wearables.data`. +- `open_wearables.scheme` re-exports schema types from `open_wearables.schema`. + +New code should prefer the layered modules directly, but compatibility facades +should remain stable unless the project intentionally makes a breaking release. + +## Extension Points + +When adding a new sensor type or payload format: + +1. Add or update schema definitions in `open_wearables.schema`. +2. Add a payload parser in `open_wearables.parsing.payload_parsers` if the + generic `SchemePayloadParser` is not enough. +3. Register the parser in `SensorDataset._build_parser(...)` if it needs special + behavior. +4. Add constants/accessor labels in `open_wearables.data.constants` only for + v2/default-scheme compatibility. +5. Add tests with synthetic binary streams in `tests/`. + +For v3-compatible sensors, prefer relying on the embedded ParseInfo scheme +instead of adding hard-coded labels. From e21d8394c5d833fbbdba961e92bcddbd7ae10194 Mon Sep 17 00:00:00 2001 From: Dennis <45356478+DennisMoschina@users.noreply.github.com> Date: Mon, 4 May 2026 11:03:42 +0200 Subject: [PATCH 3/4] refactor(parsing): structure packet and payload parsing --- docs/parser-internals.md | 25 +- src/open_wearables/parsing/payload_parsers.py | 99 +++---- src/open_wearables/parsing/stream.py | 245 +++++++++++------- src/open_wearables/schema/types.py | 22 ++ tests/test_payload_parsers.py | 66 +++++ 5 files changed, 315 insertions(+), 142 deletions(-) create mode 100644 tests/test_payload_parsers.py diff --git a/docs/parser-internals.md b/docs/parser-internals.md index e6e4013..d89d87c 100644 --- a/docs/parser-internals.md +++ b/docs/parser-internals.md @@ -33,6 +33,7 @@ package "open_wearables.data" { package "open_wearables.parsing" { class Parser + class PacketHeader class ParseResult class OeFileHeader class SchemePayloadParser @@ -49,6 +50,7 @@ package "open_wearables.schema" { SensorDataset --> Parser SensorDataset --> SensorAccessor Parser --> ParseResult +Parser --> PacketHeader Parser --> OeFileHeader Parser --> SchemePayloadParser Parser --> MicPayloadParser @@ -239,12 +241,18 @@ Each packet starts with a 10-byte packet header: After reading a packet header, `Parser`: -1. Looks up a payload parser by sensor ID. -2. Reads `Payload Size` bytes. -3. Parses the payload. -4. Flattens grouped values into DataFrame columns. -5. Adds rows into a per-SID row buffer. -6. Converts row buffers into pandas DataFrames at the end. +1. Decodes the packet prefix into an internal `PacketHeader`. +2. Looks up a payload parser by sensor ID. +3. Reads `Payload Size` bytes. +4. Parses the payload. +5. Sends microphone payloads to the microphone accumulator. +6. Flattens grouped sensor values into DataFrame columns. +7. Adds rows into a per-SID row buffer. +8. Converts row buffers into pandas DataFrames at the end. + +The parser keeps these internal steps in separate helpers so packet framing, +resynchronization, microphone accumulation, value flattening, and DataFrame +creation can evolve independently without changing the public API. ```plantuml @startuml @@ -280,8 +288,9 @@ Payload parser implementations live in `open_wearables.parsing.payload_parsers`. ### `SchemePayloadParser` `SchemePayloadParser` decodes structured sensor payloads from `SensorScheme` -objects. It precomputes the expected payload size from the scheme component -types. +objects. It uses a single parse-type registry to derive both struct formats and +component byte widths, then precomputes the expected payload size from the +scheme component types. It supports: diff --git a/src/open_wearables/parsing/payload_parsers.py b/src/open_wearables/parsing/payload_parsers.py index fca0742..06aefc5 100644 --- a/src/open_wearables/parsing/payload_parsers.py +++ b/src/open_wearables/parsing/payload_parsers.py @@ -6,15 +6,37 @@ from .base import PayloadParser +_PARSE_TYPE_FORMATS: dict[ParseType, tuple[str, int]] = { + ParseType.UINT8: (" List[dict]: + """Decode microphone payload bytes into a sample tuple.""" if len(data) != self.expected_size and self.verbose: print( f"Mic payload size {len(data)} bytes does not match expected " @@ -34,31 +56,24 @@ def should_build_df(self) -> bool: class SchemePayloadParser(PayloadParser): + """Payload parser driven by an OpenEarable ``SensorScheme`` definition.""" + def __init__(self, sensor_scheme: SensorScheme): + """Create a parser for payloads matching ``sensor_scheme``.""" self.sensor_scheme = sensor_scheme + self.expected_size = self._calculate_expected_size(sensor_scheme) - size = 0 - for group in self.sensor_scheme.groups: - for component in group.components: - if component.data_type in (ParseType.UINT8, ParseType.INT8): - size += 1 - elif component.data_type in (ParseType.UINT16, ParseType.INT16): - size += 2 - elif component.data_type in ( - ParseType.UINT32, - ParseType.INT32, - ParseType.FLOAT, - ): - size += 4 - elif component.data_type == ParseType.DOUBLE: - size += 8 - else: - raise ValueError( - f"Unsupported data type in scheme: {component.data_type}" - ) - self.expected_size = size + @staticmethod + def _calculate_expected_size(sensor_scheme: SensorScheme) -> int: + """Return the byte size for one unbuffered sample in ``sensor_scheme``.""" + return sum( + _format_for(component.data_type)[1] + for group in sensor_scheme.groups + for component in group.components + ) def check_size(self, data: bytes) -> None: + """Validate that ``data`` is either one sample or a buffered payload.""" size = len(data) if size != self.expected_size and not ( size > self.expected_size and (size - 2) % self.expected_size == 0 @@ -69,10 +84,12 @@ def check_size(self, data: bytes) -> None: ) def is_buffered(self, data: bytes) -> bool: + """Return whether ``data`` contains multiple samples and a time delta.""" size = len(data) return size > self.expected_size and (size - 2) % self.expected_size == 0 def parse(self, data: bytes, **kwargs) -> List[dict]: + """Decode a single-sample or buffered structured sensor payload.""" self.check_size(data) if self.is_buffered(data): results = [] @@ -80,7 +97,9 @@ def parse(self, data: bytes, **kwargs) -> List[dict]: payload = data[:-2] n_packets = len(payload) // self.expected_size for i in range(n_packets): - packet_data = payload[i * self.expected_size : (i + 1) * self.expected_size] + packet_data = payload[ + i * self.expected_size : (i + 1) * self.expected_size + ] parsed_packet = self.parse_packet(packet_data) parsed_packet["t_delta"] = t_delta results.append(parsed_packet) @@ -88,40 +107,26 @@ def parse(self, data: bytes, **kwargs) -> List[dict]: return [self.parse_packet(data)] def parse_packet(self, data: bytes) -> dict: + """Decode one unbuffered structured sensor sample.""" parsed_data = {} offset = 0 for group in self.sensor_scheme.groups: group_data = {} for component in group.components: - if component.data_type == ParseType.UINT8: - value = struct.unpack_from(" tuple[str, int]: + """Return the struct format and byte width for ``parse_type``.""" + try: + return _PARSE_TYPE_FORMATS[parse_type] + except KeyError as exc: + raise ValueError(f"Unsupported data type: {parse_type}") from exc diff --git a/src/open_wearables/parsing/stream.py b/src/open_wearables/parsing/stream.py index 564b2ba..77744d3 100644 --- a/src/open_wearables/parsing/stream.py +++ b/src/open_wearables/parsing/stream.py @@ -1,4 +1,5 @@ import struct +from dataclasses import dataclass from typing import BinaryIO, Dict, List, Optional import pandas as pd @@ -11,10 +12,28 @@ from .payload_parsers import MicPayloadParser, SchemePayloadParser +PACKET_HEADER_SIZE = 10 + + +@dataclass(frozen=True) +class PacketHeader: + """Decoded OpenEarable packet header.""" + + sid: int + payload_size: int + timestamp_us: int + + @property + def timestamp_s(self) -> float: + """Packet timestamp in seconds.""" + return self.timestamp_us / 1e6 + + class Parser: """Incremental parser for OpenEarable packet streams and ``.oe`` files.""" def __init__(self, parsers: dict[int, PayloadParser], verbose: bool = False): + """Create a parser from per-SID payload parser registrations.""" self.parsers = parsers self.verbose = verbose self.sensor_schemes: dict[int, SensorScheme] = {} @@ -25,6 +44,7 @@ def from_sensor_schemes( sensor_schemes: dict[int, SensorScheme], verbose: bool = False, ) -> "Parser": + """Create a parser that decodes every SID with ``SchemePayloadParser``.""" parsers: dict[int, PayloadParser] = { sid: SchemePayloadParser(scheme) for sid, scheme in sensor_schemes.items() } @@ -78,25 +98,14 @@ def parse_packets( initial_packet_bytes: Bytes already consumed by a caller before packet parsing starts. """ - rows_by_sid: dict[int, list[dict]] = {} - - header_size = 10 + rows_by_sid: dict[int, list[dict[str, object]]] = {} buffer = bytearray(initial_packet_bytes) packet_idx = 0 mic_samples: List[int] = [] mic_packets: List[MicPacket] = [] - def flush_to_dataframes() -> Dict[int, pd.DataFrame]: - result: Dict[int, pd.DataFrame] = {} - for sid, rows in rows_by_sid.items(): - df = pd.DataFrame(rows) - if not df.empty and "timestamp" in df.columns: - df.set_index("timestamp", inplace=True) - result[sid] = df - return result - while True: - if len(buffer) < header_size: + if len(buffer) < PACKET_HEADER_SIZE: chunk = data_stream.read(chunk_size) if not chunk: if self.verbose and buffer: @@ -107,48 +116,39 @@ def flush_to_dataframes() -> Dict[int, pd.DataFrame]: buffer.extend(chunk) continue - header = bytes(buffer[:header_size]) - sid, size, time = self._parse_header(header) - timestamp_s = time / 1e6 + header = self._read_packet_header(buffer) if self.verbose: print( - f"Packet #{packet_idx}: SID={sid}, size={size}, time={timestamp_s:.6f}s " + f"Packet #{packet_idx}: SID={header.sid}, " + f"size={header.payload_size}, time={header.timestamp_s:.6f}s " f"(buffer_len={len(buffer)})" ) - if sid not in self.parsers: + if header.sid not in self.parsers: if self.verbose: - print(f"Warning: No parser registered for SID={sid}. Attempting resync...") - new_offset = self._attempt_resync( - bytes(buffer), - 0, - packet_idx, - max_scan_bytes=max_resync_scan_bytes, + print( + f"Warning: No parser registered for SID={header.sid}. " + "Attempting resync..." + ) + self._discard_until_resync( + buffer, packet_idx, max_scan_bytes=max_resync_scan_bytes ) - if new_offset is None: - del buffer[:1] - else: - del buffer[:new_offset] continue - if size <= 0: + if header.payload_size <= 0: if self.verbose: - print(f"Invalid size={size} for SID={sid}. Attempting resync...") - new_offset = self._attempt_resync( - bytes(buffer), - 0, - packet_idx, - max_scan_bytes=max_resync_scan_bytes, + print( + f"Invalid size={header.payload_size} for SID={header.sid}. " + "Attempting resync..." + ) + self._discard_until_resync( + buffer, packet_idx, max_scan_bytes=max_resync_scan_bytes ) - if new_offset is None: - del buffer[:1] - else: - del buffer[:new_offset] continue - parser = self.parsers[sid] - needed = header_size + size + parser = self.parsers[header.sid] + needed = PACKET_HEADER_SIZE + header.payload_size if len(buffer) < needed: chunk = data_stream.read(chunk_size) if not chunk: @@ -161,72 +161,51 @@ def flush_to_dataframes() -> Dict[int, pd.DataFrame]: buffer.extend(chunk) continue - payload = bytes(buffer[header_size:needed]) + payload = bytes(buffer[PACKET_HEADER_SIZE:needed]) try: values_list = parser.parse(payload) if isinstance(parser, MicPayloadParser): - for item in values_list: - samples = item.get("samples") - if samples is None: - continue - mic_samples.extend(list(samples)) - mic_packets.append( - { - "timestamp": timestamp_s, - "samples": samples, - } - ) + self._append_mic_values( + values_list=values_list, + timestamp_s=header.timestamp_s, + mic_samples=mic_samples, + mic_packets=mic_packets, + ) if self.verbose: if isinstance(parser, MicPayloadParser): print( - f"Parsed mic packet #{packet_idx} (SID={sid}) successfully: " + f"Parsed mic packet #{packet_idx} (SID={header.sid}) successfully: " f"{len(values_list[0].get('samples', [])) if values_list else 0} samples" ) else: print( - f"Parsed packet #{packet_idx} (SID={sid}) successfully: {values_list}" + f"Parsed packet #{packet_idx} (SID={header.sid}) successfully: " + f"{values_list}" ) except Exception as exc: if self.verbose: print( f"struct.error while parsing payload at packet #{packet_idx} " - f"(SID={sid}, size={size}): {exc}. Attempting resync..." + f"(SID={header.sid}, size={header.payload_size}): {exc}. " + "Attempting resync..." ) - new_offset = self._attempt_resync( - bytes(buffer), - 0, - packet_idx, - max_scan_bytes=max_resync_scan_bytes, + self._discard_until_resync( + buffer, packet_idx, max_scan_bytes=max_resync_scan_bytes ) - if new_offset is None: - del buffer[:1] - else: - del buffer[:new_offset] continue if parser.should_build_df(): - for values in values_list: - flat_values: dict[str, object] = {} - for key, val in values.items(): - if key == "t_delta": - timestamp_s += val / 1e6 - continue - if isinstance(val, dict): - for sub_key, sub_val in val.items(): - flat_values[f"{key}.{sub_key}"] = sub_val - else: - flat_values[key] = val - - row = { - "timestamp": timestamp_s, - **flat_values, - } - rows_by_sid.setdefault(sid, []).append(row) + self._append_sensor_rows( + rows_by_sid=rows_by_sid, + sid=header.sid, + timestamp_s=header.timestamp_s, + values_list=values_list, + ) del buffer[:needed] packet_idx += 1 - sensor_dfs = flush_to_dataframes() + sensor_dfs = self._rows_to_dataframes(rows_by_sid) audio_stereo = mic_samples_to_stereo(mic_samples) return ParseResult( sensor_dfs=sensor_dfs, @@ -253,6 +232,99 @@ def _parse_header(self, header: bytes) -> tuple[int, int, int]: sid, size, time = struct.unpack(" PacketHeader: + """Decode the packet header at the start of ``buffer``.""" + sid, size, time = self._parse_header(bytes(buffer[:PACKET_HEADER_SIZE])) + return PacketHeader(sid=sid, payload_size=size, timestamp_us=time) + + def _append_mic_values( + self, + *, + values_list: List[dict], + timestamp_s: float, + mic_samples: List[int], + mic_packets: List[MicPacket], + ) -> None: + """Accumulate decoded microphone samples and packet timestamps.""" + for item in values_list: + samples = item.get("samples") + if samples is None: + continue + mic_samples.extend(list(samples)) + mic_packets.append( + { + "timestamp": timestamp_s, + "samples": samples, + } + ) + + def _append_sensor_rows( + self, + *, + rows_by_sid: dict[int, list[dict[str, object]]], + sid: int, + timestamp_s: float, + values_list: List[dict], + ) -> None: + """Flatten parsed sensor values and append DataFrame-ready rows.""" + for values in values_list: + flat_values, timestamp_s = self._flatten_values(values, timestamp_s) + row = { + "timestamp": timestamp_s, + **flat_values, + } + rows_by_sid.setdefault(sid, []).append(row) + + @staticmethod + def _flatten_values( + values: dict, + timestamp_s: float, + ) -> tuple[dict[str, object], float]: + """Flatten grouped parser output into ``group.component`` columns.""" + flat_values: dict[str, object] = {} + for key, val in values.items(): + if key == "t_delta": + timestamp_s += val / 1e6 + continue + if isinstance(val, dict): + for sub_key, sub_val in val.items(): + flat_values[f"{key}.{sub_key}"] = sub_val + else: + flat_values[key] = val + return flat_values, timestamp_s + + @staticmethod + def _rows_to_dataframes( + rows_by_sid: dict[int, list[dict[str, object]]], + ) -> Dict[int, pd.DataFrame]: + """Convert accumulated parser rows into timestamp-indexed DataFrames.""" + result: Dict[int, pd.DataFrame] = {} + for sid, rows in rows_by_sid.items(): + df = pd.DataFrame(rows) + if not df.empty and "timestamp" in df.columns: + df.set_index("timestamp", inplace=True) + result[sid] = df + return result + + def _discard_until_resync( + self, + buffer: bytearray, + packet_idx: int, + *, + max_scan_bytes: int, + ) -> None: + """Discard corrupt bytes up to the next plausible packet header.""" + new_offset = self._attempt_resync( + bytes(buffer), + 0, + packet_idx, + max_scan_bytes=max_scan_bytes, + ) + if new_offset is None: + del buffer[:1] + else: + del buffer[:new_offset] + def _is_plausible_header(self, sid: int, size: int, remaining: int) -> bool: if sid not in self.parsers: return False @@ -274,7 +346,6 @@ def _attempt_resync( max_scan_bytes: int = 64, ) -> Optional[int]: total_len = len(data) - header_size = 10 if self.verbose: print( @@ -284,16 +355,16 @@ def _attempt_resync( for delta in range(1, max_scan_bytes + 1): candidate = packet_start + delta - if candidate + header_size > total_len: + if candidate + PACKET_HEADER_SIZE > total_len: break - header = data[candidate : candidate + header_size] + header = data[candidate : candidate + PACKET_HEADER_SIZE] try: sid, size, time = self._parse_header(header) except struct.error: continue - remaining = total_len - (candidate + header_size) + remaining = total_len - (candidate + PACKET_HEADER_SIZE) if not self._is_plausible_header(sid, size, remaining): continue diff --git a/src/open_wearables/schema/types.py b/src/open_wearables/schema/types.py index 4b93ef8..78c0315 100644 --- a/src/open_wearables/schema/types.py +++ b/src/open_wearables/schema/types.py @@ -3,6 +3,8 @@ class ParseType(enum.Enum): + """Binary scalar types supported by OpenEarable sensor schemes.""" + UINT8 = "uint8" UINT16 = "uint16" UINT32 = "uint32" @@ -14,7 +16,10 @@ class ParseType(enum.Enum): class SensorComponentScheme: + """Schema entry for one named scalar value in a sensor payload.""" + def __init__(self, name: str, data_type: ParseType): + """Create a component with a display name and binary parse type.""" self.name = name self.data_type = data_type @@ -23,7 +28,10 @@ def __repr__(self) -> str: class SensorComponentGroupScheme: + """Named group of related payload components.""" + def __init__(self, name: str, components: list[SensorComponentScheme]): + """Create a component group in payload order.""" self.name = name self.components = components @@ -41,6 +49,19 @@ def __init__( groups: list[SensorComponentGroupScheme], sampling_rate: Optional[float] = None, ): + """Create a sensor scheme. + + Parameters + ---------- + name: + Human-readable sensor name. + sid: + Numeric sensor stream ID encoded in packet headers. + groups: + Ordered payload component groups. + sampling_rate: + Optional default sampling rate for the sensor. + """ self.name = name self.sid = sid self.groups = groups @@ -54,6 +75,7 @@ def group( name: str, components: Sequence[tuple[str, ParseType]], ) -> SensorComponentGroupScheme: + """Build a ``SensorComponentGroupScheme`` from component tuples.""" return SensorComponentGroupScheme( name=name, components=[ diff --git a/tests/test_payload_parsers.py b/tests/test_payload_parsers.py new file mode 100644 index 0000000..625d52a --- /dev/null +++ b/tests/test_payload_parsers.py @@ -0,0 +1,66 @@ +import struct +import unittest + +from open_wearables.parsing import SchemePayloadParser +from open_wearables.schema import ParseType, SensorScheme +from open_wearables.schema.types import group + + +class SchemePayloadParserTests(unittest.TestCase): + def test_parse_packet_supports_all_scalar_types(self): + scheme = SensorScheme( + name="all_types", + sid=1, + groups=[ + group( + "sample", + [ + ("uint8", ParseType.UINT8), + ("uint16", ParseType.UINT16), + ("uint32", ParseType.UINT32), + ("int8", ParseType.INT8), + ("int16", ParseType.INT16), + ("int32", ParseType.INT32), + ("float", ParseType.FLOAT), + ("double", ParseType.DOUBLE), + ], + ) + ], + ) + payload = struct.pack( + " Date: Mon, 4 May 2026 11:03:56 +0200 Subject: [PATCH 4/4] feat(data): use header microphone sampling rate --- docs/api-reference.md | 11 ++-- docs/data-model.md | 7 ++- docs/getting-started.md | 2 +- src/open_wearables/data/sensor_dataset.py | 62 +++++++++++++++++------ src/open_wearables/parsing/audio.py | 2 +- tests/test_oe_headers.py | 55 ++++++++++++++++++++ 6 files changed, 117 insertions(+), 22 deletions(-) diff --git a/docs/api-reference.md b/docs/api-reference.md index 084c90a..c5e2706 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -149,7 +149,7 @@ Returns one sensor DataFrame by name. #### `get_sampling_rate(sensor: str | int) -> Optional[float]` Returns the default sampling rate for a sensor name or SID when the file header -contains v3 frequency metadata. Returns `None` for v2 files or sensors without +contains v3+ frequency metadata. Returns `None` for v2 files or sensors without frequency metadata. #### `get_sampling_rates() -> Dict[str, Optional[float]]` @@ -161,7 +161,7 @@ name. Values are `None` when metadata is unavailable. Builds and caches a merged DataFrame across all non-empty sensor streams. -#### `get_audio_dataframe(sampling_rate: int = 48000) -> pandas.DataFrame` +#### `get_audio_dataframe(sampling_rate: Optional[float] = None) -> pandas.DataFrame` Returns timestamp-indexed audio DataFrame with columns: @@ -170,6 +170,9 @@ Returns timestamp-indexed audio DataFrame with columns: Behavior: +- Uses the microphone sampling rate from v3+ file headers when available. +- Falls back to 48 kHz when no microphone sampling-rate metadata is present. +- Uses the caller-provided `sampling_rate` as an explicit override. - Raises `ValueError` if `sampling_rate <= 0`. - Returns empty DataFrame with expected columns if no mic packets exist. - Caches by sampling rate. @@ -184,11 +187,11 @@ Saves the combined DataFrame to CSV if `self.df` is non-empty. Call `get_dataframe()` first to ensure `self.df` is populated. -#### `play_audio(sampling_rate: int = 48000) -> None` +#### `play_audio(sampling_rate: Optional[float] = None) -> None` Plays audio in IPython/Jupyter via `IPython.display.Audio`. -#### `save_audio(path: str, sampling_rate: int = 48000) -> None` +#### `save_audio(path: str, sampling_rate: Optional[float] = None) -> None` Writes WAV audio with `scipy.io.wavfile.write`. diff --git a/docs/data-model.md b/docs/data-model.md index 59ae273..393b9c6 100644 --- a/docs/data-model.md +++ b/docs/data-model.md @@ -26,7 +26,7 @@ All sensor DataFrames are indexed by `timestamp` in seconds (`float`), derived f ## Accessor Semantics -Each sensor is exposed as a `_SensorAccessor` object: +Each sensor is exposed as a `SensorAccessor` object: - `sensor.df` or `sensor.to_dataframe()` returns the full sensor DataFrame with original column names. - Group columns are available as sub-DataFrames: @@ -59,3 +59,8 @@ The audio DataFrame generated by `get_audio_dataframe()` uses: - index: `timestamp` in seconds - columns: `mic.inner`, `mic.outer` + +When a v3+ file header provides microphone frequency metadata, the default audio +timestamps use that sampling rate. Recordings without microphone frequency +metadata fall back to 48 kHz, and callers can still pass `sampling_rate` to +override the default. diff --git a/docs/getting-started.md b/docs/getting-started.md index 0ceab96..217a81f 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -65,7 +65,7 @@ mag_z = dataset.imu.mag.z ```python # Timestamp-indexed stereo audio DataFrame -audio_df = dataset.get_audio_dataframe() # default 48_000 Hz +audio_df = dataset.get_audio_dataframe() # header mic rate, or 48_000 Hz fallback print(audio_df.columns) # mic.inner, mic.outer # Save WAV diff --git a/src/open_wearables/data/sensor_dataset.py b/src/open_wearables/data/sensor_dataset.py index 0ddace7..ef0a8a6 100644 --- a/src/open_wearables/data/sensor_dataset.py +++ b/src/open_wearables/data/sensor_dataset.py @@ -22,6 +22,9 @@ from .constants import COLORS, LABELS, SENSOR_FORMATS, SENSOR_SID, SID_NAMES +DEFAULT_MICROPHONE_SAMPLING_RATE = 48000 + + class SensorDataset: """High-level representation of an OpenEarable sensor recording file.""" @@ -37,7 +40,7 @@ def __init__(self, filename: str, verbose: bool = False): self.sensor_dfs: Dict[int, pd.DataFrame] = {} self.audio_stereo: Optional[np.ndarray] = None self.audio_df: pd.DataFrame = pd.DataFrame() - self._audio_df_sampling_rate: Optional[int] = None + self._audio_df_sampling_rate: Optional[float] = None self.bone_sound: Optional[np.ndarray] = None self.df: pd.DataFrame = pd.DataFrame() @@ -132,7 +135,7 @@ def get_sampling_rate(self, sensor: Union[str, int]) -> Optional[float]: Returns ------- Optional[float] - The default sampling rate recorded in a v3 file header, or ``None`` + The default sampling rate recorded in a v3+ file header, or ``None`` when the file does not contain sampling-rate metadata for the sensor. """ if isinstance(sensor, str): @@ -208,18 +211,40 @@ def get_dataframe(self) -> pd.DataFrame: return self.df - def get_audio_dataframe(self, sampling_rate: int = 48000) -> pd.DataFrame: - if sampling_rate <= 0: - raise ValueError(f"sampling_rate must be > 0, got {sampling_rate}") + def _resolve_microphone_sampling_rate( + self, + sampling_rate: Optional[float], + ) -> float: + """Return the explicit or file-header microphone sampling rate.""" + if sampling_rate is not None: + resolved_rate = float(sampling_rate) + elif self.file_header is not None and self.file_header.version >= 3: + resolved_rate = ( + self.get_sampling_rate("microphone") + or DEFAULT_MICROPHONE_SAMPLING_RATE + ) + else: + resolved_rate = DEFAULT_MICROPHONE_SAMPLING_RATE + + if resolved_rate <= 0: + raise ValueError(f"sampling_rate must be > 0, got {resolved_rate}") + return resolved_rate + + def get_audio_dataframe( + self, + sampling_rate: Optional[float] = None, + ) -> pd.DataFrame: + """Return microphone samples indexed with the resolved sampling rate.""" + resolved_sampling_rate = self._resolve_microphone_sampling_rate(sampling_rate) - if self._audio_df_sampling_rate == sampling_rate: + if self._audio_df_sampling_rate == resolved_sampling_rate: return self.audio_df mic_packets = getattr(self.parse_result, "mic_packets", []) if not mic_packets: self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"]) self.audio_df.index.name = "timestamp" - self._audio_df_sampling_rate = sampling_rate + self._audio_df_sampling_rate = resolved_sampling_rate return self.audio_df timestamps: List[np.ndarray] = [] @@ -228,7 +253,7 @@ def get_audio_dataframe(self, sampling_rate: int = 48000) -> pd.DataFrame: for packet in mic_packets: ts, stereo = mic_packet_to_stereo_frames( packet=packet, - sampling_rate=sampling_rate, + sampling_rate=resolved_sampling_rate, ) if stereo.size == 0: continue @@ -238,7 +263,7 @@ def get_audio_dataframe(self, sampling_rate: int = 48000) -> pd.DataFrame: if not timestamps: self.audio_df = pd.DataFrame(columns=["mic.inner", "mic.outer"]) self.audio_df.index.name = "timestamp" - self._audio_df_sampling_rate = sampling_rate + self._audio_df_sampling_rate = resolved_sampling_rate return self.audio_df all_ts = np.concatenate(timestamps) @@ -253,9 +278,12 @@ def get_audio_dataframe(self, sampling_rate: int = 48000) -> pd.DataFrame: ) self.audio_df.index.name = "timestamp" self.audio_df = self.audio_df[~self.audio_df.index.duplicated(keep="first")] - self._audio_df_sampling_rate = sampling_rate + self._audio_df_sampling_rate = resolved_sampling_rate - if sampling_rate == 48000: + if ( + sampling_rate is None + or resolved_sampling_rate == DEFAULT_MICROPHONE_SAMPLING_RATE + ): self.sensor_dfs[self.SENSOR_SID["microphone"]] = self.audio_df return self.audio_df @@ -268,21 +296,25 @@ def save_csv(self, path: str) -> None: if not self.df.empty: self.df.to_csv(path) - def play_audio(self, sampling_rate: int = 48000) -> None: + def play_audio(self, sampling_rate: Optional[float] = None) -> None: + """Play microphone audio using explicit or file-header sampling rate.""" if self.audio_stereo is None: print("❌ No microphone data available.") return + resolved_sampling_rate = self._resolve_microphone_sampling_rate(sampling_rate) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: - write(tmp.name, sampling_rate, self.audio_stereo) + write(tmp.name, int(resolved_sampling_rate), self.audio_stereo) display(Audio(tmp.name)) - def save_audio(self, path: str, sampling_rate: int = 48000) -> None: + def save_audio(self, path: str, sampling_rate: Optional[float] = None) -> None: + """Save microphone audio using explicit or file-header sampling rate.""" if self.audio_stereo is None: print("❌ No microphone data available to save.") return try: - write(path, sampling_rate, self.audio_stereo) + resolved_sampling_rate = self._resolve_microphone_sampling_rate(sampling_rate) + write(path, int(resolved_sampling_rate), self.audio_stereo) print(f"✅ Audio saved successfully to {path}") except Exception as exc: print(f"❌ Error saving audio to {path}: {exc}") diff --git a/src/open_wearables/parsing/audio.py b/src/open_wearables/parsing/audio.py index f9faf4e..9047824 100644 --- a/src/open_wearables/parsing/audio.py +++ b/src/open_wearables/parsing/audio.py @@ -23,7 +23,7 @@ def interleaved_mic_to_stereo( def mic_packet_to_stereo_frames( packet: MicPacket, - sampling_rate: int, + sampling_rate: float, ) -> Tuple[np.ndarray, np.ndarray]: """Return timestamps and stereo frames for a parsed microphone packet.""" if sampling_rate <= 0: diff --git a/tests/test_oe_headers.py b/tests/test_oe_headers.py index 334dde7..20542cb 100644 --- a/tests/test_oe_headers.py +++ b/tests/test_oe_headers.py @@ -182,6 +182,61 @@ def test_sensor_dataset_builds_parser_after_reading_v3_header(self): self.assertEqual(dataset.get_sampling_rates()["imu"], 50.0) self.assertIsNone(dataset.get_sampling_rates()["ppg"]) + def test_sensor_dataset_uses_v3_microphone_sampling_rate_by_default(self): + sensor_scheme = _single_sensor_scheme( + sid=2, + sensor_name="Microphone", + group_name="mic", + component_name="sample", + parse_type=2, + frequencies=(16000.0,), + ) + parse_info = _parse_info_blob([2], [sensor_scheme]) + content = ( + _v3_header(parse_info) + + struct.pack("