Module: Batsd::Server

Defined in:
lib/batsd/server.rb

Overview

Makes data from statsd available over a TCP socket

Defined Under Namespace

Classes: Daemon

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (Object) config

access the server config



6
7
8
# File 'lib/batsd/server.rb', line 6

def self.config
  @config
end

+ (Object) config=(config)

Set the config for the server



11
12
13
# File 'lib/batsd/server.rb', line 11

def self.config=(config)
  @config=config
end

Instance Method Details

- (Object) post_init

Set up a redis and diskstore instance per connection so they don't step on each other. Since redis commands are happening in a deferrable, intentionally not using EM-redis



18
19
20
21
22
# File 'lib/batsd/server.rb', line 18

def post_init
  puts "batsd server ready and waiting on #{Batsd::Server.config[:port]} to ship data upon request\n"
  @redis = Batsd::Redis.new(Batsd::Server.config)
  @diskstore = Batsd::Diskstore.new(Batsd::Server.config[:root])
end

- (Object) receive_data(msg)

Handle a command received over the server port and return the datapoints, values, or a PONG as requested.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/batsd/server.rb', line 26

def receive_data(msg)  
  msg.split("\n").each do |row|
    begin
      command = row.split(" ")[0]
      return unless command 
      case
        when command.match(/available/i)
          EM.defer { send_data "#{JSON(@redis.datapoints)}\n" }
        when command.match(/values/i)
          EM.defer do
             command, metric, begin_time, end_time = row.split(" ")
             datapoints = []
             if metric.match(/^gauge/)
               datapoints = @diskstore.read(metric, begin_time, end_time)
             else
               Batsd::Server.config[:retentions].each_with_index do |retention, index|
                 next if (Time.now.to_i - (retention[0] * retention[1]) > begin_time.to_i)
                 if index.zero?
                   datapoints = @redis.values_from_zset(metric, begin_time, end_time)
                   break
                 else
                   datapoints = @diskstore.read("#{metric}:#{retention[0]}", begin_time, end_time)
                   break
                 end
               end
             end
             send_data "#{JSON({"#{metric}" => datapoints})}\n"
          end
        when command.match(/ping/i)
          send_data "PONG\n"
        when command.match(/quit|exit/i)
          send_data "BYE\n"
          close_connection
        else
          send_data "#{JSON({error: "Unrecognized command #{command}"})}\n"
      end
    rescue Exception => e
      puts e if ENV["VERBOSE"]
    rescue
      puts "Uncaught Error"
    end
  end
end