Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion oximeter/instruments/src/kstat/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ const CPU_NSEC_PREFIX: &str = "cpu_nsec_";
/// These correspond to the `cpu_nsec_*` fields in the `cpu::sys` kstat.
const CPU_MICROSTATES: &[&str] = &["idle", "user", "kernel", "dtrace", "intr"];

/// The maximum cardinality of the data we produce, per sampling interval.
pub const fn max_cardinality() -> usize {
// Assume max of 256 CPUs
CPU_MICROSTATES.len() * 256
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to exceed 256 CPUs with dense Turin, right? Should we bump this even high in anticipation of that?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes possibly. I debated making this method detect the actual number of cores. It's just a little more annoying.

}

/// CPU metrics for a sled, tracking microstate statistics across all cores.
#[derive(Clone, Debug)]
pub struct SledCpu {
Expand Down Expand Up @@ -244,7 +250,7 @@ mod tests {
sled_revision: SLED_REVISION,
};
let cpu = SledCpu::new(target, true);
let details = CollectionDetails::never(Duration::from_secs(1));
let details = CollectionDetails::never(Duration::from_secs(1), 512);
let id = sampler.add_target(cpu, details).await.unwrap();
let samples: Vec<_> = sampler.produce().unwrap().collect();
assert!(samples.is_empty());
Expand Down
72 changes: 58 additions & 14 deletions oximeter/instruments/src/kstat/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ mod tests {
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let details = CollectionDetails::never(Duration::from_secs(1));
let details = CollectionDetails::never(Duration::from_secs(1), 512);
let id = sampler.add_target(dl, details).await.unwrap();
let samples: Vec<_> = sampler.produce().unwrap().collect();
assert!(samples.is_empty());
Expand Down Expand Up @@ -385,8 +385,7 @@ mod tests {
#[tokio::test]
async fn test_kstat_sampler_with_overflow() {
let limit = 2;
let mut sampler =
KstatSampler::with_sample_limit(&test_logger(), limit).unwrap();
let mut sampler = KstatSampler::new(&test_logger()).unwrap();
let link = TestEtherstub::new();
let target = SledDataLinkTarget {
rack_id: RACK_ID,
Expand All @@ -399,8 +398,8 @@ mod tests {
zone_name: ZONE_NAME.into(),
};
let dl = SledDataLink::new(target, true);
let details = CollectionDetails::never(Duration::from_secs(1));
sampler.add_target(dl, details).await.unwrap();
let details = CollectionDetails::never(Duration::from_secs(1), limit);
sampler.add_target(dl.clone(), details).await.unwrap();
let samples: Vec<_> = sampler.produce().unwrap().collect();
assert!(samples.is_empty());

Expand Down Expand Up @@ -448,6 +447,47 @@ mod tests {
unreachable!();
};
assert_eq!(overflow.value(), expected_counts.overflow as u64);

// Now, resize the target queue, and test again.
//
// This is nearly the same test as above, just with a new value of
// limit. We also handle slightly different overflow conditions.
tokio::time::resume();
let limit = 4;
let details = CollectionDetails::never(Duration::from_secs(1), limit);
sampler.update_target(dl, details).await.unwrap();
tokio::time::pause();
let now = Instant::now();
let old_expected_counts = expected_counts;
let expected_counts = loop {
tokio::time::advance(STEP_DURATION).await;
if now.elapsed() > MAX_DURATION {
panic!("Waited too long for samples");
}
if let Some(counts) = sampler.sample_counts() {
break counts;
}
};
let samples: Vec<_> = sampler.produce().unwrap().collect();
let (link_samples, dropped_samples): (Vec<_>, Vec<_>) = samples
.iter()
.partition(|s| s.timeseries_name.contains("sled_data_link"));
println!("{link_samples:#?}");
assert_eq!(link_samples.len(), limit);
assert_eq!(
link_samples.len(),
expected_counts.total - expected_counts.overflow
);
println!("{dropped_samples:#?}");
assert_eq!(dropped_samples.len(), 1);
let oximeter::Datum::CumulativeU64(overflow) =
dropped_samples[0].measurement.datum()
else {
unreachable!();
};
let all_overflow =
old_expected_counts.overflow + expected_counts.overflow;
assert_eq!(overflow.value(), all_overflow as u64);
}

#[tokio::test]
Expand All @@ -471,7 +511,8 @@ mod tests {
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
let details =
CollectionDetails::duration(collection_interval, expiry, 512);
let id = sampler.add_target(dl, details).await.unwrap();
info!(log, "target added"; "id" => ?id);
assert!(matches!(
Expand Down Expand Up @@ -531,7 +572,8 @@ mod tests {
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
let details =
CollectionDetails::duration(collection_interval, expiry, 512);
let id = sampler.add_target(dl, details).await.unwrap();
info!(log, "target added"; "id" => ?id);
assert!(matches!(
Expand Down Expand Up @@ -583,7 +625,8 @@ mod tests {
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
let details =
CollectionDetails::duration(collection_interval, expiry, 512);
let id = sampler.add_target(dl, details).await.unwrap();
info!(log, "target added"; "id" => ?id);
assert!(matches!(
Expand Down Expand Up @@ -633,7 +676,8 @@ mod tests {
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_secs(1);
let expiry = Duration::from_secs(1);
let details = CollectionDetails::duration(collection_interval, expiry);
let details =
CollectionDetails::duration(collection_interval, expiry, 512);
let id = sampler.add_target(dl, details).await.unwrap();
info!(log, "target added"; "id" => ?id);
assert!(matches!(
Expand Down Expand Up @@ -661,7 +705,7 @@ mod tests {
#[tokio::test]
async fn overflowing_self_stat_queue_does_not_block_sampler() {
let log = test_logger();
let mut sampler = KstatSampler::with_sample_limit(&log, 1).unwrap();
let mut sampler = KstatSampler::new(&log).unwrap();

// We'll create an actual link, so that we can generate valid samples
// and overflow the per-target queue. This will ensure we continually
Expand All @@ -680,7 +724,7 @@ mod tests {
};
let dl = SledDataLink::new(target, true);
let collection_interval = Duration::from_millis(10);
let details = CollectionDetails::never(collection_interval);
let details = CollectionDetails::never(collection_interval, 1);
let _id = sampler.add_target(dl, details).await.unwrap();

// Pause time long enough for the sampler to have produced a bunch of
Expand Down Expand Up @@ -760,13 +804,13 @@ mod tests {
};
let dl = SledDataLink::new(target.clone(), true);
let collection_interval = Duration::from_millis(10);
let details = CollectionDetails::never(collection_interval);
let details = CollectionDetails::never(collection_interval, 512);
let id = sampler.add_target(dl.clone(), details).await.unwrap();

// Update the target.
let new_duration = Duration::from_millis(15);
sampler
.update_target(dl, CollectionDetails::never(new_duration))
.update_target(dl, CollectionDetails::never(new_duration, 512))
.await
.unwrap();

Expand Down Expand Up @@ -799,7 +843,7 @@ mod tests {
};
let dl = SledDataLink::new(target.clone(), true);
let collection_interval = Duration::from_millis(100);
let details = CollectionDetails::never(collection_interval);
let details = CollectionDetails::never(collection_interval, 512);
let id = sampler.add_target(dl.clone(), details).await.unwrap();

// And remove right away.
Expand Down
114 changes: 58 additions & 56 deletions oximeter/instruments/src/kstat/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub enum ExpirationBehavior {
Duration(Duration),
}

/// Details about the collection and expiration intervals for a target.
/// Details about how to collect from a single target.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct CollectionDetails {
/// The interval on which data from the target is collected.
Expand All @@ -123,22 +123,43 @@ pub struct CollectionDetails {
/// The latter can occur if the physical resource (such as a datalink) that
/// underlies the kstat has disappeared, for example.
pub expiration: ExpirationBehavior,
/// The number of samples to retain before dropping the oldest.
///
/// This avoids memory issues when data is produced more rapidly than it's
/// collected.
pub n_max_samples: usize,
}

impl CollectionDetails {
/// Return collection details with no expiration.
pub fn never(interval: Duration) -> Self {
Self { interval, expiration: ExpirationBehavior::Never }
pub fn never(interval: Duration, n_max_samples: usize) -> Self {
Self { interval, expiration: ExpirationBehavior::Never, n_max_samples }
}

/// Return collection details that expires after a number of attempts.
pub fn attempts(interval: Duration, count: usize) -> Self {
Self { interval, expiration: ExpirationBehavior::Attempts(count) }
pub fn attempts(
interval: Duration,
count: usize,
n_max_samples: usize,
) -> Self {
Self {
interval,
expiration: ExpirationBehavior::Attempts(count),
n_max_samples,
}
}

/// Return collection details that expires after a duration has elapsed.
pub fn duration(interval: Duration, duration: Duration) -> Self {
Self { interval, expiration: ExpirationBehavior::Duration(duration) }
pub fn duration(
interval: Duration,
duration: Duration,
n_max_samples: usize,
) -> Self {
Self {
interval,
expiration: ExpirationBehavior::Duration(duration),
n_max_samples,
}
}
}

Expand Down Expand Up @@ -348,10 +369,6 @@ struct KstatSamplerWorker {
/// Inbox channel on which the `KstatSampler` sends messages.
inbox: mpsc::Receiver<Request>,

/// The maximum number of samples we allow in each per-target buffer, to
/// avoid huge allocations when we produce data faster than it's collected.
sample_limit: usize,

/// Outbound queue on which to publish self statistics, which are expected to
/// be low-volume.
self_stat_queue: broadcast::Sender<Sample>,
Expand Down Expand Up @@ -399,7 +416,6 @@ impl KstatSamplerWorker {
inbox: mpsc::Receiver<Request>,
self_stat_queue: broadcast::Sender<Sample>,
samples: Arc<Mutex<BTreeMap<TargetId, BoundedQueue<Sample>>>>,
sample_limit: usize,
) -> Result<Self, Error> {
let ctl = Some(Ctl::new().map_err(Error::Kstat)?);
let self_stats = hostname().map(self_stats::SelfStats::new);
Expand All @@ -410,7 +426,6 @@ impl KstatSamplerWorker {
creation_times: BTreeMap::new(),
samples,
inbox,
sample_limit,
self_stat_queue,
self_stats,
sample_timeouts: FuturesUnordered::new(),
Expand Down Expand Up @@ -1152,31 +1167,36 @@ impl KstatSamplerWorker {
};
let _ = self.targets.insert(id, SampledObject::Kstat(item));

// Add to the per-target queues, making sure to keep any samples that
// were already there previously. This would be a bit odd, since it
// means that the target expired, but we hadn't been polled by oximeter.
// Nonetheless keep these samples anyway.
let n_samples = self
.samples
.lock()
.unwrap()
.entry(id)
.or_insert_with(|| BoundedQueue::new(self.sample_limit))
.len();
match n_samples {
0 => debug!(
self.log,
"inserted empty per-target sample queue";
"id" => ?id,
),
n => debug!(
self.log,
"per-target queue appears to have old samples";
"id" => ?id,
"n_samples" => n,
),
}

// Add to the per-target queues, possibly updating the _size_ of the
// queue if the caller requested it. Report any samples that were
// already there, and any we needed to drop to handle the new request.
let (n_samples_retained, n_samples_dropped) =
match self.samples.lock().unwrap().entry(id) {
Entry::Vacant(vacant) => {
vacant.insert(BoundedQueue::new(details.n_max_samples));
(0, 0)
}
Entry::Occupied(mut occupied) => {
// Swap the queues and drain anything that was there into the
// new one.
let mut old_q = occupied
.insert(BoundedQueue::new(details.n_max_samples));
let n_total_existing_samples = old_q.len();
let n_dropped =
occupied.get_mut().extend(old_q.drain().into());
let n_samples_retained =
n_total_existing_samples - n_dropped;
(n_samples_retained, n_dropped)
}
};
debug!(
self.log,
"inserted per-target sample queue";
"id" => ?id,
"n_samples_retained" => n_samples_retained,
"n_samples_dropped" => n_samples_dropped,
"n_max_samples" => details.n_max_samples,
);
Ok(id)
}
}
Expand All @@ -1202,25 +1222,8 @@ pub struct KstatSampler {
const SELF_STAT_QUEUE_SIZE: usize = 4096;

impl KstatSampler {
/// The maximum number of samples allowed in the internal buffer, before
/// oldest samples are dropped.
///
/// This is to avoid unbounded allocations in situations where data is
/// produced faster than it is collected.
///
/// Note that this is a _per-target_ sample limit!
pub const DEFAULT_SAMPLE_LIMIT: usize = 500;

/// Create a new sampler.
pub fn new(log: &Logger) -> Result<Self, Error> {
Self::with_sample_limit(log, Self::DEFAULT_SAMPLE_LIMIT)
}

/// Create a new sampler with a sample limit.
pub fn with_sample_limit(
log: &Logger,
limit: usize,
) -> Result<Self, Error> {
let samples = Arc::new(Mutex::new(BTreeMap::new()));
let (self_stat_tx, self_stat_rx) =
broadcast::channel(SELF_STAT_QUEUE_SIZE);
Expand All @@ -1230,7 +1233,6 @@ impl KstatSampler {
inbox,
self_stat_tx,
samples.clone(),
limit,
)?;
#[cfg(all(test, target_os = "illumos"))]
let (sample_count_rx, _worker_task) = {
Expand Down
6 changes: 6 additions & 0 deletions oximeter/instruments/src/kstat/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ const CPU_STATES: &[&str] = &["user", "sys", "waitrq"];
/// The prefix used for Omicron zone names.
const ZONE_PREFIX: &str = "oxz_";

/// The maximum cardinality of the data we produce, per sampling interval.
pub const fn max_cardinality() -> usize {
// Assume maximum of 256 CPUs in a zone.
CPU_STATES.len() * 256
}

/// Parsed zone metadata from a zone name formatted as "oxz_TYPE_UUID".
struct ZoneMetadata {
zone_type: String,
Expand Down
Loading
Loading