Skip to content
136 changes: 110 additions & 26 deletions lib/stathat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,95 @@ class Common
CLASSIC_VALUE_URL = "https://api.stathat.com/v"
CLASSIC_COUNT_URL = "https://api.stathat.com/c"
EZ_URL = "https://api.stathat.com/ez"
EZ_URI = URI(EZ_URL)

class << self
def send_to_stathat(url, args)
def stathat_uri(url, args)
uri = URI.parse(url)

begin
uri.query = URI.encode_www_form(args)
rescue NoMethodError => e
rescue NoMethodError
# backwards compatability for pre 1.9.x
uri.query = args.map { |arg, val| arg.to_s + "=" + CGI::escape(val.to_s) }.join('&')
end

resp = Net::HTTP.get(uri)
return uri
end

def send_to_stathat(url, args)
uri = stathat_uri(url, args)
resp = Net::HTTP.get_response(uri)
return Response.new(resp)
end

def send_ez_batch_to_stathat(batch_args, ezkey)
resp = Net::HTTP.start(EZ_URI.host, EZ_URI.port, :use_ssl => true) do |http|
http.post EZ_URI.path,
{ :ezkey => ezkey, :data => batch_args }.to_json,
"Content-Type" => "application/json"
end
return Response.new(resp)
end
end
end

class Export
class Error < StandardError
attr_reader :response

def initialize(msg, response)
@response = response
super(msg)
end
end

EXPORT_URL = "https://www.stathat.com/x"

class << self
def get(access_token, paths, args)
uri = Common::stathat_uri(([EXPORT_URL, access_token] + paths).join('/'), args)
resp = Net::HTTP.get_response(uri)

if resp.kind_of?(Net::HTTPSuccess) && resp.body
return JSON.parse(resp.body)
else
raise Error.new("export error #{resp.inspect} for #{uri.inspect}", resp)
end
end
end
end

class SyncAPI
class << self
def ez_post_value(stat_name, ezkey, value, timestamp=nil)
args = { :stat => stat_name,
:ezkey => ezkey,
:value => value }
:value => Float(value) }
args[:t] = timestamp unless timestamp.nil?
Common::send_to_stathat(Common::EZ_URL, args)
end

def ez_post_count(stat_name, ezkey, count, timestamp=nil)
args = { :stat => stat_name,
:ezkey => ezkey,
:count => count }
:count => Integer(count) }
args[:t] = timestamp unless timestamp.nil?
Common::send_to_stathat(Common::EZ_URL, args)
end

def post_count(stat_key, user_key, count, timestamp=nil)
args = { :key => stat_key,
:ukey => user_key,
:count => count }
:count => Integer(count) }
args[:t] = timestamp unless timestamp.nil?
Common::send_to_stathat(Common::CLASSIC_COUNT_URL, args)
end

def post_value(stat_key, user_key, value, timestamp=nil)
args = { :key => stat_key,
:ukey => user_key,
:value => value }
:value => Float(value) }
args[:t] = timestamp unless timestamp.nil?
Common::send_to_stathat(Common::CLASSIC_VALUE_URL, args)
end
Expand All @@ -66,20 +107,25 @@ def post_value(stat_key, user_key, value, timestamp=nil)

class API
class << self
attr_accessor :max_batch
attr_accessor :pool_size
attr_accessor :max_queue_size
attr_accessor :batch_sleep_seconds

def ez_post_value(stat_name, ezkey, value, timestamp=nil, &block)
Reporter.instance.ez_post_value(stat_name, ezkey, value, timestamp, block)
Reporter.instance.ez_post_value(stat_name, ezkey, Float(value), timestamp, block)
end

def ez_post_count(stat_name, ezkey, count, timestamp=nil, &block)
Reporter.instance.ez_post_count(stat_name, ezkey, count, timestamp, block)
Reporter.instance.ez_post_count(stat_name, ezkey, Integer(count), timestamp, block)
end

def post_count(stat_key, user_key, count, timestamp=nil, &block)
Reporter.instance.post_count(stat_key, user_key, count, timestamp, block)
Reporter.instance.post_count(stat_key, user_key, Integer(count), timestamp, block)
end

def post_value(stat_key, user_key, value, timestamp=nil, &block)
Reporter.instance.post_value(stat_key, user_key, value, timestamp, block)
Reporter.instance.post_value(stat_key, user_key, Float(value), timestamp, block)
end
end
end
Expand Down Expand Up @@ -134,22 +180,51 @@ def ez_post_count(stat_name, ezkey, count, timestamp, cb)
def run_pool
@runlock.synchronize { @running = true }
@pool = []
5.times do |i|
(API.pool_size || 5).times do |i|
@pool[i] = Thread.new do
while true do
point = @que.pop
# XXX check for error?
begin
resp = Common::send_to_stathat(point[:url], point[:args])
if point[:cb]
point[:cb].call(resp)
points = [@que.pop]
while points.size < (API.max_batch || 1) && @que.length > 0
points << @que.pop(true) rescue ThreadError
end

groups = points.group_by{|point| point[:args][:ezkey]}
groups.each do |ezkey, batch|
if ezkey.nil?
batch.each do |point|
# XXX check for error?
begin
resp = Common::send_to_stathat(point[:url], point[:args])
if point[:cb]
point[:cb].call(resp)
end
rescue
puts "Exception in StatHat encoding or callback #{$!.inspect}"
end
end

else
batch_args = batch.map{|point| point[:args]}
batch_args.each{|args| args.delete(:ezkey)}

begin
resp = Common::send_ez_batch_to_stathat(batch_args, ezkey)
batch.each do |point|
if point[:cb]
point[:cb].call(resp)
end
end
rescue
puts "Exception in StatHat encoding or callback #{$!.inspect}"
end
rescue
pp $!
end
end

@runlock.synchronize {
break unless @running
}

sleep API.batch_sleep_seconds unless API.batch_sleep_seconds.nil?
end
end
end
Expand All @@ -166,36 +241,45 @@ def stop_pool()

def enqueue(url, args, cb=nil)
return false unless @running
if API.max_queue_size && @que.length > API.max_queue_size
puts "Dropping StatHat queue"
@que.clear
end
point = {:url => url, :args => args, :cb => cb}
@que << point
true
end
end

class Response
def initialize(body)
@body = body
def initialize(resp)
@resp = resp
@parsed = nil
end

def success?
return valid? && (@resp.body.nil? || msg == "ok")
end

def valid?
return status == 200
return @resp.body.nil? ? @resp.kind_of?(Net::HTTPSuccess) : status == 200
end

def status
parse
return @parsed['status']
return @resp.body.nil? ? @resp.status : @parsed['status']
end

def msg
parse
return @parsed['msg']
return @resp.body.nil? ? nil : @parsed['msg']
end

private
def parse
return unless @parsed.nil?
@parsed = JSON.parse(@body)
return if @resp.body.nil?
@parsed = JSON.parse(@resp.body)
end
end
end
Loading