Monitoring Production Systems – Part II

In Part – I of the series, we talked about challenges faced by us in setting up monitoring system for our production stack which grew from a single EC2 instance to tiers of auto-scaling web servers, dedicated clusters of databases and other centralised systems. In this part we are going to discuss the monitoring system in detail, and particularly our application of TICK stack.

Why InfluxDB

Virtually any service will have a database at it’s heart; and so the choice of monitoring stack was dependent on choice of underlying database. A time series database was an obvious choice; and the other parameters being write/read performance, resource (cpu, memory, disk I/O and space) footprint and ease of provisioning & maintenance, since we have a very small devops/system team. There are many resources available comparing various TSDB databases (like this one); what attracted us the most about InfluxDB was low entry barrier. We evaluated few alternatives: we were already using ELK hence Elasticsearch was also an option. Although Elasticsearch can deliver high read/write performance, the performance comes at the cost of resource needs. Elasticsearch is designed for text-search, which doesn’t really match the nature of a time series. The underlying data-structure of InfluxDB reflects this difference: it uses LSM tree (rather TSM tree) for data storage.

InfluxDB won the race on many counts:

  • Easy interface. InfluxDB uses SQL-like query language which was much faster to learn as we already have deep knowledge of MySQL within the team.
  • Easy integration. InfluxDB exposed simple APIs over HTTP for writing and querying data; which besides from much shorter learning curve, makes it far easier to automate a lot of “reporting” tasks with simple bash scripts. For e.g., our deployment job in jenkins records deployment event in InfluxDB by simply making a curl request.
  • Continuous queries, or realtime streaming queries. This makes realtime alerting using Kapacitor possible; and it’s easy to keep “snapshots” of historical data. This is particularly helpful in reducing disk space and memory requirements.
  • Low resource consumption. We run influxDB on a r4.xlarge box which it shares with our ELK stack and a few more components; and with this simple setup it is easily able to perform ~1500 writes/sec during our peak traffic hours! InfluxDB also delivers high compression ratio on disk; with ~1500 writes/sec during peak hours with millisecond resolution and around 120 days retention policy, InfluxDB uses only couple of GBs on disk!

A Primer

Let’s take a quick look at Telegraf & InfluxDB first.
Telegraf employs a plugin based architecture where input plugins can perform measurements and then output plugin can relay it further to storage. Let’s quickly generate a sample telegraf configuration and start the server:

telegraf -sample-config -input-filter cpu:mem -output-filter influxdb > /etc/telegraf/telegraf.conf
service telegraf start

The [[input]] sections define entites to be instrumented: address of service, metrics to collect, etc. For [[output]] section; we’ll be mainly focused on InfluxDB. Now with above config, telegraf would start to collect CPU & memory metrics of your machine.
InfluxDB organizes time-series data in ‘measurements’ (conceptually equivalent to a table in an RDBMS) with timestamp being the primary key. Measurements can have ‘fields’: numerical values which would be used in aggregations functions (i.e. ‘average’ cpu utilization of a processor core); and ‘tags’ which would be used in grouping data (e.g. CPU utilization per node).

Now, let’s login to influx console using CLI:

$ influx -precision=rfc3339 -pretty=true
Visit https://enterprise.influxdata.com to register for updates, InfluxDB server management, and monitoring.
Connected to http://localhost:8086 version 1.1.1
InfluxDB shell version: 1.1.1

Telegraf by default uses ‘telegraf’ database. Let’s explore the database:
> use telegraf;
Using database telegraf
> show measurements;
name: measurements
------------------
name
cpu
disk
diskio
kernel
mem
processes
swap
system

