-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstreaming.rb
More file actions
92 lines (83 loc) · 2.91 KB
/
streaming.rb
File metadata and controls
92 lines (83 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# -*- coding: utf-8 -*-
require 'set'
Plugin.create :streaming do
thread = nil
@fail_count = @wait_time = 0
reconnect_request_flag = false
on_filter_stream_force_retry do
if UserConfig[:filter_realtime_rewind]
thread.kill rescue nil if thread
thread = start end end
on_filter_stream_reconnect_request do
if not reconnect_request_flag
reconnect_request_flag = true
Reserver.new(30, thread: Delayer) {
reconnect_request_flag = false
Plugin.call(:filter_stream_force_retry) } end end
def start
twitter = Enumerator.new{|y|
Plugin.filtering(:worlds, y)
}.find{|world|
world.class.slug == :twitter
}
return unless twitter
@success_flag = false
@fail = MikuTwitter::StreamingFailedActions.new("Filter Stream", self)
Thread.new{
loop{
begin
follow = Plugin.filtering(:filter_stream_follow, Set.new).first || Set.new
track = Plugin.filtering(:filter_stream_track, "").first || ""
if follow.empty? && track.empty?
sleep(60)
else
param = {}
param[:follow] = follow.to_a[0, 5000].map(&:id).join(',') if not follow.empty?
param[:track] = track if not track.empty?
r = twitter.streaming(:filter_stream, param){ |json|
json.strip!
case json
when /\A\{.*\}\Z/
unless @success_flag
@fail.success
@success_flag = true end
parsed = JSON.parse(json).symbolize
if not (UserConfig[:filter_dont_exclude_retweet] == false and parsed[:retweeted_status])
MikuTwitter::ApiCallSupport::Request::Parser.streaming_message(parsed) rescue nil
end
end }
raise r if r.is_a? Exception
notice "filter stream: disconnected #{r}"
streamerror r
end
rescue Net::HTTPError => exception
warn "filter stream: disconnected: #{exception.code} #{exception.body}"
streamerror exception
warn exception
rescue Net::ReadTimeout => exception
streamerror exception
rescue Exception => exception
warn "filter stream: disconnected: exception #{exception}"
streamerror exception
warn exception end
notice "retry wait #{@fail.wait_time}, fail_count #{@fail.fail_count}"
sleep @fail.wait_time } }
end
def streamerror(exception)
@success_flag = false
@fail.notify(exception) end
on_userconfig_modify do |key, new_val|
next if key != :filter_realtime_rewind
if new_val
notice 'filter stream: enable'
thread = start unless thread.is_a? Thread
else
notice 'filter stream: disable'
thread.kill if thread.is_a? Thread
thread = nil
end
end
Delayer.new do
thread = start if UserConfig[:filter_realtime_rewind]
end
end