diff --git a/oximeter/instruments/src/kstat/cpu.rs b/oximeter/instruments/src/kstat/cpu.rs index ca1da1b4b89..bfc99e92421 100644 --- a/oximeter/instruments/src/kstat/cpu.rs +++ b/oximeter/instruments/src/kstat/cpu.rs @@ -30,6 +30,12 @@ const CPU_NSEC_PREFIX: &str = "cpu_nsec_"; /// These correspond to the `cpu_nsec_*` fields in the `cpu::sys` kstat. const CPU_MICROSTATES: &[&str] = &["idle", "user", "kernel", "dtrace", "intr"]; +/// The maximum cardinality of the data we produce, per sampling interval. +pub const fn max_cardinality() -> usize { + // Assume max of 256 CPUs + CPU_MICROSTATES.len() * 256 +} + /// CPU metrics for a sled, tracking microstate statistics across all cores. #[derive(Clone, Debug)] pub struct SledCpu { @@ -244,7 +250,7 @@ mod tests { sled_revision: SLED_REVISION, }; let cpu = SledCpu::new(target, true); - let details = CollectionDetails::never(Duration::from_secs(1)); + let details = CollectionDetails::never(Duration::from_secs(1), 512); let id = sampler.add_target(cpu, details).await.unwrap(); let samples: Vec<_> = sampler.produce().unwrap().collect(); assert!(samples.is_empty()); diff --git a/oximeter/instruments/src/kstat/link.rs b/oximeter/instruments/src/kstat/link.rs index be4cf6a0c0c..1c32fb111f1 100644 --- a/oximeter/instruments/src/kstat/link.rs +++ b/oximeter/instruments/src/kstat/link.rs @@ -348,7 +348,7 @@ mod tests { zone_name: ZONE_NAME.into(), }; let dl = SledDataLink::new(target, true); - let details = CollectionDetails::never(Duration::from_secs(1)); + let details = CollectionDetails::never(Duration::from_secs(1), 512); let id = sampler.add_target(dl, details).await.unwrap(); let samples: Vec<_> = sampler.produce().unwrap().collect(); assert!(samples.is_empty()); @@ -385,8 +385,7 @@ mod tests { #[tokio::test] async fn test_kstat_sampler_with_overflow() { let limit = 2; - let mut sampler = - KstatSampler::with_sample_limit(&test_logger(), limit).unwrap(); + let mut sampler = KstatSampler::new(&test_logger()).unwrap(); let link = TestEtherstub::new(); let target = SledDataLinkTarget { rack_id: RACK_ID, @@ -399,8 +398,8 @@ mod tests { zone_name: ZONE_NAME.into(), }; let dl = SledDataLink::new(target, true); - let details = CollectionDetails::never(Duration::from_secs(1)); - sampler.add_target(dl, details).await.unwrap(); + let details = CollectionDetails::never(Duration::from_secs(1), limit); + sampler.add_target(dl.clone(), details).await.unwrap(); let samples: Vec<_> = sampler.produce().unwrap().collect(); assert!(samples.is_empty()); @@ -448,6 +447,47 @@ mod tests { unreachable!(); }; assert_eq!(overflow.value(), expected_counts.overflow as u64); + + // Now, resize the target queue, and test again. + // + // This is nearly the same test as above, just with a new value of + // limit. We also handle slightly different overflow conditions. + tokio::time::resume(); + let limit = 4; + let details = CollectionDetails::never(Duration::from_secs(1), limit); + sampler.update_target(dl, details).await.unwrap(); + tokio::time::pause(); + let now = Instant::now(); + let old_expected_counts = expected_counts; + let expected_counts = loop { + tokio::time::advance(STEP_DURATION).await; + if now.elapsed() > MAX_DURATION { + panic!("Waited too long for samples"); + } + if let Some(counts) = sampler.sample_counts() { + break counts; + } + }; + let samples: Vec<_> = sampler.produce().unwrap().collect(); + let (link_samples, dropped_samples): (Vec<_>, Vec<_>) = samples + .iter() + .partition(|s| s.timeseries_name.contains("sled_data_link")); + println!("{link_samples:#?}"); + assert_eq!(link_samples.len(), limit); + assert_eq!( + link_samples.len(), + expected_counts.total - expected_counts.overflow + ); + println!("{dropped_samples:#?}"); + assert_eq!(dropped_samples.len(), 1); + let oximeter::Datum::CumulativeU64(overflow) = + dropped_samples[0].measurement.datum() + else { + unreachable!(); + }; + let all_overflow = + old_expected_counts.overflow + expected_counts.overflow; + assert_eq!(overflow.value(), all_overflow as u64); } #[tokio::test] @@ -471,7 +511,8 @@ mod tests { let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); - let details = CollectionDetails::duration(collection_interval, expiry); + let details = + CollectionDetails::duration(collection_interval, expiry, 512); let id = sampler.add_target(dl, details).await.unwrap(); info!(log, "target added"; "id" => ?id); assert!(matches!( @@ -531,7 +572,8 @@ mod tests { let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); - let details = CollectionDetails::duration(collection_interval, expiry); + let details = + CollectionDetails::duration(collection_interval, expiry, 512); let id = sampler.add_target(dl, details).await.unwrap(); info!(log, "target added"; "id" => ?id); assert!(matches!( @@ -583,7 +625,8 @@ mod tests { let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); - let details = CollectionDetails::duration(collection_interval, expiry); + let details = + CollectionDetails::duration(collection_interval, expiry, 512); let id = sampler.add_target(dl, details).await.unwrap(); info!(log, "target added"; "id" => ?id); assert!(matches!( @@ -633,7 +676,8 @@ mod tests { let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_secs(1); let expiry = Duration::from_secs(1); - let details = CollectionDetails::duration(collection_interval, expiry); + let details = + CollectionDetails::duration(collection_interval, expiry, 512); let id = sampler.add_target(dl, details).await.unwrap(); info!(log, "target added"; "id" => ?id); assert!(matches!( @@ -661,7 +705,7 @@ mod tests { #[tokio::test] async fn overflowing_self_stat_queue_does_not_block_sampler() { let log = test_logger(); - let mut sampler = KstatSampler::with_sample_limit(&log, 1).unwrap(); + let mut sampler = KstatSampler::new(&log).unwrap(); // We'll create an actual link, so that we can generate valid samples // and overflow the per-target queue. This will ensure we continually @@ -680,7 +724,7 @@ mod tests { }; let dl = SledDataLink::new(target, true); let collection_interval = Duration::from_millis(10); - let details = CollectionDetails::never(collection_interval); + let details = CollectionDetails::never(collection_interval, 1); let _id = sampler.add_target(dl, details).await.unwrap(); // Pause time long enough for the sampler to have produced a bunch of @@ -760,13 +804,13 @@ mod tests { }; let dl = SledDataLink::new(target.clone(), true); let collection_interval = Duration::from_millis(10); - let details = CollectionDetails::never(collection_interval); + let details = CollectionDetails::never(collection_interval, 512); let id = sampler.add_target(dl.clone(), details).await.unwrap(); // Update the target. let new_duration = Duration::from_millis(15); sampler - .update_target(dl, CollectionDetails::never(new_duration)) + .update_target(dl, CollectionDetails::never(new_duration, 512)) .await .unwrap(); @@ -799,7 +843,7 @@ mod tests { }; let dl = SledDataLink::new(target.clone(), true); let collection_interval = Duration::from_millis(100); - let details = CollectionDetails::never(collection_interval); + let details = CollectionDetails::never(collection_interval, 512); let id = sampler.add_target(dl.clone(), details).await.unwrap(); // And remove right away. diff --git a/oximeter/instruments/src/kstat/sampler.rs b/oximeter/instruments/src/kstat/sampler.rs index 32710e949b2..1480c8e5a9c 100644 --- a/oximeter/instruments/src/kstat/sampler.rs +++ b/oximeter/instruments/src/kstat/sampler.rs @@ -110,7 +110,7 @@ pub enum ExpirationBehavior { Duration(Duration), } -/// Details about the collection and expiration intervals for a target. +/// Details about how to collect from a single target. #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct CollectionDetails { /// The interval on which data from the target is collected. @@ -123,22 +123,43 @@ pub struct CollectionDetails { /// The latter can occur if the physical resource (such as a datalink) that /// underlies the kstat has disappeared, for example. pub expiration: ExpirationBehavior, + /// The number of samples to retain before dropping the oldest. + /// + /// This avoids memory issues when data is produced more rapidly than it's + /// collected. + pub n_max_samples: usize, } impl CollectionDetails { /// Return collection details with no expiration. - pub fn never(interval: Duration) -> Self { - Self { interval, expiration: ExpirationBehavior::Never } + pub fn never(interval: Duration, n_max_samples: usize) -> Self { + Self { interval, expiration: ExpirationBehavior::Never, n_max_samples } } /// Return collection details that expires after a number of attempts. - pub fn attempts(interval: Duration, count: usize) -> Self { - Self { interval, expiration: ExpirationBehavior::Attempts(count) } + pub fn attempts( + interval: Duration, + count: usize, + n_max_samples: usize, + ) -> Self { + Self { + interval, + expiration: ExpirationBehavior::Attempts(count), + n_max_samples, + } } /// Return collection details that expires after a duration has elapsed. - pub fn duration(interval: Duration, duration: Duration) -> Self { - Self { interval, expiration: ExpirationBehavior::Duration(duration) } + pub fn duration( + interval: Duration, + duration: Duration, + n_max_samples: usize, + ) -> Self { + Self { + interval, + expiration: ExpirationBehavior::Duration(duration), + n_max_samples, + } } } @@ -348,10 +369,6 @@ struct KstatSamplerWorker { /// Inbox channel on which the `KstatSampler` sends messages. inbox: mpsc::Receiver, - /// The maximum number of samples we allow in each per-target buffer, to - /// avoid huge allocations when we produce data faster than it's collected. - sample_limit: usize, - /// Outbound queue on which to publish self statistics, which are expected to /// be low-volume. self_stat_queue: broadcast::Sender, @@ -399,7 +416,6 @@ impl KstatSamplerWorker { inbox: mpsc::Receiver, self_stat_queue: broadcast::Sender, samples: Arc>>>, - sample_limit: usize, ) -> Result { let ctl = Some(Ctl::new().map_err(Error::Kstat)?); let self_stats = hostname().map(self_stats::SelfStats::new); @@ -410,7 +426,6 @@ impl KstatSamplerWorker { creation_times: BTreeMap::new(), samples, inbox, - sample_limit, self_stat_queue, self_stats, sample_timeouts: FuturesUnordered::new(), @@ -1152,31 +1167,36 @@ impl KstatSamplerWorker { }; let _ = self.targets.insert(id, SampledObject::Kstat(item)); - // Add to the per-target queues, making sure to keep any samples that - // were already there previously. This would be a bit odd, since it - // means that the target expired, but we hadn't been polled by oximeter. - // Nonetheless keep these samples anyway. - let n_samples = self - .samples - .lock() - .unwrap() - .entry(id) - .or_insert_with(|| BoundedQueue::new(self.sample_limit)) - .len(); - match n_samples { - 0 => debug!( - self.log, - "inserted empty per-target sample queue"; - "id" => ?id, - ), - n => debug!( - self.log, - "per-target queue appears to have old samples"; - "id" => ?id, - "n_samples" => n, - ), - } - + // Add to the per-target queues, possibly updating the _size_ of the + // queue if the caller requested it. Report any samples that were + // already there, and any we needed to drop to handle the new request. + let (n_samples_retained, n_samples_dropped) = + match self.samples.lock().unwrap().entry(id) { + Entry::Vacant(vacant) => { + vacant.insert(BoundedQueue::new(details.n_max_samples)); + (0, 0) + } + Entry::Occupied(mut occupied) => { + // Swap the queues and drain anything that was there into the + // new one. + let mut old_q = occupied + .insert(BoundedQueue::new(details.n_max_samples)); + let n_total_existing_samples = old_q.len(); + let n_dropped = + occupied.get_mut().extend(old_q.drain().into()); + let n_samples_retained = + n_total_existing_samples - n_dropped; + (n_samples_retained, n_dropped) + } + }; + debug!( + self.log, + "inserted per-target sample queue"; + "id" => ?id, + "n_samples_retained" => n_samples_retained, + "n_samples_dropped" => n_samples_dropped, + "n_max_samples" => details.n_max_samples, + ); Ok(id) } } @@ -1202,25 +1222,8 @@ pub struct KstatSampler { const SELF_STAT_QUEUE_SIZE: usize = 4096; impl KstatSampler { - /// The maximum number of samples allowed in the internal buffer, before - /// oldest samples are dropped. - /// - /// This is to avoid unbounded allocations in situations where data is - /// produced faster than it is collected. - /// - /// Note that this is a _per-target_ sample limit! - pub const DEFAULT_SAMPLE_LIMIT: usize = 500; - /// Create a new sampler. pub fn new(log: &Logger) -> Result { - Self::with_sample_limit(log, Self::DEFAULT_SAMPLE_LIMIT) - } - - /// Create a new sampler with a sample limit. - pub fn with_sample_limit( - log: &Logger, - limit: usize, - ) -> Result { let samples = Arc::new(Mutex::new(BTreeMap::new())); let (self_stat_tx, self_stat_rx) = broadcast::channel(SELF_STAT_QUEUE_SIZE); @@ -1230,7 +1233,6 @@ impl KstatSampler { inbox, self_stat_tx, samples.clone(), - limit, )?; #[cfg(all(test, target_os = "illumos"))] let (sample_count_rx, _worker_task) = { diff --git a/oximeter/instruments/src/kstat/zone.rs b/oximeter/instruments/src/kstat/zone.rs index 5c5abd83fe5..f66f33f12e9 100644 --- a/oximeter/instruments/src/kstat/zone.rs +++ b/oximeter/instruments/src/kstat/zone.rs @@ -28,6 +28,12 @@ const CPU_STATES: &[&str] = &["user", "sys", "waitrq"]; /// The prefix used for Omicron zone names. const ZONE_PREFIX: &str = "oxz_"; +/// The maximum cardinality of the data we produce, per sampling interval. +pub const fn max_cardinality() -> usize { + // Assume maximum of 256 CPUs in a zone. + CPU_STATES.len() * 256 +} + /// Parsed zone metadata from a zone name formatted as "oxz_TYPE_UUID". struct ZoneMetadata { zone_type: String, diff --git a/sled-agent/src/metrics.rs b/sled-agent/src/metrics.rs index 11b069f6780..c2c7766cd31 100644 --- a/sled-agent/src/metrics.rs +++ b/sled-agent/src/metrics.rs @@ -11,6 +11,7 @@ use omicron_common::api::internal::shared::SledIdentifiers; use omicron_uuid_kinds::GenericUuid; use oximeter_instruments::http::HttpService; use oximeter_instruments::http::LatencyTracker; +use oximeter_instruments::kstat; use oximeter_instruments::kstat::CollectionDetails; use oximeter_instruments::kstat::Error as KstatError; use oximeter_instruments::kstat::KstatSampler; @@ -49,10 +50,19 @@ const METRIC_COLLECTION_INTERVAL: Duration = Duration::from_secs(30); // now. const LINK_SAMPLE_INTERVAL: Duration = Duration::from_secs(10); +/// The maximum number of samples retained for links. +const N_MAX_LINK_SAMPLES: usize = 512; + const CPU_SAMPLE_INTERVAL: Duration = Duration::from_secs(10); +/// The maximum number of CPU usage samples. +const N_MAX_CPU_SAMPLES: usize = kstat::cpu::max_cardinality() * 10; + const ZONE_SAMPLE_INTERVAL: Duration = Duration::from_secs(10); +/// The maximum number of zone CPU usage samples. +const N_MAX_ZONE_CPU_SAMPLES: usize = kstat::zone::max_cardinality() * 10; + /// The interval after which we expire kstat-based collection of transient /// links. /// @@ -135,9 +145,10 @@ fn get_collection_details(kind: &str) -> CollectionDetails { CollectionDetails::duration( LINK_SAMPLE_INTERVAL, TRANSIENT_LINK_EXPIRATION_INTERVAL, + N_MAX_LINK_SAMPLES, ) } else { - CollectionDetails::never(LINK_SAMPLE_INTERVAL) + CollectionDetails::never(LINK_SAMPLE_INTERVAL, N_MAX_LINK_SAMPLES) } } @@ -409,7 +420,8 @@ async fn add_zone( // We have one target per sled that samples all zones, so there's no // need to expire it. - let details = CollectionDetails::never(ZONE_SAMPLE_INTERVAL); + let details = + CollectionDetails::never(ZONE_SAMPLE_INTERVAL, N_MAX_ZONE_CPU_SAMPLES); match kstat_sampler.add_target(zone.clone(), details).await { Ok(_id) => { debug!(log, "added zone metrics to kstat sampler"); @@ -436,7 +448,8 @@ async fn sync_zone( }; zone.time_synced = true; - let details = CollectionDetails::never(ZONE_SAMPLE_INTERVAL); + let details = + CollectionDetails::never(ZONE_SAMPLE_INTERVAL, N_MAX_ZONE_CPU_SAMPLES); match kstat_sampler.update_target(zone.clone(), details).await { Ok(_) => { debug!(log, "updated zone metrics after time sync"); @@ -475,7 +488,8 @@ async fn add_sled_cpu( // We have one target per sled that samples all CPUs, so there's no // need to expire it. - let details = CollectionDetails::never(CPU_SAMPLE_INTERVAL); + let details = + CollectionDetails::never(CPU_SAMPLE_INTERVAL, N_MAX_CPU_SAMPLES); match kstat_sampler.add_target(cpu.clone(), details).await { Ok(_id) => { debug!(log, "added CPU metrics to kstat sampler"); @@ -502,7 +516,8 @@ async fn sync_sled_cpu( }; cpu.time_synced = true; - let details = CollectionDetails::never(CPU_SAMPLE_INTERVAL); + let details = + CollectionDetails::never(CPU_SAMPLE_INTERVAL, N_MAX_CPU_SAMPLES); match kstat_sampler.update_target(cpu.clone(), details).await { Ok(_) => { debug!(log, "updated sled CPU metrics after time sync");