All these measurements have been created by telegraf. If you are collecting the data from several nodes, you’ll use group/filter the data using ‘host’ tag. Let’s run a query to see cpu usage metrics:
> select time, usage_user from cpu group by host, cpu limit 4;
name: cpu
tags: cpu=cpu-total, host=redated-01
time usage_user
---- ----------
2017-09-05T00:00:00Z 19.63159222718702
2017-09-05T00:00:10Z 27.176381529906884
2017-09-05T00:00:20Z 24.43713635177763
2017-09-05T00:00:30Z 30.125944584854416
name: cpu
tags: cpu=cpu-total, host=redated-02
time usage_user
---- ----------
2017-09-05T02:32:20Z 15.435222672064773
2017-09-05T02:32:30Z 21.82320441988954
2017-09-05T02:32:40Z 22.21105527638187
2017-09-05T02:32:50Z 38.15361521140854

Congratulations, telegraf is successfully collecting data from your system!

Monitoring TravelTriangle Stack

Armed with the basic knowledge of telegraf and influx; let’s dive into how monitoring system works at TravelTriangle!

General System Metrics

We started with Cloudwatch, as any system hosted on AWS would; however there were couple of shortcomings:

  • Cloudwatch give CPU data at 5-min resolution unless you pay for detailed monitoring, and even then it’s still 1-min resolution.
  • Memory & disk stats are not available built-in.

Although you can provision agents like collectd yourself to collect memory/disk metrics & sub-minute CPU metrics and just use cloudwatch as storage – but that essentially makes it similar to Telegraf & InfluxDB.

At TravelTriangle, we are now using telegraf for collecting system metrics; quite like the example above. Couple of things to note here though:

  • Collecting metrics from entities like OS & hardware would require telegraf to be deployed on the server node itself. Telegraf doesn’t has an typical agent-server model, rather the same telegraf service acts as both – due to simple plugin based architecture.
  • You should collect only metric which you think you need. It is tempting to collect all data what a plugin is capable of; but be aware that subsequent increase in size of write packets may degrade InfluxDB’s performance.

And the outcome is – stunning visualizations like this one!
load_avg

Application Servers/Services Layer

Rails/Passenger Stack

A large portion of TravelTriangle is built on Ruby-on-Rails platform and is served using passenger/nginx server with nginx acting as load balancer. Passenger has a single threaded architecture; that means other requests have to wait in queue while one request is being processed. Thus, we found the most useful metric to be length of the queue; correlating queue length with resource utilization (cpu primarily) can directly tell you if your servers are over-worked, or they are simply waiting on IO (db requests, etc). Passenger reports this metric via (rvm)sudo passenger-status --show=xml; we wrote a simple script to parse this output and feed the result to influxdb directly:

from influxdb import InfluxDBClient
...
ret = commands.getoutput("rvmsudo passenger-status --show=xml")
cpu_usage = ...
data_point = {
'measurement': 'pql',
'tags': { 'host': socket.gethostname() },
'time': time.strftime('%Y-%m-%dT%H:%M:%SZ'),
'fields': { 'value': cpu_usage }
}
InfluxDBClient('influx.host', 8086, 'influx_user', 'password_redated', 'metrics').write_points([data_point])

Now, on one of our better days, this graph looks like this:

pql

Telegraf also ships with a built-in passenger plugin for the same purpose; however if you are using rvm, you’ll need to make rvm environment available to telegraf user which can be even more trickier if you rvm installation is at user level. We have started migrating towards an rvm-free setup in production environment; but do let us know if you are able to make it work with rvm!

For nginx, telegraf can collect metrics reported by ngx_http_stub_status_module module using nginx plugin.

Varnish

Telegraf ships with bundled varnish plugin; just enabling it in telegraf.conf does the trick. Note that telegraf need to be deployed on the same server node as varnish.
# # A plugin to collect stats from Varnish HTTP Cache
[[inputs.varnish]]
## The default location of the varnishstat binary can be overridden with:
binary = "/usr/bin/varnishstat"
## Glob matching can be used, ie, stats = ["MAIN.*"]
## stats may also be set to ["*"], which will collect all stats
stats = ["MAIN.cache_hit", "MAIN.cache_miss", "MAIN.uptime"]

Node.JS Stack

