Class: Batsd::Receiver::Daemon

Inherits:
Object
  • Object
show all
Defined in:
lib/batsd/receiver.rb

Overview

Interface to run the receiver from the binary

Constant Summary

MAX_TRUNCATION_INTERVAL =

Force a truncation at all intervals at least every MAX_TRUNCATION_INTERVAL seconds. This prevents overflow issues in EventMachine, but is largely theoretical; it's unlikely any instance of the daemon would run this long.

2000000

Instance Method Summary (collapse)

Constructor Details

- (Daemon) initialize(handlers, options = {})

Create a new daemon and set up it's options and register the handlers provided



60
61
62
63
64
# File 'lib/batsd/receiver.rb', line 60

def initialize(handlers, options={})
  @options = options
  @handlers = handlers
  Batsd::Receiver.handlers = handlers
end

Instance Method Details

- (Object) run

Run the event machine server. This will:

  • Bind to an address and port and process incoming messages

  • Establish flush timers for each handler that has implemented a flush method. These timers will happen at the lowest retention level (typically 10 seconds)



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/batsd/receiver.rb', line 73

def run
  EventMachine::run do
    if RUBY_PLATFORM == "java"
      Thread.current.priority = 10
    end
    bind = @options[:bind] || '0.0.0.0'
    port = @options[:port] || 8125
    puts "#{Time.now}: Starting receiver on batsd://#{bind}:#{port}"
    EventMachine::open_datagram_socket(bind, port, Batsd::Receiver)
    # Have to run the statistics service as part of this process so that
    # it has access to the handler objects, which contain their own
    # statistics
    EventMachine::start_server(bind, port + 1, Batsd::Statistics)

    @handlers.each do |type, handler|
      if handler.respond_to? :flush
        puts "#{Time.now}: Adding flush timer to #{handler}"
        EventMachine.add_periodic_timer(@options[:retentions].keys[0].to_i) do
           Thread.new { handler.flush }
        end
      end
    end

    if @options[:autotruncate]
      puts "Enabling autotruncation"
      @options[:retentions].each do |interval, n|
        frequency = [interval.to_i * n.to_i, MAX_TRUNCATION_INTERVAL].min 
        EventMachine.add_periodic_timer(frequency) do
          Thread.new { Batsd::Truncator.new(@options).run(interval) }
        end
        puts "Truncator added for #{interval} second aggregations every #{frequency} seconds"
      end
    end

  end
end