Skip to content

Commit

Permalink
Merge branch 'master' into fix/synology-files-update
Browse files Browse the repository at this point in the history
  • Loading branch information
Freika authored Jan 21, 2025
2 parents 06b113a + 4ecc565 commit cbced59
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 79 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

- Synology-related files are now up to date. #684

# 0.23.2 - 2025-01-21
### Fixed

- Drastically improved performance for Google's Records.json import. It will now take less than 5 minutes to import 500,000 points, which previously took a few hours.

### Fixed

Expand Down
7 changes: 3 additions & 4 deletions app/jobs/import/google_takeout_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ class Import::GoogleTakeoutJob < ApplicationJob
queue_as :imports
sidekiq_options retry: false

def perform(import_id, json_string)
def perform(import_id, locations, current_index)
locations_batch = Oj.load(locations)
import = Import.find(import_id)

json = Oj.load(json_string)

GoogleMaps::RecordsParser.new(import).call(json)
GoogleMaps::RecordsImporter.new(import, current_index).call(locations_batch)
end
end
2 changes: 2 additions & 0 deletions app/models/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ class Import < ApplicationRecord
belongs_to :user
has_many :points, dependent: :destroy

delegate :count, to: :points, prefix: true

include ImportUploader::Attachment(:raw)

enum :source, {
Expand Down
84 changes: 84 additions & 0 deletions app/services/google_maps/records_importer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

class GoogleMaps::RecordsImporter
include Imports::Broadcaster

BATCH_SIZE = 1000
attr_reader :import, :current_index

def initialize(import, current_index = 0)
@import = import
@batch = []
@current_index = current_index
end

def call(locations)
Array(locations).each_slice(BATCH_SIZE) do |location_batch|
batch = location_batch.map { prepare_location_data(_1) }
bulk_insert_points(batch)
broadcast_import_progress(import, current_index)
end
end

private

# rubocop:disable Metrics/MethodLength
def prepare_location_data(location)
{
latitude: location['latitudeE7'].to_f / 10**7,
longitude: location['longitudeE7'].to_f / 10**7,
timestamp: parse_timestamp(location),
altitude: location['altitude'],
velocity: location['velocity'],
raw_data: location,
topic: 'Google Maps Timeline Export',
tracker_id: 'google-maps-timeline-export',
import_id: @import.id,
user_id: @import.user_id,
created_at: Time.current,
updated_at: Time.current
}
end
# rubocop:enable Metrics/MethodLength

def bulk_insert_points(batch)
unique_batch = deduplicate_batch(batch)

# rubocop:disable Rails/SkipsModelValidations
Point.upsert_all(
unique_batch,
unique_by: %i[latitude longitude timestamp user_id],
returning: false,
on_duplicate: :skip
)
# rubocop:enable Rails/SkipsModelValidations
rescue StandardError => e
create_notification("Failed to process location batch: #{e.message}")
end

def deduplicate_batch(batch)
batch.uniq do |record|
[
record[:latitude].round(7),
record[:longitude].round(7),
record[:timestamp],
record[:user_id]
]
end
end

def parse_timestamp(location)
Timestamps.parse_timestamp(
location['timestamp'] || location['timestampMs']
)
end

def create_notification(message)
Notification.create!(
user: @import.user,
title: 'Google\'s Records.json Import Error',
content: message,
kind: :error
)
end
end
44 changes: 0 additions & 44 deletions app/services/google_maps/records_parser.rb

This file was deleted.

36 changes: 25 additions & 11 deletions app/services/tasks/imports/google_records.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# frozen_string_literal: true

# This class is named based on Google Takeout's Records.json file,
# the main source of user's location history data.
# This class is named based on Google Takeout's Records.json file

class Tasks::Imports::GoogleRecords
BATCH_SIZE = 1000 # Adjust based on your needs

def initialize(file_path, user_email)
@file_path = file_path
@user = User.find_by(email: user_email)
Expand All @@ -14,10 +15,11 @@ def call

import_id = create_import
log_start
file_content = read_file
json_data = Oj.load(file_content)
schedule_import_jobs(json_data, import_id)
process_file_in_batches(import_id)
log_success
rescue Oj::ParseError => e
Rails.logger.error("JSON parsing error: #{e.message}")
raise
end

private
Expand All @@ -26,14 +28,26 @@ def create_import
@user.imports.create(name: @file_path, source: :google_records).id
end

def read_file
File.read(@file_path)
end
def process_file_in_batches(import_id)
batch = []

Oj.load_file(@file_path, mode: :compat) do |record|
next unless record.is_a?(Hash) && record['locations']

index = 0

def schedule_import_jobs(json_data, import_id)
json_data['locations'].each do |json|
Import::GoogleTakeoutJob.perform_later(import_id, json.to_json)
record['locations'].each do |location|
batch << location

next unless batch.size >= BATCH_SIZE

index += BATCH_SIZE
Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch), index)
batch = []
end
end