We couldn’t find a built-in solution to monitor Node.JS servers with InfluxDB & Grafana; however we considering an approach like this to start with; or use PM2 APIs to put together a script… let us know if you have a better idea!

Sidekiq (async processing)

Like many RoR stacks, we rely heavily on Sidekiq for async job needs. As par Sidekiq documentation, a single sidekiq server must not process more than a “few” queues and no more than 50 threads. We process millions of jobs per day, distributed across tens of queues for better isolation and control and a single Sidekiq server can’t scale to this extent. As a result we ended up writing own deployment scripts to start multiple Sidekiq clusters with different queue sets. Now Sidekiq provides a nice web UI for monitoring; however, at this scale we needed something more. We were faced with a few constraints:

  • There is no plugin available for measuring queue latency, workers per queue, etc.
  • Sidekiq provides APIs for fetching queue latency, but we needed more instrumentation around performance of our code – slowest jobs, etc.
  • Using queue length/latency metrics, we would further build our autoscaling system to dynamically scale up/down sidekiq cluster

For collecting queue & worker measurements, which are directly available from Sidekiq APIs, we simply wrote a ruby code:
def sidekiq_measurement
m = []
s = 0
l = 0
wq = Hash.new(0)
Sidekiq::Workers.new.each { |_, _, w| wq[w['queue']] += 1 }
queues = Sidekiq::Queue.all
queues.each do |q|
m << {
series: 'sidekiq_queues',
values: { latency: (q.latency * 1000).to_i, workers: wq[q.name], length: q.size },
tags: { name: q.name },
timestamp: (Time.now.to_f * 1000).to_i
}
s += q.size
l += q.latency
end
[Sidekiq::RetrySet.new, Sidekiq::ScheduledSet.new, Sidekiq::DeadSet.new].each do |s|
m << {
series: 'sidekiq_queues',
values: { length: s.size },
tags: { name: s.name },
timestamp: (Time.now.to_f * 1000).to_i
}
end
influx_client.write_points(m)
end

You would need to initialize Sidekiq & Influx clients with your production configuration before using this snippet; and you’ll need to wrap this into a daemon process.
Now, for collecting job performance data, our first though was to write a worker with instrumentation baked in; but we had Model.delay.<method> pattern spread out throughout our codebases. We did what any ruby developer with NIH syndrome would do – we patched Sidekiq’s default worker:
module Sidekiq
module Extensions
class DelayedClass
def perform(yml)
(target, method_name, args) = YAML.load(yml)
Monitoring.with_benchmark('sidekiq', { 'target' => target.to_s, 'action' => method_name.to_s }) {
target.__send__(method_name, *args)
}
end
end
end
end

You’ll need to duplicate this for DelayedModel and DelayedMailer as well. The Monitoring.with_benchmark method looks like this:
def self.with_benchmark(measurment, tags = {}, fields = {}, time_components = false)
res = nil
bm = Benchmark.measure {
res = yield
}
tags = tags.call(res) unless tags.kind_of?(Hash)
fields = fields.call(res) unless fields.kind_of?(Hash)
t = time_components ? { 'user_time' => bm.utime * 1000, 'system_time' => bm.stime * 1000, 'total_time' => bm.total * 1000 } : {}
t['real_time'] = bm.real * 1000
self.push_to_influx(
measurment,
tags,
fields.merge(t)
)
res
end

As a result, this is how beautiful our Sidekiq dashboard looks:
sidekiq

Data Storage Layer

Elasticsearch

Telegraf ships with bundled elasticsearch plugin which internally uses _cat, _nodes, etc APIs for collecting various measurements. For us with only basic understanding of elasticsearch internals, the sheer volume of measurements was overwhelming and putting everything on a visually interpretable dashboard seemed like a challenge. Fortunately, there are community dashboards available which saved us the effort of creating them. The dashboard are available as JSON downloads which can be directly imported into Grafana, and recent versions add the ability to directly import from grafana.com. We found one such dashboard which contained plots for most of telegraf plugins.
One measurement was missing though – response latency; which was something we wanted above all. Now, since only our Ruby backend makes elasticsearch API calls; we, again, ended up patching elasticsearch-api gem to inject our instrumentation. This measurement would be from a client’s view, but within same VPC network latency is practically predictable. Take a look at lib/elasticsearch/api/namespace/common.rb file in elasticsearch-api gem:
def perform_request(method, path, params={}, body=nil)
client.perform_request method, path, params, body
end

