Class: Batsd::Handler::Timer
- Inherits:
-
Batsd::Handler
- Object
- Batsd::Handler
- Batsd::Handler::Timer
- Defined in:
- lib/batsd/handlers/timer.rb
Overview
Handles timer measurements ("|c")
Timer measurements are aggregated in various ways across aggregation intervals
Instance Method Summary (collapse)
-
- (Object) flush
Flush timers to redis and disk.
-
- (Object) handle(key, value, sample_rate)
Handle the key, value, and sample rate for a timer.
-
- (Timer) initialize(options)
constructor
Set up a new handler to handle timers.
Methods inherited from Batsd::Handler
Constructor Details
- (Timer) initialize(options)
Set up a new handler to handle timers
-
Set up a redis client
-
Set up a diskstore client to write aggregates to disk
-
Initialize last flush timers to now
16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/batsd/handlers/timer.rb', line 16 def initialize() @redis = Batsd::Redis.new() @diskstore = Batsd::Diskstore.new([:root]) @retentions = [:retentions].keys @flush_interval = @retentions.first @active_timers = {} @timers = {} now = Time.now.to_i @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } super end |
Instance Method Details
- (Object) flush
Flush timers to redis and disk.
1) At every flush interval, flush to redis and clear active timers. Also
store raw values for usage later.
2) If time since last disk write for a given aggregation, flush to disk. 3) If flushing the terminal aggregation, flush the set of datapoints to
Redis and reset that tracking in process.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/batsd/handlers/timer.rb', line 49 def flush puts "Current threadpool queue for timers: #{@threadpool.size}" if ENV["VVERBOSE"] # Flushing is usually very fast, but always fix it so that the # entire thing is based on a constant start time # Saves on time syscalls too flush_start = Time.now.to_i n = @active_timers.size t = Benchmark.measure do ts = (flush_start - flush_start % @flush_interval) timers = @active_timers.dup @active_timers = {} timers.each_slice(50) do |keys| @threadpool.queue ts, keys do |, keys| keys.each do |key, values| puts "Storing #{values.size} values to redis for #{key} at #{timestamp}" if ENV["VVERBOSE"] # Store all the aggregates for the flush interval level count = values.count @redis.store_timer , "#{key}:mean", values.mean @redis.store_timer , "#{key}:count", count @redis.store_timer , "#{key}:min", values.min @redis.store_timer , "#{key}:max", values.max @redis.store_timer , "#{key}:upper_90", values.percentile_90 if count > 1 @redis.store_timer , "#{key}:stddev", values.standard_dev end @redis.store_raw_timers_for_aggregations key, values end end end end puts "Flushed #{n} timers in #{t.real} seconds" if ENV["VERBOSE"] # If it's time for the latter aggregation to be written to disk, queue # those up @retentions.each_with_index do |retention, index| # First retention is always just flushed to redis on the flush interval next if index.zero? # Only if we're in need of a write to disk - if the next flush will be # past the threshold if (flush_start + @flush_interval) > @last_flushes[retention] + retention.to_i puts "Starting disk writing for timers@#{retention}" if ENV["VERBOSE"] t = Benchmark.measure do ts = (flush_start - flush_start % retention.to_i) @timers.keys.each_slice(400) do |keys| @threadpool.queue ts, keys, retention do |, keys, retention| keys.each do |key| values = @redis.extract_values_from_string("#{key}:#{retention}") if values values = values.collect(&:to_f) puts "Writing the aggregates for #{values.count} values for #{key} at the #{retention} level to disk." if ENV["VVERBOSE"] count = values.count ["mean", "count", "min", "max", ["upper_90", "percentile_90"], ["stddev", "standard_dev"]].each do |aggregation| if aggregation.is_a? Array name = aggregation[0] aggregation = aggregation[1] else name = aggregation end val = (count > 1 ? values.send(aggregation.to_sym) : values.first) @diskstore.append_value_to_file(@diskstore.build_filename("#{key}:#{name}:#{retention}"), "#{timestamp} #{val}") end end end end end @last_flushes[retention] = flush_start end puts "#{Time.now}: Handled disk writing for timers@#{retention} in #{t.real}" if ENV["VERBOSE"] # If this is the last retention we're handling, flush the # times list to redis and reset it if retention == @retentions.last puts "Clearing the timers list. Current state is: #{@timers}" if ENV["VVERBOSE"] t = Benchmark.measure do @redis.add_datapoint @timers.keys @timers = {} end puts "#{Time.now}: Flushed datapoints for timers in #{t.real}" if ENV["VERBOSE"] end end end end |
- (Object) handle(key, value, sample_rate)
Handle the key, value, and sample rate for a timer
Store timers in a hashed array ({a: [], b:[]}) and the set of timers we know about in a hash of nil values
32 33 34 35 36 37 38 39 |
# File 'lib/batsd/handlers/timer.rb', line 32 def handle(key, value, sample_rate) key = "timers:#{key}" if value @active_timers[key] ||= [] @active_timers[key].push value.to_f @timers[key] = nil end end |