Encheapening Cernan Internal Metrics

In the new 0.7.x series of cernan we stumbled on a neat, cheap approach for making internal metrics available inside a rust codebase, an approach that has legs in other projects, I'd say. This is going to be a quick note describing what cernan is, what we were doing before and how our current approach works.

What is Cernan?

Cernan is a telemetry and logging shipping daemon I work on at Postmates. The idea with cernan is it's supposed to pick up the slack that Mozilla left when heka became depreciated, extending the utility. Cernan intentionally blurs the lines between 'logs' – unstructured text – and 'telemetry' – time series points – so that devs can emit data and ops folks can come along behind and build an understanding of running software systems. It's my understanding that some folks go real deep on building chains of lua programmable filters to do log analysis in real-time, build aggregations of time series and that others treat cernan like an over-engineered statsd server. Both are valid and I'm happy to see that the multiplexing / manipulation we needed inside Postmates is useful to other organizations.

For me, the architect of cernan, my view of cernan's a little different. What I see is this: a router with directed non-cyclic topology between nodes – mapped one to one on operating system threads – and overload protection along the routes in the form of disk buffering. Sources are nodes that produce Events, Filters manipulate Events and Sinks terminate Events, either by shipping them to some remote system, printing them to stdout or otherwise. Cernan makes the promise of being both memory efficient and tender toward your CPU. I'm always on the lookout to optimize cernan along these lines.

Self-Telemetry

Way back in PR #235 I introduced into cernan the ability to "self-telemeter". Self-telemetry serves the same purpose as telemetry sent by applications: to give devs and ops folks insight into the running software system, in this case cernan its own self. Now, logging had been a feature of cernan since its early, unstable days but we couldn't bring cernan's ability to convert logs into an appropriate Event for a telemetry sink to bear because of feedback concerns. PR #235 added a new Source to cernan, one which was only accessible to cernan code. You give that Source a name, a value and some metadata tags and you've got a Event::Telemetry. Internal is routable just like every other Source and disables itself when there's nothing to route to. For the most part the names that Internal receives are known statically at compilation time, baked right into the source code. The first cut of Internal required all names to be String because some self-telemetry is dynamic. Telemetry originating in the File sources, for instance, include the filenames cernan is polling. That means allocations. Every time a self-telemetry is created the name has to be allocated, all the metadata tags and whatever aggregation structure is appropriate.

Which, I mean, that's fine but I wasn't pleased.

Encheapened Self-Telemetry

There's two classes of self-telemetry, those with a static name and those without. While tracking down a memory leak in hopper it occurred to me that the technique we use to keep track of "now" in cernan would be useful for statically named self-telemetry too. Here's what "now" looks like in cernan:

lazy_static! {  
    static ref NOW: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(Utc::now().timestamp() as usize));
}

The way dependent code reads this value is through time::now. It's a common enough trick where hardware concurrency primitives are available: every so often store the value of some OS time lookup in an atomic number, read the value without locking or serialization in other threads. Saves on syscalls.

If you squint at it just right time::NOW is a time series with a static name. Now, take a look at this diff:

diff --git a/src/sink/wavefront.rs b/src/sink/wavefront.rs  
index 631c4d4..c41f6cc 100644  
--- a/src/sink/wavefront.rs
+++ b/src/sink/wavefront.rs
@@ -1,7 +1,8 @@
+//! Wavefront is a proprietary aggregation and alerting product
+
 use buckets;
 use metric::{AggregationMethod, LogLine, TagMap, Telemetry};
 use sink::{Sink, Valve};
-use source::report_telemetry;
 use std::cmp;
 use std::collections::{HashMap, HashSet};
 use std::io::Write as IoWrite;
@@ -10,8 +11,29 @@ use std::net::TcpStream;
 use std::net::ToSocketAddrs;
 use std::string;
 use std::sync;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
 use time;