Simply wrapping this in Monitoring.with_benchmark did the trick! Moreover, you can generate tags from method and path to get average latency per-index and per-operation (GET, POST) basis. Alternatively, you could patch lib/elasticsearch/api/actions/bulk.rb and other actions.

Redis

Redis input plugin would do the job; however, server latency measurement was missing. Ruby daemon to the rescue!
m = []
t = (Time.now.to_f * 1000).to_i
redis_hosts.each do |h|
inst = Redis.new(host: h)
begin
tm = (Benchmark.measure {
10.times { inst.ping } #Avg of 10 requests
}.real/10).round(5)
rescue Redis::BaseError => e
log "#{h}: error in ping: #{e.message}"
next
end
m << {
series: 'redis_latency',
values: { latency: tm },
tags: { host: h },
timestamp: t
}
end
influx_client.write_points(m)

MySQL

Telegraf provides a plugin for MySQL; however since we are using RDS as our database service; we had a lot of metrics already available and simple scripting made it easy to collate & parse slow query logs. We haven’t explored the plugin so far, but it seems to promise a lot – we’ll share our experience once we use it!

Learnings

We started using InfluxDB just over a year ago. Our traffic has grown more than ~5x during this time with writes on InfluxDB going upto ~1500/sec. Within this time period, we had just a couple of outages in monitoring system, and few noteworthy lessons from those incidences:

  1. Like any database, InfluxDB uses indexes for faster searching which are generally kept in memory for faster lookup. Unique combinations of tags in a measurement constitute a series (index). Taking the example of cpu measurement and assuming we have two tags host & cpu, and we are pushing data from 2 machines with 8 cores each. This will create a total of 2×8 = 16 series for the purpose of indexing! As the number of unique combinations grow, no of series will grow geometrically which can make InfluxDB consume high amount of memory and degrade it’s performance. While this isn’t an outright problem (we have over 500 series in our sidekiq job performance measurement), you do need to be careful in adding tags: for e.g. you don’t need to add ‘hostname’ as a tag in every measurement. We learned this the hard way – we started to push user_id and user_agent as tags in our visitors measurement which ended up creating tens of millions of series. We were using an even smaller t2.large box at that time, and InfluxDB started to perform frequent data compression on disk resulting in high I/O. We didn’t experience full-fledged outage, however query performance degraded substantially.
  2. When we started to use telegraf, we enabled every possible metrics in input plugins which made size of telegraf requests extremely large resulting in 5xx errors on both read & write queries. We stopped writing to InfluxDB temporarily, which was simple enough for us to do by rewriting internal record set in Route53, and then tweaking telegraf configuration. Interestingly, InfluxDB managed to retain the writes – so we did’t experience any data loss during the outage, however you’ll need to resist the temptation of (force) restarting InfluxDB server!
  3. InfluxDB seems to partition storage on the basis of retention policies, in the way that you need to specify policy name along with measurement name in your queries, etc. – for all measurement not in ‘default’ policy. You need to design your schema keeping retention policy in mind as changing policy for a measurement will require changing all the continuous queries, Kapacitor alerts, etc involving that measurement. We keep only 120 days of data with millisecond resolution, we use continuous queries to create daily snapshots with 1 day resolution into a separate database with 365 days default retention policy. This enables us to do historical comparisons and analysis, etc! Creating a balance between retention and resolution will reduce resource needs and contribute to a stable & robust monitoring system.

This concludes part II of the series. In next part we’ll take a deep dive into other two remaining components of our monitoring stacks – Kapacitor & Grafana.

Comments