Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ relay-hk = { path = "../relay/crates/relay-hk" }
relay-tbl = { path = "../relay/crates/relay-tbl" }
relay-cs = { path = "../relay/crates/relay-cs" }
relay-to = { path = "../relay/crates/relay-to" }
relay-ccsds = { path = "../relay/crates/relay-ccsds" }
wohl-leak = { path = "crates/wohl-leak" }
wohl-temp = { path = "crates/wohl-temp" }
wohl-air = { path = "crates/wohl-air" }
Expand Down
1 change: 1 addition & 0 deletions crates/wohl-hub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ relay-hk.workspace = true
relay-tbl.workspace = true
relay-cs.workspace = true
relay-to.workspace = true
relay-ccsds.workspace = true

# Runtime
serde = { version = "1", features = ["derive"] }
Expand Down
210 changes: 205 additions & 5 deletions crates/wohl-hub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
//!
//! Reads sensor data from stdin (JSON lines), routes through monitors,
//! dispatches alerts through dedup/rate-limiter, prints alerts to stdout.
//!
//! With `--ccsds` (or `WOHL_INPUT=ccsds`), reads binary CCSDS sensor packets
//! instead — 14 bytes per reading, matching what Wohl sensor nodes emit.

use std::io::BufRead;
use std::io::{BufRead, Read};

use relay_ccsds::sensor_wire;
use serde::{Deserialize, Serialize};

// ── Configuration types ────────────────────────────────────────
Expand Down Expand Up @@ -627,12 +631,96 @@ fn config_from_str(s: &str) -> Result<HubConfig, toml::de::Error> {
toml::from_str(s)
}

// ── Main ───────────────────────────────────────────────────────
// ── CCSDS sensor packet → SensorEvent mapping ──────────────────

/// Translate a decoded CCSDS sensor packet into a wohl-hub SensorEvent.
///
/// `time` is supplied by the caller (usually wall-clock seconds on arrival)
/// since CCSDS packets don't carry a timestamp field.
/// Returns None for sensor types the hub does not currently handle.
fn packet_to_event(pkt: &sensor_wire::SensorPacket, time: u64) -> Option<SensorEvent> {
match pkt.sensor_type {
sensor_wire::SENSOR_TEMP => Some(SensorEvent::Temp {
zone: pkt.zone_id as u32,
value: pkt.value,
time,
}),
sensor_wire::SENSOR_WATER => Some(SensorEvent::Water {
zone: pkt.zone_id as u32,
wet: pkt.value != 0,
time,
}),
sensor_wire::SENSOR_CONTACT => Some(SensorEvent::Contact {
// device_id is the physical contact identifier
id: pkt.device_id as u32,
open: pkt.value != 0,
time,
}),
sensor_wire::SENSOR_CO2 => Some(SensorEvent::Air {
zone: pkt.zone_id as u32,
co2: pkt.value.max(0) as u32,
pm25: None,
voc: None,
time,
}),
sensor_wire::SENSOR_POWER => Some(SensorEvent::Power {
// device_id is the circuit identifier
circuit: pkt.device_id as u32,
watts: (pkt.value.max(0) / 10) as u32, // value is watts × 10
time,
}),
// PM25, VOC, HUMIDITY, MOTION, ENERGY, LUX, PRESSURE, WIND, RAIN
// not yet wired to monitors — dropped silently.
_ => None,
}
}

fn main() {
let config = load_config();
let mut hub = WohlHub::new(&config);
fn now_seconds() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}

fn run_ccsds_mode(hub: &mut WohlHub) {
eprintln!("[wohl-hub] ready — reading CCSDS sensor packets from stdin (14 bytes each)");

let mut stdin = std::io::stdin().lock();
let mut buf = [0u8; sensor_wire::PACKET_SIZE];

loop {
match stdin.read_exact(&mut buf) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => {
eprintln!("[wohl-hub] stdin error: {}", e);
break;
}
}

let pkt = match sensor_wire::decode_packet(&buf) {
Ok(p) => p,
Err(e) => {
eprintln!("[wohl-hub] ccsds decode error: {:?}", e);
continue;
}
};

let Some(event) = packet_to_event(&pkt, now_seconds()) else {
continue;
};

let alerts = hub.process_event(event);
for alert in &alerts {
match serde_json::to_string(alert) {
Ok(json) => println!("{}", json),
Err(e) => eprintln!("[wohl-hub] serialize error: {}", e),
}
}
}
}

