Skip to content

Commit

Permalink
Rewrite the "sync_local" query.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Feb 3, 2025
1 parent 024fda8 commit 4c988e2
Showing 1 changed file with 72 additions and 24 deletions.
96 changes: 72 additions & 24 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,50 +55,98 @@ pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result<i64, SQLiteEr

// Query for updated objects

// Be careful with modifying this query - it is critical for performance. When modifying, make sure to check
// performance of the query with a large number of rows, and also with a large number of duplicate rows (same row_id).
//
// This form uses a subquery with max(r.op_id) instead of a JOIN to get the latest oplog entry for each updated row.
// The subquery is because:
// 1. We need the GROUP BY to execute _before_ looking up the latest op_id for each row, otherwise
// we get terrible performance if there are lots of duplicate ids (O(N^2) performance).
// 2. We want to avoid using a second GROUP BY, which would use a secondary TEMP B-TREE.
//
// It does not appear to be feasible to avoid the single TEMP B-TREE here.
//
// The query roughly does the following:
// 1. Filter oplog by the ops added but not applied yet (oplog b). These are not unique.
// 2. Use GROUP BY to get unique rows. This adds some overhead because of the TEMP B-TREE, but is necessary
// to cover cases of duplicate rows.
// 3. For each op, find the latest version of the data. This is done using a subquery, with `max(r.op_id)`` to
// select the latest version.
//
// The subquery instead of a JOIN is because:
// 1. We need the GROUP BY to execute _before_ looking up the latest op_id for each row, otherwise
// we get terrible performance if there are lots of duplicate ids (O(N^2) performance).
// 2. We want to avoid using a second GROUP BY, which would use a second TEMP B-TREE.
//
// The `ifnull(data, max(op_id))` clause is a hack to pick the row with the largest op_id, but only select the data.
//
// QUERY PLAN
// |--CO-ROUTINE updated_rows
// | `--COMPOUND QUERY
// | |--LEFT-MOST SUBQUERY
// | | |--SCAN buckets USING COVERING INDEX ps_buckets_name
// | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?)
// | `--UNION ALL
// | `--SCAN ps_updated_rows
// |--SCAN b
// |--USE TEMP B-TREE FOR GROUP BY
// `--CORRELATED SCALAR SUBQUERY 3
// `--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?)

// language=SQLite
let statement = db
.prepare_v2(
"\
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
WITH updated_rows AS (
SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION SELECT row_type, row_id FROM ps_updated_rows
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
)
-- 3. Group the objects from different buckets together into a single one (ops).
SELECT b.row_type as type,
b.row_id as id,
r.data as data,
count(r.bucket) as buckets,
/* max() affects which row is used for 'data' */
max(r.op_id) as op_id
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
FROM updated_rows b
LEFT OUTER JOIN ps_oplog AS r
ON r.row_type = b.row_type
AND r.row_id = b.row_id
-- Group for (3)
GROUP BY b.row_type, b.row_id",
SELECT
b.row_type,
b.row_id,
(
SELECT ifnull(r.data, max(r.op_id))
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
) as data
FROM updated_rows b;
GROUP BY b.row_type, b.row_id;
",
)
.into_db_result(db)?;

// TODO: cache statements
// An alternative form of the query is this:
//
// SELECT r.row_type as type,
// r.row_id as id,
// r.data as data,
// max(r.op_id) as op_id
// FROM ps_oplog r
// GROUP BY r.row_type, r.row_id;
//
// This form is simple and fast, but does not filter only on updated rows. It also ignores ps_updated_rows.
// We could later add heuristics to use this form on initial sync, or when a large number of rows have been re-synced.
//
// QUERY PLAN
// `--SCAN r USING INDEX ps_oplog_row

// TODO: cache individual statements

while statement.step().into_db_result(db)? == ResultCode::ROW {
let type_name = statement.column_text(0)?;
let id = statement.column_text(1)?;
let buckets = statement.column_int(3)?;
let data = statement.column_text(2);

let table_name = internal_table_name(type_name);

if tables.contains(&table_name) {
let quoted = quote_internal_name(type_name, false);

if buckets == 0 {
if data.is_err() {
// DELETE
let delete_statement = db
.prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted))
Expand All @@ -115,7 +163,7 @@ GROUP BY b.row_type, b.row_id",
insert_statement.exec()?;
}
} else {
if buckets == 0 {
if data.is_err() {
// DELETE
// language=SQLite
let delete_statement = db
Expand Down

0 comments on commit 4c988e2

Please sign in to comment.