Import::GoogleTakeoutJob.perform_later(import_id, Oj.dump(batch)) if batch.any?
end

def log_start
Expand Down
1 change: 1 addition & 0 deletions config/initializers/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

Sidekiq.configure_server do |config|
config.redis = { url: ENV['REDIS_URL'] }
config.logger = Sidekiq::Logger.new($stdout)

if ENV['PROMETHEUS_EXPORTER_ENABLED'].to_s == 'true'
require 'prometheus_exporter/instrumentation'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,62 @@

require 'rails_helper'

RSpec.describe GoogleMaps::RecordsParser do
RSpec.describe GoogleMaps::RecordsImporter do
describe '#call' do
subject(:parser) { described_class.new(import).call(json) }
subject(:parser) { described_class.new(import).call(locations) }

let(:import) { create(:import) }
let(:time) { DateTime.new(2025, 1, 1, 12, 0, 0) }
let(:json) do
{
'latitudeE7' => 123_456_789,
'longitudeE7' => 123_456_789,
'altitude' => 0,
'velocity' => 0
}
let(:locations) do
[
{
'timestampMs' => (time.to_f * 1000).to_i.to_s,
'latitudeE7' => 123_456_789,
'longitudeE7' => 123_456_789,
'accuracy' => 10,
'altitude' => 100,
'verticalAccuracy' => 5,
'activity' => [
{
'timestampMs' => (time.to_f * 1000).to_i.to_s,
'activity' => [
{
'type' => 'STILL',
'confidence' => 100
}
]
}
]
}
]
end

context 'with regular timestamp' do
let(:json) { super().merge('timestamp' => time.to_s) }
let(:locations) { super()[0].merge('timestamp' => time.to_s).to_json }

it 'creates a point' do
expect { parser }.to change(Point, :count).by(1)
end
end

context 'when point already exists' do
let(:json) { super().merge('timestamp' => time.to_s) }
let(:locations) do
[
super()[0].merge(
'timestamp' => time.to_s,
'latitudeE7' => 123_456_789,
'longitudeE7' => 123_456_789
)
]
end

before do
create(
:point, user: import.user, import:, latitude: 12.3456789, longitude: 12.3456789,
:point,
user: import.user,
import: import,
latitude: 12.3456789,
longitude: 12.3456789,
timestamp: time.to_i
)
end
Expand All @@ -41,15 +68,19 @@
end

context 'with timestampMs in milliseconds' do
let(:json) { super().merge('timestampMs' => (time.to_f * 1000).to_i.to_s) }
let(:locations) do
[super()[0].merge('timestampMs' => (time.to_f * 1000).to_i.to_s)]
end

it 'creates a point using milliseconds timestamp' do
expect { parser }.to change(Point, :count).by(1)
end
end

context 'with ISO 8601 timestamp' do
let(:json) { super().merge('timestamp' => time.iso8601) }
let(:locations) do
[super()[0].merge('timestamp' => time.iso8601)]
end

it 'parses ISO 8601 timestamp correctly' do
expect { parser }.to change(Point, :count).by(1)
Expand All @@ -59,7 +90,9 @@
end

context 'with timestamp in milliseconds' do
let(:json) { super().merge('timestamp' => (time.to_f * 1000).to_i.to_s) }
let(:locations) do
[super()[0].merge('timestamp' => (time.to_f * 1000).to_i.to_s)]
end

it 'parses millisecond timestamp correctly' do
expect { parser }.to change(Point, :count).by(1)
Expand All @@ -69,7 +102,9 @@
end

context 'with timestamp in seconds' do
let(:json) { super().merge('timestamp' => time.to_i.to_s) }
let(:locations) do
[super()[0].merge('timestamp' => time.to_i.to_s)]
end

it 'parses second timestamp correctly' do
expect { parser }.to change(Point, :count).by(1)
Expand Down
4 changes: 2 additions & 2 deletions spec/services/tasks/imports/google_records_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
RSpec.describe Tasks::Imports::GoogleRecords do
describe '#call' do
let(:user) { create(:user) }
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json') }
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json').to_s }

it 'schedules the Import::GoogleTakeoutJob' do
expect(Import::GoogleTakeoutJob).to receive(:perform_later).exactly(3).times
expect(Import::GoogleTakeoutJob).to receive(:perform_later).exactly(1).time

described_class.new(file_path, user.email).call
end
Expand Down
2 changes: 1 addition & 1 deletion spec/tasks/import_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'rails_helper'

describe 'import.rake' do
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json') }
let(:file_path) { Rails.root.join('spec/fixtures/files/google/records.json').to_s }
let(:user) { create(:user) }

it 'calls importing class' do
Expand Down

0 comments on commit cbced59

Please sign in to comment.