fn run_json_mode(hub: &mut WohlHub) {
eprintln!("[wohl-hub] ready — reading sensor events from stdin");

let stdin = std::io::stdin();
Expand Down Expand Up @@ -666,6 +754,26 @@ fn main() {
}
}
}
}

fn input_mode_is_ccsds() -> bool {
if std::env::args().any(|a| a == "--ccsds") {
return true;
}
matches!(std::env::var("WOHL_INPUT").as_deref(), Ok("ccsds"))
}

// ── Main ───────────────────────────────────────────────────────

fn main() {
let config = load_config();
let mut hub = WohlHub::new(&config);

if input_mode_is_ccsds() {
run_ccsds_mode(&mut hub);
} else {
run_json_mode(&mut hub);
}

eprintln!("[wohl-hub] done");
}
Expand Down Expand Up @@ -1105,4 +1213,96 @@ dedup_cooldown_sec = 60
serde_json::from_str(r#"{"type":"tick","time":2000}"#).unwrap();
assert!(matches!(tick, SensorEvent::Tick { time: 2000 }));
}

#[test]
fn test_packet_to_event_temp() {
let pkt = sensor_wire::SensorPacket {
device_id: 10,
sequence: 0,
sensor_type: sensor_wire::SENSOR_TEMP,
quality: sensor_wire::QUALITY_GOOD,
zone_id: 1,
value: -100,
};
let ev = packet_to_event(&pkt, 500).unwrap();
assert!(matches!(ev, SensorEvent::Temp { zone: 1, value: -100, time: 500 }));
}

#[test]
fn test_packet_to_event_water() {
let pkt = sensor_wire::SensorPacket {
device_id: 7,
sequence: 0,
sensor_type: sensor_wire::SENSOR_WATER,
quality: sensor_wire::QUALITY_GOOD,
zone_id: 2,
value: 1,
};
let ev = packet_to_event(&pkt, 900).unwrap();
assert!(matches!(ev, SensorEvent::Water { zone: 2, wet: true, time: 900 }));
}

#[test]
fn test_packet_to_event_contact_uses_device_id() {
let pkt = sensor_wire::SensorPacket {
device_id: 42,
sequence: 0,
sensor_type: sensor_wire::SENSOR_CONTACT,
quality: sensor_wire::QUALITY_GOOD,
zone_id: 1,
value: 1,
};
let ev = packet_to_event(&pkt, 0).unwrap();
assert!(matches!(ev, SensorEvent::Contact { id: 42, open: true, .. }));
}

#[test]
fn test_packet_to_event_power_scales() {
let pkt = sensor_wire::SensorPacket {
device_id: 3,
sequence: 0,
sensor_type: sensor_wire::SENSOR_POWER,
quality: sensor_wire::QUALITY_GOOD,
zone_id: 1,
value: 15230, // 1523.0W × 10
};
let ev = packet_to_event(&pkt, 0).unwrap();
assert!(matches!(ev, SensorEvent::Power { circuit: 3, watts: 1523, .. }));
}

#[test]
fn test_packet_to_event_unknown_returns_none() {
let pkt = sensor_wire::SensorPacket {
device_id: 1,
sequence: 0,
sensor_type: sensor_wire::SENSOR_WIND,
quality: sensor_wire::QUALITY_GOOD,
zone_id: 1,
value: 0,
};
assert!(packet_to_event(&pkt, 0).is_none());
}

#[test]
fn test_end_to_end_ccsds_freeze() {
// Encode a below-freeze temp reading, decode, translate, feed to hub.
let pkt = sensor_wire::SensorPacket {
device_id: 10,
sequence: 0,
sensor_type: sensor_wire::SENSOR_TEMP,
quality: sensor_wire::QUALITY_GOOD,
zone_id: 1,
value: -100,
};
let mut buf = [0u8; sensor_wire::PACKET_SIZE];
sensor_wire::encode_packet(&pkt, &mut buf);
let decoded = sensor_wire::decode_packet(&buf).unwrap();
let event = packet_to_event(&decoded, 1000).unwrap();

let mut hub = test_hub();
let alerts = hub.process_event(event);
assert!(!alerts.is_empty());
assert_eq!(alerts[0].alert, "freeze");
assert_eq!(alerts[0].zone, Some(1));
}
}
Loading