+lazy_static! {
+    /// Total histograms emitted
+    pub static ref WAVEFRONT_AGGR_HISTO: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+    /// Total sums emitted
+    pub static ref WAVEFRONT_AGGR_SUM: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+    /// Total sets emitted
+    pub static ref WAVEFRONT_AGGR_SET: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+    /// Total summarize emitted
+    pub static ref WAVEFRONT_AGGR_SUMMARIZE: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+    /// Total percentiles for summarize emitted
+    pub static ref WAVEFRONT_AGGR_TOT_PERCENT: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+    /// Total delivery attempts made
+    pub static ref WAVEFRONT_DELIVERY_ATTEMPTS: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+    /// Total valve closed
+    pub static ref WAVEFRONT_VALVE_CLOSED: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+    /// Total valve open
+    pub static ref WAVEFRONT_VALVE_OPEN: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
+}
+
 /// The `wavefront` sink emits into [Wavefront](http://wavefront.com), a
 /// proprietary metric aggregation and alerting product.
 pub struct Wavefront {
@@ -386,28 +408,16 @@ impl Wavefront {
                     }

                     match value.kind() {
-                        AggregationMethod::Histogram => report_telemetry(
-                            "cernan.sinks.wavefront.aggregation.histogram",
-                            1.0,
-                        ),
-                        AggregationMethod::Sum => report_telemetry(
-                            "cernan.sinks.wavefront.aggregation.sum",
-                            1.0,
-                        ),
-                        AggregationMethod::Set => report_telemetry(
-                            "cernan.sinks.wavefront.aggregation.set",
-                            1.0,
-                        ),
+                        AggregationMethod::Histogram => {
+                            WAVEFRONT_AGGR_HISTO.fetch_add(1, Ordering::Relaxed)
+                        }
+                        AggregationMethod::Sum | AggregationMethod::Set => {
+                            WAVEFRONT_AGGR_SUM.fetch_add(1, Ordering::Relaxed)
+                        }
                         AggregationMethod::Summarize => {
-                            report_telemetry(
-                                "cernan.sinks.wavefront.aggregation.summarize",
-                                1.0,
-                            );
-                            report_telemetry(
-                                "cernan.sinks.wavefront.aggregation.\
-                                 summarize.total_percentiles",
-                                self.percentiles.len() as f64,
-                            );
+                            WAVEFRONT_AGGR_SUMMARIZE.fetch_add(1, Ordering::Relaxed);
+                            WAVEFRONT_AGGR_TOT_PERCENT
+                                .fetch_add(self.percentiles.len(), Ordering::Relaxed)
                         }
                     };

@@ -522,10 +532,6 @@ impl Sink for Wavefront {
     fn flush(&mut self) {
         self.format_stats();
         loop {
-            report_telemetry(
-                "cernan.sinks.wavefront.delivery_attempts",
-                self.delivery_attempts as f64,
-            );
             if self.delivery_attempts > 0 {
                 debug!("delivery attempts: {}", self.delivery_attempts);
             }
@@ -539,6 +545,7 @@ impl Sink for Wavefront {
                     self.flush_number += 1;
                     return;
                 } else {
+                    WAVEFRONT_DELIVERY_ATTEMPTS.fetch_add(1, Ordering::Relaxed);
                     self.delivery_attempts = self.delivery_attempts.saturating_add(1);
                     delivery_failure = true;
                 }
@@ -563,8 +570,10 @@ impl Sink for Wavefront {

     fn valve_state(&self) -> Valve {
         if self.aggrs.len() > 10_000 {
+            WAVEFRONT_VALVE_CLOSED.fetch_add(1, Ordering::Relaxed);
             Valve::Closed
         } else {
+            WAVEFRONT_VALVE_OPEN.fetch_add(1, Ordering::Relaxed);
             Valve::Open
         }
     }

All those calls to source::report_telemetry disappear – and with them, their allocations – in favor of fiddling with the new lazily static Arc<AtomicUsize> at the top of the module. It's okay for the wavefront Sink if counts are a little off and so we fetch_add with Ordering::Relaxed. If this is unfamiliar stuff for you, the Rust docs on Ordering are, as usual, real good, as well as the LLVM docs the Rust docs reference. The Internal source had to change to consume and reset these values, which is, uh, a little on the chatty side. But! The good news is we've avoided several thousand small allocations a second, have increased the documentation of cernan – if only in the generated rustdocs – and opened an avenue for tests to inspect self-telemetry without doing something especially goofy.

Neat trick.