Class: Batsd::Handler::Counter
- Inherits:
-
Batsd::Handler
- Object
- Batsd::Handler
- Batsd::Handler::Counter
- Defined in:
- lib/batsd/handlers/counter.rb
Overview
Handles counter measurements ("|c")
Counter measurements are summed together across aggregation intervals
Instance Method Summary (collapse)
-
- (Object) flush
Flushes the accumulated counters that are pending in @active_counters.
-
- (Object) handle(key, value, sample_rate)
Processes an incoming counter measurement.
-
- (Counter) initialize(options)
constructor
Set up a new handler to handle counters.
Methods inherited from Batsd::Handler
Constructor Details
- (Counter) initialize(options)
Set up a new handler to handle counters
-
Set up a redis client
-
Set up a diskstore client to write aggregates to disk
-
Initialize last flush timers to now
16 17 18 19 20 21 22 23 24 25 |
# File 'lib/batsd/handlers/counter.rb', line 16 def initialize() @redis = Batsd::Redis.new() @diskstore = Batsd::Diskstore.new([:root]) @counters = @active_counters = {} @retentions = [:retentions].keys @flush_interval = @retentions.first now = Time.now.to_i @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } super end |
Instance Method Details
- (Object) flush
Flushes the accumulated counters that are pending in @active_counters.
Each counter is pushed into the threadpool queue, which will update all of the counters for all of the aggregations in Redis
flush is also used to write the latter aggregations from redis to disk. It does this by tracking the last time they were written. If that was a sufficient time ago, the value will be retrieved from redis, cleared, and written to disk in another thread.
When the last level of aggregation (least granularity) is written, the @counters will be flushed to the 'datapoints' set in redis and reset
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/batsd/handlers/counter.rb', line 61 def flush puts "Current threadpool queue for counters: #{@threadpool.size}" if ENV["VVERBOSE"] # Flushing is usually very fast, but always fix it so that the # entire thing is based on a constant start time # Saves on time syscalls too flush_start = Time.now.to_i n = @active_counters.size t = Benchmark.measure do ts = (flush_start - flush_start % @flush_interval) counters = @active_counters.dup @active_counters = {} counters.each_slice(50) do |keys| @threadpool.queue ts, keys do |, keys| keys.each do |key, value| @redis.store_and_update_all_counters(, key, value) end end end end puts "Flushed #{n} counters in #{t.real} seconds" if ENV["VERBOSE"] # If it's time for the latter aggregation to be written to disk, queue # those up @retentions.each_with_index do |retention, index| # First retention is always just flushed to redis on the flush interval next if index.zero? # Only if we're in need of a write to disk - if the next flush will be # past the threshold if (flush_start + @flush_interval) > @last_flushes[retention] + retention.to_i puts "Starting disk writing for timers@#{retention}" if ENV["VERBOSE"] t = Benchmark.measure do ts = (flush_start - flush_start % retention.to_i) @counters.keys.each_slice(400) do |keys| @threadpool.queue ts, keys, retention do |, keys, retention| keys.each do |key| key = "#{key}:#{retention}" value = @redis.get_and_clear_key(key) if value value = "#{ts} #{value}" @diskstore.append_value_to_file(@diskstore.build_filename(key), value) end end end end @last_flushes[retention] = flush_start end puts "#{Time.now}: Handled disk writing for counters@#{retention} in #{t.real}" if ENV["VERBOSE"] # If this is the last retention we're handling, flush the # counters list to redis and reset it if retention == @retentions.last puts "Clearing the counters list. Current state is: #{@counters}" if ENV["VVERBOSE"] t = Benchmark.measure do @redis.add_datapoint @counters.keys @counters = {} end puts "#{Time.now}: Flushed datapoints for counters in #{t.real}" if ENV["VERBOSE"] end end end end |
- (Object) handle(key, value, sample_rate)
Processes an incoming counter measurement
-
Normalize for sample rate provided
-
Adds the value to any existing values by the same key and stores it in @active_counters
-
Add the key and a nil value to @counters in order to track the set of counters that have been handled "recently". This is a relatively memory efficient, relatively fast way of storing a unique set of keys.
37 38 39 40 41 42 43 44 |
# File 'lib/batsd/handlers/counter.rb', line 37 def handle(key, value, sample_rate) if sample_rate value = value.to_f / sample_rate.gsub("@", "").to_f end key = "counters:#{key}" @active_counters[key] = @active_counters[key].to_i + value.to_i @counters[key] = nil end |