-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemail_cleaner.rb
402 lines (340 loc) · 11.9 KB
/
email_cleaner.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# coding: utf-8
# frozen_string_literal: true
require 'clamp'
require 'byebug'
require 'fileutils'
require 'ruby-progressbar'
require 'concurrent-edge'
require 'connection_pool'
require 'mysql2'
require './header_parser.rb'
require 'json'
require 'mail'
require 'csv'
require 'RubySpamAssassin'
include RubySpamAssassin
class Computer < Concurrent::Actor::RestartingContext
def initialize
super()
@jobs = {}
@finished_count = 0
end
def on_message(msg)
command, *args = msg
case command
# new job to process
when :run
job = args[0]
@jobs[job] = envelope.future
# Process asynchronously and send message back when done.
Concurrent::Promises.future(&job).chain(job) do |fulfilled, value, reason, job|
# byebug unless reason.empty?
self.tell [:done, job, fulfilled, value, reason]
end
# Do not make return value of this method to be answer of this message.
# We are answering later in :done by resolving the future kept in @jobs.
Concurrent::Actor::Behaviour::MESSAGE_PROCESSED
when :done
job, fulfilled, value, reason = *args
future = @jobs.delete job
# Answer the job's result.
# byebug unless value.empty?
@finished_count += value[:total]
future.resolve fulfilled, value, reason
future = nil
when :status
status = { running_jobs: @jobs.size, finished_count: @finished_count }
@finished_count = 0
status
else
# Continue to fail with unknown message.
pass
end
end
end # => :on_message
Clamp do
option ['-s', '--size'],
'SIZE',
'the number of lines to processes concurrently',
default: 5,
attribute_name: :size do |s|
Integer(s)
end
option ['--mysql-timeout'],
'MYSQL_TIMEOUT',
'the timeout for any mysql transaction',
default: 5,
attribute_name: :mysql_timeout do |t|
Integer(t)
end
option ['-u', '--user'],
'USER',
'username for mysql local database',
attribute_name: :mysql_user
option ['-p', '--password'],
'PASSWORD',
'password for mysql local database',
default: nil,
attribute_name: :mysql_password
option ['-h', '--host'],
'HOST',
'host for mysql server',
default: 'localhost',
attribute_name: :mysql_host
option ['-v', '--verbose'],
:flag,
'be talky'
option ['-e', '--eml'],
'EML OUTPUT DIRECTORY',
'output directory for eml files, no out put if not set',
default: nil,
attribute_name: :eml_output_directory
option ['-o', '--offset'],
'DB OFFSET',
'offset to start from when getting rows',
default: 0,
attribute_name: :offset do |o|
Integer(o)
end
option ['-i', '--id'],
'START ID',
'id to start with',
default: 0,
attribute_name: :start_id
option ['--debug'],
:flag,
'debug mode, no threading, verbose'
parameter 'DATABASE', 'the database to convert everything into', attribute_name: :database
def execute
puts "Processing database #{database} ⚙️ ⚙️= ⚙️ "
@verbose = verbose?
@size = size
@mysql_timeout = mysql_timeout
@database = database
@eml_output_directory = eml_output_directory
@mysql_host = mysql_host
@mysql_password = mysql_password
@offset = offset
@cores = Concurrent.processor_count - 1
@last_id = start_id
if debug?
@debug = debug?
@verbose = true
@cores = 1
end
# Just to keep one in the queue
# @cores += 1
# Steps
# Start up a connection pool
mysql_connection_pool = start_mysql_connection_pool @cores * 5,
mysql_timeout,
mysql_user,
mysql_password,
database,
mysql_host
mysql_count = mysql_connection_pool.with do |mysql_client|
get_row_count(mysql_client)
end
puts "Found about #{mysql_count - offset} records to process"
puts "Processing on #{@cores} cores"
puts 'Starting processing 🏁 🏁 🏁'
@progressbar = ProgressBar.create total: mysql_count - offset,
throttle_rate: 0.5,
format: '%t: |%B| : %c/%C : %p%% : %a : %E '
@error_count = 0
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
# Get X number of lines from the database
computer = Concurrent::Actor.spawn Computer, :computer
client = new_mysql_client mysql_user,
mysql_password,
database,
mysql_host
@finished_count = 0 if @debug
loop do
#1000.times do
#futures = [] if futures.nil?
#OK, let's start this over.
#We create a pool of futures of a certain size
#We then load up futures, grabbing, call it 200 mysql rows for each one until we hit a size
#When one finishes, we grab the next, etc.
unless @debug
status = computer.ask(:status).value!
running_jobs = status[:running_jobs]
@progressbar.progress += status[:finished_count]
@progressbar.title = "#{running_jobs} jobs running, last_id #{@last_id}"
else
running_jobs = 0
end
if running_jobs < @size
mysql_results = get_database_results client, (@cores * @size) - (running_jobs * @size), @last_id
#break if mysql_results.count < @size.to_i
break if mysql_results.count.zero? && running_jobs.zero?
@last_id = mysql_results.to_a.last["id"] unless mysql_results.count.zero?
@offset += mysql_results.count
# mysql_results.each do |result|
# futures << create_future(result, mysql_connection_pool)
# @progressbar.increment
# end
# mysql_results
mysql_results.each_slice(@size) do |results|
@progressbar.log @last_id
if @debug
begin
process_mysql_result results, mysql_user, mysql_password, database, mysql_host
@finished_count += results.count
@progressbar.progress += results.count
@progressbar.title = "Debug Mode: last_id #{@last_id}"
rescue Exception => e
@progressbar.log "Error processing slice: #{e}"
@progressbar.log "Backtrack: #{e.backtrace}"
end
else
# byebug
computer.ask [:run, -> { process_mysql_result results, mysql_user, mysql_password, database, mysql_host }]
end
# @progressbar.progress += @size
end
mysql_results = nil
end
end
@progressbar.finish
end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
elapsed_time = end_time - start_time
puts 'Finished! 🏁 🙌 🎉'
puts "Total time: #{elapsed_time}"
puts "Number of records: #{mysql_count}"
puts "Number of errors: #{@error_count}"
end
def start_mysql_connection_pool(size, timeout, user, password, database, host)
ConnectionPool.new(size: size, timeout: @mysql_timeout) do
new_mysql_client(user, password, database, host)
end
end
def new_mysql_client(user, password, database, host)
Mysql2::Client.new(host: host,
database: database,
username: user,
password: password,
cache_rows: false
)
end
def get_row_count(client)
results = client.query("SELECT TABLE_NAME AS 'User Emails', TABLE_ROWS AS 'Rows' FROM information_schema.TABLES WHERE TABLES.TABLE_SCHEMA = '#{@database}' AND TABLES.TABLE_TYPE = 'BASE TABLE';")
results.first["Rows"]
end
def get_database_results(client, count, id)
client.query("SELECT * FROM user_emails WHERE id > #{id} ORDER BY id LIMIT #{count}", cache_rows: false)
end
def process_mysql_result(mysql_results, mysql_user, mysql_password, database, mysql_host)
mysql_conn = new_mysql_client mysql_user,
mysql_password,
database,
mysql_host
errors = []
mysql_results.each do |mysql_result|
begin
headers = parse_headers mysql_result
id = headers[:id]
results = headers[:parsed_results]
# Add to the database
to = mysql_conn.escape results['to'] unless results['to'].nil?
from = mysql_conn.escape results['from'] unless results['from'].nil?
results_as_json = results.to_json
json_headers = mysql_conn.escape results_as_json
results_as_json = nil
email_format = eml_format(mysql_result, headers)
is_spam = check_if_spam(headers, email_format)
# byebug if is_spam == false
mysql_conn.query("UPDATE `user_emails` SET `to` = '#{to}', `from` = '#{from}', `json` = '#{json_headers}' WHERE `id` = #{id};")
next if is_spam == true
unless @eml_output_directory.nil?
File.open "#{@eml_output_directory}/#{mysql_result['id']}.eml", 'w' do |f|
f.write email_format
end
end
rescue Exception => e
@progressbar.log "Error processing slice: #{e}"
@progressbar.log "Backtrace: #{e.backtrace}"
byebug
# errors << mysql_result["id"]
end
end
mysql_conn.close
mysql_conn = nil
{ total: mysql_results.count }
end
def check_if_spam(headers, eml)
# if 'eml' is nil, we don't even bother, kill it
return true if eml.nil?
# First, we check the headers for a 'x-spam-status', and parse that
# if possible. If not, we then send it through to SpamAssassin
spam_status_raw = headers[:parsed_results]['x-spam-status']
if spam_status_raw.nil?
# spam_client = SpamClient.new("0.0.0.0", "783", 20)
# report = spam_client.check(eml)
# spam_status = report.score
# @progressbar.log "Found novel spam score of: #{spam_status}"
# For the moment it seems as if these are not spam in general
return false
else
spam_status = parse_header_spam_status(spam_status_raw)
# @progressbar.log "Found reported spam score of: #{spam_status}"
end
return false if spam_status.to_f < 2
true
end
def parse_header_spam_status(spam_status)
extracted_string = spam_status[14..-2]
return nil if extracted_string.nil?
extracted_string.to_f.abs
end
def eml_format(result, headers)
encoding = clean_encoding result['encoding']
begin
mail = Mail.new do
headers[:parsed_results].each do |key, value|
header[key] = value
end
text_part do
content_type encoding
body result['plain_text']
end
html_part do
content_type "text/html; charset=#{encoding}"
body result['html']
end
charset = encoding
end
mail.to_s
rescue Exception => e
#byebug
end
end
def clean_encoding encoding
return "" if encoding.nil?
index = encoding.index '$'
encoding[0...index]
end
def parse_headers(mysql_result)
# Parsing:
# Pull out the headers
# Parse headers
# Get a connection from the pool
# Insert back into database under the correct column
# Temporarily!
headers = mysql_result["headers"]
parser = HeaderParser.new(headers)
parsed_results = parser.parse
parser = nil
{ id: mysql_result['id'], parsed_results: parsed_results }
end
end
class RowProcessingError < StandardError
def initialize(row, message)
@row = row
@message = message
end
def to_s
"RowProcessingError: id: #{row['id']}, '#{@message}'"
end
end