-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.rb
115 lines (90 loc) · 3.02 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
$:.unshift "."
require "lib/teleport"
require "lib/stabilizer_service"
require "lib/merger_service"
require "lib/uploader"
require "lib/push_delivery"
require "mongoid"
require "aws-sdk"
if ARGV.empty?
STDERR.puts "Missing queue id (A or B)"
exit
end
Aws.config.update({
region: 'us-east-1',
credentials: Aws::Credentials.new(ENV['AWS_ACCESS_KEY_ID'], ENV['AWS_SECRET_ACCESS_KEY'])
})
QUEUE_A_URL = ENV['QUEUE_A_URL']
QUEUE_B_URL = ENV['QUEUE_B_URL']
if ARGV[0].upcase == 'B'
QUEUE_URL = QUEUE_B_URL
else
QUEUE_URL = QUEUE_A_URL
end
Mongoid.load!(File.expand_path(File.join(".", "mongoid.yml")))
poller = Aws::SQS::QueuePoller.new(QUEUE_URL)
puts "Listening on: #{QUEUE_URL}"
begin
poller.poll do |message|
begin
params = JSON.parse(message.body, symbolize_names: true)
puts "[PROCESSING] #{params[:command]}"
case params[:command]
when "post_process"
# Set to stabilizing
teleport = Teleport.find(params[:id])
teleport.status = Teleport::Status::STABILIZING
teleport.save
# Submit stabilize job for each side
sqs = Aws::SQS::Client.new
{ left: QUEUE_A_URL, right: QUEUE_B_URL }.each do |side, queue_url|
sqs.send_message(
queue_url: queue_url,
message_body: { command: "stabilize", id: teleport.id, side: side }.to_json
)
end
when "stabilize"
teleport = Teleport.find(params[:id])
side = params[:side]
puts "Stabilizing"
stabilizer = StabilizerService.new(teleport.source_url, side)
path = stabilizer.stabilize!
puts "Updating"
teleport.send("stabilized_#{side}_path=", path)
teleport.save
when "upload"
teleport = Teleport.find(params[:id])
puts "Merging"
merger = MergerService.new(teleport.source_url,
teleport.stabilized_left_path,
teleport.stabilized_right_path)
path = merger.merge!
puts "Uploading"
uploader = Uploader.new(teleport.id, path)
url = uploader.upload!
puts "Updating"
teleport.url = url
teleport.status = Teleport::Status::ENABLED
teleport.save
puts "Notifying"
if teleport.push_token
push = PushDelivery.new(teleport.push_token)
push.title = "Ready to watch"
push.body = teleport.title
push.deliver!
end
puts "Cleaning"
StabilizerService.cleanup(teleport.stabilized_left_path)
StabilizerService.cleanup(teleport.stabilized_right_path)
teleport.stabilized_left_path = nil
teleport.stabilized_right_path = nil
teleport.save
end
puts "[PROCESSED] #{params[:command]}"
rescue Exception => e
STDERR.puts "[ERROR] #{params[:command]}: #{e.message}"
throw :skip_delete
end
end
rescue SystemExit, Interrupt
end